• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python uritools.urisplit函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中uritools.urisplit函数的典型用法代码示例。如果您正苦于以下问题:Python urisplit函数的具体用法?Python urisplit怎么用?Python urisplit使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了urisplit函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_getquery

 def test_getquery(self):
     cases = [
         ("?", [], {}),
         ("?&", [], {}),
         ("?&&", [], {}),
         ("?=",
          [('', '')],
          {'': ['']}),
         ("?=a",
          [('', 'a')],
          {'': ['a']}),
         ("?a",
          [('a', None)],
          {'a': [None]}),
         ("?a=",
          [('a', '')],
          {'a': ['']}),
         ("?&a=b",
          [('a', 'b')],
          {'a': ['b']}),
         ("?a=a+b&b=b+c",
          [('a', 'a+b'), ('b', 'b+c')],
          {'a': ['a+b'], 'b': ['b+c']}),
         ("?a=a%20b&b=b%20c",
          [('a', 'a b'), ('b', 'b c')],
          {'a': ['a b'], 'b': ['b c']}),
         ("?a=1&a=2",
          [('a', '1'), ('a', '2')],
          {'a': ['1', '2']}),
     ]
     for query, querylist, querydict in cases:
         self.assertEqual(urisplit(query).getquerylist(), querylist,
                          'Error parsing query dict for %r' % query)
         self.assertEqual(urisplit(query).getquerydict(), querydict,
                          'Error parsing query list for %r' % query)
开发者ID:solidsnack,项目名称:uritools,代码行数:35,代码来源:test_split.py


示例2: fileref

def fileref(path_or_handle_or_url, pfx='file:///'):
    # No-op on URLs.
    if isinstance(path_or_handle_or_url, uritools.SplitResult):
        return path_or_handle_or_url
    # Existing path objects become URLs.
    if isinstance(path_or_handle_or_url, py.path.local):
        return uritools.urisplit(pfx + str(path_or_handle_or_url))
    # Open handles get a name lookup.
    if hasattr(path_or_handle_or_url, 'name'):
        return uritools.urisplit(pfx + path_or_handle_or_url.name)
    return uritools.urisplit(pfx + path_or_handle_or_url)     # Maybe a string?
开发者ID:drcloud,项目名称:arx,代码行数:11,代码来源:files.py


示例3: test_invalid_ip_literal

 def test_invalid_ip_literal(self):
     uris = [
         'http://::12.34.56.78]/',
         'http://[::1/foo/',
         'ftp://[::1/foo/bad]/bad',
         'http://[::1/foo/bad]/bad',
         'http://[foo]/',
         'http://[v7.future]'
     ]
     for uri in uris:
         with self.assertRaises(ValueError, msg='%r' % uri):
             urisplit(uri).gethost()
         with self.assertRaises(ValueError, msg='%r' % uri.encode('ascii')):
             urisplit(uri.encode('ascii')).gethost()
开发者ID:tkem,项目名称:uritools,代码行数:14,代码来源:test_split.py


示例4: browse

 def browse(self, uri):
     logger.debug(u'browse called with uri %s' % uri)
     result = []
     localpath = urisplit(uri).path
     # import pdb; pdb.set_trace()
     if localpath == 'root':
         result = self._media_dirs()
     else:
         directory = localpath
         logger.debug(u'directory is %s' % directory)
         for name in os.listdir(directory):
             child = os.path.join(directory, name)
             uri = uricompose(self.URI_PREFIX, None, child)
             if self.backend._follow_symlinks:
                 st = os.stat(child)
             else:
                 st = os.lstat(child)
             if not self.backend._show_hidden and name.startswith(b'.'):
                 continue
             elif stat.S_ISDIR(st.st_mode):
                 result.append(models.Ref.directory(name=name, uri=uri))
             elif stat.S_ISREG(st.st_mode):
                 result.append(models.Ref.track(name=name, uri=uri))
             else:
                 logger.warn(u'Strange file encountered %s' % child)
                 pass
     result.sort(key=operator.attrgetter('name'))
     return result
开发者ID:rawdlite,项目名称:mopidy-filebrowser,代码行数:28,代码来源:library.py


示例5: get_con_pool

def get_con_pool(host,
                 key_file=None,
                 cert_file=None,
                 socket_timeout=15.0,
                 max_pool_size=3,
                 verify_https=True):
    """
    Return a ConnectionPool instance of given host
    :param socket_timeout:
        socket timeout for each connection in seconds
    """
    kwargs = {
        "timeout": socket_timeout,
        "maxsize": max_pool_size,
        "block": True,
        }

    if key_file is not None and cert_file is not None:
        kwargs["key_file"] = key_file
        kwargs["cert_file"] = cert_file

    if urisplit(host).scheme == "https":
        kwargs["ssl_version"] = ssl.PROTOCOL_TLSv1
        if verify_https:
            kwargs["cert_reqs"] = "CERT_REQUIRED"
            kwargs["ca_certs"] = getattr(settings, "RESTCLIENTS_CA_BUNDLE",
                                         "/etc/ssl/certs/ca-bundle.crt")

    return connection_from_url(host, **kwargs)
开发者ID:uwwebservices,项目名称:uw-registry,代码行数:29,代码来源:url_lib_wrapper.py


示例6: _browse_directory

    def _browse_directory(self, uri, order=('type', 'name')):
        query = dict(uritools.urisplit(uri).getquerylist())
        type = query.pop('type', None)
        role = query.pop('role', None)

        # TODO: handle these in schema (generically)?
        if type == 'date':
            format = query.get('format', '%Y-%m-%d')
            return map(_dateref, schema.dates(self._connect(), format=format))
        if type == 'genre':
            return map(_genreref, schema.list_distinct(self._connect(), 'genre'))  # noqa

        # Fix #38: keep sort order of album tracks; this also applies
        # to composers and performers
        if type == Ref.TRACK and 'album' in query:
            order = ('disc_no', 'track_no', 'name')

        roles = role or ('artist', 'albumartist')  # FIXME: re-think 'roles'...

        refs = []
        for ref in schema.browse(self._connect(), type, order, role=roles, **query):  # noqa
            if ref.type == Ref.TRACK or (not query and not role):
                refs.append(ref)
            elif ref.type == Ref.ALBUM:
                refs.append(Ref.directory(uri=uritools.uricompose(
                    'local', None, 'directory', dict(query, type=Ref.TRACK, album=ref.uri)  # noqa
                ), name=ref.name))
            elif ref.type == Ref.ARTIST:
                refs.append(Ref.directory(uri=uritools.uricompose(
                    'local', None, 'directory', dict(query, **{role: ref.uri})
                ), name=ref.name))
            else:
                logger.warn('Unexpected SQLite browse result: %r', ref)
        return refs
开发者ID:siam28,项目名称:mopidy-local-sqlite,代码行数:34,代码来源:library.py


示例7: __loop

	def __loop(self):
		continueReading = True
		mustWait = False
		while continueReading:
			try:
				if (mustWait):
					time.sleep (1)
					mustWait = False

				if (self.state == ClientState.OFFLINE):
					continueReading = False
				elif (self.state == ClientState.CONNECT):
					if (self.discover):
						self.uri = self.discoverURI(self.id, 1000, self.strategy)
					split = urisplit(self.uri)
					continueReading = self.__initsocket(split.host, int(split.port))
				elif (self.state == ClientState.READFRAME):
					self.__readAll()
					continueReading = self.__readFrame()
				elif (self.state == ClientState.READFRAMESIZE):
					self.__readAll()
					continueReading = self.__readFrameSize()
			except socket.error as err:
				mustWait = True
				continueReading = True
				print ("Lost Connection. Reconnect. ",err)
				if (self.state != ClientState.OFFLINE):
					self.state = ClientState.CONNECT
开发者ID:FScherzinger,项目名称:glowing-chainsaw,代码行数:28,代码来源:PSClient.py


示例8: _notify_metadata

    def _notify_metadata(self, iv, metadata, source_peer):
        # We are already in a green thread here
        if not isinstance(metadata, dict):
            metadata = {'metadata': metadata}

        uri = urisplit(iv.uri)

        tags = self._prefix_keys(iv.static_tags, 's_')
        tags.update(self._prefix_keys(source_peer, 'd_'))
        tags['authority'] = uri.authority
        tags['path'] = uri.path
        tags['scheme'] = uri.scheme

        metadata_to_write = {}
        for k, v in metadata.items():
            if type(v) not in InfluxDBArchiver._types_from_py_to_db.keys():
                metadata_to_write['json_' + k] = json.dumps(v)
            else:
                metadata_to_write[k] = v

        data = [{
            'measurement': 'metadata',
            'fields': metadata_to_write,
            'tags': tags
        }]
        logger.info('Writing metadata for %s: %s', uri, metadata)

        self._write_data(data)
开发者ID:Alidron,项目名称:alidron-archiver-influxdb,代码行数:28,代码来源:alidron_archiver.py


示例9: __parseuri

 def __parseuri(self, uri):
     try:
         server = self.__server(uri)
     except ValueError:
         return None, uri
     else:
         return server['URI'], server['Path'] + uritools.urisplit(uri).path
开发者ID:tkem,项目名称:mopidy-dleyna,代码行数:7,代码来源:client.py


示例10: get_images

 def get_images(self, uris):
     # TODO: suggest as API improvement
     uris = frozenset(uris)
     # group uris by authority (media server)
     queries = collections.defaultdict(list)
     for uri in uris.difference([self.root_directory.uri]):
         parts = uritools.urisplit(uri)
         baseuri = parts.scheme + '://' + parts.authority
         queries[baseuri].append(parts.path)
     # start searching - blocks only when iterating over results
     results = []
     for baseuri, paths in queries.items():
         try:
             iterable = self.__images(baseuri, paths)
         except NotImplementedError as e:
             logger.warn('Not retrieving images for %s: %s', baseuri, e)
         else:
             results.append(iterable)
     # merge results
     result = {}
     for uri, images in itertools.chain.from_iterable(results):
         result[uri] = tuple(images)
     if self.root_directory.uri in uris:
         result[self.root_directory.uri] = tuple()
     return result
开发者ID:anthonydahanne,项目名称:mopidy-dleyna,代码行数:25,代码来源:library.py


示例11: browse

 def browse(self, uri):
     logger.debug(u"Browse being called for %s" % uri)
     level = urisplit(uri).path
     query = self._sanitize_query(dict(urisplit(uri).getquerylist()))
     logger.debug("Got parsed to level: %s - query: %s" % (level,
                                                           query))
     result = []
     if not level:
         logger.error("No level for uri %s" % uri)
         # import pdb; pdb.set_trace()
     if level == 'root':
         for row in self._browse_genre():
             result.append(Ref.directory(
                 uri=uricompose('beetslocal',
                                None,
                                'genre',
                                dict(genre=row[0])),
                 name=row[0] if bool(row[0]) else u'No Genre'))
     elif level == "genre":
         # artist refs not browsable via mpd
         for row in self._browse_artist(query):
             result.append(Ref.directory(
                 uri=uricompose('beetslocal',
                                None,
                                'artist',
                                dict(genre=query['genre'][0],
                                     artist=row[1])),
                 name=row[0] if bool(row[0]) else u'No Artist'))
     elif level == "artist":
         for album in self._browse_album(query):
             result.append(Ref.album(
                 uri=uricompose('beetslocal',
                                None,
                                'album',
                                dict(album=album.id)),
                 name=album.album))
     elif level == "album":
         for track in self._browse_track(query):
             result.append(Ref.track(
                 uri="beetslocal:track:%s:%s" % (
                     track.id,
                     uriencode(track.path, '/')),
                 name=track.title))
     else:
         logger.debug('Unknown URI: %s', uri)
     # logger.debug(result)
     return result
开发者ID:rawdlite,项目名称:mopidy-beets-local,代码行数:47,代码来源:library.py


示例12: test_getscheme

 def test_getscheme(self):
     self.assertEqual(urisplit('foo').getscheme(default='bar'), 'bar')
     self.assertEqual(urisplit('foo:').getscheme(default='bar'), 'foo')
     self.assertEqual(urisplit('FOO:').getscheme(default='bar'), 'foo')
     self.assertEqual(urisplit('FOO_BAR:/').getscheme(default='x'), 'x')
     self.assertEqual(urisplit(b'foo').getscheme(default='bar'), 'bar')
     self.assertEqual(urisplit(b'foo:').getscheme(default='bar'), 'foo')
     self.assertEqual(urisplit(b'FOO:').getscheme(default='bar'), 'foo')
     self.assertEqual(urisplit(b'FOO_BAR:/').getscheme(default='x'), 'x')
开发者ID:tkem,项目名称:uritools,代码行数:9,代码来源:test_split.py


示例13: parse_uri

def parse_uri(uri):
    """
    Returns a parsed object for the given URI that you can further extract info from:
    gethost, getpath, getport, getquerydict, etc.
    :param uri:
    :return:
    :rtype: uritools.SplitResult
    """
    return uritools.urisplit(uri)
开发者ID:KyleBenson,项目名称:SmartAmericaSensors,代码行数:9,代码来源:uri.py


示例14: test_getport

 def test_getport(self):
     for uri in ['foo://bar', 'foo://bar:', 'foo://bar/', 'foo://bar:/']:
         result = urisplit(uri)
         if result.authority.endswith(':'):
             self.assertEqual(result.port, '')
         else:
             self.assertEqual(result.port, None)
         self.assertEqual(result.gethost(), 'bar')
         self.assertEqual(result.getport(8000), 8000)
开发者ID:tkem,项目名称:uritools,代码行数:9,代码来源:test_split.py


示例15: __init__

 def __init__(self, uri, **kwargs):
     parsed_uri = urisplit(uri)
     self.scheme = parsed_uri.scheme
     self.kwargs = dict(kwargs)
     self.baudrate = None
     self.parse_userinfo(parsed_uri)
     self.parse_hostinfo(parsed_uri)
     self.validate()
     self.fill_defaults()
开发者ID:meuter,项目名称:citizenshell,代码行数:9,代码来源:parseduri.py


示例16: _filters

 def _filters(self, uri):
     if uri.startswith('local:directory'):
         return [dict(uritools.urisplit(uri).getquerylist())]
     elif uri.startswith('local:artist'):
         return [{'artist': uri}, {'albumartist': uri}]
     elif uri.startswith('local:album'):
         return [{'album': uri}]
     else:
         return []
开发者ID:mopidy,项目名称:mopidy-local-sqlite,代码行数:9,代码来源:library.py


示例17: __server

 def __server(self, uri):
     udn = uritools.urisplit(uri).gethost()
     if not udn:
         raise ValueError('Invalid URI %s' % uri)
     try:
         server = self.__servers[udn]
     except KeyError:
         raise LookupError('Unknown media server UDN %s' % udn)
     else:
         return server
开发者ID:tkem,项目名称:mopidy-dleyna,代码行数:10,代码来源:client.py


示例18: test_getauthority

 def test_getauthority(self):
     from ipaddress import IPv4Address, IPv6Address
     cases = [
         ('urn:example:animal:ferret:nose',
          None,
          (None, None, None)),
         ('file:///',
          None,
          (None, '', None)),
         ('http://userinfo[email protected]:5432/foo/',
          None,
          ('userinfo', 'test.python.org', 5432)),
         ('http://[email protected]:5432/foo/',
          None,
          ('userinfo', IPv4Address('12.34.56.78'), 5432)),
         ('http://[email protected][::1]:5432/foo/',
          None,
          ('userinfo', IPv6Address('::1'), 5432)),
         ('urn:example:animal:ferret:nose',
          ('nobody', 'localhost', 42),
          ('nobody', 'localhost', 42)),
         ('file:///',
          ('nobody', 'localhost', 42),
          ('nobody', 'localhost', 42)),
         ('http://Test.python.org/foo/',
          ('nobody', 'localhost', 42),
          ('nobody', 'test.python.org', 42)),
         ('http://[email protected]/foo/',
          ('nobody', 'localhost', 42),
          ('userinfo', 'test.python.org', 42)),
         ('http://Test.python.org:5432/foo/',
          ('nobody', 'localhost', 42),
          ('nobody', 'test.python.org', 5432)),
         ('http://[email protected]:5432/foo/',
          ('nobody', 'localhost', 42),
          ('userinfo', 'test.python.org', 5432)),
     ]
     for uri, default, authority in cases:
         self.assertEqual(urisplit(uri).getauthority(default), authority)
     for uri in ['http://[::1/', 'http://::1]/']:
         with self.assertRaises(ValueError, msg='%r' % uri):
             urisplit(uri).getauthority()
         with self.assertRaises(ValueError, msg='%r' % uri):
             urisplit(uri.encode()).getauthority()
     with self.assertRaises(TypeError, msg='%r' % uri):
         urisplit('').getauthority(42)
     with self.assertRaises(ValueError, msg='%r' % uri):
         urisplit('').getauthority(('userinfo', 'test.python.org'))
开发者ID:tkem,项目名称:uritools,代码行数:48,代码来源:test_split.py


示例19: is_local_url

def is_local_url(target):
    """Determine if URL is a local.

    :param target: The URL to check.
    :returns: ``True`` if the target is a local url.
    """
    server_name = current_app.config['SERVER_NAME']
    test_url = urisplit(target)
    return not test_url.host or test_url.scheme in ('http', 'https') and \
        server_name == test_url.host
开发者ID:hjhsalo,项目名称:invenio-oauthclient,代码行数:10,代码来源:utils.py


示例20: browse

 def browse(self, uri):
     try:
         identifier = urisplit(uri).path
         if not identifier:
             return self._browse_root()
         elif identifier in self._config['collections']:
             return self._browse_collection(identifier)
         else:
             return self._browse_item(identifier)
     except Exception as e:
         logger.error('Error browsing %s: %s', uri, e)
         return []
开发者ID:soujak,项目名称:mopidy-internetarchive,代码行数:12,代码来源:library.py



注:本文中的uritools.urisplit函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python url.parse函数代码示例发布时间:2022-05-27
下一篇:
Python uritemplate.URITemplate类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap