本文整理汇总了Python中urllib2.urlparse.urlparse函数的典型用法代码示例。如果您正苦于以下问题:Python urlparse函数的具体用法?Python urlparse怎么用?Python urlparse使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了urlparse函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: __init__
def __init__(self, announce, piece_length=262144, **kw):
self.piece_length = piece_length
if not bool(urlparse.urlparse(announce).scheme):
raise ValueError('No schema present for url')
self.tdict = {
'announce': announce,
'creation date': int(time()),
'info': {
'piece length': self.piece_length
}
}
if kw.get('comment'):
self.tdict.update({'comment': kw.get('comment')})
if kw.get('httpseeds'):
if not isinstance(kw.get('httpseeds'), list):
raise TypeError('httpseeds must be a list')
else:
self.tdict.update({'httpseeds': kw.get('httpseeds')})
if kw.get('announcelist'):
if not isinstance(kw.get('announcelist'), list):
raise TypeError('announcelist must be a list of lists')
if False in [isinstance(l, list) for l in kw.get('announcelist')]:
raise TypeError('announcelist must be a list of lists')
if False in [bool(urlparse.urlparse(f[0]).scheme) for f in kw.get('announcelist')]:
raise ValueError('No schema present for url')
else:
self.tdict.update({'announce-list': kw.get('announcelist')})
开发者ID:TheTerrasque,项目名称:makeTorrent,代码行数:27,代码来源:makeTorrent.py
示例2: classify_link
def classify_link(link):
''' classify link according to its domain
'''
if link is None:
return link, SITE_TYPE['junk']
original_url = link
url = urlparse.urlparse(link)
max_try_count = 10
try_count = 0
while url.netloc in _SHORT_SERVICE:
if try_count >= max_try_count:
# if multiple redirect, return as news
return link, SITE_TYPE['news']
#get original link of short link
original_url = _get_original_link(original_url)
url = urlparse.urlparse(original_url)
try_count += 1
domain_token = url.netloc.split('.')
length = len(domain_token) - 2
while length >= 0:
domain = '.'.join(domain_token[length:])
if domain in _BLACK_SITE_LIST:
return original_url, _BLACK_SITE_LIST[domain]
length -= 1
#treat unclassified link as news link
return original_url, SITE_TYPE['news']
开发者ID:qwang2505,项目名称:VoteHelper,代码行数:26,代码来源:linkclass.py
示例3: __getParentPage
def __getParentPage(self):
'''
This will get the Parent Page info
'''
page = {}
try:
self.hierarchy = page['et_thread_hierarchy'] = [stripHtml(x.renderContents()) for x in self.soup.find('div',{'class':'rd Microsoft_Msn_Boards_Read_List Web_Bindings_Base'}).findAll('li')]
except:
log.info(self.log_msg('Thread hierarchy is not found'))
try:
self.forum_title = page['title'] = stripHtml(self.soup.find('h2').renderContents())
except:
log.info(self.log_msg('Title Not Found'))
page['title'] = ''
if checkSessionInfo(self.genre, self.session_info_out, self.parent_uri, self.task.instance_data.get('update')):
log.info(self.log_msg('Session info return True'))
return False
for each in ['et_author_name','ei_thread_replies_count','ei_thread_view_count','ei_author_count','et_last_post_author','edate_last_post_date','posted_date']:
try:
page[each] = self.task.pagedata[each]
except:
log.info(self.log_msg('Page data cannot be extracted for %s'%each))
try:
page['ei_thread_id'] = int(urlparse.urlparse(self.currenturi)[4].split('&')[0].split('ThreadId=')[1])
except:
log.info(self.log_msg('Thread id not found'))
try:
post_hash = get_hash(page)
id = None
if self.session_info_out == {}:
id = self.task.id
result = updateSessionInfo(self.genre, self.session_info_out, self.parent_uri, post_hash, 'Post', self.task.instance_data.get('update'),Id=id)
if not result['updated']:
return False
page['path'] = [self.parent_uri]
page['parent_path'] = []
page['uri'] = normalize(self.currenturi)
page['uri_domain'] = unicode(urlparse.urlparse(page['uri'])[1])
page['priority'] = self.task.priority
page['level'] = self.task.level
page['pickup_date'] = datetime.strftime(datetime.utcnow(),'%Y-%m-%dT%H:%M:%SZ')
page['connector_instance_log_id'] = self.task.connector_instance_log_id
page['connector_instance_id'] = self.task.connector_instance_id
page['workspace_id'] = self.task.workspace_id
page['client_id'] = self.task.client_id
page['client_name'] = self.task.client_name
page['last_updated_time'] = page['pickup_date']
page['versioned'] = False
page['data'] = ''
page['task_log_id']=self.task.id
page['entity'] = 'Post'
page['category']=self.task.instance_data.get('category','')
self.pages.append(page)
log.info(page)
log.info(self.log_msg('Parent Page added'))
return True
except :
log.exception(self.log_msg("parent post couldn't be parsed"))
return False
开发者ID:jsyadav,项目名称:CrawlerFramework,代码行数:59,代码来源:moneycentralconnector.py
示例4: completeurl
def completeurl(fullurl, partialurl):
from urllib2 import urlparse
parsed_jobsurl = urlparse.urlparse(fullurl)
parsed_joburl = urlparse.urlparse(partialurl)
fulljoburl = urlparse.urlunparse([parsed_jobsurl.scheme, parsed_jobsurl.netloc,
parsed_joburl.path, parsed_joburl.params, parsed_joburl.query,
parsed_joburl.fragment])
return fulljoburl
开发者ID:petrbouchal,项目名称:czgov-jobs,代码行数:8,代码来源:lib_minscrapers.py
示例5: convert
def convert(self, value, context, ctx_opts):
if value[:4] != u'http':
value = u'http://%s' % value
domain = urlparse.urlparse(value)[1]
if not domain or domain == u'':
domain = urlparse.urlparse(u'http://%s' % value)[1]
if not domain or len(domain.split(u'.')) < 2 or \
len(domain.split(u' ')) > 1:
self.error('invalid_domain', value, context, ctx_opts)
return domain.lower()
开发者ID:stevenkampen,项目名称:Bolognium,代码行数:10,代码来源:filters.py
示例6: homepage_url
def homepage_url(self):
"""Try ensure we prepend http: to the url if there's nothing there
This is to ensure we're not generating relative links in the
user templates."""
if not self.homepage:
return self.homepage
parsed = urlparse.urlparse(self.homepage)
if parsed.scheme:
return self.homepage
# Vague sanity check
abs_url = ''.join(['http://', self.homepage])
if urlparse.urlparse(abs_url).scheme == 'http':
return abs_url
return self.homepage
开发者ID:lelutin,项目名称:wafer,代码行数:15,代码来源:models.py
示例7: check_config
def check_config():
"""
Check crucial configuration details for existence and workability.
Runs checks to see whether bugtracker's URL is reachable, whether
backend is available at the right filename, and whether the script has
the key arguments it needs to run: URL, backend, and database details.
The filename for the backend in the backends/ directory needs to be the
same as the configuration argument specifying that backend. For
instance, invoking the Launchpad backend uses 'lp', and so the filename
is 'lp.py'.
"""
Config.check_params(['url', 'backend'])
if Config.backend + ".py" not in Backend.get_all_backends():
raise InvalidConfig('Backend "' + Config.backend + '" does not exist')
url = urlparse.urlparse(Config.url)
check_url = urlparse.urljoin(url.scheme + '://' + url.netloc, '')
print("Checking URL: " + check_url)
req = Request(check_url)
if Config.backend != 'github':
try:
response = urlopen(req)
except HTTPError, e:
raise InvalidConfig('The server could not fulfill the request '
+ str(e.msg) + '(' + str(e.code) + ')')
except URLError, e:
raise InvalidConfig('We failed to reach a server. ' + str(e.reason))
开发者ID:davidziman,项目名称:Bicho,代码行数:31,代码来源:config.py
示例8: __addPost
def __addPost(self, post):
"""
This will take the post tag , and fetch data and meta data and add it to
self.pages
"""
try:
page = self.__getData(post)
if not page:
log.info(self.log_msg('page contains empty data, getdata \
returns False for uri %s'%self.currenturi))
return True
unique_key = get_hash(page)
if checkSessionInfo(self.__genre, self.session_info_out, unique_key,\
self.task.instance_data.get('update')):
log.info(self.log_msg('Session info returns True for uri %s'%unique_key))
return False
result = updateSessionInfo(self.__genre, self.session_info_out, unique_key, \
get_hash( page ),'forum', self.task.instance_data.get('update'))
if result['updated']:
page['parent_path'] = []
page['path'] = [unique_key]
page['uri'] = self.currenturi
page['uri_domain'] = urlparse.urlparse(page['uri'])[1]
log.info(page)
page.update(self.__task_elements_dict)
self.pages.append(page)
else:
log.info(self.log_msg('Update session info returns False for \
url %s'%self.currenturi))
except:
log.exception(self.log_msg('Cannot add the post for the uri %s'%self.currenturi))
return True
开发者ID:jsyadav,项目名称:CrawlerFramework,代码行数:32,代码来源:mrrebatesconnector.py
示例9: __addPost
def __addPost(self, post):
'''It will add the post
'''
try:
page = self.__getData(post)
if not page:
return True
unique_key = get_hash( {'data' : page['data'] })
if checkSessionInfo('review', self.session_info_out, unique_key,\
self.task.instance_data.get('update'),parent_list\
= [self.currenturi]):
log.info(self.log_msg('Session info returns True'))
return False
result=updateSessionInfo('review', self.session_info_out, unique_key, \
get_hash( page ),'Review', self.task.instance_data.get('update'),\
parent_list=[self.currenturi])
if not result['updated']:
log.info(self.log_msg('Update session info returns False'))
return True
page['path'] = [self.currenturi]
page['parent_path'] = []
#page['path'].append(unique_key)
page['uri'] = self.currenturi
page['uri_domain'] = urlparse.urlparse(page['uri'])[1]
page['entity'] = 'post'
page.update(self.__task_elements_dict)
self.pages.append(page)
log.info(page)
log.info(self.log_msg('Post Added'))
return True
except:
log.exception(self.log_msg('Error while adding session info'))
return False
开发者ID:jsyadav,项目名称:CrawlerFramework,代码行数:35,代码来源:bankguideconnector.py
示例10: __setParentPage
def __setParentPage(self):
"""This will get the parent info
"""
page = {}
try:
page['et_thread_hierarchy'] = self.__hierarchy = [x.strip() for x in stripHtml(self.soup.find('div', 'deck breadcrumbs').renderContents()).split('>') if x.strip()][1:]
page['data'] = page['title'] = page['et_thread_hierarchy'][-1]
except:
log.exception(self.log_msg('Thread hierarchy and Title Not found for uri\
%s'%self.currenturi))
return
if checkSessionInfo(self.__genre, self.session_info_out, self.task.instance_data['uri'], \
self.task.instance_data.get('update')):
log.info(self.log_msg('Session info return True, Already exists'))
return
try:
result = updateSessionInfo('review', self.session_info_out, self.\
task.instance_data['uri'], get_hash( page ), 'forum', self.task.instance_data.get('update'))
if result['updated']:
page['path'] = [self.task.instance_data['uri']]
page['parent_path'] = []
page['uri'] = self.currenturi
page['uri_domain'] = unicode(urlparse.urlparse(page['uri'])[1])
page['data'] = ''
page['entity'] = 'thread'
page.update(self.__task_elements_dict)
page['posted_date'] = page['pickup_date']
self.pages.append(page)
log.info(self.log_msg('Parent Page Added'))
else:
log.info(self.log_msg('Result[updated] returned True for \
uri'%self.currenturi))
except:
log.exception(self.log_msg("parent post couldn't be parsed"))
开发者ID:jsyadav,项目名称:CrawlerFramework,代码行数:34,代码来源:everydayhealthconnector.py
示例11: generate_cookie
def generate_cookie(self, url_path, session_id, expiration=None, add_header=False):
'''
Return a session cookie containing the session id. The cookie
will be contrainted to the url path, defined for use
with HTTP only, and only returned on secure connections (SSL).
:parameters:
url_path
The cookie will be returned in a request if it begins
with this url path.
session_id
The session id identified by the session cookie
add_header
If true format cookie string with Set-Cookie: header
:returns:
cookie string
'''
if not expiration: # Catch zero unix timestamps
expiration = None;
cookie = Cookie(self.session_cookie_name, session_id,
domain=urlparse.urlparse(api.env.xmlrpc_uri).netloc,
path=url_path, httponly=True, secure=True,
expires=expiration)
if add_header:
result = 'Set-Cookie: %s' % cookie
else:
result = str(cookie)
return result
开发者ID:andygabby,项目名称:freeipa,代码行数:32,代码来源:session.py
示例12: test_compare_triples
def test_compare_triples():
for mime, fext in MIME_TYPES.items():
dump_path = path.join(DUMP_DIR, path.basename(mime))
for url in URLs:
if six.PY2:
fname = '%s.%s' % (path.basename(urlparse.urlparse(url).path), fext)
else:
fname = '%s.%s' % (path.basename(urlparse(url).path), fext)
fname = path.join(dump_path, fname)
req = Request(url)
req.add_header('Accept', mime)
res = urlopen(req)
g_fdp.parse(data=res.read(), format=mime)
g_dump.parse(fname, format=mime)
both, first, second = graph_diff(g_fdp, g_dump)
n_first = len(first)
# n_second = len(second)
# n_both = len(both)
assert_equals(
n_first, 0, '{} triple(s) different from reference:\n\n{}===\n{}\n'.format(
n_first, first.serialize(format='turtle'), second.serialize(format='turtle')))
开发者ID:NLeSC,项目名称:ODEX-FAIRDataPoint,代码行数:27,代码来源:test_fdp.py
示例13: on_navigation_requested
def on_navigation_requested(self,view,frame,req,data=None):
uri = req.get_uri()
parse = urlparse.urlparse(uri)
if self.url_callback.find(parse.hostname) > 0:
self.getAccessToken(parse)
return True
return False
开发者ID:creturn,项目名称:workSnippet-python,代码行数:7,代码来源:oauth.py
示例14: is_url
def is_url(name):
try:
result = urlparse.urlparse(name)
except Exception:
return False
else:
return result.scheme in ('http', 'https', 'file', 'ftp')
开发者ID:msabramo,项目名称:pythonz,代码行数:7,代码来源:util.py
示例15: __call__
def __call__(self, **kwargs):
field = self.context.getField('provenances')
provenances = field.getAccessor(self.context)()
formatted_provenances = []
for provenance in provenances:
title = provenance.get('title', '')
link = provenance.get('link', '')
owner = provenance.get('owner', '')
if title != '' or owner != '' or link != '':
formatted_provenance = {'source':{}, 'owner':{}}
formatted_provenance['source']['title'] = title
formatted_provenance['source']['url'] = link
if owner != '':
if hasVocab:
owner_title = tmpOrganisationsVocabulary.\
getDisplayList(self.context).getValue(owner)
else:
owner_title = owner
formatted_provenance['owner']['title'] = owner_title
parser = urlparse.urlparse(owner)
if all((parser.scheme, parser.netloc)):
formatted_provenance['owner']['url'] = owner
else:
formatted_provenance['owner']['url'] = link
formatted_provenances.append(formatted_provenance)
self.info['provenances'] = formatted_provenances
return self.info
开发者ID:dbitouze,项目名称:eea.daviz,代码行数:30,代码来源:data.py
示例16: __addPost
def __addPost(self, post, is_question=False):
try:
unique_key = re.search(r'(\d+)', post['id']).groups()[0]
if checkSessionInfo(self.__genre, self.session_info_out, unique_key, \
self.task.instance_data.get('update'),parent_list\
= [self.task.instance_data['uri']]):
log.info(self.log_msg('Session info returns True for %s'%unique_key))
return False
page = self.__getData(post, is_question)
if not page:
log.info(self.log_msg('page contains empty data, getdata \
returns False for uri %s'%self.currenturi))
return True
result = updateSessionInfo(self.__genre, self.session_info_out, unique_key, \
get_hash( page ),'forum', self.task.instance_data.get('update'),\
parent_list=[self.task.instance_data['uri']])
if result['updated']:
page['parent_path'] = [self.task.instance_data['uri']]
page['path'] = [ self.task.instance_data['uri'], unique_key]
page['uri'] = self.__baseuri + 'showpost.php?p=' + unique_key
page['uri_domain'] = urlparse.urlparse(page['uri'])[1]
page.update(self.__task_elements_dict)
self.pages.append(page)
else:
log.info(self.log_msg('Update session info returns False for \
url %s'%self.currenturi))
except:
log.exception(self.log_msg('Cannot add the post for the uri %s'%self.currenturi))
return True
开发者ID:jsyadav,项目名称:CrawlerFramework,代码行数:29,代码来源:dubaiforumsconnector.py
示例17: get_document
def get_document(self, url):
"""
Connects to the server and retrieves the document
"""
set_status(_('Contacting SomaFM server...'))
hostinfo = urlparse.urlparse(url)
try:
c = httplib.HTTPConnection(hostinfo.netloc, timeout = 20)
except TypeError:
c = httplib.HTTPConnection(hostinfo.netloc)
try:
c.request('GET', hostinfo.path, headers={'User-Agent':
self.user_agent})
response = c.getresponse()
except (socket.timeout, socket.error):
raise radio.RadioException(_('Error connecting to SomaFM server.'))
if response.status != 200:
raise radio.RadioException(_('Error connecting to SomaFM server.'))
document = response.read()
c.close()
set_status('')
return document
开发者ID:eri-trabiccolo,项目名称:exaile,代码行数:27,代码来源:__init__.py
示例18: send_email
def send_email(self, to='', subject='', body='', cc='', bcc=''):
log.info('sending a mail')
data = dict(nvp_bu_send='Send')
for name in 'to subject body cc bcc'.split():
if vars()[name]:
data[name] = vars()[name].encode('utf-8')
if not hasattr(self, 'sendpath'):
response = self.internal_http_opener.open(self.internalBaseMailUrl + '?ui=html')
from urllib2 import urlparse
respurl = urlparse.urlparse(response.geturl())
try:
response.close()
except: pass
del response
self.sendpath = respurl.path
url = 'https://mail.google.com' + self.sendpath
try:
at = self.gmail_at
except KeyError:
at = ''
params = dict(at=at, v='b', pv='tl', s='s', fv='b', cpt='c', cs='c')
if not self.hosted:
params.update(fv='b', cpt='c', cs='c')
else:
params.update(cs='b', s='s')
url = UrlQuery(url, params)
response = self.webrequest(url, follow_js_redirects=True, **data)
log.info('sent a mail')
assert response and ('Your message has been sent.' in response)
log.info('send mail success: %r', bool('Your message has been sent.' in response))
return True
开发者ID:AlexUlrich,项目名称:digsby,代码行数:34,代码来源:gmail.py
示例19: write
def write(self, url, start_time, end_time, ok, reason):
'''
@param url: 这个url必须是完整的http请求地址如: http://127.0.0.1:8080/xxx/?sdfa=fas
@param ok: boolean -> True or False
@param reason: 错误原因,字符串,不能包含英文逗号','
'''
process_time = int((end_time - start_time) * 1000) # 毫秒
urlps = urlparse.urlparse(url)
host = '%s%s' %(urlps.hostname, (':%s' %urlps.port) if urlps.port else '')
url = urlps.path or '/'
for iu in self.ignore_url:
if url.startswith(iu):
return
ok = ok and 1 or 0 # and 'true' or 'false'
if self.log_format == 'comma':
msg = '%s,%s,%s,%s,%s,%s,%s,%s' %(self.server_name, self.project_name, host, url, process_time, ok, int(start_time), reason)
elif self.log_format == 'json':
msg = { 'server_name': self.server_name, 'project_name': self.project_name,
'host': host,
'url': url, 'create_time': int(start_time), 'process_time': process_time,
'ok': ok, 'reason': reason}
msg = simplejson.dumps(msg)
self.write_log(msg)
开发者ID:yayoec,项目名称:python,代码行数:26,代码来源:api_log.py
示例20: fetch_photos_from_msg
def fetch_photos_from_msg(self, album, msg=None):
u = album.user
token = get_access_token(u)
graph = facebook.GraphAPI(token)
if msg.status == 'awaiting':
parts = urlparse.urlparse(msg.next_page)
qs = urlparse.parse_qs(parts.query)
after = qs.get('after')[0]
photos = graph.get_object(album.fb_album_id + "/photos", fields='id,source', limit=2, after=after)
new_next_page = photos.get('paging').get('next')
new_msg = Message.objects.create(next_page=new_next_page, user=u, status='awaiting')
for photo in photos.get('data'):
img_temp = NamedTemporaryFile(delete=True)
img_temp.write(urlopen(photo.get('source')).read())
img_temp.flush()
photo_object = Photo.objects.create(title=photo.get('id'),
description=photo.get('created_time'),
album=album,
file=File(img_temp))
pprint(photo_object.filename)
self.stdout.write('Successfully fetched photo for source "%s"\n' % photo.get('source'))
msg.status = 'done'
msg.save()
self.stdout.write('Finished this queue "%s"\n' % new_msg.next_page)
开发者ID:dynamicguy,项目名称:photomatic,代码行数:25,代码来源:album.py
注:本文中的urllib2.urlparse.urlparse函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论