本文整理汇总了Python中neutron.policy.check函数的典型用法代码示例。如果您正苦于以下问题:Python check函数的具体用法?Python check怎么用?Python check使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了check函数的19个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_check_non_existent_action
def test_check_non_existent_action(self):
action = "example:idonotexist"
result_1 = policy.check(self.context, action, self.target)
self.assertFalse(result_1)
result_2 = policy.check(self.context, action, self.target,
might_not_exist=True)
self.assertTrue(result_2)
开发者ID:21atlas,项目名称:neutron,代码行数:7,代码来源:test_policy.py
示例2: _items
def _items(self, request, do_authz=False, parent_id=None):
"""Retrieves and formats a list of elements of the requested entity."""
# NOTE(salvatore-orlando): The following ensures that fields which
# are needed for authZ policy validation are not stripped away by the
# plugin before returning.
original_fields, fields_to_add = self._do_field_list(
api_common.list_args(request, 'fields'))
filters = api_common.get_filters(
request, self._attr_info,
['fields', 'sort_key', 'sort_dir',
'limit', 'marker', 'page_reverse'],
is_filter_validation_supported=self._filter_validation)
kwargs = {'filters': filters,
'fields': original_fields}
sorting_helper = self._get_sorting_helper(request)
pagination_helper = self._get_pagination_helper(request)
sorting_helper.update_args(kwargs)
sorting_helper.update_fields(original_fields, fields_to_add)
pagination_helper.update_args(kwargs)
pagination_helper.update_fields(original_fields, fields_to_add)
if parent_id:
kwargs[self._parent_id_name] = parent_id
obj_getter = getattr(self._plugin, self._plugin_handlers[self.LIST])
obj_list = obj_getter(request.context, **kwargs)
obj_list = sorting_helper.sort(obj_list)
obj_list = pagination_helper.paginate(obj_list)
# Check authz
if do_authz:
# FIXME(salvatore-orlando): obj_getter might return references to
# other resources. Must check authZ on them too.
# Omit items from list that should not be visible
tmp_list = []
for obj in obj_list:
self._set_parent_id_into_ext_resources_request(
request, obj, parent_id, is_get=True)
if policy.check(
request.context, self._plugin_handlers[self.SHOW],
obj, plugin=self._plugin, pluralized=self._collection):
tmp_list.append(obj)
obj_list = tmp_list
# Use the first element in the list for discriminating which attributes
# should be filtered out because of authZ policies
# fields_to_add contains a list of attributes added for request policy
# checks but that were not required by the user. They should be
# therefore stripped
fields_to_strip = fields_to_add or []
if obj_list:
fields_to_strip += self._exclude_attributes_by_policy(
request.context, obj_list[0])
collection = {self._collection:
[self._filter_attributes(obj,
fields_to_strip=fields_to_strip)
for obj in obj_list]}
pagination_links = pagination_helper.get_links(obj_list)
if pagination_links:
collection[self._collection + "_links"] = pagination_links
# Synchronize usage trackers, if needed
resource_registry.resync_resource(
request.context, self._resource, request.context.tenant_id)
return collection
开发者ID:noironetworks,项目名称:neutron,代码行数:60,代码来源:base.py
示例3: _exclude_attributes_by_policy
def _exclude_attributes_by_policy(self, context, resource, data):
"""Identifies attributes to exclude according to authZ policies.
Return a list of attribute names which should be stripped from the
response returned to the user because the user is not authorized
to see them.
"""
attributes_to_exclude = []
for attr_name in data.keys():
attr_data = attribute_population._attributes_for_resource(
resource).get(attr_name)
if attr_data and attr_data['is_visible']:
if policy.check(
context,
# NOTE(kevinbenton): this used to reference a
# _plugin_handlers dict, why?
'get_%s:%s' % (resource, attr_name),
data,
might_not_exist=True,
pluralized=attribute_population._plural(resource)):
# this attribute is visible, check next one
continue
# if the code reaches this point then either the policy check
# failed or the attribute was not visible in the first place
attributes_to_exclude.append(attr_name)
return attributes_to_exclude
开发者ID:apporc,项目名称:neutron,代码行数:26,代码来源:policy_enforcement.py
示例4: _update
def _update(self, request, id, body, **kwargs):
body = Controller.prepare_request_body(request.context,
body, False,
self._resource, self._attr_info,
allow_bulk=self._allow_bulk)
action = self._plugin_handlers[self.UPDATE]
# Load object to check authz
# but pass only attributes in the original body and required
# by the policy engine to the policy 'brain'
field_list = [name for (name, value) in self._attr_info.items()
if (value.get('required_by_policy') or
value.get('primary_key') or
'default' not in value)]
# Ensure policy engine is initialized
policy.init()
parent_id = kwargs.get(self._parent_id_name)
orig_obj = self._item(request, id, field_list=field_list,
parent_id=parent_id)
orig_object_copy = copy.copy(orig_obj)
orig_obj.update(body[self._resource])
# Make a list of attributes to be updated to inform the policy engine
# which attributes are set explicitly so that it can distinguish them
# from the ones that are set to their default values.
orig_obj[n_const.ATTRIBUTES_TO_UPDATE] = body[self._resource].keys()
try:
policy.enforce(request.context,
action,
orig_obj,
pluralized=self._collection)
except oslo_policy.PolicyNotAuthorized:
# To avoid giving away information, pretend that it
# doesn't exist if policy does not authorize SHOW
with excutils.save_and_reraise_exception() as ctxt:
if not policy.check(request.context,
self._plugin_handlers[self.SHOW],
orig_obj,
pluralized=self._collection):
ctxt.reraise = False
msg = _('The resource could not be found.')
raise webob.exc.HTTPNotFound(msg)
obj_updater = getattr(self._plugin, action)
kwargs = {self._resource: body}
if parent_id:
kwargs[self._parent_id_name] = parent_id
obj = obj_updater(request.context, id, **kwargs)
# Usually an update operation does not alter resource usage, but as
# there might be side effects it might be worth checking for changes
# in resource usage here as well (e.g: a tenant port is created when a
# router interface is added)
resource_registry.set_resources_dirty(request.context)
result = {self._resource: self._view(request.context, obj)}
notifier_method = self._resource + '.update.end'
self._notifier.info(request.context, notifier_method, result)
registry.notify(self._resource, events.BEFORE_RESPONSE, self,
context=request.context, data=result,
method_name=notifier_method, action=action,
original=orig_object_copy)
return result
开发者ID:AradhanaSingh,项目名称:neutron,代码行数:60,代码来源:base.py
示例5: _get_items_by_filter_and_order_and_page
def _get_items_by_filter_and_order_and_page(self, request,kwargs,original_fields=None,fields_to_add=None,do_authz=False, parent_id=None):
"""
get resource items by filters,order,and page
"""
sorting_helper = self._get_sorting_helper(request)#convert sort
pagination_helper = self._get_pagination_helper(request) #convert paging
sorting_helper.update_args(kwargs) #add sort to kwargs
sorting_helper.update_fields(original_fields, fields_to_add)
pagination_helper.update_args(kwargs) #add page to kwargs
pagination_helper.update_fields(original_fields, fields_to_add)
if parent_id:
kwargs[self._parent_id_name] = parent_id
obj_getter = getattr(self._plugin, self._plugin_handlers[self.LIST])
obj_list = obj_getter(request.context, **kwargs)
obj_list = sorting_helper.sort(obj_list)
obj_list = pagination_helper.paginate(obj_list)
# Check authz
if do_authz and obj_list:
# FIXME(salvatore-orlando): obj_getter might return references to
# other resources. Must check authZ on them too.
# Omit items from list that should not be visible
obj_list = [obj for obj in obj_list
if policy.check(request.context,
self._plugin_handlers[self.SHOW],
obj,
plugin=self._plugin)]
return obj_list,pagination_helper
开发者ID:xiongmeng1108,项目名称:gcloud7_neutron-2014.2.2,代码行数:29,代码来源:base.py
示例6: _items
def _items(self, request, do_authz=False, parent_id=None):
"""Retrieves and formats a list of elements of the requested entity."""
# NOTE(salvatore-orlando): The following ensures that fields which
# are needed for authZ policy validation are not stripped away by the
# plugin before returning.
original_fields, fields_to_add = self._do_field_list(api_common.list_args(request, "fields"))
filters = api_common.get_filters(
request, self._attr_info, ["fields", "sort_key", "sort_dir", "limit", "marker", "page_reverse"]
)
kwargs = {"filters": filters, "fields": original_fields}
sorting_helper = self._get_sorting_helper(request)
pagination_helper = self._get_pagination_helper(request)
sorting_helper.update_args(kwargs)
sorting_helper.update_fields(original_fields, fields_to_add)
pagination_helper.update_args(kwargs)
pagination_helper.update_fields(original_fields, fields_to_add)
if parent_id:
kwargs[self._parent_id_name] = parent_id
obj_getter = getattr(self._plugin, self._plugin_handlers[self.LIST])
obj_list = obj_getter(request.context, **kwargs)
obj_list = sorting_helper.sort(obj_list)
obj_list = pagination_helper.paginate(obj_list)
# Check authz
if do_authz:
# FIXME(salvatore-orlando): obj_getter might return references to
# other resources. Must check authZ on them too.
# Omit items from list that should not be visible
obj_list = [
obj
for obj in obj_list
if policy.check(request.context, self._plugin_handlers[self.SHOW], obj, plugin=self._plugin)
]
# Use the first element in the list for discriminating which attributes
# should be filtered out because of authZ policies
# fields_to_add contains a list of attributes added for request policy
# checks but that were not required by the user. They should be
# therefore stripped
fields_to_strip = fields_to_add or []
if obj_list:
fields_to_strip += self._exclude_attributes_by_policy(request.context, obj_list[0])
collection = {
self._collection: [
self._filter_attributes(request.context, obj, fields_to_strip=fields_to_strip) for obj in obj_list
]
}
pagination_links = pagination_helper.get_links(obj_list)
if pagination_links:
collection[self._collection + "_links"] = pagination_links
return collection
开发者ID:nash-x,项目名称:hws,代码行数:49,代码来源:base.py
示例7: _items
def _items(self, request, do_authz=False, parent_id=None):
"""Retrieves and formats a list of elements of the requested entity."""
# NOTE(salvatore-orlando): The following ensures that fields which
# are needed for authZ policy validation are not stripped away by the
# plugin before returning.
original_fields, fields_to_add = self._do_field_list(
api_common.list_args(request, 'fields'))
filters = api_common.get_filters(request, self._attr_info,
['fields', 'sort_key', 'sort_dir',
'limit', 'marker', 'page_reverse'])
kwargs = {'filters': filters,
'fields': original_fields}
sorting_helper = self._get_sorting_helper(request)
pagination_helper = self._get_pagination_helper(request)
sorting_helper.update_args(kwargs)
sorting_helper.update_fields(original_fields, fields_to_add)
pagination_helper.update_args(kwargs)
pagination_helper.update_fields(original_fields, fields_to_add)
if parent_id:
kwargs[self._parent_id_name] = parent_id
obj_getter = getattr(self._plugin, self._plugin_handlers[self.LIST])
obj_list = obj_getter(request.context, **kwargs)
obj_list = sorting_helper.sort(obj_list)
obj_list = pagination_helper.paginate(obj_list)
# Check authz
if do_authz:
# FIXME(salvatore-orlando): obj_getter might return references to
# other resources. Must check authZ on them too.
# Omit items from list that should not be visible
obj_list = [obj for obj in obj_list
if policy.check(request.context,
self._plugin_handlers[self.SHOW],
obj,
plugin=self._plugin)]
collection = {self._collection:
[self._view(request.context, obj,
fields_to_strip=fields_to_add)
for obj in obj_list]}
pagination_links = pagination_helper.get_links(obj_list)
if pagination_links:
collection[self._collection + "_links"] = pagination_links
return collection
开发者ID:ChengZuo,项目名称:neutron,代码行数:43,代码来源:base.py
示例8: _delete
def _delete(self, request, id, **kwargs):
action = self._plugin_handlers[self.DELETE]
# Check authz
policy.init()
parent_id = kwargs.get(self._parent_id_name)
obj = self._item(request, id, parent_id=parent_id)
try:
policy.enforce(request.context,
action,
obj,
pluralized=self._collection)
except oslo_policy.PolicyNotAuthorized:
# To avoid giving away information, pretend that it
# doesn't exist if policy does not authorize SHOW
with excutils.save_and_reraise_exception() as ctxt:
if not policy.check(request.context,
self._plugin_handlers[self.SHOW],
obj,
pluralized=self._collection):
ctxt.reraise = False
msg = _('The resource could not be found.')
raise webob.exc.HTTPNotFound(msg)
obj_deleter = getattr(self._plugin, action)
obj_deleter(request.context, id, **kwargs)
# A delete operation usually alters resource usage, so mark affected
# usage trackers as dirty
resource_registry.set_resources_dirty(request.context)
notifier_method = self._resource + '.delete.end'
result = {self._resource: self._view(request.context, obj)}
notifier_payload = {self._resource + '_id': id}
notifier_payload.update(result)
self._notifier.info(request.context,
notifier_method,
notifier_payload)
registry.publish(self._resource, events.BEFORE_RESPONSE, self,
payload=events.APIEventPayload(
request.context, notifier_method, action,
states=({}, obj, result,),
collection_name=self._collection))
开发者ID:noironetworks,项目名称:neutron,代码行数:42,代码来源:base.py
示例9: _exclude_attributes_by_policy
def _exclude_attributes_by_policy(self, context, data):
"""Identifies attributes to exclude according to authZ policies.
Return a list of attribute names which should be stripped from the
response returned to the user because the user is not authorized
to see them.
"""
attributes_to_exclude = []
for attr_name in data.keys():
attr_data = self._attr_info.get(attr_name)
if attr_data and attr_data["is_visible"]:
if policy.check(
context, "%s:%s" % (self._plugin_handlers[self.SHOW], attr_name), data, might_not_exist=True
):
# this attribute is visible, check next one
continue
# if the code reaches this point then either the policy check
# failed or the attribute was not visible in the first place
attributes_to_exclude.append(attr_name)
return attributes_to_exclude
开发者ID:nash-x,项目名称:hws,代码行数:20,代码来源:base.py
示例10: _exclude_attributes_by_policy
def _exclude_attributes_by_policy(self, context, controller, resource,
collection, data):
"""Identifies attributes to exclude according to authZ policies.
Return a list of attribute names which should be stripped from the
response returned to the user because the user is not authorized
to see them.
"""
attributes_to_exclude = []
for attr_name in data.keys():
# TODO(amotoki): All attribute maps have tenant_id and
# it determines excluded attributes based on tenant_id.
# We need to migrate tenant_id to project_id later
# as attr_info is referred to in various places and we need
# to check all logs carefully.
if attr_name == 'project_id':
continue
attr_data = controller.resource_info.get(attr_name)
if attr_data and attr_data['is_visible']:
if policy.check(
context,
# NOTE(kevinbenton): this used to reference a
# _plugin_handlers dict, why?
'get_%s:%s' % (resource, attr_name),
data,
might_not_exist=True,
pluralized=collection):
# this attribute is visible, check next one
continue
# if the code reaches this point then either the policy check
# failed or the attribute was not visible in the first place
attributes_to_exclude.append(attr_name)
# TODO(amotoki): As mentioned in the above TODO,
# we treat project_id and tenant_id equivalently.
# This should be migrated to project_id later.
if attr_name == 'tenant_id':
attributes_to_exclude.append('project_id')
if attributes_to_exclude:
LOG.debug("Attributes excluded by policy engine: %s",
attributes_to_exclude)
return attributes_to_exclude
开发者ID:eayunstack,项目名称:neutron,代码行数:41,代码来源:policy_enforcement.py
示例11: _exclude_attributes_by_policy
def _exclude_attributes_by_policy(self, context, data):
"""Identifies attributes to exclude according to authZ policies.
Return a list of attribute names which should be stripped from the
response returned to the user because the user is not authorized
to see them.
"""
attributes_to_exclude = []
for attr_name in data.keys():
# TODO(amotoki): At now, all attribute maps have tenant_id and
# determine excluded attributes based on tenant_id.
# We need to migrate tenant_id to project_id later
# as attr_info is referred to in various places and we need
# to check all logis carefully.
if attr_name == 'project_id':
continue
attr_data = self._attr_info.get(attr_name)
if attr_data and attr_data['is_visible']:
if policy.check(
context,
'%s:%s' % (self._plugin_handlers[self.SHOW],
attr_name),
data,
might_not_exist=True,
pluralized=self._collection):
# this attribute is visible, check next one
continue
# if the code reaches this point then either the policy check
# failed or the attribute was not visible in the first place
attributes_to_exclude.append(attr_name)
# TODO(amotoki): As mentioned in the above TODO,
# we treat project_id and tenant_id equivalently.
# This should be migrated to project_id in Ocata.
if attr_name == 'tenant_id':
attributes_to_exclude.append('project_id')
return attributes_to_exclude
开发者ID:noironetworks,项目名称:neutron,代码行数:37,代码来源:base.py
示例12: test_firewall_policy_insert_rule_with_owner
def test_firewall_policy_insert_rule_with_owner(self):
action = "insert_rule"
target = {"tenant_id": "own_tenant"}
user_context = context.Context('', "own_tenant", roles=['user'])
result = policy.check(user_context, action, target)
self.assertTrue(result)
开发者ID:21atlas,项目名称:neutron,代码行数:6,代码来源:test_policy.py
示例13: _check_view_auth
def _check_view_auth(self, context, resource, action):
return policy.check(context, action, resource)
开发者ID:nitinnain,项目名称:neutron,代码行数:2,代码来源:plumgrid_plugin.py
示例14: _check_external_router_policy
def _check_external_router_policy(self, context):
return policy.check(context, "get_network", self._network_definition())
开发者ID:glove747,项目名称:liberty-neutron,代码行数:2,代码来源:test_policies.py
示例15: test_check_bad_action_noraise
def test_check_bad_action_noraise(self):
action = "example:denied"
result = policy.check(self.context, action, self.target)
self.assertEqual(result, False)
开发者ID:ChengZuo,项目名称:neutron,代码行数:4,代码来源:test_policy.py
示例16: test_firewall_policy_remove_rule_without_admin_or_owner
def test_firewall_policy_remove_rule_without_admin_or_owner(self):
action = "remove_rule"
target = {"firewall_rule_id": "rule_id", "tenant_id": "tenantA"}
user_context = context.Context('', "another_tenant", roles=['user'])
result = policy.check(user_context, action, target)
self.assertFalse(result)
开发者ID:21atlas,项目名称:neutron,代码行数:6,代码来源:test_policy.py
示例17: _check_provider_view_auth
def _check_provider_view_auth(self, context, network):
return policy.check(context,
"extension:provider_network:view",
network)
开发者ID:abhirajbutala,项目名称:neutron,代码行数:4,代码来源:n1kv_neutron_plugin.py
示例18: test_firewall_policy_insert_rule_with_admin_context
def test_firewall_policy_insert_rule_with_admin_context(self):
action = "insert_rule"
target = {}
result = policy.check(context.get_admin_context(), action, target)
self.assertTrue(result)
开发者ID:21atlas,项目名称:neutron,代码行数:5,代码来源:test_policy.py
示例19: before
def before(self, state):
# This hook should be run only for PUT,POST and DELETE methods and for
# requests targeting a neutron resource
resources = state.request.context.get('resources', [])
if state.request.method not in ('POST', 'PUT', 'DELETE'):
return
# As this routine will likely alter the resources, do a shallow copy
resources_copy = resources[:]
neutron_context = state.request.context.get('neutron_context')
resource = state.request.context.get('resource')
# If there is no resource for this request, don't bother running authZ
# policies
if not resource:
return
controller = utils.get_controller(state)
if not controller or utils.is_member_action(controller):
return
collection = state.request.context.get('collection')
needs_prefetch = (state.request.method == 'PUT' or
state.request.method == 'DELETE')
policy.init()
action = controller.plugin_handlers[
pecan_constants.ACTION_MAP[state.request.method]]
# NOTE(salv-orlando): As bulk updates are not supported, in case of PUT
# requests there will be only a single item to process, and its
# identifier would have been already retrieved by the lookup process;
# in the case of DELETE requests there won't be any item to process in
# the request body
original_resources = []
if needs_prefetch:
try:
item = resources_copy.pop()
except IndexError:
# Ops... this was a delete after all!
item = {}
resource_id = state.request.context.get('resource_id')
parent_id = state.request.context.get('parent_id')
method = state.request.method
resource_obj = fetch_resource(method, neutron_context, controller,
collection, resource, resource_id,
parent_id=parent_id)
if resource_obj:
original_resources.append(resource_obj)
obj = copy.copy(resource_obj)
obj.update(item)
obj[const.ATTRIBUTES_TO_UPDATE] = item.keys()
# Put back the item in the list so that policies could be
# enforced
resources_copy.append(obj)
# TODO(salv-orlando): as other hooks might need to prefetch resources,
# store them in the request context. However, this should be done in a
# separate hook which is conveniently called before all other hooks
state.request.context['original_resources'] = original_resources
for item in resources_copy:
try:
policy.enforce(
neutron_context, action, item,
pluralized=collection)
except oslo_policy.PolicyNotAuthorized:
with excutils.save_and_reraise_exception() as ctxt:
# If a tenant is modifying it's own object, it's safe to
# return a 403. Otherwise, pretend that it doesn't exist
# to avoid giving away information.
controller = utils.get_controller(state)
s_action = controller.plugin_handlers[controller.SHOW]
if not policy.check(neutron_context, s_action, item,
pluralized=collection):
ctxt.reraise = False
msg = _('The resource could not be found.')
raise webob.exc.HTTPNotFound(msg)
开发者ID:eayunstack,项目名称:neutron,代码行数:72,代码来源:policy_enforcement.py
注:本文中的neutron.policy.check函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论