本文整理汇总了Python中swift.common.middleware.acl.parse_acl函数的典型用法代码示例。如果您正苦于以下问题:Python parse_acl函数的具体用法?Python parse_acl怎么用?Python parse_acl使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了parse_acl函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: get_s3_acl
def get_s3_acl(headers, acl_headers, resource='container'):
out = ['<AccessControlPolicy>']
owner_header = 'x-%s-owner' % resource
headers = dict([(k.lower(), v) for k, v in headers.iteritems()])
if owner_header in headers:
owner = xml_escape(headers[owner_header])
out.append('<Owner><ID>%s</ID><DisplayName>%s</DisplayName></Owner>' %
(owner, owner))
out.append('<AccessControlList>')
for header in acl_headers:
if header in headers:
permission = None
if resource == 'container':
# len(x-container-acl-) = 16; len(x-container-) = 12
frm = 16 if header.startswith('x-container-acl-') else 12
permission = header[frm:].upper().replace('-', '_')
elif resource == 'object':
# len(x-object-acl-) = 13
permission = header[13:].upper().replace('-', '_')
if permission:
referrers, groups = parse_acl(headers[header])
for ref in referrers:
uri = AMZ_ALL_USERS if ref == '*' else ref
grant = amz_group_grant(uri, permission)
out.append(grant)
for group in groups:
grant = amz_user_grant(group, group, permission)
out.append(grant)
out.append('</AccessControlList></AccessControlPolicy>')
body = ''.join(out)
return Response(body=body, content_type='application/xml',
headers={'Content-Length': str(len(body))})
开发者ID:Nexenta,项目名称:swift3,代码行数:32,代码来源:utils.py
示例2: test_parse_v2_acl
def test_parse_v2_acl(self):
# For all these tests, the header name will be "hdr".
tests = [
# Simple case: all ACL data in one header line
({'hdr': '{"a":1,"b":"foo"}'}, {'a': 1, 'b': 'foo'}),
# No header "hdr" exists -- should return None
({}, None),
({'junk': 'junk'}, None),
# Empty ACLs should return empty dict
({'hdr': ''}, {}),
({'hdr': '{}'}, {}),
({'hdr': '{ }'}, {}),
# Bad input -- should return None
({'hdr': '["array"]'}, None),
({'hdr': 'null'}, None),
({'hdr': '"some_string"'}, None),
({'hdr': '123'}, None),
]
for hdrs_in, expected in tests:
result = acl.parse_acl(version=2, data=hdrs_in.get('hdr'))
self.assertEquals(expected, result,
'%r: %r != %r' % (hdrs_in, result, expected))
开发者ID:bouncestorage,项目名称:swift,代码行数:26,代码来源:test_acl.py
示例3: extract_acl_and_report_errors
def extract_acl_and_report_errors(self, req):
"""
Return a user-readable string indicating the errors in the input ACL,
or None if there are no errors.
"""
acl_header = 'x-account-access-control'
acl_data = req.headers.get(acl_header)
result = parse_acl(version=2, data=acl_data)
if result is None:
return 'Syntax error in input (%r)' % acl_data
tempauth_acl_keys = 'admin read-write read-only'.split()
for key in result:
# While it is possible to construct auth systems that collaborate
# on ACLs, TempAuth is not such an auth system. At this point,
# it thinks it is authoritative.
if key not in tempauth_acl_keys:
return 'Key %r not recognized' % key
for key in tempauth_acl_keys:
if key not in result:
continue
if not isinstance(result[key], list):
return 'Value for key %r must be a list' % key
for grantee in result[key]:
if not isinstance(grantee, str):
return 'Elements of %r list must be strings' % key
# Everything looks fine, no errors found
internal_hdr = get_sys_meta_prefix('account') + 'core-access-control'
req.headers[internal_hdr] = req.headers.pop(acl_header)
return None
开发者ID:hbhdytf,项目名称:mac,代码行数:32,代码来源:tempauth.py
示例4: authorize
def authorize(self, req):
"""
add by colony.
stolen from swauth
"""
try:
version, account, container, obj = split_path(req.path, 1, 4, True)
except ValueError:
return HTTPNotFound(request=req)
if not account:
return self.denied_response(req)
user_groups = (req.remote_user or '').split(',')
# authority of admin.
if account in user_groups and \
(req.method not in ('DELETE', 'PUT') or container):
req.environ['swift_owner'] = True
return None
# authority of normal.
if hasattr(req, 'acl'):
referrers, groups = parse_acl(req.acl)
if referrer_allowed(req.referer, referrers):
if obj or '.rlistings' in groups:
return None
return self.denied_response(req)
if not req.remote_user:
return self.denied_response(req)
for user_group in user_groups:
if user_group in groups:
return None
return self.denied_response(req)
开发者ID:AsherBond,项目名称:colony,代码行数:30,代码来源:auth_token_for_colony.py
示例5: authorize_anonymous
def authorize_anonymous(self, req):
"""
Authorize an anonymous request.
:returns: None if authorization is granted, an error page otherwise.
"""
try:
part = req.split_path(1, 4, True)
version, account, container, obj = part
except ValueError:
return HTTPNotFound(request=req)
#allow OPTIONS requests to proceed as normal
if req.method == 'OPTIONS':
return
is_authoritative_authz = (account and
account.startswith(self.reseller_prefix))
if not is_authoritative_authz:
return self.denied_response(req)
referrers, roles = swift_acl.parse_acl(getattr(req, 'acl', None))
authorized = self._authorize_unconfirmed_identity(req, obj, referrers,
roles)
if not authorized:
return self.denied_response(req)
开发者ID:Manasa7,项目名称:swift-encrypt,代码行数:26,代码来源:keystoneauth.py
示例6: authorize
def authorize(self, req):
"""
Returns None if the request is authorized to continue or a standard
WSGI response callable if not.
"""
try:
version, account, container, obj = split_path(req.path, 1, 4, True)
except ValueError:
return HTTPNotFound(request=req)
if not account or not account.startswith(self.reseller_prefix):
return self.denied_response(req)
user_groups = (req.remote_user or '').split(',')
if '.reseller_admin' in user_groups:
return None
if account in user_groups and \
(req.method not in ('DELETE', 'PUT') or container):
# If the user is admin for the account and is not trying to do an
# account DELETE or PUT...
return None
referrers, groups = parse_acl(getattr(req, 'acl', None))
if referrer_allowed(req.referer, referrers):
return None
if not req.remote_user:
return self.denied_response(req)
for user_group in user_groups:
if user_group in groups:
return None
return self.denied_response(req)
开发者ID:edwardt,项目名称:swift,代码行数:28,代码来源:auth.py
示例7: authorize
def authorize(self, req):
env = req.environ
env_identity = env.get('keystone.identity', {})
tenant = env_identity.get('tenant')
try:
version, account, container, obj = split_path(req.path, 1, 4, True)
except ValueError:
return HTTPNotFound(request=req)
if account != '%s_%s' % (self.reseller_prefix, tenant[0]):
self.logger.debug('tenant mismatch')
return self.denied_response(req)
# If user is in the swift operator group then make the owner of it.
user_groups = env_identity.get('roles', [])
for _group in self.keystone_swift_operator_roles.split(','):
_group = _group.strip()
if _group in user_groups:
self.logger.debug(
"User is in group: %s allow him to do whatever it wants" % (_group))
req.environ['swift_owner'] = True
return None
# If user is of the same name of the tenant then make owner of it.
user = env_identity.get('user', '')
if self.keystone_tenant_user_admin and user == tenant[1]:
self.logger.debug("user: %s == %s tenant and option "\
"keystone_tenant_user_admin is set" % \
(user, tenant))
req.environ['swift_owner'] = True
return None
# Allow container sync
if (req.environ.get('swift_sync_key') and
req.environ['swift_sync_key'] ==
req.headers.get('x-container-sync-key', None) and
'x-timestamp' in req.headers and
(req.remote_addr in self.allowed_sync_hosts or
get_remote_client(req) in self.allowed_sync_hosts)):
self.logger.debug('allowing container-sync')
return None
# Check if Referrer allow it
referrers, groups = parse_acl(getattr(req, 'acl', None))
if referrer_allowed(req.referer, referrers):
if obj or '.rlistings' in groups:
self.logger.debug('authorizing via ACL')
return None
return self.denied_response(req)
# Check if we have the group in the usergroups and allow it
for user_group in user_groups:
if user_group in groups:
self.logger.debug('user in group which is allowed in" \
" ACL: %s authorizing' % (user_group))
return None
# last but not least retun deny
return self.denied_response(req)
开发者ID:mdegerne,项目名称:swift-keystone2,代码行数:60,代码来源:middleware.py
示例8: add_acls_from_sys_metadata
def add_acls_from_sys_metadata(self, resp):
if resp.environ["REQUEST_METHOD"] in ("HEAD", "GET", "PUT", "POST"):
prefix = get_sys_meta_prefix("account") + "core-"
name = "access-control"
(extname, intname) = ("x-account-" + name, prefix + name)
acl_dict = parse_acl(version=2, data=resp.headers.pop(intname))
if acl_dict: # treat empty dict as empty header
resp.headers[extname] = format_acl(version=2, acl_dict=acl_dict)
开发者ID:anishnarang,项目名称:gswift-multinode,代码行数:8,代码来源:account.py
示例9: add_acls_from_sys_metadata
def add_acls_from_sys_metadata(self, resp):
if resp.environ['REQUEST_METHOD'] in ('HEAD', 'GET', 'PUT', 'POST'):
prefix = get_sys_meta_prefix('account') + 'core-'
name = 'access-control'
(extname, intname) = ('x-account-' + name, prefix + name)
acl_dict = parse_acl(version=2, data=resp.headers.pop(intname))
if acl_dict: # treat empty dict as empty header
resp.headers[extname] = format_acl(
version=2, acl_dict=acl_dict)
开发者ID:Ahiknsr,项目名称:swift,代码行数:9,代码来源:account.py
示例10: get_acl
def get_acl(account_name, headers):
"""
Attempts to construct an S3 ACL based on what is found in the swift headers
"""
elem = Element('AccessControlPolicy')
owner = SubElement(elem, 'Owner')
SubElement(owner, 'ID').text = account_name
SubElement(owner, 'DisplayName').text = account_name
access_control_list = SubElement(elem, 'AccessControlList')
# grant FULL_CONTROL to myself by default
grant = SubElement(access_control_list, 'Grant')
grantee = SubElement(grant, 'Grantee', nsmap={'xsi': XMLNS_XSI})
grantee.set('{%s}type' % XMLNS_XSI, 'CanonicalUser')
SubElement(grantee, 'ID').text = account_name
SubElement(grantee, 'DisplayName').text = account_name
SubElement(grant, 'Permission').text = 'FULL_CONTROL'
referrers, _ = parse_acl(headers.get('x-container-read'))
if referrer_allowed('unknown', referrers):
# grant public-read access
grant = SubElement(access_control_list, 'Grant')
grantee = SubElement(grant, 'Grantee', nsmap={'xsi': XMLNS_XSI})
grantee.set('{%s}type' % XMLNS_XSI, 'Group')
SubElement(grantee, 'URI').text = \
'http://acs.amazonaws.com/groups/global/AllUsers'
SubElement(grant, 'Permission').text = 'READ'
referrers, _ = parse_acl(headers.get('x-container-write'))
if referrer_allowed('unknown', referrers):
# grant public-write access
grant = SubElement(access_control_list, 'Grant')
grantee = SubElement(grant, 'Grantee', nsmap={'xsi': XMLNS_XSI})
grantee.set('{%s}type' % XMLNS_XSI, 'Group')
SubElement(grantee, 'URI').text = \
'http://acs.amazonaws.com/groups/global/AllUsers'
SubElement(grant, 'Permission').text = 'WRITE'
body = tostring(elem)
return HTTPOk(body=body, content_type="text/plain")
开发者ID:notmyname,项目名称:swift3,代码行数:42,代码来源:acl.py
示例11: authorize
def authorize(self, req):
env = req.environ
identity = env.get('cloudstack.identity', {})
try:
version, _account, container, obj = split_path(req.path, minsegs=1, maxsegs=4, rest_with_last=True)
except ValueError:
return HTTPNotFound(request=req)
if not _account or not _account.startswith(self.reseller_prefix):
return self.denied_response(req)
# Remove the reseller_prefix from the account.
if self.reseller_prefix != '':
account = _account[len(self.reseller_prefix)+1:]
else:
account = _account
user_roles = identity.get('roles', [])
# If this user is part of this account or is the global admin, give access.
if account == identity.get('account') or self.cs_roles[1] in user_roles:
req.environ['swift_owner'] = True
self.logger.debug("User %s is global admin or owner, authorizing" % identity.get('username'))
return None
# Allow container sync
if (req.environ.get('swift_sync_key') and req.environ['swift_sync_key'] == req.headers.get('x-container-sync-key', None) and
'x-timestamp' in req.headers and (req.remote_addr in self.allowed_sync_hosts or get_remote_client(req) in self.allowed_sync_hosts)):
self.logger.debug('Allowing container-sync')
return None
if req.method == 'OPTIONS':
#allow OPTIONS requests to proceed as normal
self.logger.debug("Allow OPTIONS request.")
return None
# Check if Referrer allow it
referrers, groups = parse_acl(getattr(req, 'acl', None))
if referrer_allowed(req.referer, referrers):
if obj or '.rlistings' in groups:
self.logger.debug('Authorizing via ACL')
return None
return self.denied_response(req)
# Check if we have the group in the user_roles and allow if we do
for role in user_roles:
if role in groups:
self.logger.debug('User has role %s, allowing via ACL' % (role))
return None
# This user is not authorized, deny request.
return self.denied_response(req)
开发者ID:cldmnky,项目名称:cs_auth,代码行数:53,代码来源:middleware.py
示例12: test_parse_acl
def test_parse_acl(self):
self.assertEquals(acl.parse_acl(None), ([], []))
self.assertEquals(acl.parse_acl(''), ([], []))
self.assertEquals(acl.parse_acl('.r:ref1'), (['ref1'], []))
self.assertEquals(acl.parse_acl('.r:-ref1'), (['-ref1'], []))
self.assertEquals(acl.parse_acl('account:user'),
([], ['account:user']))
self.assertEquals(acl.parse_acl('account'), ([], ['account']))
self.assertEquals(acl.parse_acl('acc1,acc2:usr2,.r:ref3,.r:-ref4'),
(['ref3', '-ref4'], ['acc1', 'acc2:usr2']))
self.assertEquals(acl.parse_acl(
'acc1,acc2:usr2,.r:ref3,acc3,acc4:usr4,.r:ref5,.r:-ref6'),
(['ref3', 'ref5', '-ref6'],
['acc1', 'acc2:usr2', 'acc3', 'acc4:usr4']))
开发者ID:674009287,项目名称:swift,代码行数:14,代码来源:test_acl.py
示例13: authorize
def authorize(self, req):
env = req.environ
identity = env.get("cloudstack.identity", {})
try:
version, _account, container, obj = split_path(req.path, minsegs=1, maxsegs=4, rest_with_last=True)
except ValueError:
return HTTPNotFound(request=req)
if not _account or not _account.startswith(self.reseller_prefix):
return self.denied_response(req)
# Remove the reseller_prefix from the account.
if self.reseller_prefix != "":
account = _account[len(self.reseller_prefix) + 1 :]
else:
account = _account
user_roles = identity.get("roles", [])
# If this user is part of this account or is the global admin, give access.
if account == identity.get("account") or self.cs_roles[1] in user_roles:
req.environ["swift_owner"] = True
return None
# Allow container sync
if (
req.environ.get("swift_sync_key")
and req.environ["swift_sync_key"] == req.headers.get("x-container-sync-key", None)
and "x-timestamp" in req.headers
and (req.remote_addr in self.allowed_sync_hosts or get_remote_client(req) in self.allowed_sync_hosts)
):
self.logger.debug("Allowing container-sync")
return None
# Check if Referrer allow it
referrers, groups = parse_acl(getattr(req, "acl", None))
if referrer_allowed(req.referer, referrers):
if obj or ".rlistings" in groups:
self.logger.debug("Authorizing via ACL")
return None
return self.denied_response(req)
# Check if we have the group in the user_roles and allow if we do
for role in user_roles:
if role in groups:
self.logger.debug("User has role %s, allowing via ACL" % (role))
return None
# This user is not authorized, deny request.
return self.denied_response(req)
开发者ID:cloudops,项目名称:cs_auth,代码行数:51,代码来源:middleware.py
示例14: authorize_colony
def authorize_colony(self, req):
"""
add by colony.
1. All user GET or HEAD account.
2. All user create a container.
3. All user read or write objects with no contaner acl.
4. But any user are limited by container acl if exists.
"""
try:
version, account, container, obj = split_path(req.path, 1, 4, True)
except ValueError:
return HTTPNotFound(request=req)
if not account:
self.logger.info('no account')
return self.denied_response(req)
user_groups = (req.remote_user or '').split(',')
self.logger.info('request_remote_user: %s' % req.remote_user)
self.logger.info('request_method: %s' % req.method)
# all user has normal authority, but 'swift_owner'.
req.environ['swift_owner'] = True
# Any user GET or HEAD account
if req.method in ['HEAD', 'GET'] and not container:
self.logger.info('HEAD or GET account all ok')
return None
# Any user creates container
if req.method in ['PUT', 'POST', 'DELETE'] and container and not obj:
self.logger.info('Any user create container')
return None
if hasattr(req, 'acl'):
self.logger.info('container acl: %s' % req.acl)
referrers, groups = parse_acl(req.acl)
self.logger.info('referrers: %s' % referrers)
self.logger.info('group: %s' % groups)
if referrer_allowed(req.referer, referrers):
if obj or '.rlistings' in groups:
self.logger.info('referer_allowed')
return None
if not req.remote_user:
return self.denied_response(req)
for user_group in user_groups:
if user_group in groups:
self.logger.info('group_allowed: %s' % user_group)
return None
if not referrers and not groups:
self.logger.info('no acl allow default access')
return None
self.logger.info('group not allowed.')
return self.denied_response(req)
self.logger.info('request forbidden')
return self.denied_response(req)
开发者ID:dais,项目名称:colony,代码行数:50,代码来源:auth_token_for_colony.py
示例15: authorize
def authorize(self, req):
"""
Returns None if the request is authorized to continue or a standard
WSGI response callable if not.
"""
try:
version, account, container, obj = req.split_path(1, 4, True)
except ValueError:
self.logger.increment("errors")
return HTTPNotFound(request=req)
if not account or not account.startswith(self.reseller_prefix):
return self.denied_response(req)
user_groups = (req.remote_user or "").split(",")
if (
".reseller_admin" in user_groups
and account != self.reseller_prefix
and account[len(self.reseller_prefix)] != "."
):
req.environ["swift_owner"] = True
return None
if account in user_groups and (req.method not in ("DELETE", "PUT") or container):
# If the user is admin for the account and is not trying to do an
# account DELETE or PUT...
req.environ["swift_owner"] = True
return None
if (
req.environ.get("swift_sync_key")
and (req.environ["swift_sync_key"] == req.headers.get("x-container-sync-key", None))
and "x-timestamp" in req.headers
):
return None
if req.method == "OPTIONS":
# allow OPTIONS requests to proceed as normal
return None
referrers, groups = parse_acl(getattr(req, "acl", None))
if referrer_allowed(req.referer, referrers):
if obj or ".rlistings" in groups:
return None
return self.denied_response(req)
if not req.remote_user:
return self.denied_response(req)
for user_group in user_groups:
if user_group in groups:
return None
return self.denied_response(req)
开发者ID:ChristopherMacGown,项目名称:swift,代码行数:46,代码来源:tempauth.py
示例16: authorize
def authorize(self, req):
"""
Returns None if the request is authorized to continue or a standard
WSGI response callable if not.
"""
try:
version, account, container, obj = split_path(req.path, 1, 4, True)
except ValueError:
self.logger.increment('errors')
return HTTPNotFound(request=req)
if not account or not account.startswith(self.reseller_prefix):
return self.denied_response(req)
user_groups = (req.remote_user or '').split(',')
if '.reseller_admin' in user_groups and \
account != self.reseller_prefix and \
account[len(self.reseller_prefix)] != '.':
req.environ['swift_owner'] = True
return None
if account in user_groups and \
(req.method not in ('DELETE', 'PUT') or container):
# If the user is admin for the account and is not trying to do an
# account DELETE or PUT...
req.environ['swift_owner'] = True
return None
if (req.environ.get('swift_sync_key') and
req.environ['swift_sync_key'] ==
req.headers.get('x-container-sync-key', None) and
'x-timestamp' in req.headers and
(req.remote_addr in self.allowed_sync_hosts or
get_remote_client(req) in self.allowed_sync_hosts)):
return None
if req.method == 'OPTIONS':
#allow OPTIONS requests to proceed as normal
return None
referrers, groups = parse_acl(getattr(req, 'acl', None))
if referrer_allowed(req.referer, referrers):
if obj or '.rlistings' in groups:
return None
return self.denied_response(req)
if not req.remote_user:
return self.denied_response(req)
for user_group in user_groups:
if user_group in groups:
return None
return self.denied_response(req)
开发者ID:mohitsethi,项目名称:swift,代码行数:46,代码来源:tempauth.py
示例17: authorize
def authorize(self, req):
env = req.environ
env_identity = env.get('keystone.identity', {})
tenant = env_identity.get('tenant')
try:
version, account, container, obj = split_path(req.path, 1, 4, True)
except ValueError:
return HTTPNotFound(request=req)
if account != '%s_%s' % (self.reseller_prefix, tenant):
self.logger.debug('tenant mismatch: %s != %s_%s' % \
(account, self.reseller_prefix, tenant))
return self.denied_response(req)
user_groups = env_identity.get('roles', [])
#TODO: setting?
if self.keystone_admin_group in user_groups:
req.environ['swift_owner'] = True
return None
if (req.environ.get('swift_sync_key') and
req.environ['swift_sync_key'] ==
req.headers.get('x-container-sync-key', None) and
'x-timestamp' in req.headers and
(req.remote_addr in self.allowed_sync_hosts or
get_remote_client(req) in self.allowed_sync_hosts)):
self.logger.debug('allowing container-sync')
return None
# Check if Referrer allow it #TODO: check if it works
referrers, groups = parse_acl(getattr(req, 'acl', None))
if referrer_allowed(req.referer, referrers):
if obj or '.rlistings' in groups:
self.logger.debug('authorizing via ACL')
return None
return self.denied_response(req)
# Check if we have the group in the group user and allow it
for user_group in user_groups:
if user_group in groups:
self.logger.debug('user in group: %s authorizing' % \
(user_group))
return None
return self.denied_response(req)
开发者ID:chmouel,项目名称:swift-keystone2,代码行数:46,代码来源:middleware.py
示例18: authorize_anonymous
def authorize_anonymous(self, req):
"""
Authorize an anonymous request.
:returns: None if authorization is granted, an error page otherwise.
"""
try:
part = swift_utils.split_path(req.path, 1, 4, True)
version, account, container, obj = part
except ValueError:
return HTTPNotFound(request=req)
is_authoritative_authz = account and account.startswith(self.reseller_prefix)
if not is_authoritative_authz:
return self.denied_response(req)
referrers, roles = swift_acl.parse_acl(getattr(req, "acl", None))
authorized = self._authorize_unconfirmed_identity(req, obj, referrers, roles)
if not authorized:
return self.denied_response(req)
开发者ID:rushikeshjadhav,项目名称:swift,代码行数:20,代码来源:keystoneauth.py
示例19: get_target
def get_target(self, environ):
req = Request(environ)
try:
parts = req.split_path(1, 4, True)
version, account, container, obj = parts
except ValueError:
version = account = container = obj = None
referrers, acls = swift_acl.parse_acl(getattr(req, 'acl', None))
target = {
"req": req,
"method": req.method.lower(),
"version": version,
"account": account,
"container": container,
"object": obj,
"acls": acls,
"referrers": referrers
}
return target
开发者ID:cloudwatt,项目名称:swiftpolicy,代码行数:20,代码来源:swiftpolicy.py
示例20: authorize
def authorize(self, req):
env = req.environ
identity = env.get('mauth.identity', {})
try:
version, _account, container, obj = split_path(req.path, minsegs=1, maxsegs=4, rest_with_last=True)
except ValueError:
return HTTPNotFound(request=req)
if not _account or not _account.startswith(self.reseller_prefix):
return self.denied_response(req)
user_roles = identity.get('roles', [])
# If this user is part of this account, give access.
if _account == identity.get('account_part'):
req.environ['swift_owner'] = True
return None
# Allow container sync
if (req.environ.get('swift_sync_key') and req.environ['swift_sync_key'] == req.headers.get('x-container-sync-key', None) and
'x-timestamp' in req.headers and (req.remote_addr in self.allowed_sync_hosts or get_remote_client(req) in self.allowed_sync_hosts)):
self.logger.debug('Allowing container-sync')
return None
# Check if Referrer allow it
referrers, groups = parse_acl(getattr(req, 'acl', None))
if referrer_allowed(req.referer, referrers):
if obj or '.rlistings' in groups:
self.logger.debug('Authorizing via ACL')
return None
return self.denied_response(req)
# Check if we have the group in the user_roles and allow if we do
for role in user_roles:
if role in groups:
self.logger.debug('User has role %s, allowing via ACL' % (role))
return None
# This user is not authorized, deny request.
return self.denied_response(req)
开发者ID:cloudops,项目名称:mauth,代码行数:41,代码来源:middleware.py
注:本文中的swift.common.middleware.acl.parse_acl函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论