本文整理汇总了Python中tldextract.extract函数的典型用法代码示例。如果您正苦于以下问题:Python extract函数的具体用法?Python extract怎么用?Python extract使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了extract函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: insert
def insert(data):
if data.strip():
con = MySQLdb.connect(host="localhost", # your host, usually localhost
user="root", # your username
passwd="1234", # your password
db="rabbitmq") # name of the data base
cur = con.cursor()
query="insert into rabbitmq (url,domain,ttl,class,type,ip,worker)values(%s,%s,%s,%s,%s,%s,%s)"
tld=""
try:
tld=tldextract.extract(data).registered_domain
except:
traceback.format_exc()
try:
digs= os.popen("dig +tries=1 +timeout=1 +noall +answer "+tldextract.extract(tld).registered_domain).read()
digs=str(digs).split('\n')
for dig in digs:
if(dig.strip()):
try:
dig=dig.replace("\t\t","\t")
dig=dig.replace("\t\t","\t")
temp=dig.split('\t')
print "Data: "+temp[0] +"\t Data: "+ temp[1]+"\t Data: "+ temp[2]+"\t Data: "+ temp[3]+"\t Data: "+ temp[4]
params=(data.strip(),tld.strip(),temp[1].strip(),temp[2].strip(),temp[3].strip(),temp[4].strip(),worker)
cur.execute(query,params)
except:
params=(data.strip(),tld.strip(),"","","","",worker)
cur.execute(query,params)
except:
params=(data.strip(),tld.strip(),"","","","",worker)
cur.execute(query,params)
con.commit()
cur.close()
con.close()
开发者ID:dudhaneviraj,项目名称:RabbitMQ,代码行数:35,代码来源:Worker9.py
示例2: _cache_html_to_df
def _cache_html_to_df(self, html):
company = BeautifulSoup(html)
title = company.find('div',{'class':'companyTitle'})
description = company.find('div',{'class':'companyDescription'})
revenue = company.find('div',{'class':'companyRevenue'})
address = company.find('div',{'class':'companyAddress'})
employee_count = company.find('p',{'class':'companyEmployeeCountText'})
website = company.find('div',{'class':'website'})
phone = company.find('span',{'class':'hq'})
industries = company.find('p', {'class':'industry'})
industries = industries.find_all('span') if industries else []
industries = [industry.text for industry in industries]
data = [title, description, revenue, address, employee_count,
website, phone]
columns = ["name", "description", "revenue", "address",
"headcount","website","phone"]
# add industries
data = [val.text.strip() if val else "" for val in data]
data = dict(zip(columns, data))
data["industry"] = industries
print data
data["domain"] = "{}.{}".format(tldextract.extract(data["website"]).domain,
tldextract.extract(data["website"]).tld)
try:
data['logo'] = company.find('img',{'class':'companyLogo'})['src']
except:
data['logo'] = ""
data["source"] = "zoominfo"
data['headcount'] = data['headcount'].split('Employees')[0]
data['description'] = data['description'].split('Company Description')[-1]
data['revenue'] = data['revenue'].split('in Revenue')[0]
# add fullcontact address support
print data
return data
开发者ID:john2x,项目名称:clearspark,代码行数:35,代码来源:zoominfo.py
示例3: _html_to_dict
def _html_to_dict(self, url):
#r = requests.get(url).text
r = Crawlera().get(url).text
print url
try:
company_name = BeautifulSoup(r).find('h1',{'itemprop':'name'})
company_name = company_name.find('strong').text
except:
return {"handle": url}
address = BeautifulSoup(r).find('h1',{'itemprop':'name'}).find('span').text
city = BeautifulSoup(r).find('span',{'itemprop':'addressLocality'}).text
state = BeautifulSoup(r).find('span',{'itemprop':'addressRegion'}).text
postal_code = BeautifulSoup(r).find('span',{'itemprop':'postalCode'}).text
description = BeautifulSoup(r).find('article',{'itemprop':'description'}).text.strip().replace('\nMore...','')
logo = BeautifulSoup(r).find('figure').find('img')['src']
website = BeautifulSoup(r).find('li',{'class':'website'}).find('a')['href'].split('gourl?')[-1]
domain = "{}.{}".format(tldextract.extract(website).domain, tldextract.extract(website).tld)
''' Phone '''
main = BeautifulSoup(r).find('li',{'class':'phone'}).find('strong',{'class':'primary'}).text
numbers = BeautifulSoup(r).find('li',{'class':'phone'}).findAll('li')
nums = [number.find('span').text for number in numbers]
names = [number.text.split(number.find('span').text)[0] for number in numbers]
numbers = dict(zip(names, nums))
numbers['main'] = main
_vars = [company_name, address, city, state, postal_code, description, logo, website, domain]
labels = ["name","address","city","state","postal_code", "description", "logo", "website", "domain"]
company = dict(zip(labels, _vars))
company["numbers"] = numbers
company["handle"] = url
return company
开发者ID:john2x,项目名称:scaling-fortnight,代码行数:31,代码来源:company_db.py
示例4: compare_host
def compare_host(host1, host2):
""" True if the domain.suffix part of both hosts is the same TAB05 """
(_, domain1, suffix1) = tldextract.extract(host1)
(_, domain2, suffix2) = tldextract.extract(host2)
return domain1 == domain2 and suffix1 == suffix2
开发者ID:jsoffer,项目名称:eilat,代码行数:7,代码来源:InterceptNAM.py
示例5: test_tldextract
def test_tldextract():
'''
verify that tldextract parses just the netloc
This is neither documented or tested by tldextract (!)
'''
assert tldextract.extract('example.com').registered_domain == 'example.com'
assert tldextract.extract('www.example.com').registered_domain == 'example.com'
开发者ID:cocrawler,项目名称:cocrawler,代码行数:7,代码来源:test_urls.py
示例6: loadLists
def loadLists(writer=sys.stdout):
if isStale(suspect_file):
print >> writer, "Updating ISC Suspicious Domains..."
new_file = requests.get(isc_url)
with open(suspect_file, 'w') as sf_buffer:
sf_buffer.write(new_file.content)
if safebrowsing_bootstrap:
print("Initial download of SafeBrowsing DB... this will take a few minutes.")
updateSafebrowsing()
elif isStale(safebrowsing_db, maxTime=259200):
print >> writer, "Updating Google Safebrowsing DB..."
updateSafebrowsing()
if isStale(topthousand_file, maxTime=2629743):
print >> writer, "Updating Alexa Top 1000..."
new_file = requests.get(topmillion_url)
with zipfile.ZipFile(StringIO(new_file.content), 'r') as zipData:
with zipData.open('top-1m.csv', 'r') as oneMil:
with open(topthousand_file, 'w') as topThousand:
for i in range(0,1000):
topThousand.write(oneMil.readline())
for sf_read in open(suspect_file):
badDomain = tldextract.extract(sf_read)
ISC_LIST.append(badDomain)
for topthousand_read in open(topthousand_file):
cleaned_line = topthousand_read.split(",")[1].strip()
valuableDomain = tldextract.extract(cleaned_line)
ALEXA_LIST.append(valuableDomain)
开发者ID:jcjones,项目名称:beta_whitelist_manager,代码行数:31,代码来源:malicious_url_check.py
示例7: start
def start(self):
for ext in file_extensions:
if ext in url_file(self.url):
db.collections.update_one({
'structure': '#URLEntry',
'url': self.url
}, {'$set': { 'last_scraped': time.strftime("%Y-%m-%d %H:%M:%S")}})
print('Skipping: {}'.format(self.url))
return None
try:
with self.sess as sess:
html_doc = sess.get(self.url, timeout=3).text
except (InvalidSchema, ConnectionError, Timeout, TooManyRedirects):
db.collections.remove(
{
'structure': '#URLEntry',
'url': self.url
}
)
return None
soup = BeautifulSoup(html_doc, 'html.parser')
urls = self.get_urls(soup)
for url in urls:
existing = db.collections.find_one({
'structure': '#URLEntry',
'url': url
})
if existing is None:
try:
tld = tldextract.extract(url).suffix
except:
tld = '*'
entry = URLEntry(domain=self.get_domain(url), url=url, tld=tld)
db.collections.insert_one(entry.export())
this_existing = db.collections.find_one({
'structure': '#URLEntry',
'domain': self.get_domain(self.url),
'url': self.url
})
if this_existing is not None:
db.collections.update_one({
'structure': '#URLEntry',
'domain': self.get_domain(self.url),
'url': self.url
}, {'$set': { 'last_scraped': time.strftime("%Y-%m-%d %H:%M:%S")}})
else:
try:
tld = tldextract.extract(self.url).suffix
except:
tld = '*'
entry = URLEntry(domain=self.get_domain(self.url), url=self.url, tld=tld)
db.collections.insert_one(entry.export())
开发者ID:sebbekarlsson,项目名称:webster,代码行数:60,代码来源:Spider.py
示例8: crawlList
def crawlList(list):
main_dict = parsedDictionary.parsedDictionary()
#iterate through domains
for i in range(0, len(list)):
print "Scripts present at " + list[i]
scripts = getScripts(list[i])
printList(scripts)
#iterate through this domain's scripts
#this codes checks if the script is linked externally or is hosted on the same domain (given by a relative URL)
dict = parsedDictionary.parsedDictionary()
for y in range(0, len(scripts)):
full = ''
if( (scripts[y].startswith("//")) or (scripts[y].startswith("http"))):
full = tldextract.extract(scripts[y])
if(len(full.domain) <= 1):
full = tldextract.extract(list[i])
else:
full = tldextract.extract(list[i])
link = full.domain + '.' + full.suffix
if(not dict.exists(link)):
dict.addElement(link)
main_dict.add(dict)
print main_dict.Dict
print "}}}}}"
print dict.Dict
print "\n -------------------------------"
sortedlist = main_dict.sortByValue()
print " \n Top scripts: "
printList(sortedlist)
开发者ID:marcb1,项目名称:python-indexer,代码行数:33,代码来源:basicFunctions.py
示例9: process_item
def process_item(self, item, spider):
domain_name=tldextract.extract(item['url']).domain
db = self.connection[domain_name] #ÓÃÓòÃû×÷Ϊ
self.collection = db[settings['MONGODB_COLLECTION']]
valid = True
for data in item:
if not data:
valid = False
raise DropItem("Missing {0}!".format(data))
if valid:
if domain_name in spider.crawledPagesPerSite and spider.crawledPagesPerSite[domain_name]>spider.maximumPagesPerSite:
return None
self.collection.insert(dict(item))
if domain_name in spider.crawledPagesPerSite:
spider.crawledPagesPerSite[domain_name]+=1
else:
spider.crawledPagesPerSite[domain_name]=1
print "crawledPagesPerSite", spider.crawledPagesPerSite[domain_name]
print "spider.allowed_domains", spider.allowed_domains
print "spider.maximumPagesPerSite", spider.maximumPagesPerSite
print "domain_name", domain_name, item['url']
if spider.crawledPagesPerSite[domain_name]>spider.maximumPagesPerSite:
suffix=tldextract.extract(item['url']).suffix
domain_and_suffix=domain_name+"."+suffix
print domain_and_suffix
if domain_and_suffix in spider.allowed_domains:
spider.allowed_domains.remove(domain_and_suffix)
spider.dynamic_deny_domain.append(domain_name)
#spider.rules[0].link_extractor.allow_domains.remove(domain_and_suffix)
spider.rules[0].link_extractor.deny_domains.add(domain_and_suffix)
print "spider.allowed_domains", spider.allowed_domains
return None
log.msg("Item added to MongoDB database!",level=log.DEBUG, spider=spider)
return item
开发者ID:ningyuwhut,项目名称:crawler,代码行数:35,代码来源:pipelines.py
示例10: handle
def handle(self):
SO_ORIGINAL_DST = 80
# self.request is the client connection/socket
dst = self.request.getsockopt(socket.SOL_IP, SO_ORIGINAL_DST, 16) # Get the original destination IP before iptables redirect
_, dst_port, ip1, ip2, ip3, ip4 = struct.unpack("!HHBBBB8x", dst)
dst_ip = '%s.%s.%s.%s' % (ip1,ip2,ip3,ip4)
peername = '%s:%s' % (self.request.getpeername()[0], self.request.getpeername()[1])
print success('Client %s -> %s:443' % (peername, dst_ip))
RemoteHostnames[dst_ip] = getCertHostnamesCached(dst_ip)
#RemoteHostnames[dst_ip] = ['*.*.*.*','*.*.*','*.*','*'] # example fixed wildcard cert
CN = RemoteHostnames[dst_ip][0] # SSL_Certificate_CN2 module will return CN as first list element
if add_extra_hostnames:
import tldextract
domain = tldextract.extract(CN).domain
tld = tldextract.extract(CN).tld
bonus_hostnames = [] # kludge to work around lack of good support for SNI (server name indication) in python
bonus_hostnames.append('www.%s.%s' % (domain,tld))
bonus_hostnames.append('*.%s.%s' % (domain,tld))
bonus_hostnames.append('%s.%s' % (domain,tld)) # without this, requests to (e.g.) https://google.com fail as the CN is
for extra_name in bonus_hostnames: # www.google.com and there is no subjectAltName 'google.com' in the cert.
if extra_name not in RemoteHostnames[dst_ip]:
# however, adding extra hostnames as subjectAltNames makes other certs fail to validate, so disabled by default
RemoteHostnames[dst_ip].append(extra_name)
PhoneConnected = False
CreateSignedX509Certificate(ip=dst_ip, hostnames=RemoteHostnames[dst_ip], peername=peername)
try:
(certfile, keyfile) = GeneratedCert[dst_ip]
#print 'Setting up SSL socket using %s' % certfile
stream_phone = ssl.wrap_socket(self.request, server_side=True, certfile=certfile,
keyfile=keyfile, ssl_version=ssl.PROTOCOL_TLSv1)
PhoneConnected = True
except (ssl.SSLError), e:
print error('SSLError on connection to phone (%s)' % e)
self.finish()
开发者ID:Exceltior,项目名称:iSniff,代码行数:34,代码来源:iSniff.py
示例11: same_domain
def same_domain(url1, url2):
url1_extract = tldextract.extract(url1)
url2_extract = tldextract.extract(url2)
if url1_extract.domain == url2_extract.domain:
return True
else:
return False
开发者ID:geekpycoder,项目名称:urlcollector,代码行数:7,代码来源:collector.py
示例12: is_same_domain
def is_same_domain(url1, url2):
"""Check seedurl and other url belongs to same domain.
>>>is_same_domain("http://kracekumar.wordpress.com", "http://wordpress.com")
True
>>>is_same_domain("http://kracekumar.com", "http://tumblr.com")
False
"""
return tldextract.extract(url1).domain == tldextract.extract(url2).domain
开发者ID:kracekumar,项目名称:crawlit,代码行数:8,代码来源:crawlit.py
示例13: email_pattern_research
def email_pattern_research():
website = request.args['domain']
domain = "{}.{}".format(tldextract.extract(website).domain,
tldextract.extract(website).tld)
api_key = "9a31a1defcdc87a618e12970435fd44741d7b88794f7396cbec486b8"
name = request.args['name'] if "name" in request.args.keys() else ""
q.enqueue(EmailGuess().search_sources, domain, name, api_key, timeout=6000)
return {'email_research_started':True}
开发者ID:john2x,项目名称:clearspark,代码行数:8,代码来源:api.py
示例14: mxsniff
def mxsniff(email_or_domain, ignore_errors=False, cache=None):
"""
Lookup MX records for a given email address, URL or domain name and identify the email service provider(s)
from an internal list of known service providers.
:param str email_or_domain: Email, domain or URL to lookup
:return: Identified service provider, or a list if there's more than one (in unusual circumstances)
>>> mxsniff('example.com')['match']
['nomx']
>>> mxsniff('__invalid_domain_name__.com')['match']
['nomx']
>>> mxsniff('[email protected]')['match']
['google-gmail']
>>> sorted(mxsniff('https://google.com/').items())
[('domain', 'google.com'), ('match', ['google-apps']), ('mx', [(10, 'aspmx.l.google.com'), (20, 'alt1.aspmx.l.google.com'), (30, 'alt2.aspmx.l.google.com'), (40, 'alt3.aspmx.l.google.com'), (50, 'alt4.aspmx.l.google.com')]), ('mx_tld', ['google.com']), ('query', 'https://google.com/')]
"""
domain = get_domain(email_or_domain)
if cache and domain in cache:
return cache[domain]
result = []
tld = []
try:
answers = [] # Default value in case of verbose mode where an error occurs
answers = sorted([(rdata.preference, rdata.exchange.to_text(omit_final_dot=True).lower())
for rdata in dns.resolver.query(domain, 'MX')])
for preference, exchange in answers:
rdomain = tldextract.extract(exchange).registered_domain
if rdomain not in tld:
tld.append(rdomain)
provider = provider_domains.get(exchange)
if provider and provider not in result:
result.append(provider)
except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN, dns.resolver.NoNameservers):
pass
except dns.exception.DNSException as e:
if ignore_errors:
pass
else:
raise MXLookupException('{exc} {error} ({domain})'.format(
exc=e.__class__.__name__, error=text_type(e), domain=domain))
if not result:
# Check for self-hosted email servers; identify them with the label 'self'
if tldextract.extract(domain).registered_domain in tld:
result.append('self')
if not result:
if answers:
result.append('unknown') # We don't know this one's provider
else:
result.append('nomx') # This domain has no mail servers
result = {'query': email_or_domain, 'domain': domain, 'match': result, 'mx': answers, 'mx_tld': tld}
if cache:
cache[domain] = result
return result
开发者ID:jace,项目名称:mxsniff,代码行数:58,代码来源:__init__.py
示例15: check_domain_limit
def check_domain_limit(self, url):
for domain in self.limit_domain:
ext = tldextract.extract(domain)
# *的时候匹配所有二级域名,或者只匹配特定的域名
if ((ext[0] == "*" or ext[0] == "") and tldextract.extract(url)[1] == ext[1]) or \
(".".join(tldextract.extract(url)) == domain):
return True
return False
开发者ID:LoRexxar,项目名称:Pansidong,代码行数:9,代码来源:WebSpider.py
示例16: check_match
def check_match(_url, url):
target_url = _url['target_url']
allowed_domains = _url['allowed_domains']
match = False
url_domain = tldextract.extract(url).domain.lower()
target_url_domain = tldextract.extract(target_url).domain.lower()
if url_domain == target_url_domain or url_domain in allowed_domains:
match = True
return match
开发者ID:thequbit,项目名称:iddt,代码行数:9,代码来源:utils.py
示例17: _check_match
def _check_match(self, url):
match = False
url_domain = tldextract.extract(url).domain.lower()
target_url_domain = tldextract.extract(
self._data['url_data']['target_url']
).domain.lower()
if url_domain == target_url_domain or \
url_domain in self._data['url_data']['allowed_domains']:
match = True
return match
开发者ID:thequbit,项目名称:BarkingOwl,代码行数:10,代码来源:scraper.py
示例18: extract_HAR_features
def extract_HAR_features(harfile):
"""
Opens a HAR file (JSON), extracts features from it and store them in a dict.
Returns the dict with the features.
"""
har_features = {}
har = json.loads(open(harfile).read())
domain = har["log"]["pages"][0]["id"]
# Extract domain
ext = tldextract.extract(domain)
domain = ext.domain + '.' + ext.suffix
domainNoTLD = ext.domain
# initialize variables
domainStringSent, firstparty_data, thirdparty_data, firstparty_html, thirdparty_html, firstparty_requests, thirdparty_requests = 0, 0, 0, 0, 0, 0, 0
for entry in har["log"]["entries"]:
requestUrl = str(entry["request"]["url"])
ext = tldextract.extract(requestUrl)
requestDomain = ext.domain + '.' + ext.suffix
# Check if the domainNoTLD is passed in the parameters of the request
url_parameters = re.search('https?:\/\/.*\/(.*)', requestUrl)
if url_parameters:
if domainNoTLD in url_parameters.group(1):
domainStringSent += 1
# Check if this is a first-party request (Request domain == site domain)
result = re.search('https?:\/\/(.*)\/.*', requestUrl)
if result:
if domain in result.group(1):
# print requestUrl, 'is FIRST party request of size', entry["response"]["bodySize"]
firstparty_requests += 1
firstparty_data += int(entry["response"]["bodySize"])
if entry["response"]["content"]["mimeType"]:
mimeType = entry["response"]["content"]["mimeType"]
if 'text' in mimeType or 'javascript' in mimeType:
firstparty_html += entry["response"]["bodySize"]
else:
# print requestUrl, 'is THIRD party request of size', entry["response"]["bodySize"]
thirdparty_requests += 1
thirdparty_data += int(entry["response"]["bodySize"])
if entry["response"]["content"]["mimeType"]:
mimeType = entry["response"]["content"]["mimeType"]
if 'text' in mimeType or 'javascript' in mimeType:
thirdparty_html += entry["response"]["bodySize"]
har_features['TP_DataRatio'] = safe_division(thirdparty_data, firstparty_data + thirdparty_data)
har_features['TP_HtmlRatio'] = safe_division(thirdparty_html, firstparty_html + thirdparty_html)
har_features['TP_RequestRatio'] = safe_division(thirdparty_requests, firstparty_requests + thirdparty_requests)
har_features['domainStringSent'] = domainStringSent
har_features['initialResponseSize'] = har["log"]["entries"][0]["response"]["bodySize"]
har_features['initialResponseRatio'] = safe_division(har_features['initialResponseSize'], firstparty_data + thirdparty_data)
return har_features
开发者ID:ysya,项目名称:Domain-Parking-Sensors-Auto-Script,代码行数:54,代码来源:feature_extractor.py
示例19: url_as_diff
def url_as_diff(new, old):
if new == old:
return '<same>'
if new == '-':
return new
old_parse = urlparse.urlsplit(old)
new_parse = urlparse.urlsplit(new)
changed = set()
for f in old_parse._fields:
new_f = getattr(new_parse, f)
if new_f and new_f == getattr(old_parse, f):
new_parse = new_parse._replace(**{f: '<{}>'.format(f)})
elif new_f:
changed.add(f)
if tuple(changed) == ('scheme',):
return '{}://<same>'.format(new_parse.scheme)
if (not new_parse.netloc.startswith('<') and
new_parse.port is None and old_parse.port is None):
new_domain = tldextract.extract(new_parse.netloc)
old_domain = tldextract.extract(old_parse.netloc)
for f in old_domain._fields:
new_f = getattr(new_domain, f)
if new_f and new_f == getattr(old_domain, f):
new_domain = new_domain._replace(**{f: '<{}>'.format(f)})
new_domain = '.'.join(new_domain).replace('<domain>.<suffix>',
'<domain+>')
new_parse = new_parse._replace(netloc=new_domain)
if new_parse.path == old_parse.path + '/':
new_parse = new_parse._replace(path='<path>/')
if new_parse.path.startswith('/') and old_parse.path.startswith('/'):
new_dirs = new_parse.path[1:].split('/')
old_dirs = old_parse.path[1:].split('/')
if new_dirs[-1] and new_dirs[-1] == old_dirs[-1]:
new_dirs[-1] = '<basename>'
old_dirs = {d: i for i, d in enumerate(old_dirs)}
for i, new_dir in enumerate(new_dirs):
if new_dir in old_dirs:
new_dirs[i] = '<dir{}>'.format(old_dirs[new_dir] + 1)
new_parse = new_parse._replace(path='/' + '/'.join(new_dirs))
if (old_parse.query and new_parse.query and
not new_parse.query.startswith('<')):
old_query = set(old_parse.query.split('&'))
new_query = set(new_parse.query.split('&'))
if new_query > old_query:
new_params = '&'.join(sorted(map(urllib.quote,
new_query - old_query)))
new_parse = new_parse._replace(query='<query>' + '&' + new_params)
out = new_parse.geturl()
return out
开发者ID:schwa-lab,项目名称:sharingnews,代码行数:54,代码来源:cleaning.py
示例20: validRedirect
def validRedirect(history):
first=history[0]
last=history[-1]
if "blogspot" in first and "blogspot" in last:
return True
first = tldextract.extract(first)
last = tldextract.extract(last)
if first.domain!=last.domain:
return False
else:
return True
开发者ID:kiarashplusplus,项目名称:bo,代码行数:11,代码来源:views.py
注:本文中的tldextract.extract函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论