本文整理汇总了Python中swiftclient.get_auth函数的典型用法代码示例。如果您正苦于以下问题:Python get_auth函数的具体用法?Python get_auth怎么用?Python get_auth使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get_auth函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: authenticate
def authenticate(self, clone_conn=None):
if clone_conn:
self.conn_class = clone_conn.conn_class
self.storage_host = clone_conn.storage_host
self.storage_url = clone_conn.storage_url
self.storage_port = clone_conn.storage_port
self.storage_token = clone_conn.storage_token
return
if self.auth_version == "1":
auth_path = '%sv1.0' % (self.auth_prefix)
if self.account:
auth_user = '%s:%s' % (self.account, self.username)
else:
auth_user = self.username
else:
auth_user = self.username
auth_path = self.auth_prefix
auth_scheme = 'https://' if self.auth_ssl else 'http://'
auth_netloc = "%s:%d" % (self.auth_host, self.auth_port)
auth_url = auth_scheme + auth_netloc + auth_path
(storage_url, storage_token) = get_auth(
auth_url, auth_user, self.password, snet=False,
tenant_name=self.account, auth_version=self.auth_version,
os_options={})
if not (storage_url and storage_token):
raise AuthenticationFailed()
x = storage_url.split('/')
if x[0] == 'http:':
self.conn_class = httplib.HTTPConnection
self.storage_port = 80
elif x[0] == 'https:':
self.conn_class = httplib.HTTPSConnection
self.storage_port = 443
else:
raise ValueError('unexpected protocol %s' % (x[0]))
self.storage_host = x[2].split(':')[0]
if ':' in x[2]:
self.storage_port = int(x[2].split(':')[1])
# Make sure storage_url is a string and not unicode, since
# keystoneclient (called by swiftclient) returns them in
# unicode and this would cause troubles when doing
# no_safe_quote query.
self.storage_url = str('/%s/%s' % (x[3], x[4]))
self.account_name = str(x[4])
self.auth_user = auth_user
# With v2 keystone, storage_token is unicode.
# We want it to be string otherwise this would cause
# troubles when doing query with already encoded
# non ascii characters in its headers.
self.storage_token = str(storage_token)
self.user_acl = '%s:%s' % (self.account, self.username)
self.http_connect()
return self.storage_url, self.storage_token
开发者ID:heemanshu,项目名称:swift_juno,代码行数:60,代码来源:swift_test_client.py
示例2: main
def main():
options, commands = parser.parse_args()
commands.remove('split-brain')
if not commands:
parser.print_help()
return 'ERROR: must specify at least one command'
for cmd_args in commands:
cmd = cmd_args.split(':', 1)[0]
if cmd not in BrainSplitter.__commands__:
parser.print_help()
return 'ERROR: unknown command %s' % cmd
url, token = get_auth('http://127.0.0.1:8080/auth/v1.0',
'test:tester', 'testing')
brain = BrainSplitter(url, token, options.container, options.object)
for cmd_args in commands:
parts = cmd_args.split(':', 1)
command = parts[0]
if len(parts) > 1:
args = utils.list_from_csv(parts[1])
else:
args = ()
try:
brain.run(command, *args)
except ClientException as e:
print '**WARNING**: %s raised %s' % (command, e)
print 'STATUS'.join(['*' * 25] * 2)
brain.servers.status()
sys.exit()
开发者ID:SamuelXu,项目名称:swift,代码行数:28,代码来源:test_container_merge_policy_index.py
示例3: reset_environment
def reset_environment():
call(['resetswift'])
pids = {}
try:
port2server = {}
for s, p in (('account', 6002), ('container', 6001), ('object', 6000)):
for n in xrange(1, 5):
pids['%s%d' % (s, n)] = \
Popen(['swift-%s-server' % s,
'/etc/swift/%s-server/%d.conf' % (s, n)]).pid
port2server[p + (n * 10)] = '%s%d' % (s, n)
pids['proxy'] = Popen(['swift-proxy-server',
'/etc/swift/proxy-server.conf']).pid
account_ring = Ring('/etc/swift/account.ring.gz')
container_ring = Ring('/etc/swift/container.ring.gz')
object_ring = Ring('/etc/swift/object.ring.gz')
attempt = 0
while True:
attempt += 1
try:
url, token = get_auth('http://127.0.0.1:8080/auth/v1.0',
'test:tester', 'testing')
account = url.split('/')[-1]
break
except Exception, err:
if attempt > 9:
print err
print 'Giving up after %s retries.' % attempt
raise err
print err
print 'Retrying in 2 seconds...'
sleep(2)
except BaseException, err:
kill_pids(pids)
raise err
开发者ID:diamrem,项目名称:swift,代码行数:35,代码来源:common.py
示例4: setUp
def setUp(self):
resetswift()
try:
self.ipport2server = {}
self.configs = defaultdict(dict)
self.account_ring = get_ring(
'account',
self.acct_cont_required_replicas,
self.acct_cont_required_devices,
ipport2server=self.ipport2server,
config_paths=self.configs)
self.container_ring = get_ring(
'container',
self.acct_cont_required_replicas,
self.acct_cont_required_devices,
ipport2server=self.ipport2server,
config_paths=self.configs)
self.policy = get_policy(**self.policy_requirements)
self.object_ring = get_ring(
self.policy.ring_name,
self.obj_required_replicas,
self.obj_required_devices,
server='object',
ipport2server=self.ipport2server,
config_paths=self.configs)
self.servers_per_port = any(
int(readconf(c, section_name='object-replicator').get(
'servers_per_port', '0'))
for c in self.configs['object-replicator'].values())
Manager(['main']).start(wait=False)
for ipport in self.ipport2server:
check_server(ipport, self.ipport2server)
proxy_ipport = ('127.0.0.1', 8080)
self.ipport2server[proxy_ipport] = 'proxy'
self.url, self.token, self.account = check_server(
proxy_ipport, self.ipport2server)
self.account_1 = {
'url': self.url, 'token': self.token, 'account': self.account}
url2, token2 = get_auth(
'http://%s:%d/auth/v1.0' % proxy_ipport,
'test2:tester2', 'testing2')
self.account_2 = {
'url': url2, 'token': token2, 'account': url2.split('/')[-1]}
head_account(url2, token2) # sanity check
self.replicators = Manager(
['account-replicator', 'container-replicator',
'object-replicator'])
self.updaters = Manager(['container-updater', 'object-updater'])
except BaseException:
try:
raise
finally:
try:
Manager(['all']).kill()
except Exception:
pass
开发者ID:prashanthpai,项目名称:swift,代码行数:60,代码来源:common.py
示例5: page_login
def page_login(self, req):
""" create login page """
if req.method == 'POST':
try:
username = req.params_alt().get('username')
password = req.params_alt().get('password')
(storage_url, token) = get_auth(self.auth_url,
username, password,
auth_version=self.auth_version)
if self.token_bank.get(token, None):
self.token_bank[token].update({'url': storage_url,
'last': int(time())})
else:
self.token_bank[token] = {'url': storage_url,
'last': int(time())}
resp = HTTPFound(location=self.add_prefix(storage_url) + \
'?limit=%s' % self.items_per_page)
resp.set_cookie('_token', token, path=self.page_path,
max_age=self.cookie_max_age,
secure=self.secure)
self.memcache_update(token)
return resp
except Exception, err:
lang = self.get_lang(req)
resp = Response(charset='utf8')
resp.app_iter = self.tmpl({'ptype': 'login',
'top': self.page_path,
'title': self.title, 'lang': lang,
'message': 'Login Failed'})
return resp
开发者ID:famao,项目名称:Taylor,代码行数:30,代码来源:taylor.py
示例6: authenticate
def authenticate(self, clone_conn=None):
if clone_conn:
self.conn_class = clone_conn.conn_class
self.storage_host = clone_conn.storage_host
self.storage_url = clone_conn.storage_url
self.storage_port = clone_conn.storage_port
self.storage_token = clone_conn.storage_token
return
if self.auth_version == "1":
auth_path = "%sv1.0" % (self.auth_prefix)
if self.account:
auth_user = "%s:%s" % (self.account, self.username)
else:
auth_user = self.username
else:
auth_user = self.username
auth_path = self.auth_prefix
auth_scheme = "https://" if self.auth_ssl else "http://"
auth_netloc = "%s:%d" % (self.auth_host, self.auth_port)
auth_url = auth_scheme + auth_netloc + auth_path
(storage_url, storage_token) = get_auth(
auth_url,
auth_user,
self.password,
snet=False,
tenant_name=self.account,
auth_version=self.auth_version,
os_options={},
)
if not (storage_url and storage_token):
raise AuthenticationFailed()
x = storage_url.split("/")
if x[0] == "http:":
self.conn_class = httplib.HTTPConnection
self.storage_port = 80
elif x[0] == "https:":
self.conn_class = httplib.HTTPSConnection
self.storage_port = 443
else:
raise ValueError("unexpected protocol %s" % (x[0]))
self.storage_host = x[2].split(":")[0]
if ":" in x[2]:
self.storage_port = int(x[2].split(":")[1])
# Make sure storage_url is a string and not unicode, since
# keystoneclient (called by swiftclient) returns them in
# unicode and this would cause troubles when doing
# no_safe_quote query.
self.storage_url = str("/%s/%s" % (x[3], x[4]))
self.storage_token = storage_token
self.http_connect()
return self.storage_url, self.storage_token
开发者ID:674009287,项目名称:swift,代码行数:59,代码来源:swift_test_client.py
示例7: __init__
def __init__(self):
# get URL and token to use for requests
self.auth_info = SWIFT.get_auth('http://192.168.52.2:8080/auth/v1.0/','test:tester','testing')
self.swift_url = self.auth_info[0]
self.auth_token = self.auth_info[1]
self.user = getpass.getuser()
self.container_name = 'testdir'
开发者ID:slupers,项目名称:swift_browser,代码行数:8,代码来源:swift.py
示例8: get_url_token
def get_url_token(user_index, os_options):
authargs = dict(
snet=False,
tenant_name=swift_test_tenant[user_index],
auth_version=swift_test_auth_version,
os_options=os_options,
insecure=insecure,
)
return get_auth(swift_test_auth, swift_test_user[user_index], swift_test_key[user_index], **authargs)
开发者ID:rtblife97,项目名称:swift,代码行数:9,代码来源:__init__.py
示例9: __init__
def __init__(self, logger, conf, names):
self.logger = logger
self.aborted = False
self.user = conf.user
self.key = conf.key
self.auth_url = conf.auth
self.use_proxy = config_true_value(conf.use_proxy)
if not self.use_proxy and direct_client is None:
self.logger.critical("You need to have swift installed if you are "
"not using the proxy")
sys.exit(1)
self.auth_version = conf.auth_version
self.logger.info("Auth version: %s" % self.auth_version)
if self.use_proxy:
if using_http_proxy(self.auth_url):
logger.warn("Auth is going through HTTP proxy server. This "
"could affect test result")
url, token = client.get_auth(self.auth_url, self.user, self.key,
auth_version=self.auth_version)
self.token = token
self.account = url.split('/')[-1]
if conf.url == '':
self.url = url
else:
self.url = conf.url
else:
self.token = 'SlapChop!'
self.account = conf.account
self.url = conf.url
self.ip, self.port = self.url.split('/')[2].split(':')
if using_http_proxy(self.url):
logger.warn("Communication with Swift server is going through "
"HTTP proxy server. This could affect test result")
self.object_size = int(conf.object_size)
self.object_sources = conf.object_sources
self.lower_object_size = int(conf.lower_object_size)
self.upper_object_size = int(conf.upper_object_size)
self.files = []
if self.object_sources:
self.object_sources = self.object_sources.split()
self.files = [file(f, 'rb').read() for f in self.object_sources]
self.put_concurrency = int(conf.put_concurrency)
self.get_concurrency = int(conf.get_concurrency)
self.del_concurrency = int(conf.del_concurrency)
self.total_objects = int(conf.num_objects)
self.total_gets = int(conf.num_gets)
self.timeout = int(conf.timeout)
self.devices = conf.devices.split()
self.names = names
self.conn_pool = ConnectionPool(self.url,
max(self.put_concurrency,
self.get_concurrency,
self.del_concurrency))
开发者ID:NozomiNetworks,项目名称:swift-bench,代码行数:56,代码来源:bench.py
示例10: retry
def retry(func, *args, **kwargs):
"""
You can use the kwargs to override:
'retries' (default: 5)
'use_account' (default: 1) - which user's token to pass
'url_account' (default: matches 'use_account') - which user's storage URL
'resource' (default: url[url_account] - URL to connect to; retry()
will interpolate the variable :storage_url: if present
"""
global url, token, parsed, conn
retries = kwargs.get('retries', 5)
attempts, backoff = 0, 1
# use account #1 by default; turn user's 1-indexed account into 0-indexed
use_account = kwargs.pop('use_account', 1) - 1
# access our own account by default
url_account = kwargs.pop('url_account', use_account + 1) - 1
os_options = {'user_domain_name': swift_test_domain[use_account],
'project_domain_name': swift_test_domain[use_account]}
while attempts <= retries:
attempts += 1
try:
if not url[use_account] or not token[use_account]:
url[use_account], token[use_account] = \
get_auth(swift_test_auth, swift_test_user[use_account],
swift_test_key[use_account],
snet=False,
tenant_name=swift_test_tenant[use_account],
auth_version=swift_test_auth_version,
os_options=os_options)
parsed[use_account] = conn[use_account] = None
if not parsed[use_account] or not conn[use_account]:
parsed[use_account], conn[use_account] = \
connection(url[use_account])
# default resource is the account url[url_account]
resource = kwargs.pop('resource', '%(storage_url)s')
template_vars = {'storage_url': url[url_account]}
parsed_result = urlparse(resource % template_vars)
return func(url[url_account], token[use_account],
parsed_result, conn[url_account],
*args, **kwargs)
except (socket.error, HTTPException):
if attempts > retries:
raise
parsed[use_account] = conn[use_account] = None
except AuthError:
url[use_account] = token[use_account] = None
continue
except InternalServerError:
pass
if attempts <= retries:
sleep(backoff)
backoff *= 2
raise Exception('No result after %s retries.' % retries)
开发者ID:heemanshu,项目名称:swift_juno,代码行数:56,代码来源:__init__.py
示例11: get_storage_token
def get_storage_token(self):
""" Get a storage token for the swift cluster """
account = self.get_account_str()
try:
url, token = swiftclient.get_auth(self.authurl, account, self.key)
except swiftclient.client.ClientException as e:
raise SageFSException('HTTP Error: %s - %s'
% (e.http_status, e.http_reason))
self.storeurl = url
self.storetoken = token
开发者ID:stredger,项目名称:sagefs,代码行数:10,代码来源:sagefs.py
示例12: get_token
def get_token(self):
if time() - self._token_creation_time >= self.auth_token_duration:
new_token = swiftclient.get_auth(
self.api_auth_url,
self.api_username,
self.api_key,
auth_version=self.auth_version,
os_options={"tenant_name": self.tenant_name})[1]
self.token = new_token
return self._token
开发者ID:tossmilestone,项目名称:django-storage-swift,代码行数:10,代码来源:storage.py
示例13: __init__
def __init__(self):
self.last_headers_name = None
self.last_headers_value = None
# Get authentication token
self.storage_url, self.token = swiftclient.get_auth(
self.api_auth_url,
self.api_username,
self.api_key,
auth_version=self.auth_version,
os_options=dict({"tenant_name": self.tenant_name}.items() +
self.os_extra_options.items()),
cacert=self.custom_ca,
insecure=self.ssl_insecure
)
self.http_conn = swiftclient.http_connection(self.storage_url)
# Check container
try:
swiftclient.head_container(self.storage_url, self.token,
self.container_name,
http_conn=self.http_conn)
except swiftclient.ClientException:
headers = {}
if self.auto_create_container:
if self.auto_create_container_public:
headers['X-Container-Read'] = '.r:*'
swiftclient.put_container(self.storage_url, self.token,
self.container_name,
http_conn=self.http_conn,
headers=headers)
else:
raise ImproperlyConfigured(
"Container %s does not exist." % self.container_name)
if self.auto_base_url:
# Derive a base URL based on the authentication information from
# the server, optionally overriding the protocol, host/port and
# potentially adding a path fragment before the auth information.
self.base_url = self.storage_url + '/'
if self.override_base_url is not None:
# override the protocol and host, append any path fragments
split_derived = urlparse.urlsplit(self.base_url)
split_override = urlparse.urlsplit(self.override_base_url)
split_result = [''] * 5
split_result[0:2] = split_override[0:2]
split_result[2] = (split_override[2] +
split_derived[2]).replace('//', '/')
self.base_url = urlparse.urlunsplit(split_result)
self.base_url = urlparse.urljoin(self.base_url,
self.container_name)
self.base_url += '/'
else:
self.base_url = self.override_base_url
开发者ID:morpheu,项目名称:django-storage-swift,代码行数:55,代码来源:storage.py
示例14: __init__
def __init__(self, **settings):
# check if some of the settings provided as class attributes
# should be overwritten
for name, value in settings.items():
if hasattr(self, name):
setattr(self, name, value)
self.last_headers_name = None
self.last_headers_value = None
# Get authentication token
self.storage_url, self.token = swiftclient.get_auth(
self.api_auth_url,
self.api_username,
self.api_key,
auth_version=self.auth_version,
os_options={"tenant_name": self.tenant_name},
)
self.http_conn = swiftclient.http_connection(self.storage_url)
# Check container
try:
swiftclient.head_container(self.storage_url, self.token,
self.container_name,
http_conn=self.http_conn)
except swiftclient.ClientException:
if self.auto_create_container:
swiftclient.put_container(self.storage_url, self.token,
self.container_name,
http_conn=self.http_conn)
else:
raise ImproperlyConfigured(
"Container %s does not exist." % self.container_name)
if self.auto_base_url:
# Derive a base URL based on the authentication information from
# the server, optionally overriding the protocol, host/port and
# potentially adding a path fragment before the auth information.
self.base_url = self.storage_url + '/'
if self.override_base_url is not None:
# override the protocol and host, append any path fragments
split_derived = urlparse.urlsplit(self.base_url)
split_override = urlparse.urlsplit(self.override_base_url)
split_result = [''] * 5
split_result[0:2] = split_override[0:2]
split_result[2] = (split_override[2] +
split_derived[2]).replace('//', '/')
self.base_url = urlparse.urlunsplit(split_result)
self.base_url = urlparse.urljoin(self.base_url,
self.container_name)
self.base_url += '/'
else:
self.base_url = self.override_base_url
开发者ID:HiddenData,项目名称:django-storage-swift,代码行数:54,代码来源:storage.py
示例15: get_auth_token
def get_auth_token():
"""
Get Authenticate token and Storage URL
Returns:
(token, storage_url)
"""
auth_url = "http://%s:8080/auth/v1.0" % PROXY_IP
url, token = client.get_auth(auth_url,
':'.join((ACCOUNT, USER)),
KEY)
return (token, url)
开发者ID:tangyi1989,项目名称:swift-op,代码行数:12,代码来源:utils.py
示例16: authenticate
def authenticate(self, clone_conn=None):
if clone_conn:
self.conn_class = clone_conn.conn_class
self.storage_host = clone_conn.storage_host
self.storage_url = clone_conn.storage_url
self.storage_port = clone_conn.storage_port
self.storage_token = clone_conn.storage_token
return
if self.auth_version == "1":
auth_path = '%sv1.0' % (self.auth_prefix)
if self.account:
auth_user = '%s:%s' % (self.account, self.username)
else:
auth_user = self.username
else:
auth_user = self.username
auth_path = self.auth_prefix
auth_scheme = 'https://' if self.auth_ssl else 'http://'
auth_netloc = "%s:%d" % (self.auth_host, self.auth_port)
auth_url = auth_scheme + auth_netloc + auth_path
(storage_url, storage_token) = get_auth(auth_url,
auth_user, self.password,
snet=False,
tenant_name=self.account,
auth_version=self.auth_version,
os_options={})
if not (storage_url and storage_token):
raise AuthenticationFailed()
x = storage_url.split('/')
if x[0] == 'http:':
self.conn_class = httplib.HTTPConnection
self.storage_port = 80
elif x[0] == 'https:':
self.conn_class = httplib.HTTPSConnection
self.storage_port = 443
else:
raise ValueError('unexpected protocol %s' % (x[0]))
self.storage_host = x[2].split(':')[0]
if ':' in x[2]:
self.storage_port = int(x[2].split(':')[1])
self.storage_url = '/%s/%s' % (x[3], x[4])
self.storage_token = storage_token
self.http_connect()
return self.storage_url, self.storage_token
开发者ID:Guoweiwei1130,项目名称:openstack,代码行数:52,代码来源:swift_test_client.py
示例17: _get_auth
def _get_auth(self, user_name):
info = self.users.get(user_name)
if info is None:
return None, None
os_options = {"user_domain_name": info["domain"], "project_domain_name": info["domain"]}
authargs = dict(
snet=False,
tenant_name=info["account"],
auth_version=self.auth_version,
os_options=os_options,
insecure=self.insecure,
)
storage_url, token = get_auth(self.auth_url, user_name, info["password"], **authargs)
return storage_url, token
开发者ID:buptUnixGuys,项目名称:swift,代码行数:16,代码来源:test_access_control.py
示例18: __init__
def __init__(self, logger, conf, names):
self.logger = logger
self.aborted = False
self.user = conf.user
self.key = conf.key
self.auth_url = conf.auth
self.use_proxy = conf.use_proxy.lower() in TRUE_VALUES
self.auth_version = conf.auth_version
self.logger.info("Auth version: %s" % self.auth_version)
if self.use_proxy:
url, token = client.get_auth(self.auth_url, self.user, self.key,
auth_version=self.auth_version)
self.token = token
self.account = url.split('/')[-1]
if conf.url == '':
self.url = url
else:
self.url = conf.url
else:
self.token = 'SlapChop!'
self.account = conf.account
self.url = conf.url
self.ip, self.port = self.url.split('/')[2].split(':')
self.containers = ['%s_%d' % (conf.container_name, i)
for i in xrange(int(conf.num_containers))]
self.object_size = int(conf.object_size)
self.object_sources = conf.object_sources
self.lower_object_size = int(conf.lower_object_size)
self.upper_object_size = int(conf.upper_object_size)
self.files = []
if self.object_sources:
self.object_sources = self.object_sources.split()
self.files = [file(f, 'rb').read() for f in self.object_sources]
self.put_concurrency = int(conf.put_concurrency)
self.get_concurrency = int(conf.get_concurrency)
self.del_concurrency = int(conf.del_concurrency)
self.total_objects = int(conf.num_objects)
self.total_gets = int(conf.num_gets)
self.timeout = int(conf.timeout)
self.devices = conf.devices.split()
self.names = names
self.conn_pool = ConnectionPool(self.url,
max(self.put_concurrency, self.get_concurrency,
self.del_concurrency))
开发者ID:DmitryMezhensky,项目名称:Hadoop-and-Swift-integration,代码行数:46,代码来源:bench.py
示例19: retry
def retry(func, *args, **kwargs):
"""
You can use the kwargs to override the 'retries' (default: 5) and
'use_account' (default: 1).
"""
global url, token, parsed, conn
retries = kwargs.get('retries', 5)
use_account = 1
if 'use_account' in kwargs:
use_account = kwargs['use_account']
del kwargs['use_account']
use_account -= 1
attempts = 0
backoff = 1
while attempts <= retries:
attempts += 1
try:
if not url[use_account] or not token[use_account]:
url[use_account], token[use_account] = \
get_auth(swift_test_auth, swift_test_user[use_account],
swift_test_key[use_account],
snet=False,
tenant_name=swift_test_tenant[use_account],
auth_version=swift_test_auth_version,
os_options={})
parsed[use_account] = conn[use_account] = None
if not parsed[use_account] or not conn[use_account]:
parsed[use_account], conn[use_account] = \
http_connection(url[use_account])
return func(url[use_account], token[use_account],
parsed[use_account], conn[use_account],
*args, **kwargs)
except (socket.error, HTTPException):
if attempts > retries:
raise
parsed[use_account] = conn[use_account] = None
except AuthError:
url[use_account] = token[use_account] = None
continue
except InternalServerError:
pass
if attempts <= retries:
sleep(backoff)
backoff *= 2
raise Exception('No result after %s retries.' % retries)
开发者ID:674009287,项目名称:swift,代码行数:45,代码来源:swift_testing.py
示例20: check_server
def check_server(port, port2server, pids, timeout=CHECK_SERVER_TIMEOUT):
server = port2server[port]
if server[:-1] in ('account', 'container', 'object'):
if int(server[-1]) > 4:
return None
path = '/connect/1/2'
if server[:-1] == 'container':
path += '/3'
elif server[:-1] == 'object':
path += '/3/4'
try_until = time() + timeout
while True:
try:
conn = HTTPConnection('127.0.0.1', port)
conn.request('GET', path)
resp = conn.getresponse()
# 404 because it's a nonsense path (and mount_check is false)
# 507 in case the test target is a VM using mount_check
if resp.status not in (404, 507):
raise Exception(
'Unexpected status %s' % resp.status)
break
except Exception as err:
if time() > try_until:
print err
print 'Giving up on %s:%s after %s seconds.' % (
server, port, timeout)
raise err
sleep(0.1)
else:
try_until = time() + timeout
while True:
try:
url, token = get_auth('http://127.0.0.1:8080/auth/v1.0',
'test:tester', 'testing')
account = url.split('/')[-1]
head_account(url, token)
return url, token, account
except Exception as err:
if time() > try_until:
print err
print 'Giving up on proxy:8080 after 30 seconds.'
raise err
sleep(0.1)
return None
开发者ID:anishnarang,项目名称:gswift,代码行数:45,代码来源:common.py
注:本文中的swiftclient.get_auth函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论