本文整理汇总了Python中uritools.urisplit函数的典型用法代码示例。如果您正苦于以下问题:Python urisplit函数的具体用法?Python urisplit怎么用?Python urisplit使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了urisplit函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_getquery
def test_getquery(self):
cases = [
("?", [], {}),
("?&", [], {}),
("?&&", [], {}),
("?=",
[('', '')],
{'': ['']}),
("?=a",
[('', 'a')],
{'': ['a']}),
("?a",
[('a', None)],
{'a': [None]}),
("?a=",
[('a', '')],
{'a': ['']}),
("?&a=b",
[('a', 'b')],
{'a': ['b']}),
("?a=a+b&b=b+c",
[('a', 'a+b'), ('b', 'b+c')],
{'a': ['a+b'], 'b': ['b+c']}),
("?a=a%20b&b=b%20c",
[('a', 'a b'), ('b', 'b c')],
{'a': ['a b'], 'b': ['b c']}),
("?a=1&a=2",
[('a', '1'), ('a', '2')],
{'a': ['1', '2']}),
]
for query, querylist, querydict in cases:
self.assertEqual(urisplit(query).getquerylist(), querylist,
'Error parsing query dict for %r' % query)
self.assertEqual(urisplit(query).getquerydict(), querydict,
'Error parsing query list for %r' % query)
开发者ID:solidsnack,项目名称:uritools,代码行数:35,代码来源:test_split.py
示例2: fileref
def fileref(path_or_handle_or_url, pfx='file:///'):
# No-op on URLs.
if isinstance(path_or_handle_or_url, uritools.SplitResult):
return path_or_handle_or_url
# Existing path objects become URLs.
if isinstance(path_or_handle_or_url, py.path.local):
return uritools.urisplit(pfx + str(path_or_handle_or_url))
# Open handles get a name lookup.
if hasattr(path_or_handle_or_url, 'name'):
return uritools.urisplit(pfx + path_or_handle_or_url.name)
return uritools.urisplit(pfx + path_or_handle_or_url) # Maybe a string?
开发者ID:drcloud,项目名称:arx,代码行数:11,代码来源:files.py
示例3: test_invalid_ip_literal
def test_invalid_ip_literal(self):
uris = [
'http://::12.34.56.78]/',
'http://[::1/foo/',
'ftp://[::1/foo/bad]/bad',
'http://[::1/foo/bad]/bad',
'http://[foo]/',
'http://[v7.future]'
]
for uri in uris:
with self.assertRaises(ValueError, msg='%r' % uri):
urisplit(uri).gethost()
with self.assertRaises(ValueError, msg='%r' % uri.encode('ascii')):
urisplit(uri.encode('ascii')).gethost()
开发者ID:tkem,项目名称:uritools,代码行数:14,代码来源:test_split.py
示例4: browse
def browse(self, uri):
logger.debug(u'browse called with uri %s' % uri)
result = []
localpath = urisplit(uri).path
# import pdb; pdb.set_trace()
if localpath == 'root':
result = self._media_dirs()
else:
directory = localpath
logger.debug(u'directory is %s' % directory)
for name in os.listdir(directory):
child = os.path.join(directory, name)
uri = uricompose(self.URI_PREFIX, None, child)
if self.backend._follow_symlinks:
st = os.stat(child)
else:
st = os.lstat(child)
if not self.backend._show_hidden and name.startswith(b'.'):
continue
elif stat.S_ISDIR(st.st_mode):
result.append(models.Ref.directory(name=name, uri=uri))
elif stat.S_ISREG(st.st_mode):
result.append(models.Ref.track(name=name, uri=uri))
else:
logger.warn(u'Strange file encountered %s' % child)
pass
result.sort(key=operator.attrgetter('name'))
return result
开发者ID:rawdlite,项目名称:mopidy-filebrowser,代码行数:28,代码来源:library.py
示例5: get_con_pool
def get_con_pool(host,
key_file=None,
cert_file=None,
socket_timeout=15.0,
max_pool_size=3,
verify_https=True):
"""
Return a ConnectionPool instance of given host
:param socket_timeout:
socket timeout for each connection in seconds
"""
kwargs = {
"timeout": socket_timeout,
"maxsize": max_pool_size,
"block": True,
}
if key_file is not None and cert_file is not None:
kwargs["key_file"] = key_file
kwargs["cert_file"] = cert_file
if urisplit(host).scheme == "https":
kwargs["ssl_version"] = ssl.PROTOCOL_TLSv1
if verify_https:
kwargs["cert_reqs"] = "CERT_REQUIRED"
kwargs["ca_certs"] = getattr(settings, "RESTCLIENTS_CA_BUNDLE",
"/etc/ssl/certs/ca-bundle.crt")
return connection_from_url(host, **kwargs)
开发者ID:uwwebservices,项目名称:uw-registry,代码行数:29,代码来源:url_lib_wrapper.py
示例6: _browse_directory
def _browse_directory(self, uri, order=('type', 'name')):
query = dict(uritools.urisplit(uri).getquerylist())
type = query.pop('type', None)
role = query.pop('role', None)
# TODO: handle these in schema (generically)?
if type == 'date':
format = query.get('format', '%Y-%m-%d')
return map(_dateref, schema.dates(self._connect(), format=format))
if type == 'genre':
return map(_genreref, schema.list_distinct(self._connect(), 'genre')) # noqa
# Fix #38: keep sort order of album tracks; this also applies
# to composers and performers
if type == Ref.TRACK and 'album' in query:
order = ('disc_no', 'track_no', 'name')
roles = role or ('artist', 'albumartist') # FIXME: re-think 'roles'...
refs = []
for ref in schema.browse(self._connect(), type, order, role=roles, **query): # noqa
if ref.type == Ref.TRACK or (not query and not role):
refs.append(ref)
elif ref.type == Ref.ALBUM:
refs.append(Ref.directory(uri=uritools.uricompose(
'local', None, 'directory', dict(query, type=Ref.TRACK, album=ref.uri) # noqa
), name=ref.name))
elif ref.type == Ref.ARTIST:
refs.append(Ref.directory(uri=uritools.uricompose(
'local', None, 'directory', dict(query, **{role: ref.uri})
), name=ref.name))
else:
logger.warn('Unexpected SQLite browse result: %r', ref)
return refs
开发者ID:siam28,项目名称:mopidy-local-sqlite,代码行数:34,代码来源:library.py
示例7: __loop
def __loop(self):
continueReading = True
mustWait = False
while continueReading:
try:
if (mustWait):
time.sleep (1)
mustWait = False
if (self.state == ClientState.OFFLINE):
continueReading = False
elif (self.state == ClientState.CONNECT):
if (self.discover):
self.uri = self.discoverURI(self.id, 1000, self.strategy)
split = urisplit(self.uri)
continueReading = self.__initsocket(split.host, int(split.port))
elif (self.state == ClientState.READFRAME):
self.__readAll()
continueReading = self.__readFrame()
elif (self.state == ClientState.READFRAMESIZE):
self.__readAll()
continueReading = self.__readFrameSize()
except socket.error as err:
mustWait = True
continueReading = True
print ("Lost Connection. Reconnect. ",err)
if (self.state != ClientState.OFFLINE):
self.state = ClientState.CONNECT
开发者ID:FScherzinger,项目名称:glowing-chainsaw,代码行数:28,代码来源:PSClient.py
示例8: _notify_metadata
def _notify_metadata(self, iv, metadata, source_peer):
# We are already in a green thread here
if not isinstance(metadata, dict):
metadata = {'metadata': metadata}
uri = urisplit(iv.uri)
tags = self._prefix_keys(iv.static_tags, 's_')
tags.update(self._prefix_keys(source_peer, 'd_'))
tags['authority'] = uri.authority
tags['path'] = uri.path
tags['scheme'] = uri.scheme
metadata_to_write = {}
for k, v in metadata.items():
if type(v) not in InfluxDBArchiver._types_from_py_to_db.keys():
metadata_to_write['json_' + k] = json.dumps(v)
else:
metadata_to_write[k] = v
data = [{
'measurement': 'metadata',
'fields': metadata_to_write,
'tags': tags
}]
logger.info('Writing metadata for %s: %s', uri, metadata)
self._write_data(data)
开发者ID:Alidron,项目名称:alidron-archiver-influxdb,代码行数:28,代码来源:alidron_archiver.py
示例9: __parseuri
def __parseuri(self, uri):
try:
server = self.__server(uri)
except ValueError:
return None, uri
else:
return server['URI'], server['Path'] + uritools.urisplit(uri).path
开发者ID:tkem,项目名称:mopidy-dleyna,代码行数:7,代码来源:client.py
示例10: get_images
def get_images(self, uris):
# TODO: suggest as API improvement
uris = frozenset(uris)
# group uris by authority (media server)
queries = collections.defaultdict(list)
for uri in uris.difference([self.root_directory.uri]):
parts = uritools.urisplit(uri)
baseuri = parts.scheme + '://' + parts.authority
queries[baseuri].append(parts.path)
# start searching - blocks only when iterating over results
results = []
for baseuri, paths in queries.items():
try:
iterable = self.__images(baseuri, paths)
except NotImplementedError as e:
logger.warn('Not retrieving images for %s: %s', baseuri, e)
else:
results.append(iterable)
# merge results
result = {}
for uri, images in itertools.chain.from_iterable(results):
result[uri] = tuple(images)
if self.root_directory.uri in uris:
result[self.root_directory.uri] = tuple()
return result
开发者ID:anthonydahanne,项目名称:mopidy-dleyna,代码行数:25,代码来源:library.py
示例11: browse
def browse(self, uri):
logger.debug(u"Browse being called for %s" % uri)
level = urisplit(uri).path
query = self._sanitize_query(dict(urisplit(uri).getquerylist()))
logger.debug("Got parsed to level: %s - query: %s" % (level,
query))
result = []
if not level:
logger.error("No level for uri %s" % uri)
# import pdb; pdb.set_trace()
if level == 'root':
for row in self._browse_genre():
result.append(Ref.directory(
uri=uricompose('beetslocal',
None,
'genre',
dict(genre=row[0])),
name=row[0] if bool(row[0]) else u'No Genre'))
elif level == "genre":
# artist refs not browsable via mpd
for row in self._browse_artist(query):
result.append(Ref.directory(
uri=uricompose('beetslocal',
None,
'artist',
dict(genre=query['genre'][0],
artist=row[1])),
name=row[0] if bool(row[0]) else u'No Artist'))
elif level == "artist":
for album in self._browse_album(query):
result.append(Ref.album(
uri=uricompose('beetslocal',
None,
'album',
dict(album=album.id)),
name=album.album))
elif level == "album":
for track in self._browse_track(query):
result.append(Ref.track(
uri="beetslocal:track:%s:%s" % (
track.id,
uriencode(track.path, '/')),
name=track.title))
else:
logger.debug('Unknown URI: %s', uri)
# logger.debug(result)
return result
开发者ID:rawdlite,项目名称:mopidy-beets-local,代码行数:47,代码来源:library.py
示例12: test_getscheme
def test_getscheme(self):
self.assertEqual(urisplit('foo').getscheme(default='bar'), 'bar')
self.assertEqual(urisplit('foo:').getscheme(default='bar'), 'foo')
self.assertEqual(urisplit('FOO:').getscheme(default='bar'), 'foo')
self.assertEqual(urisplit('FOO_BAR:/').getscheme(default='x'), 'x')
self.assertEqual(urisplit(b'foo').getscheme(default='bar'), 'bar')
self.assertEqual(urisplit(b'foo:').getscheme(default='bar'), 'foo')
self.assertEqual(urisplit(b'FOO:').getscheme(default='bar'), 'foo')
self.assertEqual(urisplit(b'FOO_BAR:/').getscheme(default='x'), 'x')
开发者ID:tkem,项目名称:uritools,代码行数:9,代码来源:test_split.py
示例13: parse_uri
def parse_uri(uri):
"""
Returns a parsed object for the given URI that you can further extract info from:
gethost, getpath, getport, getquerydict, etc.
:param uri:
:return:
:rtype: uritools.SplitResult
"""
return uritools.urisplit(uri)
开发者ID:KyleBenson,项目名称:SmartAmericaSensors,代码行数:9,代码来源:uri.py
示例14: test_getport
def test_getport(self):
for uri in ['foo://bar', 'foo://bar:', 'foo://bar/', 'foo://bar:/']:
result = urisplit(uri)
if result.authority.endswith(':'):
self.assertEqual(result.port, '')
else:
self.assertEqual(result.port, None)
self.assertEqual(result.gethost(), 'bar')
self.assertEqual(result.getport(8000), 8000)
开发者ID:tkem,项目名称:uritools,代码行数:9,代码来源:test_split.py
示例15: __init__
def __init__(self, uri, **kwargs):
parsed_uri = urisplit(uri)
self.scheme = parsed_uri.scheme
self.kwargs = dict(kwargs)
self.baudrate = None
self.parse_userinfo(parsed_uri)
self.parse_hostinfo(parsed_uri)
self.validate()
self.fill_defaults()
开发者ID:meuter,项目名称:citizenshell,代码行数:9,代码来源:parseduri.py
示例16: _filters
def _filters(self, uri):
if uri.startswith('local:directory'):
return [dict(uritools.urisplit(uri).getquerylist())]
elif uri.startswith('local:artist'):
return [{'artist': uri}, {'albumartist': uri}]
elif uri.startswith('local:album'):
return [{'album': uri}]
else:
return []
开发者ID:mopidy,项目名称:mopidy-local-sqlite,代码行数:9,代码来源:library.py
示例17: __server
def __server(self, uri):
udn = uritools.urisplit(uri).gethost()
if not udn:
raise ValueError('Invalid URI %s' % uri)
try:
server = self.__servers[udn]
except KeyError:
raise LookupError('Unknown media server UDN %s' % udn)
else:
return server
开发者ID:tkem,项目名称:mopidy-dleyna,代码行数:10,代码来源:client.py
示例18: test_getauthority
def test_getauthority(self):
from ipaddress import IPv4Address, IPv6Address
cases = [
('urn:example:animal:ferret:nose',
None,
(None, None, None)),
('file:///',
None,
(None, '', None)),
('http://userinfo[email protected]:5432/foo/',
None,
('userinfo', 'test.python.org', 5432)),
('http://[email protected]:5432/foo/',
None,
('userinfo', IPv4Address('12.34.56.78'), 5432)),
('http://[email protected][::1]:5432/foo/',
None,
('userinfo', IPv6Address('::1'), 5432)),
('urn:example:animal:ferret:nose',
('nobody', 'localhost', 42),
('nobody', 'localhost', 42)),
('file:///',
('nobody', 'localhost', 42),
('nobody', 'localhost', 42)),
('http://Test.python.org/foo/',
('nobody', 'localhost', 42),
('nobody', 'test.python.org', 42)),
('http://[email protected]/foo/',
('nobody', 'localhost', 42),
('userinfo', 'test.python.org', 42)),
('http://Test.python.org:5432/foo/',
('nobody', 'localhost', 42),
('nobody', 'test.python.org', 5432)),
('http://[email protected]:5432/foo/',
('nobody', 'localhost', 42),
('userinfo', 'test.python.org', 5432)),
]
for uri, default, authority in cases:
self.assertEqual(urisplit(uri).getauthority(default), authority)
for uri in ['http://[::1/', 'http://::1]/']:
with self.assertRaises(ValueError, msg='%r' % uri):
urisplit(uri).getauthority()
with self.assertRaises(ValueError, msg='%r' % uri):
urisplit(uri.encode()).getauthority()
with self.assertRaises(TypeError, msg='%r' % uri):
urisplit('').getauthority(42)
with self.assertRaises(ValueError, msg='%r' % uri):
urisplit('').getauthority(('userinfo', 'test.python.org'))
开发者ID:tkem,项目名称:uritools,代码行数:48,代码来源:test_split.py
示例19: is_local_url
def is_local_url(target):
"""Determine if URL is a local.
:param target: The URL to check.
:returns: ``True`` if the target is a local url.
"""
server_name = current_app.config['SERVER_NAME']
test_url = urisplit(target)
return not test_url.host or test_url.scheme in ('http', 'https') and \
server_name == test_url.host
开发者ID:hjhsalo,项目名称:invenio-oauthclient,代码行数:10,代码来源:utils.py
示例20: browse
def browse(self, uri):
try:
identifier = urisplit(uri).path
if not identifier:
return self._browse_root()
elif identifier in self._config['collections']:
return self._browse_collection(identifier)
else:
return self._browse_item(identifier)
except Exception as e:
logger.error('Error browsing %s: %s', uri, e)
return []
开发者ID:soujak,项目名称:mopidy-internetarchive,代码行数:12,代码来源:library.py
注:本文中的uritools.urisplit函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论