本文整理汇总了Python中swift.common.utils.urlparse函数的典型用法代码示例。如果您正苦于以下问题:Python urlparse函数的具体用法?Python urlparse怎么用?Python urlparse使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了urlparse函数的15个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: __init__
def __init__(self, app, conf):
self.app = app
self.conf = conf
self.logger = get_logger(conf, log_route='liteauth')
self.provider = load_oauth_provider(
conf.get('oauth_provider', 'google_oauth'))
self.auth_endpoint = conf.get('auth_endpoint', '')
if not self.auth_endpoint:
raise ValueError('auth_endpoint not set in config file')
if isinstance(self.auth_endpoint, unicode):
self.auth_endpoint = self.auth_endpoint.encode('utf-8')
parsed_path = urlparse(self.auth_endpoint)
if not parsed_path.netloc:
raise ValueError('auth_endpoint is invalid in config file')
self.auth_domain = parsed_path.netloc
self.login_path = parsed_path.path
self.scheme = parsed_path.scheme
if self.scheme != 'https':
raise ValueError('auth_endpoint must have https:// scheme')
# by default service_domain can be extracted from the endpoint
# in case where auth domain is different from service domain
# you need to set up the service domain separately
# Example:
# auth_endpoint = https://auth.example.com/login
# service_domain = https://www.example.com
self.service_domain = conf.get('service_domain',
'%s://%s'
% (self.scheme, self.auth_domain))
self.storage_driver = None
self.oauth_login_timeout = 3600
开发者ID:pkit,项目名称:liteauth,代码行数:30,代码来源:oauth_login.py
示例2: referrer_allowed
def referrer_allowed(referrer, referrer_acl):
"""
Returns True if the referrer should be allowed based on the referrer_acl
list (as returned by :func:`parse_acl`).
See :func:`clean_acl` for documentation of the standard Swift ACL format.
:param referrer: The value of the HTTP Referer header.
:param referrer_acl: The list of referrer designations as returned by
:func:`parse_acl`.
:returns: True if the referrer should be allowed; False if not.
"""
allow = False
if referrer_acl:
rhost = urlparse(referrer or '').hostname or 'unknown'
for mhost in referrer_acl:
if mhost[0] == '-':
mhost = mhost[1:]
if mhost == rhost or \
(mhost[0] == '.' and rhost.endswith(mhost)):
allow = False
elif mhost == '*' or mhost == rhost or \
(mhost[0] == '.' and rhost.endswith(mhost)):
allow = True
return allow
开发者ID:AnyBucket,项目名称:OpenStack-Install-and-Understand-Guide,代码行数:25,代码来源:acl.py
示例3: _update_sync_to_headers
def _update_sync_to_headers(self, name, sync_to, user_key,
realm, realm_key, method, headers):
"""
Updates container sync headers
:param name: The name of the object
:param sync_to: The URL to the remote container.
:param user_key: The X-Container-Sync-Key to use when sending requests
to the other container.
:param realm: The realm from self.realms_conf, if there is one.
If None, fallback to using the older allowed_sync_hosts
way of syncing.
:param realm_key: The realm key from self.realms_conf, if there
is one. If None, fallback to using the older
allowed_sync_hosts way of syncing.
:param method: HTTP method to create sig with
:param headers: headers to update with container sync headers
"""
if realm and realm_key:
nonce = uuid.uuid4().hex
path = urlparse(sync_to).path + '/' + quote(name)
sig = self.realms_conf.get_sig(method, path,
headers.get('x-timestamp', 0),
nonce, realm_key,
user_key)
headers['x-container-sync-auth'] = '%s %s %s' % (realm,
nonce,
sig)
else:
headers['x-container-sync-key'] = user_key
开发者ID:harrisonfeng,项目名称:swift,代码行数:30,代码来源:sync.py
示例4: cookie_resp
def cookie_resp(status, response_headers, exc_info=None):
resp_headers = HeaderKeyDict(response_headers)
if 'x-auth-token' in resp_headers:
auth_token = resp_headers['x-auth-token']
expires_in = int(resp_headers.get('x-auth-token-expires', 0))
storage_url = resp_headers.get('x-storage-url', '')
path_parts = urlparse(storage_url)
domain = path_parts.hostname
secure = False
if path_parts.scheme == 'https':
secure = True
if auth_token and domain:
new_cookie = create_auth_cookie('session',
domain,
token=auth_token,
expires_in=expires_in,
secure=secure,
httponly=True)
response_headers.append(('Set-Cookie', new_cookie))
new_cookie = create_auth_cookie('storage',
domain,
token=quote(storage_url,
safe=''),
expires_in=expires_in,
secure=secure)
response_headers.append(('Set-Cookie', new_cookie))
return start_response(status, response_headers, exc_info)
开发者ID:pkit,项目名称:liteauth,代码行数:27,代码来源:liteauth_token.py
示例5: __init__
def __init__(self, app, conf):
#: The next WSGI application/filter in the paste.deploy pipeline.
self.app = app
#: The filter configuration dict. Only used in tests.
self.conf = conf
self.logger = get_logger(conf, log_route='staticweb')
# We expose a more general "url_base" parameter in case we want
# to incorporate the path prefix later. Currently it is discarded.
url_base = conf.get('url_base', None)
self.url_scheme = None
self.url_host = None
if url_base:
parsed = urlparse(url_base)
self.url_scheme = parsed.scheme
self.url_host = parsed.netloc
开发者ID:clayg,项目名称:swift,代码行数:16,代码来源:staticweb.py
示例6: __init__
def __init__(self, app, conf):
self.app = app
self.conf = conf
self.version = 'v1'
self.auth_endpoint = conf.get('auth_endpoint', '')
if not self.auth_endpoint:
raise ValueError('auth_endpoint not set in config file')
if isinstance(self.auth_endpoint, unicode):
self.auth_endpoint = self.auth_endpoint.encode('utf-8')
parsed_path = urlparse(self.auth_endpoint)
if not parsed_path.netloc:
raise ValueError('auth_endpoint is invalid in config file')
self.auth_domain = parsed_path.netloc
self.login_path = parsed_path.path
self.scheme = parsed_path.scheme
if self.scheme != 'https':
raise ValueError('auth_endpoint must have https:// scheme')
# by default service_domain can be extracted from the endpoint
# in case where auth domain is different from service domain
# you need to set up the service domain separately
# Example:
# auth_endpoint = https://auth.example.com/login
# service_domain = https://www.example.com
self.service_domain = conf.get('service_domain',
'%s://%s'
% (self.scheme, self.auth_domain))
self.logger = get_logger(conf, log_route='lite-auth')
# try to refresh token
# when less than this amount of seconds left
self.refresh_before = conf.get('token_refresh_before', 60 * 29)
# url for whitelist objects
# Example: /v1/liteauth/whitelist
self.whitelist_url = conf.get('whitelist_url', '').lower().rstrip('/')
# url for invite objects
# Example: /v1/liteauth/invites
self.invite_url = conf.get('invite_url', '').lower().rstrip('/')
self.storage_driver = None
self.metadata_key = conf.get('metadata_key', 'userdata').lower()
self.provider = load_oauth_provider(conf.get('oauth_provider', 'google_oauth'))
self.oauth_login_timeout = 3600
开发者ID:pkit,项目名称:liteauth,代码行数:40,代码来源:liteauth.py
示例7: test_urlparse
def test_urlparse(self):
parsed = utils.urlparse('http://127.0.0.1/')
self.assertEquals(parsed.scheme, 'http')
self.assertEquals(parsed.hostname, '127.0.0.1')
self.assertEquals(parsed.path, '/')
parsed = utils.urlparse('http://127.0.0.1:8080/')
self.assertEquals(parsed.port, 8080)
parsed = utils.urlparse('https://127.0.0.1/')
self.assertEquals(parsed.scheme, 'https')
parsed = utils.urlparse('http://[::1]/')
self.assertEquals(parsed.hostname, '::1')
parsed = utils.urlparse('http://[::1]:8080/')
self.assertEquals(parsed.hostname, '::1')
self.assertEquals(parsed.port, 8080)
parsed = utils.urlparse('www.example.com')
self.assertEquals(parsed.hostname, '')
开发者ID:AsylumCorp,项目名称:swift,代码行数:21,代码来源:test_utils.py
示例8: test_urlparse
def test_urlparse(self):
parsed = utils.urlparse("http://127.0.0.1/")
self.assertEquals(parsed.scheme, "http")
self.assertEquals(parsed.hostname, "127.0.0.1")
self.assertEquals(parsed.path, "/")
parsed = utils.urlparse("http://127.0.0.1:8080/")
self.assertEquals(parsed.port, 8080)
parsed = utils.urlparse("https://127.0.0.1/")
self.assertEquals(parsed.scheme, "https")
parsed = utils.urlparse("http://[::1]/")
self.assertEquals(parsed.hostname, "::1")
parsed = utils.urlparse("http://[::1]:8080/")
self.assertEquals(parsed.hostname, "::1")
self.assertEquals(parsed.port, 8080)
parsed = utils.urlparse("www.example.com")
self.assertEquals(parsed.hostname, "")
开发者ID:colecrawford,项目名称:swift,代码行数:21,代码来源:test_utils.py
示例9: origin_get
def origin_get(url, token, parsed, conn, cdn_url, obj, headers={}):
cdn_parsed = urlparse(cdn_url)
conn.request('GET',
quote(cdn_parsed.path + '/' + obj), '',
self._origin_headers(headers, cdn_url))
return check_response(conn)
开发者ID:aerwin3,项目名称:sos,代码行数:6,代码来源:origin_testing.py
示例10: _origin_headers
def _origin_headers(self, cur_headers, url):
parsed = urlparse(url)
cur_headers['Host'] = parsed.hostname
return cur_headers
开发者ID:aerwin3,项目名称:sos,代码行数:4,代码来源:origin_testing.py
示例11: origin_options
def origin_options(url, token, parsed, conn, cdn_url, obj, headers={}):
cdn_parsed = urlparse(cdn_url)
conn.request('OPTIONS',
quote(cdn_parsed.path + '/' + obj), '',
self._origin_headers(headers, cdn_url))
return check_response(conn, allow_401=True)
开发者ID:aerwin3,项目名称:sos,代码行数:6,代码来源:origin_testing.py
示例12: container_sync_row
def container_sync_row(self, row, sync_to, user_key, broker, info,
realm, realm_key):
"""
Sends the update the row indicates to the sync_to container.
:param row: The updated row in the local database triggering the sync
update.
:param sync_to: The URL to the remote container.
:param user_key: The X-Container-Sync-Key to use when sending requests
to the other container.
:param broker: The local container database broker.
:param info: The get_info result from the local container database
broker.
:param realm: The realm from self.realms_conf, if there is one.
If None, fallback to using the older allowed_sync_hosts
way of syncing.
:param realm_key: The realm key from self.realms_conf, if there
is one. If None, fallback to using the older
allowed_sync_hosts way of syncing.
:returns: True on success
"""
try:
start_time = time()
if row['deleted']:
try:
headers = {'x-timestamp': row['created_at']}
if realm and realm_key:
nonce = uuid.uuid4().hex
path = urlparse(sync_to).path + '/' + quote(
row['name'])
sig = self.realms_conf.get_sig(
'DELETE', path, headers['x-timestamp'], nonce,
realm_key, user_key)
headers['x-container-sync-auth'] = '%s %s %s' % (
realm, nonce, sig)
else:
headers['x-container-sync-key'] = user_key
delete_object(sync_to, name=row['name'], headers=headers,
proxy=self.select_http_proxy(),
logger=self.logger,
timeout=self.conn_timeout)
except ClientException as err:
if err.http_status != HTTP_NOT_FOUND:
raise
self.container_deletes += 1
self.logger.increment('deletes')
self.logger.timing_since('deletes.timing', start_time)
else:
part, nodes = \
self.get_object_ring(info['storage_policy_index']). \
get_nodes(info['account'], info['container'],
row['name'])
shuffle(nodes)
exc = None
looking_for_timestamp = Timestamp(row['created_at'])
timestamp = -1
headers = body = None
# look up for the newest one
headers_out = {'X-Newest': True,
'X-Backend-Storage-Policy-Index':
str(info['storage_policy_index'])}
try:
source_obj_status, source_obj_info, source_obj_iter = \
self.swift.get_object(info['account'],
info['container'], row['name'],
headers=headers_out,
acceptable_statuses=(2, 4))
except (Exception, UnexpectedResponse, Timeout) as err:
source_obj_info = {}
source_obj_iter = None
exc = err
timestamp = Timestamp(source_obj_info.get(
'x-timestamp', 0))
headers = source_obj_info
body = source_obj_iter
if timestamp < looking_for_timestamp:
if exc:
raise exc
raise Exception(
_('Unknown exception trying to GET: '
'%(account)r %(container)r %(object)r'),
{'account': info['account'],
'container': info['container'],
'object': row['name']})
for key in ('date', 'last-modified'):
if key in headers:
del headers[key]
if 'etag' in headers:
headers['etag'] = headers['etag'].strip('"')
if 'content-type' in headers:
headers['content-type'] = clean_content_type(
headers['content-type'])
headers['x-timestamp'] = row['created_at']
if realm and realm_key:
nonce = uuid.uuid4().hex
path = urlparse(sync_to).path + '/' + quote(row['name'])
sig = self.realms_conf.get_sig(
'PUT', path, headers['x-timestamp'], nonce, realm_key,
user_key)
#.........这里部分代码省略.........
开发者ID:BjoernT,项目名称:swift,代码行数:101,代码来源:sync.py
示例13: container_sync_row
def container_sync_row(self, row, sync_to, user_key, broker, info,
realm, realm_key):
"""
Sends the update the row indicates to the sync_to container.
:param row: The updated row in the local database triggering the sync
update.
:param sync_to: The URL to the remote container.
:param user_key: The X-Container-Sync-Key to use when sending requests
to the other container.
:param broker: The local container database broker.
:param info: The get_info result from the local container database
broker.
:param realm: The realm from self.realms_conf, if there is one.
If None, fallback to using the older allowed_sync_hosts
way of syncing.
:param realm_key: The realm key from self.realms_conf, if there
is one. If None, fallback to using the older
allowed_sync_hosts way of syncing.
:returns: True on success
"""
try:
start_time = time()
if row['deleted']:
try:
headers = {'x-timestamp': row['created_at']}
if realm and realm_key:
nonce = uuid.uuid4().hex
path = urlparse(sync_to).path + '/' + quote(
row['name'])
sig = self.realms_conf.get_sig(
'DELETE', path, headers['x-timestamp'], nonce,
realm_key, user_key)
headers['x-container-sync-auth'] = '%s %s %s' % (
realm, nonce, sig)
else:
headers['x-container-sync-key'] = user_key
delete_object(sync_to, name=row['name'], headers=headers,
proxy=self.select_http_proxy())
except ClientException as err:
if err.http_status != HTTP_NOT_FOUND:
raise
self.container_deletes += 1
self.logger.increment('deletes')
self.logger.timing_since('deletes.timing', start_time)
else:
part, nodes = self.object_ring.get_nodes(
info['account'], info['container'],
row['name'])
shuffle(nodes)
exc = None
looking_for_timestamp = float(row['created_at'])
timestamp = -1
headers = body = None
for node in nodes:
try:
these_headers, this_body = direct_get_object(
node, part, info['account'], info['container'],
row['name'], resp_chunk_size=65536)
this_timestamp = float(these_headers['x-timestamp'])
if this_timestamp > timestamp:
timestamp = this_timestamp
headers = these_headers
body = this_body
except ClientException as err:
# If any errors are not 404, make sure we report the
# non-404 one. We don't want to mistakenly assume the
# object no longer exists just because one says so and
# the others errored for some other reason.
if not exc or exc.http_status == HTTP_NOT_FOUND:
exc = err
except (Exception, Timeout) as err:
exc = err
if timestamp < looking_for_timestamp:
if exc:
raise exc
raise Exception(
_('Unknown exception trying to GET: %(node)r '
'%(account)r %(container)r %(object)r'),
{'node': node, 'part': part,
'account': info['account'],
'container': info['container'],
'object': row['name']})
for key in ('date', 'last-modified'):
if key in headers:
del headers[key]
if 'etag' in headers:
headers['etag'] = headers['etag'].strip('"')
headers['x-timestamp'] = row['created_at']
if realm and realm_key:
nonce = uuid.uuid4().hex
path = urlparse(sync_to).path + '/' + quote(row['name'])
sig = self.realms_conf.get_sig(
'PUT', path, headers['x-timestamp'], nonce, realm_key,
user_key)
headers['x-container-sync-auth'] = '%s %s %s' % (
realm, nonce, sig)
else:
headers['x-container-sync-key'] = user_key
put_object(sync_to, name=row['name'], headers=headers,
#.........这里部分代码省略.........
开发者ID:10389030,项目名称:swift,代码行数:101,代码来源:sync.py
示例14: opt
def opt(command):
gettext.install('swauth', unicode=1)
parser = OptionParser(usage='Usage: %prog [options] <account> <user> <password>')
parser.add_option('-a', '--admin', dest='admin', action='store_true',
default=False, help='Give the user administrator access; otherwise '
'the user will only have access to containers specifically allowed '
'with ACLs.')
parser.add_option('-r', '--reseller-admin', dest='reseller_admin',
action='store_true', default=False, help='Give the user full reseller '
'administrator access, giving them full access to all accounts within '
'the reseller, including the ability to create new accounts. Creating '
'a new reseller admin requires super_admin rights.')
parser.add_option('-s', '--suffix', dest='suffix',
default='', help='The suffix to use with the reseller prefix as the '
'storage account name (default: <randomly-generated-uuid4>) Note: If '
'the account already exists, this will have no effect on existing '
'service URLs. Those will need to be updated with '
'swauth-set-account-service')
parser.add_option('-A', '--admin-url', dest='admin_url',
default='http://127.0.0.1:8080/auth/', help='The URL to the auth '
'subsystem (default: http://127.0.0.1:8080/auth/')
parser.add_option('-U', '--admin-user', dest='admin_user',
default='.super_admin', help='The user with admin rights to add users '
'(default: .super_admin).')
parser.add_option('-K', '--admin-key', dest='admin_key',
help='The key for the user with admin rights to add users.')
#########################
# modified by wuzebang 2014/2/27
#args = argv[1:]
args = command
if not args:
args.append('-h')
(options, args) = parser.parse_args(args)
if len(args) < 3:
parser.parse_args(['-h'])
elif len(args) == 3:
attr = []
else:
attr = args[3:]
account, user, password = args[:3]
############################
parsed = urlparse(options.admin_url)
if parsed.scheme not in ('http', 'https'):
raise Exception('Cannot handle protocol scheme %s for url %s' %
(parsed.scheme, repr(options.admin_url)))
parsed_path = parsed.path
if not parsed_path:
parsed_path = '/'
elif parsed_path[-1] != '/':
parsed_path += '/'
# Ensure the account exists if user is NOT trying to change his password
if not options.admin_user == (account + ':' + user):
path = '%sv2/%s' % (parsed_path, account)
headers = {'X-Auth-Admin-User': options.admin_user,
'X-Auth-Admin-Key': options.admin_key}
if options.suffix:
headers['X-Account-Suffix'] = options.suffix
conn = http_connect(parsed.hostname, parsed.port, 'GET', path, headers,
ssl=(parsed.scheme == 'https'))
resp = conn.getresponse()
if resp.status // 100 != 2:
headers['Content-Length'] = '0'
conn = http_connect(parsed.hostname, parsed.port, 'PUT', path, headers,
ssl=(parsed.scheme == 'https'))
resp = conn.getresponse()
if resp.status // 100 != 2:
print 'Account creation failed: %s %s' % (resp.status, resp.reason)
# Add the user
path = '%sv2/%s/%s' % (parsed_path, account, user)
headers = {'X-Auth-Admin-User': options.admin_user,
'X-Auth-Admin-Key': options.admin_key,
'X-Auth-User-Key': password,
'X-Auth-User-Attr': attr, # added by wuzbeang 2014/2/27
'Content-Length': '0'}
print 'the header is :\n' , repr(headers)
if options.admin:
headers['X-Auth-User-Admin'] = 'true'
if options.reseller_admin:
headers['X-Auth-User-Reseller-Admin'] = 'true'
conn = http_connect(parsed.hostname, parsed.port, 'PUT', path, headers,
ssl=(parsed.scheme == 'https'))
resp = conn.getresponse()
if resp.status // 100 != 2:
#exit('User creation failed: %s %s' % (resp.status, resp.reason))
print 'User added failed!'
开发者ID:liuyueyi,项目名称:SwiftProject,代码行数:89,代码来源:swauth_add_user.py
示例15: opt
def opt(command):
gettext.install('swauth', unicode=1)
parser = OptionParser(usage='''
Usage: %prog [options] [account] [user]
If [account] and [user] are omitted, a list of accounts will be output.
If [account] is included but not [user], an account's information will be
output, including a list of users within the account.
If [account] and [user] are included, the user's information will be output,
including a list of groups the user belongs to.
If the [user] is '.groups', the active groups for the account will be listed.
'''.strip())
parser.add_option('-p', '--plain-text', dest='plain_text',
action='store_true', default=False, help='Changes the output from '
'JSON to plain text. This will cause an account to list only the '
'users and a user to list only the groups.')
parser.add_option('-A', '--admin-url', dest='admin_url',
default='http://127.0.0.1:8080/auth/', help='The URL to the auth '
'subsystem (default: http://127.0.0.1:8080/auth/')
parser.add_option('-U', '--admin-user', dest='admin_user',
default='.super_admin', help='The user with admin rights to add users '
'(default: .super_admin).')
parser.add_option('-K', '--admin-key', dest='admin_key',
help='The key for the user with admin rights to add users.')
#########################
# modified by wuzebang 2014/2/27
#args = argv[1:]
args = command
## if not args:
## args.append('-h')
## (options, args) = parser.parse_args(args)
## if len(args) < 3:
## parser.parse_args(['-h'])
## elif len(args) == 3:
## attr = []
## else:
## attr = args[3:]
## account, user, password = args[:3]
## ############################
if not args:
args.append('-h')
(options, args) = parser.parse_args(args)
if len(args) > 2:
parser.parse_args(['-h'])
parsed = urlparse(options.admin_url)
if parsed.scheme not in ('http', 'https'):
raise Exception('Cannot handle protocol scheme %s for url %s' %
(parsed.scheme, repr(options.admin_url)))
parsed_path = parsed.path
if not parsed_path:
parsed_path = '/'
elif parsed_path[-1] != '/':
parsed_path += '/'
path = '%sv2/%s' % (parsed_path, '/'.join(args))
headers = {'X-Auth-Admin-User': options.admin_user,
'X-Auth-Admin-Key': options.admin_key}
conn = http_connect(parsed.hostname, parsed.port, 'GET', path, headers,
ssl=(parsed.scheme == 'https'))
resp = conn.getresponse()
body = resp.read()
if resp.status // 100 != 2:
print('List failed: %s %s' % (resp.status, resp.reason))
if options.plain_text:
info = json.loads(body)
for group in info[['accounts', 'users', 'groups'][len(args)]]:
print group['name']
else:
print body
return json.loads(body)
开发者ID:liuyueyi,项目名称:SwiftProject,代码行数:76,代码来源:swauthlist.py
注:本文中的swift.common.utils.urlparse函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论