本文整理汇总了Python中urllib.parse.urljoin函数的典型用法代码示例。如果您正苦于以下问题:Python urljoin函数的具体用法?Python urljoin怎么用?Python urljoin使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了urljoin函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: post_to_hastebin
def post_to_hastebin(data, url="http://hastebin.com/"):
if isinstance(data, str):
data = data.encode()
response = requests.post(urljoin(url, "documents"), data)
response.raise_for_status()
result = response.json()
return urljoin(url, result['key'])
开发者ID:FuelRats,项目名称:pipsqueak,代码行数:7,代码来源:hastebin.py
示例2: report_from
def report_from(result, year_range):
link = result.select("a")[0]
title = link.text
landing_url = urljoin(REPORTS_URL, link.get('href'))
report_id_node, published_node = result.select("div.release_info")
report_id = report_id_node.text.strip().replace(",", "")
published_on = datetime.datetime.strptime(published_node.text, '%b %d, %Y')
if published_on.year not in year_range:
logging.debug("[%s] Skipping, not in requested range." % landing_url)
return
logging.debug("Scraping landing url: %s", landing_url)
landing_page = beautifulsoup_from_url(landing_url)
summary = landing_page.select("div.left_col")[0].text.strip()
pdf_link = landing_page.select("#link_bar > a")[0]
report_url = urljoin(REPORTS_URL, pdf_link.get('href'))
text_link = landing_page.select("#add_material a")[-1]
text_report_url = urljoin(REPORTS_URL, text_link.get('href'))
report = {
'inspector': 'gao',
'inspector_url': 'http://www.gao.gov/about/workforce/ig.html',
'agency': 'gao',
'agency_name': 'Government Accountability Office',
'report_id': report_id,
'url': report_url,
'text_url': text_report_url,
'landing_url': landing_url,
'title': title,
'published_on': datetime.datetime.strftime(published_on, "%Y-%m-%d"),
}
return report
开发者ID:BunsenMcDubbs,项目名称:inspectors-general,代码行数:35,代码来源:gao.py
示例3: test_checksending
def test_checksending(self):
httpretty.register_uri(
httpretty.POST,
urljoin(SmsAero.URL_GATE, '/checksending/'),
body='{"reason": {"33460579": "smsc reject", \
"33460580": "delivery success"}, \
"result": "accepted"}',
status=200,
content_type='text/json',
)
self.api.checksending(322)
httpretty.register_uri(
httpretty.POST,
urljoin(SmsAero.URL_GATE, '/checksending/'),
body='{"reason": "empty field", "result": "reject"}',
status=200,
content_type='text/json',
)
try:
self.api.checksending('')
self.assertTrue(False)
except SmsAeroError:
pass
开发者ID:sheregeda,项目名称:smsaero,代码行数:26,代码来源:test_api.py
示例4: root
def root():
fp = request.fullpath
try:
numpkgs = len(list(packages()))
except:
numpkgs = 0
return """<html><head><title>Welcome to pypiserver!</title></head><body>
<h1>Welcome to pypiserver!</h1>
<p>This is a PyPI compatible package index serving %(NUMPKGS)s packages.</p>
<p> To use this server with pip, run the the following command:
<blockquote><pre>
pip install -i %(URL)ssimple/ PACKAGE [PACKAGE2...]
</pre></blockquote></p>
<p> To use this server with easy_install, run the the following command:
<blockquote><pre>
easy_install -i %(URL)ssimple/ PACKAGE
</pre></blockquote></p>
<p>The complete list of all packages can be found <a href="%(PACKAGES)s">here</a> or via the <a href="%(SIMPLE)s">simple</a> index.</p>
<p>This instance is running version %(VERSION)s of the <a href="http://pypi.python.org/pypi/pypiserver">pypiserver</a> software.</p>
</body></html>
""" % dict(URL=request.url, VERSION=__version__, NUMPKGS=numpkgs,
PACKAGES=urljoin(fp, "packages/"),
SIMPLE=urljoin(fp, "simple/"))
开发者ID:cecedille1,项目名称:pypiserver,代码行数:29,代码来源:_app.py
示例5: setUpClass
def setUpClass(cls):
"""Create an RPM repository with a valid feed and sync it.
Do the following:
1. Reset Pulp, including the Squid cache.
2. Create a repository with the "background" download policy.
3. Sync and publish the repository.
4. Download an RPM from the repository.
"""
super(BackgroundTestCase, cls).setUpClass()
if (selectors.bug_is_untestable(1905, cls.cfg.version) and
_os_is_rhel6(cls.cfg)):
raise unittest.SkipTest('https://pulp.plan.io/issues/1905')
# Required to ensure content is actually downloaded.
utils.reset_squid(cls.cfg)
utils.reset_pulp(cls.cfg)
# Create, sync and publish a repository.
repo = _create_repo(cls.cfg, 'background')
cls.resources.add(repo['_href'])
report = utils.sync_repo(cls.cfg, repo['_href']).json()
# Record the tasks spawned when syncing the repository, and the state
# of the repository itself after the sync.
client = api.Client(cls.cfg)
cls.repo = client.get(repo['_href'], params={'details': True}).json()
cls.tasks = tuple(api.poll_spawned_tasks(cls.cfg, report))
# Download an RPM.
path = urljoin('/pulp/repos/', repo['id'] + '/')
path = urljoin(path, RPM)
cls.rpm = client.get(path)
开发者ID:release-engineering,项目名称:pulp-smash,代码行数:34,代码来源:test_download_policies.py
示例6: fake
def fake(base_url, username, password, tourney_id):
url_opener = _utils.login_and_enter_arcade(base_url, username, password)
# calculate some more URLs
tourneys_url = urljoin(base_url, "arcade.php?&do=viewtournaments")
join_tourney_url = urljoin(base_url, "arcade.php?&do=registertourney&tid={0}".format(
tourney_id
))
#view_tourney_url = urljoin(base_url, "arcade.php?&do=viewtourney&tid={0}".format(
# tourney_id
#))
# go to tourneys
print("entering tourneys page")
tourneys_response = url_opener.open(tourneys_url)
tourneys_response.read()
# go to tourney creation form
print("joining tourney")
join_tourney_response = url_opener.open(join_tourney_url)
join_tourney_response.read()
# look at tourney to make sure it sticks
#print("looking at tourney")
#view_tourney_response = url_opener.open(view_tourney_url)
#view_tourney_response.read()
print("done")
开发者ID:RavuAlHemio,项目名称:vbcbbot,代码行数:28,代码来源:register_to_tourney.py
示例7: test_entry_feed_enclosure
def test_entry_feed_enclosure(self):
entry = self.create_published_entry()
feed = EntryFeed()
self.assertEquals(
feed.item_enclosure_url(entry), 'http://example.com/image.jpg')
self.assertEquals(feed.item_enclosure_length(entry), '100000')
self.assertEquals(feed.item_enclosure_mime_type(entry), 'image/jpeg')
entry.content = 'My test content with image <img src="image.jpg" />'
entry.save()
self.assertEquals(
feed.item_enclosure_url(entry), 'http://example.com/image.jpg')
self.assertEquals(feed.item_enclosure_length(entry), '100000')
self.assertEquals(feed.item_enclosure_mime_type(entry), 'image/jpeg')
entry.content = 'My test content with image ' \
'<img src="http://test.com/image.jpg" />'
entry.save()
self.assertEquals(
feed.item_enclosure_url(entry), 'http://test.com/image.jpg')
self.assertEquals(feed.item_enclosure_length(entry), '100000')
self.assertEquals(feed.item_enclosure_mime_type(entry), 'image/jpeg')
path = default_storage.save('enclosure.png', ContentFile('Content'))
entry.image = path
entry.save()
self.assertEquals(feed.item_enclosure_url(entry),
urljoin('http://example.com', entry.image.url))
self.assertEquals(feed.item_enclosure_length(entry), '7')
self.assertEquals(feed.item_enclosure_mime_type(entry), 'image/png')
default_storage.delete(path)
entry.image = 'invalid_image_without_extension'
entry.save()
self.assertEquals(feed.item_enclosure_url(entry),
urljoin('http://example.com', entry.image.url))
self.assertEquals(feed.item_enclosure_length(entry), '100000')
self.assertEquals(feed.item_enclosure_mime_type(entry), 'image/jpeg')
开发者ID:Damgaard,项目名称:django-blog-zinnia,代码行数:34,代码来源:feeds.py
示例8: MyParser
def MyParser(url,index):
global links,A,num
if (not IsInTheList(url, links)) and (len(links) <= num) and Is_ntut_web(url):
try:
soup = BeautifulSoup(urlopen(url), "lxml")
result = soup.find("meta",attrs={"http-equiv":"refresh"})
meta = str(soup.html.head.meta)
if result:
links.append(url)
wait,text=result["content"].split(";")
if text.lower().startswith("url="):
pice=text[4:]
tempUrl = urljoin('http://www.ntut.edu.tw',pice)
print(url)
MyParser(tempUrl,FindIndex(url,links))
if index != FindIndex(url,links):
A[FindIndex(url,links),index]=1
elif meta.find('text/html;') >= 0:
links.append(url)
for link in soup.findAll('a'):
#print(A[:,0])
tempUrl = link.get('href')
tempUrl = urljoin("http://www.ntut.edu.tw",tempUrl)
MyParser(tempUrl,FindIndex(url,links))
if index != FindIndex(url,links):
A[FindIndex(url,links),index]=1
except:
pass
elif IsInTheList(url, links) and (len(links) <= num+1):
if index != FindIndex(url,links):
A[FindIndex(url,links),index]=1
开发者ID:brian41005,项目名称:My_EM_Project,代码行数:31,代码来源:HtmlGetLink.py
示例9: compose_url
def compose_url(season, year=None, sport=None):
if year and sport:
return urljoin(URL, season + '/' + year + '/' + sport)
elif year:
return urljoin(URL, season + '/' + year)
else:
return urljoin(URL, season)
开发者ID:mrpatiwi,项目名称:olympic-sport-scraper,代码行数:7,代码来源:scrapper.py
示例10: getStreamURLs
def getStreamURLs(self):
time = self.time
logging.debug("%s: Starting update of streamURL array", threading.current_thread().name)
for i in range(0, self.length):
if re.findall(r"(^.*Helios-HSS.*$)", self.playlist.getPlaylistUrl()):
url = urljoin(
self.baseUrl,
"IRDETO-HSS-H/QualityLevels("
+ str(self.qualityLevels)
+ ")/Fragments(video="
+ str(int(time))
+ ")",
)
# print(self.baseUrl, "IS Helios VOD")
elif re.findall(r"(^.*\.vod.*$)", self.baseUrl):
url = urljoin(
self.baseUrl,
"IRDETO-HSS-O/QualityLevels("
+ str(self.qualityLevels)
+ ")/Fragments(video="
+ str(int(time))
+ ")",
)
# print(self.baseUrl, "IS Orion VOD")
else:
url = urljoin(
self.baseUrl,
"QualityLevels(" + str(self.qualityLevels) + ")/Fragments(video=" + str(int(time)) + ")",
)
# print(self.baseUrl, "IS LIVE")
self.streamUrls.append(url)
time = time + int(self.deltaArray[i])
# print(self.streamUrls[i], 'index : ', i)
logging.debug("%s: Completed updating streamURL array", threading.current_thread().name)
return self
开发者ID:dexpetkovic,项目名称:smooth-streaming-validator,代码行数:35,代码来源:streamURL.py
示例11: adaptionset
def adaptionset(element, url, baseurl=None, offset_sec=None, duration_sec=None):
streams = {}
dirname = os.path.dirname(url) + "/"
if baseurl:
dirname = urljoin(dirname, baseurl)
template = element[0].find("{urn:mpeg:dash:schema:mpd:2011}SegmentTemplate")
represtation = element[0].findall(".//{urn:mpeg:dash:schema:mpd:2011}Representation")
for i in represtation:
files = []
segments = False
filename = dirname
bitrate = int(i.attrib["bandwidth"]) / 1000
idnumber = i.attrib["id"]
if i.find("{urn:mpeg:dash:schema:mpd:2011}BaseURL") is not None:
filename = urljoin(filename, i.find("{urn:mpeg:dash:schema:mpd:2011}BaseURL").text)
if i.find("{urn:mpeg:dash:schema:mpd:2011}SegmentBase") is not None:
segments = True
files.append(filename)
if template is not None:
segments = True
files = templateelemt(template, filename, idnumber, offset_sec, duration_sec)
elif i.find("{urn:mpeg:dash:schema:mpd:2011}SegmentTemplate") is not None:
segments = True
files = templateelemt(i.find("{urn:mpeg:dash:schema:mpd:2011}SegmentTemplate"), filename, idnumber, offset_sec, duration_sec)
if files:
streams[bitrate] = {"segments": segments, "files": files}
return streams
开发者ID:spaam,项目名称:svtplay-dl,代码行数:34,代码来源:dash.py
示例12: get_ENCODE
def get_ENCODE(obj_id, connection, frame="object"):
'''GET an ENCODE object as JSON and return as dict'''
if frame is None:
if '?' in obj_id:
url = urljoin(connection.server, obj_id+'&limit=all')
else:
url = urljoin(connection.server, obj_id+'?limit=all')
elif '?' in obj_id:
url = urljoin(connection.server, obj_id+'&limit=all&frame='+frame)
else:
url = urljoin(connection.server, obj_id+'?limit=all&frame='+frame)
logging.debug('GET %s' % (url))
response = requests.get(url, auth=connection.auth, headers=connection.headers)
logging.debug('GET RESPONSE code %s' % (response.status_code))
try:
if response.json():
logging.debug('GET RESPONSE JSON: %s' % (json.dumps(response.json(), indent=4, separators=(',', ': '))))
except:
logging.debug('GET RESPONSE text %s' % (response.text))
if not response.status_code == 200:
if response.json().get("notification"):
logging.warning('%s' % (response.json().get("notification")))
else:
logging.warning('GET failure. Response code = %s' % (response.text))
return response.json()
开发者ID:ENCODE-DCC,项目名称:pyencoded-tools,代码行数:25,代码来源:encodedcc.py
示例13: parse_homework
def parse_homework(words):
n, gist, id, time = words
dirname = os.path.join(OUTPUT, 'homework', n)
name = id
url = 'http://nbviewer.ipython.org/%s' % gist
text = infopen(url)
if text is None:
url = 'http://gist.github.com/%s' % gist
text = infopen(url)
assert text is not None
soup = BS(text)
a = soup.find('a', title='View Raw')
assert a is not None
content = infopen(urljoin(url, a['href']))
assert content is not None
good = False
else:
soup = BS(text)
a = soup.find('a', text='Download Notebook')
if a is None:
content = text
good = False
else:
content = infopen(urljoin(url, a['href']))
assert content is not None
good = True
return Bunch(
dirname=dirname,
name=name,
content=content,
good=good,
time=time,
title='homework %s' % n,
author=id
)
开发者ID:arisumukyu,项目名称:python-course.2013,代码行数:35,代码来源:archive.py
示例14: parse_susetags
def parse_susetags(repo, baseurl):
url = urljoin(baseurl, 'content')
content = requests.get(url)
if content.status_code != requests.codes.ok:
return False
f = tempfile.TemporaryFile()
f.write(content.content)
f.flush()
os.lseek(f.fileno(), 0, os.SEEK_SET)
repo.add_content(solv.xfopen_fd(None, f.fileno()), 0)
defvendorid = repo.meta.lookup_id(solv.SUSETAGS_DEFAULTVENDOR)
descrdir = repo.meta.lookup_str(solv.SUSETAGS_DESCRDIR)
if not descrdir:
descrdir = 'suse/setup/descr'
url = urljoin(baseurl, descrdir + '/packages.gz')
with requests.get(url, stream=True) as packages:
if packages.status_code != requests.codes.ok:
raise Exception(url + ' does not exist')
content = gzip.GzipFile(fileobj=io.BytesIO(packages.content))
os.lseek(f.fileno(), 0, os.SEEK_SET)
f.write(content.read())
f.flush()
os.lseek(f.fileno(), 0, os.SEEK_SET)
repo.add_susetags(f, defvendorid, None, solv.Repo.REPO_NO_INTERNALIZE|solv.Repo.SUSETAGS_RECORD_SHARES)
return True
return False
开发者ID:openSUSE,项目名称:osc-plugin-factory,代码行数:30,代码来源:update_repo_handler.py
示例15: main
def main():
# 指定种子页面
base_url = 'https://www.zhihu.com/'
seed_url = urljoin(base_url, 'explore')
# 创建Redis客户端
client = Redis(host='1.2.3.4', port=6379, password='1qaz2wsx')
# 设置用户代理(否则访问会被拒绝)
headers = {'user-agent': 'Baiduspider'}
# 通过requests模块发送GET请求并指定用户代理
resp = requests.get(seed_url, headers=headers)
# 创建BeautifulSoup对象并指定使用lxml作为解析器
soup = BeautifulSoup(resp.text, 'lxml')
href_regex = re.compile(r'^/question')
# 将URL处理成SHA1摘要(长度固定更简短)
hasher_proto = sha1()
# 查找所有href属性以/question打头的a标签
for a_tag in soup.find_all('a', {'href': href_regex}):
# 获取a标签的href属性值并组装完整的URL
href = a_tag.attrs['href']
full_url = urljoin(base_url, href)
# 传入URL生成SHA1摘要
hasher = hasher_proto.copy()
hasher.update(full_url.encode('utf-8'))
field_key = hasher.hexdigest()
# 如果Redis的键'zhihu'对应的hash数据类型中没有URL的摘要就访问页面并缓存
if not client.hexists('zhihu', field_key):
html_page = requests.get(full_url, headers=headers).text
# 对页面进行序列化和压缩操作
zipped_page = zlib.compress(pickle.dumps(html_page))
# 使用hash数据类型保存URL摘要及其对应的页面代码
client.hset('zhihu', field_key, zipped_page)
# 显示总共缓存了多少个页面
print('Total %d question pages found.' % client.hlen('zhihu'))
开发者ID:460708485,项目名称:Python-100-Days,代码行数:33,代码来源:example06.py
示例16: parse_repomd
def parse_repomd(repo, baseurl):
url = urljoin(baseurl, 'repodata/repomd.xml')
repomd = requests.get(url)
if repomd.status_code != requests.codes.ok:
return False
ns = {'r': 'http://linux.duke.edu/metadata/repo'}
root = ET.fromstring(repomd.content)
primary_element = root.find('.//r:data[@type="primary"]', ns)
location = primary_element.find('r:location', ns).get('href')
sha256_expected = primary_element.find('r:checksum[@type="sha256"]', ns).text
f = tempfile.TemporaryFile()
f.write(repomd.content)
f.flush()
os.lseek(f.fileno(), 0, os.SEEK_SET)
repo.add_repomdxml(solv.xfopen_fd(None, f.fileno()), 0)
url = urljoin(baseurl, location)
with requests.get(url, stream=True) as primary:
if primary.status_code != requests.codes.ok:
raise Exception(url + ' does not exist')
sha256 = hashlib.sha256(primary.content).hexdigest()
if sha256 != sha256_expected:
raise Exception('checksums do not match {} != {}'.format(sha256, sha256_expected))
content = gzip.GzipFile(fileobj=io.BytesIO(primary.content))
os.lseek(f.fileno(), 0, os.SEEK_SET)
f.write(content.read())
f.flush()
os.lseek(f.fileno(), 0, os.SEEK_SET)
repo.add_rpmmd(solv.xfopen_fd(None, f.fileno()), None, 0)
return True
return False
开发者ID:openSUSE,项目名称:osc-plugin-factory,代码行数:34,代码来源:update_repo_handler.py
示例17: startElementNS
def startElementNS(self, name, qname, attrs):
stack = self.stack
stack.append(ElementHandler())
current = self.current
parent = self.parent
base = attrs.get(BASE, None)
if base is not None:
base, frag = urldefrag(base)
if parent and parent.base:
base = urljoin(parent.base, base)
else:
systemId = self.locator.getPublicId() \
or self.locator.getSystemId()
if systemId:
base = urljoin(systemId, base)
else:
if parent:
base = parent.base
if base is None:
systemId = self.locator.getPublicId() \
or self.locator.getSystemId()
if systemId:
base, frag = urldefrag(systemId)
current.base = base
language = attrs.get(LANG, None)
if language is None:
if parent:
language = parent.language
current.language = language
current.start(name, qname, attrs)
开发者ID:0038lana,项目名称:Test-Task,代码行数:30,代码来源:rdfxml.py
示例18: _crawl
def _crawl(self):
uri = urljoin(self.__uri, self.__next)
self.__class__._log("debug", "%s crawls url: %s" % (self.__class__.__name__, uri))
(page, base, _) = self.__class__._fetch_remote_html(uri)
if not page:
self.__class__._log("debug", "%s crawled EMPTY url: %s" % (self.__class__.__name__, uri))
return
# get more content ("scroll down")
# to know what page to parse next
# update new last URI when we're not on first run
_next = None
_more = page.find("div", {"id": "more_loading"})
if _more:
_more = _more.find("a", {"href": True})
if _more:
_next = urljoin(base, _more["href"])
if _next:
self.__next = _next
else:
self.__class__._log("debug", "%s found no `next` on url: %s" % (self.__class__.__name__, uri))
# for every found imageContainer
# add img-src to map if not blacklisted
images_added = 0
for con in page.find_all("div", {"class": "imagecontainer"}):
image = con.find("img", {"src": True})
if image:
if self._add_image(urljoin(base, image["src"]), self.__site):
images_added += 1
if not images_added:
self.__class__._log("debug", "%s found no images on url: %s" % (self.__class__.__name__, uri))
开发者ID:omgwtflaserguns,项目名称:nichtparasoup,代码行数:34,代码来源:soupio.py
示例19: search_film
def search_film(self, search_query):
logging.info('Searching film for query: {}'.format(search_query))
search_url = urljoin(self.site_url, "/search/movies/")
search_url = urljoin(search_url, quote_plus(search_query))
search_page = self.fetch_page(search_url)
pq = PyQuery(search_page)
dom_search_list = pq(u".list_item")
film_list = []
for dom_item in dom_search_list:
name = pq(dom_item).find('img[border="0"]').show().attr('alt')
category = "Film"
film = Media(name=name, category=category)
# set description
desc = pq(dom_item).find('.plot').text()
film.description = re.sub('\s', ' ', str(desc)) # remove newlines from description
film.rating = pq(dom_item).find('span.rank_value').text()
# set page url
href = pq(dom_item).find('a.panel').attr('href')
film.url = urljoin(self.site_url, href)
# set thumbnail url
href_thumbnail = pq(dom_item).find('img[border="0"]').show().attr('src')
film.thumbnail = urljoin(self.site_url, href_thumbnail)
film_list.append(film)
return film_list
开发者ID:marcwebbie,项目名称:pyfetcher,代码行数:34,代码来源:crawler_tubeplus.py
示例20: urls_for
def urls_for(self):
only = self.options.get('topics')
if only: # if only...
only = set(only.split(','))
only = [(o, TOPIC_TO_REPORT_TYPE[o]) if o in TOPIC_TO_REPORT_TYPE else o
for o in only]
yield from self.urls_for_topics(only)
# If there are topics selected, ONLY yield URLs for those.
return
# First yield the URLs for the topics that are tangential to the main
# Calendar Year reports.
yield from self.urls_for_topics(ADDITIONAL_TOPICS)
# Not getting reports from specific topics, iterate over all Calendar Year
# reports.
page = BeautifulSoup(utils.download(BASE_URL))
# Iterate over each "Calendar Year XXXX" link
for li in page.select('.field-items li'):
md = RE_CALENDAR_YEAR.search(li.text)
if md:
cur_year = int(md.group(1))
if cur_year >= self.year_range[0] and cur_year <= self.year_range[-1]:
href = li.select('a')[0]['href']
next_url = urljoin(BASE_URL, href)
# The first page of reports is yielded.
yield next_url
# Next, read all the pagination links for the page and yield those. So
# far, I haven't seen a page that doesn't have all of the following
# pages enumerated.
next_page = BeautifulSoup(utils.download(next_url))
for link in next_page.select('li.pager-item a'):
yield urljoin(BASE_URL, link['href'])
开发者ID:slobdell,项目名称:inspectors-general,代码行数:35,代码来源:energy.py
注:本文中的urllib.parse.urljoin函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论