• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python parse.urljoin函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中urllib.parse.urljoin函数的典型用法代码示例。如果您正苦于以下问题:Python urljoin函数的具体用法?Python urljoin怎么用?Python urljoin使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了urljoin函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: post_to_hastebin

def post_to_hastebin(data, url="http://hastebin.com/"):
    if isinstance(data, str):
        data = data.encode()
    response = requests.post(urljoin(url, "documents"), data)
    response.raise_for_status()
    result = response.json()
    return urljoin(url, result['key'])
开发者ID:FuelRats,项目名称:pipsqueak,代码行数:7,代码来源:hastebin.py


示例2: report_from

def report_from(result, year_range):
  link = result.select("a")[0]
  title = link.text
  landing_url = urljoin(REPORTS_URL, link.get('href'))
  report_id_node, published_node = result.select("div.release_info")
  report_id = report_id_node.text.strip().replace(",", "")
  published_on = datetime.datetime.strptime(published_node.text, '%b %d, %Y')

  if published_on.year not in year_range:
    logging.debug("[%s] Skipping, not in requested range." % landing_url)
    return

  logging.debug("Scraping landing url: %s", landing_url)
  landing_page = beautifulsoup_from_url(landing_url)
  summary = landing_page.select("div.left_col")[0].text.strip()

  pdf_link = landing_page.select("#link_bar > a")[0]
  report_url = urljoin(REPORTS_URL, pdf_link.get('href'))

  text_link = landing_page.select("#add_material a")[-1]
  text_report_url = urljoin(REPORTS_URL, text_link.get('href'))

  report = {
    'inspector': 'gao',
    'inspector_url': 'http://www.gao.gov/about/workforce/ig.html',
    'agency': 'gao',
    'agency_name': 'Government Accountability Office',
    'report_id': report_id,
    'url': report_url,
    'text_url': text_report_url,
    'landing_url': landing_url,
    'title': title,
    'published_on': datetime.datetime.strftime(published_on, "%Y-%m-%d"),
  }
  return report
开发者ID:BunsenMcDubbs,项目名称:inspectors-general,代码行数:35,代码来源:gao.py


示例3: test_checksending

    def test_checksending(self):
        httpretty.register_uri(
            httpretty.POST,
            urljoin(SmsAero.URL_GATE, '/checksending/'),
            body='{"reason": {"33460579": "smsc reject", \
                "33460580": "delivery success"}, \
                "result": "accepted"}',
            status=200,
            content_type='text/json',
        )

        self.api.checksending(322)

        httpretty.register_uri(
            httpretty.POST,
            urljoin(SmsAero.URL_GATE, '/checksending/'),
            body='{"reason": "empty field", "result": "reject"}',
            status=200,
            content_type='text/json',
        )

        try:
            self.api.checksending('')
            self.assertTrue(False)
        except SmsAeroError:
            pass
开发者ID:sheregeda,项目名称:smsaero,代码行数:26,代码来源:test_api.py


示例4: root

def root():
    fp = request.fullpath

    try:
        numpkgs = len(list(packages()))
    except:
        numpkgs = 0

    return """<html><head><title>Welcome to pypiserver!</title></head><body>
<h1>Welcome to pypiserver!</h1>
<p>This is a PyPI compatible package index serving %(NUMPKGS)s packages.</p>

<p> To use this server with pip, run the the following command:
<blockquote><pre>
pip install -i %(URL)ssimple/ PACKAGE [PACKAGE2...]
</pre></blockquote></p>

<p> To use this server with easy_install, run the the following command:
<blockquote><pre>
easy_install -i %(URL)ssimple/ PACKAGE
</pre></blockquote></p>

<p>The complete list of all packages can be found <a href="%(PACKAGES)s">here</a> or via the <a href="%(SIMPLE)s">simple</a> index.</p>

<p>This instance is running version %(VERSION)s of the <a href="http://pypi.python.org/pypi/pypiserver">pypiserver</a> software.</p>
</body></html>
""" % dict(URL=request.url, VERSION=__version__, NUMPKGS=numpkgs,
           PACKAGES=urljoin(fp, "packages/"),
           SIMPLE=urljoin(fp, "simple/"))
开发者ID:cecedille1,项目名称:pypiserver,代码行数:29,代码来源:_app.py


示例5: setUpClass

    def setUpClass(cls):
        """Create an RPM repository with a valid feed and sync it.

        Do the following:

        1. Reset Pulp, including the Squid cache.
        2. Create a repository with the "background" download policy.
        3. Sync and publish the repository.
        4. Download an RPM from the repository.
        """
        super(BackgroundTestCase, cls).setUpClass()
        if (selectors.bug_is_untestable(1905, cls.cfg.version) and
                _os_is_rhel6(cls.cfg)):
            raise unittest.SkipTest('https://pulp.plan.io/issues/1905')

        # Required to ensure content is actually downloaded.
        utils.reset_squid(cls.cfg)
        utils.reset_pulp(cls.cfg)

        # Create, sync and publish a repository.
        repo = _create_repo(cls.cfg, 'background')
        cls.resources.add(repo['_href'])
        report = utils.sync_repo(cls.cfg, repo['_href']).json()

        # Record the tasks spawned when syncing the repository, and the state
        # of the repository itself after the sync.
        client = api.Client(cls.cfg)
        cls.repo = client.get(repo['_href'], params={'details': True}).json()
        cls.tasks = tuple(api.poll_spawned_tasks(cls.cfg, report))

        # Download an RPM.
        path = urljoin('/pulp/repos/', repo['id'] + '/')
        path = urljoin(path, RPM)
        cls.rpm = client.get(path)
开发者ID:release-engineering,项目名称:pulp-smash,代码行数:34,代码来源:test_download_policies.py


示例6: fake

def fake(base_url, username, password, tourney_id):
    url_opener = _utils.login_and_enter_arcade(base_url, username, password)

    # calculate some more URLs
    tourneys_url = urljoin(base_url, "arcade.php?&do=viewtournaments")
    join_tourney_url = urljoin(base_url, "arcade.php?&do=registertourney&tid={0}".format(
        tourney_id
    ))
    #view_tourney_url = urljoin(base_url, "arcade.php?&do=viewtourney&tid={0}".format(
    #    tourney_id
    #))

    # go to tourneys
    print("entering tourneys page")
    tourneys_response = url_opener.open(tourneys_url)
    tourneys_response.read()

    # go to tourney creation form
    print("joining tourney")
    join_tourney_response = url_opener.open(join_tourney_url)
    join_tourney_response.read()

    # look at tourney to make sure it sticks
    #print("looking at tourney")
    #view_tourney_response = url_opener.open(view_tourney_url)
    #view_tourney_response.read()

    print("done")
开发者ID:RavuAlHemio,项目名称:vbcbbot,代码行数:28,代码来源:register_to_tourney.py


示例7: test_entry_feed_enclosure

 def test_entry_feed_enclosure(self):
     entry = self.create_published_entry()
     feed = EntryFeed()
     self.assertEquals(
         feed.item_enclosure_url(entry), 'http://example.com/image.jpg')
     self.assertEquals(feed.item_enclosure_length(entry), '100000')
     self.assertEquals(feed.item_enclosure_mime_type(entry), 'image/jpeg')
     entry.content = 'My test content with image <img src="image.jpg" />'
     entry.save()
     self.assertEquals(
         feed.item_enclosure_url(entry), 'http://example.com/image.jpg')
     self.assertEquals(feed.item_enclosure_length(entry), '100000')
     self.assertEquals(feed.item_enclosure_mime_type(entry), 'image/jpeg')
     entry.content = 'My test content with image ' \
                     '<img src="http://test.com/image.jpg" />'
     entry.save()
     self.assertEquals(
         feed.item_enclosure_url(entry), 'http://test.com/image.jpg')
     self.assertEquals(feed.item_enclosure_length(entry), '100000')
     self.assertEquals(feed.item_enclosure_mime_type(entry), 'image/jpeg')
     path = default_storage.save('enclosure.png', ContentFile('Content'))
     entry.image = path
     entry.save()
     self.assertEquals(feed.item_enclosure_url(entry),
                       urljoin('http://example.com', entry.image.url))
     self.assertEquals(feed.item_enclosure_length(entry), '7')
     self.assertEquals(feed.item_enclosure_mime_type(entry), 'image/png')
     default_storage.delete(path)
     entry.image = 'invalid_image_without_extension'
     entry.save()
     self.assertEquals(feed.item_enclosure_url(entry),
                       urljoin('http://example.com', entry.image.url))
     self.assertEquals(feed.item_enclosure_length(entry), '100000')
     self.assertEquals(feed.item_enclosure_mime_type(entry), 'image/jpeg')
开发者ID:Damgaard,项目名称:django-blog-zinnia,代码行数:34,代码来源:feeds.py


示例8: MyParser

def MyParser(url,index):
    global links,A,num
    if (not IsInTheList(url, links)) and (len(links) <= num) and Is_ntut_web(url):
        try:
            soup = BeautifulSoup(urlopen(url), "lxml")
            result = soup.find("meta",attrs={"http-equiv":"refresh"})
            meta = str(soup.html.head.meta)
            if result:
                links.append(url)
                wait,text=result["content"].split(";")
                if text.lower().startswith("url="):
                    pice=text[4:]
                    tempUrl = urljoin('http://www.ntut.edu.tw',pice)
                    print(url)
                    MyParser(tempUrl,FindIndex(url,links))
                    if index != FindIndex(url,links):
                        A[FindIndex(url,links),index]=1
            elif meta.find('text/html;') >= 0:
                links.append(url)
                for link in soup.findAll('a'):
                    #print(A[:,0])
                    tempUrl = link.get('href')
                    tempUrl = urljoin("http://www.ntut.edu.tw",tempUrl)
                    MyParser(tempUrl,FindIndex(url,links))
                    if index != FindIndex(url,links):
                        A[FindIndex(url,links),index]=1
        except:
            pass
    elif IsInTheList(url, links) and (len(links) <= num+1):
        if index != FindIndex(url,links):
            A[FindIndex(url,links),index]=1
开发者ID:brian41005,项目名称:My_EM_Project,代码行数:31,代码来源:HtmlGetLink.py


示例9: compose_url

def compose_url(season, year=None, sport=None):
    if year and sport:
        return urljoin(URL, season + '/' + year + '/' + sport)
    elif year:
        return urljoin(URL, season + '/' + year)
    else:
        return urljoin(URL, season)
开发者ID:mrpatiwi,项目名称:olympic-sport-scraper,代码行数:7,代码来源:scrapper.py


示例10: getStreamURLs

 def getStreamURLs(self):
     time = self.time
     logging.debug("%s: Starting update of streamURL array", threading.current_thread().name)
     for i in range(0, self.length):
         if re.findall(r"(^.*Helios-HSS.*$)", self.playlist.getPlaylistUrl()):
             url = urljoin(
                 self.baseUrl,
                 "IRDETO-HSS-H/QualityLevels("
                 + str(self.qualityLevels)
                 + ")/Fragments(video="
                 + str(int(time))
                 + ")",
             )
             # print(self.baseUrl, "IS Helios VOD")
         elif re.findall(r"(^.*\.vod.*$)", self.baseUrl):
             url = urljoin(
                 self.baseUrl,
                 "IRDETO-HSS-O/QualityLevels("
                 + str(self.qualityLevels)
                 + ")/Fragments(video="
                 + str(int(time))
                 + ")",
             )
             # print(self.baseUrl, "IS Orion VOD")
         else:
             url = urljoin(
                 self.baseUrl,
                 "QualityLevels(" + str(self.qualityLevels) + ")/Fragments(video=" + str(int(time)) + ")",
             )
             # print(self.baseUrl, "IS LIVE")
         self.streamUrls.append(url)
         time = time + int(self.deltaArray[i])
         # print(self.streamUrls[i], 'index : ', i)
     logging.debug("%s: Completed updating streamURL array", threading.current_thread().name)
     return self
开发者ID:dexpetkovic,项目名称:smooth-streaming-validator,代码行数:35,代码来源:streamURL.py


示例11: adaptionset

def adaptionset(element, url, baseurl=None, offset_sec=None, duration_sec=None):
    streams = {}

    dirname = os.path.dirname(url) + "/"
    if baseurl:
        dirname = urljoin(dirname, baseurl)

    template = element[0].find("{urn:mpeg:dash:schema:mpd:2011}SegmentTemplate")
    represtation = element[0].findall(".//{urn:mpeg:dash:schema:mpd:2011}Representation")

    for i in represtation:
        files = []
        segments = False
        filename = dirname
        bitrate = int(i.attrib["bandwidth"]) / 1000
        idnumber = i.attrib["id"]

        if i.find("{urn:mpeg:dash:schema:mpd:2011}BaseURL") is not None:
            filename = urljoin(filename, i.find("{urn:mpeg:dash:schema:mpd:2011}BaseURL").text)

        if i.find("{urn:mpeg:dash:schema:mpd:2011}SegmentBase") is not None:
            segments = True
            files.append(filename)
        if template is not None:
            segments = True
            files = templateelemt(template, filename, idnumber, offset_sec, duration_sec)
        elif i.find("{urn:mpeg:dash:schema:mpd:2011}SegmentTemplate") is not None:
            segments = True
            files = templateelemt(i.find("{urn:mpeg:dash:schema:mpd:2011}SegmentTemplate"), filename, idnumber, offset_sec, duration_sec)

        if files:
            streams[bitrate] = {"segments": segments, "files": files}

    return streams
开发者ID:spaam,项目名称:svtplay-dl,代码行数:34,代码来源:dash.py


示例12: get_ENCODE

def get_ENCODE(obj_id, connection, frame="object"):
    '''GET an ENCODE object as JSON and return as dict'''
    if frame is None:
        if '?' in obj_id:
            url = urljoin(connection.server, obj_id+'&limit=all')
        else:
            url = urljoin(connection.server, obj_id+'?limit=all')
    elif '?' in obj_id:
        url = urljoin(connection.server, obj_id+'&limit=all&frame='+frame)
    else:
        url = urljoin(connection.server, obj_id+'?limit=all&frame='+frame)
    logging.debug('GET %s' % (url))
    response = requests.get(url, auth=connection.auth, headers=connection.headers)
    logging.debug('GET RESPONSE code %s' % (response.status_code))
    try:
        if response.json():
            logging.debug('GET RESPONSE JSON: %s' % (json.dumps(response.json(), indent=4, separators=(',', ': '))))
    except:
        logging.debug('GET RESPONSE text %s' % (response.text))
    if not response.status_code == 200:
        if response.json().get("notification"):
            logging.warning('%s' % (response.json().get("notification")))
        else:
            logging.warning('GET failure.  Response code = %s' % (response.text))
    return response.json()
开发者ID:ENCODE-DCC,项目名称:pyencoded-tools,代码行数:25,代码来源:encodedcc.py


示例13: parse_homework

def parse_homework(words):
    n, gist, id, time = words
    dirname = os.path.join(OUTPUT, 'homework', n)
    name = id
    url = 'http://nbviewer.ipython.org/%s' % gist
    text = infopen(url)
    if text is None:
        url = 'http://gist.github.com/%s' % gist
        text = infopen(url)
        assert text is not None
        soup = BS(text)
        a = soup.find('a', title='View Raw')
        assert a is not None
        content = infopen(urljoin(url, a['href']))
        assert content is not None
        good = False
    else:
        soup = BS(text)
        a = soup.find('a', text='Download Notebook')
        if a is None:
            content = text
            good = False
        else:
            content = infopen(urljoin(url, a['href']))
            assert content is not None
            good = True
    return Bunch(
        dirname=dirname,
        name=name,
        content=content,
        good=good,
        time=time,
        title='homework %s' % n,
        author=id
    )
开发者ID:arisumukyu,项目名称:python-course.2013,代码行数:35,代码来源:archive.py


示例14: parse_susetags

def parse_susetags(repo, baseurl):
    url = urljoin(baseurl, 'content')
    content = requests.get(url)
    if content.status_code != requests.codes.ok:
        return False

    f = tempfile.TemporaryFile()
    f.write(content.content)
    f.flush()
    os.lseek(f.fileno(), 0, os.SEEK_SET)
    repo.add_content(solv.xfopen_fd(None, f.fileno()), 0)

    defvendorid = repo.meta.lookup_id(solv.SUSETAGS_DEFAULTVENDOR)
    descrdir = repo.meta.lookup_str(solv.SUSETAGS_DESCRDIR)
    if not descrdir:
        descrdir = 'suse/setup/descr'

    url = urljoin(baseurl, descrdir + '/packages.gz')
    with requests.get(url, stream=True) as packages:
        if packages.status_code != requests.codes.ok:
            raise Exception(url + ' does not exist')

        content = gzip.GzipFile(fileobj=io.BytesIO(packages.content))
        os.lseek(f.fileno(), 0, os.SEEK_SET)
        f.write(content.read())
        f.flush()
        os.lseek(f.fileno(), 0, os.SEEK_SET)
        repo.add_susetags(f, defvendorid, None, solv.Repo.REPO_NO_INTERNALIZE|solv.Repo.SUSETAGS_RECORD_SHARES)
        return True
    return False
开发者ID:openSUSE,项目名称:osc-plugin-factory,代码行数:30,代码来源:update_repo_handler.py


示例15: main

def main():
    # 指定种子页面
    base_url = 'https://www.zhihu.com/'
    seed_url = urljoin(base_url, 'explore')
    # 创建Redis客户端
    client = Redis(host='1.2.3.4', port=6379, password='1qaz2wsx')
    # 设置用户代理(否则访问会被拒绝)
    headers = {'user-agent': 'Baiduspider'}
    # 通过requests模块发送GET请求并指定用户代理
    resp = requests.get(seed_url, headers=headers)
    # 创建BeautifulSoup对象并指定使用lxml作为解析器
    soup = BeautifulSoup(resp.text, 'lxml')
    href_regex = re.compile(r'^/question')
    # 将URL处理成SHA1摘要(长度固定更简短)
    hasher_proto = sha1()
    # 查找所有href属性以/question打头的a标签
    for a_tag in soup.find_all('a', {'href': href_regex}):
        # 获取a标签的href属性值并组装完整的URL
        href = a_tag.attrs['href']
        full_url = urljoin(base_url, href)
        # 传入URL生成SHA1摘要
        hasher = hasher_proto.copy()
        hasher.update(full_url.encode('utf-8'))
        field_key = hasher.hexdigest()
        # 如果Redis的键'zhihu'对应的hash数据类型中没有URL的摘要就访问页面并缓存
        if not client.hexists('zhihu', field_key):
            html_page = requests.get(full_url, headers=headers).text
            # 对页面进行序列化和压缩操作
            zipped_page = zlib.compress(pickle.dumps(html_page))
            # 使用hash数据类型保存URL摘要及其对应的页面代码
            client.hset('zhihu', field_key, zipped_page)
    # 显示总共缓存了多少个页面
    print('Total %d question pages found.' % client.hlen('zhihu'))
开发者ID:460708485,项目名称:Python-100-Days,代码行数:33,代码来源:example06.py


示例16: parse_repomd

def parse_repomd(repo, baseurl):
    url = urljoin(baseurl, 'repodata/repomd.xml')
    repomd = requests.get(url)
    if repomd.status_code != requests.codes.ok:
        return False

    ns = {'r': 'http://linux.duke.edu/metadata/repo'}
    root = ET.fromstring(repomd.content)
    primary_element = root.find('.//r:data[@type="primary"]', ns)
    location = primary_element.find('r:location', ns).get('href')
    sha256_expected = primary_element.find('r:checksum[@type="sha256"]', ns).text

    f = tempfile.TemporaryFile()
    f.write(repomd.content)
    f.flush()
    os.lseek(f.fileno(), 0, os.SEEK_SET)
    repo.add_repomdxml(solv.xfopen_fd(None, f.fileno()), 0)
    url = urljoin(baseurl, location)
    with requests.get(url, stream=True) as primary:
        if primary.status_code != requests.codes.ok:
            raise Exception(url + ' does not exist')
        sha256 = hashlib.sha256(primary.content).hexdigest()
        if sha256 != sha256_expected:
            raise Exception('checksums do not match {} != {}'.format(sha256, sha256_expected))

        content = gzip.GzipFile(fileobj=io.BytesIO(primary.content))
        os.lseek(f.fileno(), 0, os.SEEK_SET)
        f.write(content.read())
        f.flush()
        os.lseek(f.fileno(), 0, os.SEEK_SET)
        repo.add_rpmmd(solv.xfopen_fd(None, f.fileno()), None, 0)
        return True

    return False
开发者ID:openSUSE,项目名称:osc-plugin-factory,代码行数:34,代码来源:update_repo_handler.py


示例17: startElementNS

 def startElementNS(self, name, qname, attrs):
     stack = self.stack
     stack.append(ElementHandler())
     current = self.current
     parent = self.parent
     base = attrs.get(BASE, None)
     if base is not None:
         base, frag = urldefrag(base)
         if parent and parent.base:
             base = urljoin(parent.base, base)
         else:
             systemId = self.locator.getPublicId() \
                 or self.locator.getSystemId()
             if systemId:
                 base = urljoin(systemId, base)
     else:
         if parent:
             base = parent.base
         if base is None:
             systemId = self.locator.getPublicId() \
                 or self.locator.getSystemId()
             if systemId:
                 base, frag = urldefrag(systemId)
     current.base = base
     language = attrs.get(LANG, None)
     if language is None:
         if parent:
             language = parent.language
     current.language = language
     current.start(name, qname, attrs)
开发者ID:0038lana,项目名称:Test-Task,代码行数:30,代码来源:rdfxml.py


示例18: _crawl

    def _crawl(self):
        uri = urljoin(self.__uri, self.__next)
        self.__class__._log("debug", "%s crawls url: %s" % (self.__class__.__name__, uri))

        (page, base, _) = self.__class__._fetch_remote_html(uri)
        if not page:
            self.__class__._log("debug", "%s crawled EMPTY url: %s" % (self.__class__.__name__, uri))
            return

        # get more content ("scroll down")
        # to know what page to parse next
        # update new last URI when we're not on first run
        _next = None
        _more = page.find("div", {"id": "more_loading"})
        if _more:
            _more = _more.find("a", {"href": True})
            if _more:
                _next = urljoin(base, _more["href"])
        if _next:
            self.__next = _next
        else:
            self.__class__._log("debug", "%s found no `next` on url: %s" % (self.__class__.__name__, uri))

        # for every found imageContainer
        # add img-src to map if not blacklisted
        images_added = 0
        for con in page.find_all("div", {"class": "imagecontainer"}):
            image = con.find("img", {"src": True})
            if image:
                if self._add_image(urljoin(base, image["src"]), self.__site):
                    images_added += 1

        if not images_added:
            self.__class__._log("debug", "%s found no images on url: %s" % (self.__class__.__name__, uri))
开发者ID:omgwtflaserguns,项目名称:nichtparasoup,代码行数:34,代码来源:soupio.py


示例19: search_film

    def search_film(self, search_query):
        logging.info('Searching film for query: {}'.format(search_query))

        search_url = urljoin(self.site_url, "/search/movies/")
        search_url = urljoin(search_url, quote_plus(search_query))

        search_page = self.fetch_page(search_url)
        pq = PyQuery(search_page)

        dom_search_list = pq(u".list_item")
        film_list = []
        for dom_item in dom_search_list:
            name = pq(dom_item).find('img[border="0"]').show().attr('alt')
            category = "Film"

            film = Media(name=name, category=category)

            # set description
            desc = pq(dom_item).find('.plot').text()
            film.description = re.sub('\s', ' ', str(desc))  # remove newlines from description

            film.rating = pq(dom_item).find('span.rank_value').text()

            # set page url
            href = pq(dom_item).find('a.panel').attr('href')
            film.url = urljoin(self.site_url, href)

            # set thumbnail url
            href_thumbnail = pq(dom_item).find('img[border="0"]').show().attr('src')
            film.thumbnail = urljoin(self.site_url, href_thumbnail)

            film_list.append(film)

        return film_list
开发者ID:marcwebbie,项目名称:pyfetcher,代码行数:34,代码来源:crawler_tubeplus.py


示例20: urls_for

  def urls_for(self):
    only = self.options.get('topics')
    if only: # if only...
      only = set(only.split(','))
      only = [(o, TOPIC_TO_REPORT_TYPE[o]) if o in TOPIC_TO_REPORT_TYPE else o
              for o in only]
      yield from self.urls_for_topics(only)
      # If there are topics selected, ONLY yield URLs for those.
      return

    # First yield the URLs for the topics that are tangential to the main
    # Calendar Year reports.
    yield from self.urls_for_topics(ADDITIONAL_TOPICS)

    # Not getting reports from specific topics, iterate over all Calendar Year
    # reports.
    page = BeautifulSoup(utils.download(BASE_URL))

    # Iterate over each "Calendar Year XXXX" link
    for li in page.select('.field-items li'):
      md = RE_CALENDAR_YEAR.search(li.text)
      if md:
        cur_year = int(md.group(1))
        if cur_year >= self.year_range[0] and cur_year <= self.year_range[-1]:
          href = li.select('a')[0]['href']
          next_url = urljoin(BASE_URL, href)
          # The first page of reports is yielded.
          yield next_url

          # Next, read all the pagination links for the page and yield those. So
          # far, I haven't seen a page that doesn't have all of the following
          # pages enumerated.
          next_page = BeautifulSoup(utils.download(next_url))
          for link in next_page.select('li.pager-item a'):
            yield urljoin(BASE_URL, link['href'])
开发者ID:slobdell,项目名称:inspectors-general,代码行数:35,代码来源:energy.py



注:本文中的urllib.parse.urljoin函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python parse.urllib_parse_unquote函数代码示例发布时间:2022-05-27
下一篇:
Python parse.urlencode函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap