本文整理汇总了Python中swiftclient.client.head_account函数的典型用法代码示例。如果您正苦于以下问题:Python head_account函数的具体用法?Python head_account怎么用?Python head_account使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了head_account函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: wrapper
def wrapper(*args, **kw):
storage_url = args[0].session.get('storage_url', '')
auth_token = args[0].session.get('auth_token', '')
username = args[0].session.get('username', '')
password = args[0].session.get('password', '')
try:
client.head_account(storage_url, auth_token)
return fn(*args, **kw)
except:
#Attempt to get a new auth token
try:
auth_version = settings.SWIFT_AUTH_VERSION or 1
(storage_url, auth_token) = client.get_auth(
settings.SWIFT_AUTH_URL, username, password,
auth_version=auth_version)
args[0].session['auth_token'] = auth_token
args[0].session['storage_url'] = storage_url
return fn(*args, **kw)
except:
messages.error(args[0], _("Session expired."))
return {'errors': 'true'}
开发者ID:OLRC,项目名称:django-swiftbrowser,代码行数:25,代码来源:utils.py
示例2: test_server_error
def test_server_error(self):
body = "c" * 65
c.http_connection = self.fake_http_connection(500, body=body)
self.assertRaises(c.ClientException, c.head_account, "http://www.tests.com", "asdf")
try:
c.head_account("http://www.tests.com", "asdf")
except c.ClientException as e:
new_body = "[first 60 chars of response] " + body[0:60]
self.assertEqual(e.__str__()[-89:], new_body)
开发者ID:AlphaStaxLLC,项目名称:python-swiftclient,代码行数:9,代码来源:test_swiftclient.py
示例3: wrapper
def wrapper(*args, **kw):
storage_url = args[0].session.get('storage_url', '')
auth_token = args[0].session.get('auth_token', '')
username = args[0].session.get('username', '')
password = args[0].session.get('password', '')
# If the following variables are available, attempt to get an
# auth token
if (storage_url and auth_token and username and password):
# If the user has no role, head the container to valid the token
if args[0].session.get('norole'):
storage_url = args[0].session.get('default_storage_url', '')
auth_token = args[0].session.get('default_auth_token', '')
#Attempt to get a new auth token
try:
client.head_container(
storage_url, auth_token, args[0].session.get('user'))
return fn(*args, **kw)
except:
# Failure to get an auth token, tell the user the session
# has expiredself.
messages.error(args[0], _("Session expired."))
# A regular user's token is validated by heading the account.
else:
try:
client.head_account(storage_url, auth_token)
return fn(*args, **kw)
except:
#Attempt to get a new auth token
try:
auth_version = settings.SWIFT_AUTH_VERSION or 1
(storage_url, auth_token) = client.get_auth(
settings.SWIFT_AUTH_URL, username, password,
auth_version=auth_version)
args[0].session['auth_token'] = auth_token
args[0].session['storage_url'] = storage_url
return fn(*args, **kw)
except:
# Failure to get an auth token, tell the user the
# session has expired.
messages.error(args[0], _("Session expired."))
return redirect(swiftbrowser.views.login)
开发者ID:bkawula,项目名称:django-swiftbrowser,代码行数:48,代码来源:__init__.py
示例4: setUp
def setUp(self):
super(TestAccountReaper, self).setUp()
self.all_objects = []
# upload some containers
body = 'test-body'
for policy in ENABLED_POLICIES:
container = 'container-%s-%s' % (policy.name, uuid.uuid4())
client.put_container(self.url, self.token, container,
headers={'X-Storage-Policy': policy.name})
obj = 'object-%s' % uuid.uuid4()
client.put_object(self.url, self.token, container, obj, body)
self.all_objects.append((policy, container, obj))
policy.load_ring('/etc/swift')
Manager(['container-updater']).once()
headers = client.head_account(self.url, self.token)
self.assertEqual(int(headers['x-account-container-count']),
len(ENABLED_POLICIES))
self.assertEqual(int(headers['x-account-object-count']),
len(ENABLED_POLICIES))
self.assertEqual(int(headers['x-account-bytes-used']),
len(ENABLED_POLICIES) * len(body))
part, nodes = self.account_ring.get_nodes(self.account)
for node in nodes:
direct_delete_account(node, part, self.account)
开发者ID:larsbutler,项目名称:swift,代码行数:29,代码来源:test_account_reaper.py
示例5: get_temp_key
def get_temp_key(storage_url, auth_token):
""" Tries to get meta-temp-url key from account.
If not set, generate tempurl and save it to acocunt.
This requires at least account owner rights. """
logging.debug(' in get_temp_key: ' )
try:
account = client.head_account(storage_url, auth_token)
except client.ClientException:
return None
logging.debug(' account in get_temp_key: %s ' % account)
key = account.get('x-account-meta-temp-url-key')
logging.debug(' key in get_temp_key: %s ' % key)
if not key:
chars = string.ascii_lowercase + string.digits
key = ''.join(random.choice(chars) for x in range(32))
headers = {'x-account-meta-temp-url-key': key}
try:
client.post_account(storage_url, auth_token, headers)
except client.ClientException:
return None
return key
开发者ID:ddxgz,项目名称:swift-storage-restapi,代码行数:25,代码来源:swiftutils.py
示例6: test_ok
def test_ok(self):
c.http_connection = self.fake_http_connection(200)
value = c.head_account('http://www.tests.com', 'asdf')
# TODO: Hmm. This doesn't really test too much as it uses a fake that
# always returns the same dict. I guess it "exercises" the code, so
# I'll leave it for now.
self.assertEqual(type(value), dict)
开发者ID:mgeisler,项目名称:python-swiftclient,代码行数:7,代码来源:test_swiftclient.py
示例7: iter_accounts
def iter_accounts(ksclient):
endpoint = ksclient.service_catalog.url_for(
service_type='object-store',
endpoint_type='adminURL')
base_url = '%s/v1/%s' % (endpoint, cfg.CONF.reseller_prefix)
for t in ksclient.tenants.list():
yield (t.id, swift.head_account('%s%s' % (base_url, t.id),
ksclient.auth_token))
开发者ID:100PercentIT,项目名称:ceilometer,代码行数:8,代码来源:swift.py
示例8: get_swift_used
def get_swift_used(sc, tenant_id):
tenant_url, token = get_swift_tenant_connection(sc, tenant_id)
try:
swift_account = swiftclient.head_account(url=tenant_url, token=token)
except sc_exception:
print 'Project %s has no swift quota' % tenant_id
return
return swift_account.get(SWIFT_USED_KEY, -1)
开发者ID:VicNode,项目名称:scripts,代码行数:8,代码来源:check_swift_usage.py
示例9: get_default_temp_time
def get_default_temp_time(storage_url, auth_token):
"""Return in seconds the header Default-Temp-Time for the given tenant.
If an exception is caught, return 0."""
try:
cont = client.head_account(storage_url, auth_token)
return cont.get('x-account-meta-default-temp-time', '')
except:
return 0
开发者ID:OLRC,项目名称:django-swiftbrowser,代码行数:9,代码来源:utils.py
示例10: iter_accounts
def iter_accounts(ksclient):
try:
endpoint = ksclient.service_catalog.url_for(service_type="object-store", endpoint_type="adminURL")
except exceptions.EndpointNotFound:
LOG.debug(_("Swift endpoint not found"))
return
for t in ksclient.tenants.list():
yield (t.id, swift.head_account(SwiftPollster._neaten_url(endpoint, t.id), ksclient.auth_token))
开发者ID:mygoda,项目名称:openstack,代码行数:9,代码来源:swift.py
示例11: login
def login(request):
""" Tries to login user and sets session data """
request.session.flush()
#Process the form if there is a POST request.
if (request.POST):
form = LoginForm(request.POST)
if form.is_valid():
username = form.cleaned_data['username']
password = form.cleaned_data['password']
try:
auth_version = settings.SWIFT_AUTH_VERSION or 1
(storage_url, auth_token) = client.get_auth(
settings.SWIFT_AUTH_URL, username, password,
auth_version=auth_version)
request.session['auth_token'] = auth_token
request.session['storage_url'] = storage_url
request.session['username'] = username
request.session['password'] = password
tenant_name, user = split_tenant_user_names(username)
request.session['user'] = user
request.session['tenant_name'] = tenant_name
# Upon successful retrieval of a token, if we're unable to
# head the account, then the user is not an admin or
# swiftoperator and has access to only a container.
try:
client.head_account(storage_url, auth_token)
except:
request.session['norole'] = True
return redirect(containerview)
# Specify why the login failed.
except client.ClientException, e:
messages.error(request, _("Login failed: {0}".format(
e)))
# Generic login failure message.
except Exception, e:
print(e)
messages.error(request, _("Login failed."))
开发者ID:OLRC,项目名称:django-swiftbrowser,代码行数:43,代码来源:main.py
示例12: test_sync
def test_sync(self):
all_objects = []
# upload some containers
for policy in ENABLED_POLICIES:
container = 'container-%s-%s' % (policy.name, uuid.uuid4())
client.put_container(self.url, self.token, container,
headers={'X-Storage-Policy': policy.name})
obj = 'object-%s' % uuid.uuid4()
body = 'test-body'
client.put_object(self.url, self.token, container, obj, body)
all_objects.append((policy, container, obj))
Manager(['container-updater']).once()
headers = client.head_account(self.url, self.token)
self.assertEqual(int(headers['x-account-container-count']),
len(ENABLED_POLICIES))
self.assertEqual(int(headers['x-account-object-count']),
len(ENABLED_POLICIES))
self.assertEqual(int(headers['x-account-bytes-used']),
len(ENABLED_POLICIES) * len(body))
part, nodes = self.account_ring.get_nodes(self.account)
for node in nodes:
direct_delete_account(node, part, self.account)
Manager(['account-reaper']).once()
get_to_final_state()
for policy, container, obj in all_objects:
cpart, cnodes = self.container_ring.get_nodes(
self.account, container)
for cnode in cnodes:
try:
direct_head_container(cnode, cpart, self.account,
container)
except ClientException as err:
self.assertEquals(err.http_status, 404)
else:
self.fail('Found un-reaped /%s/%s on %r' %
(self.account, container, node))
object_ring = POLICIES.get_object_ring(policy.idx, '/etc/swift/')
part, nodes = object_ring.get_nodes(self.account, container, obj)
for node in nodes:
try:
direct_get_object(node, part, self.account,
container, obj)
except ClientException as err:
self.assertEquals(err.http_status, 404)
else:
self.fail('Found un-reaped /%s/%s/%s on %r in %s!' %
(self.account, container, obj, node, policy))
开发者ID:gayana06,项目名称:Thesis,代码行数:54,代码来源:test_account_reaper.py
示例13: iter_accounts
def iter_accounts():
ks = ksclient.Client(username=cfg.CONF.os_username,
password=cfg.CONF.os_password,
tenant_name=cfg.CONF.os_tenant_name,
auth_url=cfg.CONF.os_auth_url)
endpoint = ks.service_catalog.url_for(service_type='object-store',
endpoint_type='adminURL')
base_url = '%s/v1/%s' % (endpoint, cfg.CONF.reseller_prefix)
for t in ks.tenants.list():
yield (t.id, swift.head_account('%s%s' % (base_url, t.id),
ks.auth_token))
开发者ID:att-cloud,项目名称:ceilometer,代码行数:11,代码来源:swift.py
示例14: get_tempurl_key
def get_tempurl_key():
(storage_url, auth_token) = client.get_auth(settings.SWIFT_AUTH_URL, settings.SWIFT_USER, settings.SWIFT_PASSWORD)
meta = client.head_account(storage_url, auth_token)
key = meta.get("x-account-meta-temp-url-key")
if not key:
key = random_key()
headers = {"x-account-meta-temp-url-key": key}
client.post_account(storage_url, auth_token, headers)
return storage_url, key
开发者ID:redhat-cip,项目名称:openstack-swift-webapps-examples,代码行数:11,代码来源:views.py
示例15: _get_account_info
def _get_account_info(self, ksclient, cache):
try:
endpoint = ksclient.service_catalog.url_for(
service_type='object-store',
endpoint_type='adminURL')
except exceptions.EndpointNotFound:
LOG.debug(_("Swift endpoint not found"))
raise StopIteration()
for t in cache['tenants']:
yield (t.id, swift.head_account(self._neaten_url(endpoint, t.id),
ksclient.auth_token))
开发者ID:tomassedovic,项目名称:ceilometer,代码行数:12,代码来源:swift.py
示例16: _get_account_info
def _get_account_info(self, ksclient, cache):
try:
endpoint = ksclient.service_catalog.url_for(
service_type='object-store',
endpoint_type=cfg.CONF.service_credentials.os_endpoint_type)
except exceptions.EndpointNotFound:
LOG.debug(_("Swift endpoint not found"))
raise StopIteration()
for t in cache[self.CACHE_KEY_TENANT]:
yield (t.id, swift.head_account(self._neaten_url(endpoint, t.id),
ksclient.auth_token))
开发者ID:TheUtils,项目名称:ceilometer,代码行数:12,代码来源:swift.py
示例17: iter_accounts
def iter_accounts(ksclient):
try:
endpoint = ksclient.service_catalog.url_for(
service_type='object-store',
endpoint_type='adminURL')
except exceptions.EndpointNotFound:
LOG.debug(_("Swift endpoint not found"))
return
base_url = '%s/v1/%s' % (endpoint, cfg.CONF.reseller_prefix)
for t in ksclient.tenants.list():
yield (t.id, swift.head_account('%s%s' % (base_url, t.id),
ksclient.auth_token))
开发者ID:CiscoAS,项目名称:ceilometer,代码行数:13,代码来源:swift.py
示例18: get_swift_usage
def get_swift_usage(tenant_url, token):
usage = {}
usage['gb_allocated'] = usage['gb_used'] = 0
try:
swift_account = swiftclient.head_account(url=tenant_url, token=token)
if SWIFT_QUOTA_KEY in swift_account:
usage['gb_allocated'] = b_to_gb(swift_account[SWIFT_QUOTA_KEY])
usage['gb_used'] = b_to_gb(swift_account['x-account-bytes-used'])
except ClientException:
pass
return usage
开发者ID:VicNode,项目名称:scripts,代码行数:14,代码来源:usage-report.py
示例19: get_fine_grained_temp_key
def get_fine_grained_temp_key(storage_url, auth_token, container_name=None):
"""
Tries to get meta-temp-url key from account or container.
If not set, generate tempurl and save it.
"""
logging.debug(' in get_fine_grained_temp_key: container_name:%s, \
storage_url:%s ' %
(container_name, storage_url) )
try:
if container_name:
container = client.head_container(storage_url, auth_token,
container_name)
key = container.get('x-container-meta-temp-url-key')
logging.debug(' key in get_fine_grained_temp_key container: %s ' % key)
else:
account = client.head_account(storage_url, auth_token)
key = account.get('x-account-meta-temp-url-key')
logging.debug(' key in get_fine_grained_temp_key account: %s ' % key)
except client.ClientException:
return None
# logging.debug(' account or container in get_temp_key: %s '
# % account or container)
if not key:
chars = string.ascii_lowercase + string.digits
key = ''.join(random.choice(chars) for x in range(32))
if container_name:
headers = {'x-container-meta-temp-url-key': key}
try:
client.post_container(storage_url, auth_token, container_name,
headers)
logging.debug(' post_container')
except client.ClientException:
return None
raise ValueError('cannot get key, have no account rights to \
get account key!')
else:
headers = {'x-account-meta-temp-url-key': key}
try:
client.post_account(storage_url, auth_token, headers)
logging.debug(' post_account')
except client.ClientException:
return None
return key
开发者ID:ddxgz,项目名称:swift-storage-restapi,代码行数:49,代码来源:swiftutils.py
示例20: get_account
def get_account(self, deep=True):
if not self.http_conn:
self.connect()
account_info = swift.head_account(url=self.swift_url, token=self.token, http_conn=self.http_conn)
account_head, containers = swift.get_account(url=self.swift_url, token=self.token, http_conn=self.http_conn)
if self.debug:
print(account_info)
print(account_head)
for container in containers:
print(container)
print
if deep:
self.get_containers(containers)
开发者ID:jesusaurus,项目名称:openstack-tests,代码行数:15,代码来源:swiftTest.py
注:本文中的swiftclient.client.head_account函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论