本文整理汇总了Python中neutron.policy.enforce函数的典型用法代码示例。如果您正苦于以下问题:Python enforce函数的具体用法?Python enforce怎么用?Python enforce使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了enforce函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: delete
def delete(self, request, id, **kwargs):
plugin = self.get_plugin()
policy.enforce(request.context,
"delete_%s" % L3_ROUTER,
{})
return plugin.remove_router_from_l3_agent(
request.context, kwargs['agent_id'], id)
开发者ID:rchunduru,项目名称:neutron,代码行数:7,代码来源:l3agentscheduler.py
示例2: _handle_action
def _handle_action(request, id, **kwargs):
arg_list = [request.context, id]
# Ensure policy engine is initialized
policy.init()
# Fetch the resource and verify if the user can access it
try:
parent_id = kwargs.get(self._parent_id_name)
resource = self._item(request,
id,
do_authz=True,
field_list=None,
parent_id=parent_id)
except oslo_policy.PolicyNotAuthorized:
msg = _('The resource could not be found.')
raise webob.exc.HTTPNotFound(msg)
body = copy.deepcopy(kwargs.pop('body', None))
# Explicit comparison with None to distinguish from {}
if body is not None:
arg_list.append(body)
# It is ok to raise a 403 because accessibility to the
# object was checked earlier in this method
policy.enforce(request.context,
name,
resource,
pluralized=self._collection)
ret_value = getattr(self._plugin, name)(*arg_list, **kwargs)
# It is simply impossible to predict whether one of this
# actions alters resource usage. For instance a tenant port
# is created when a router interface is added. Therefore it is
# important to mark as dirty resources whose counters have
# been altered by this operation
resource_registry.set_resources_dirty(request.context)
return ret_value
开发者ID:bupthzd,项目名称:neutron,代码行数:33,代码来源:base.py
示例3: update
def update(self, request, id, body=None, **kwargs):
"""Updates the specified entity's attributes."""
parent_id = kwargs.get(self._parent_id_name)
try:
payload = body.copy()
except AttributeError:
msg = _("Invalid format: %s") % request.body
raise exceptions.BadRequest(resource='body', msg=msg)
payload['id'] = id
self._notifier.info(request.context,
self._resource + '.update.start',
payload)
body = Controller.prepare_request_body(request.context, body, False,
self._resource, self._attr_info,
allow_bulk=self._allow_bulk)
action = self._plugin_handlers[self.UPDATE]
# Load object to check authz
# but pass only attributes in the original body and required
# by the policy engine to the policy 'brain'
field_list = [name for (name, value) in self._attr_info.iteritems()
if (value.get('required_by_policy') or
value.get('primary_key') or
'default' not in value)]
# Ensure policy engine is initialized
policy.init()
orig_obj = self._item(request, id, field_list=field_list,
parent_id=parent_id)
orig_object_copy = copy.copy(orig_obj)
orig_obj.update(body[self._resource])
# Make a list of attributes to be updated to inform the policy engine
# which attributes are set explicitly so that it can distinguish them
# from the ones that are set to their default values.
orig_obj[const.ATTRIBUTES_TO_UPDATE] = body[self._resource].keys()
try:
policy.enforce(request.context,
action,
orig_obj)
except common_policy.PolicyNotAuthorized:
with excutils.save_and_reraise_exception() as ctxt:
# If a tenant is modifying it's own object, it's safe to return
# a 403. Otherwise, pretend that it doesn't exist to avoid
# giving away information.
if request.context.tenant_id != orig_obj['tenant_id']:
ctxt.reraise = False
msg = _('The resource could not be found.')
raise webob.exc.HTTPNotFound(msg)
obj_updater = getattr(self._plugin, action)
kwargs = {self._resource: body}
if parent_id:
kwargs[self._parent_id_name] = parent_id
obj = obj_updater(request.context, id, **kwargs)
result = {self._resource: self._view(request.context, obj)}
notifier_method = self._resource + '.update.end'
self._notifier.info(request.context, notifier_method, result)
self._send_dhcp_notification(request.context,
result,
notifier_method)
self._send_nova_notification(action, orig_object_copy, result)
return result
开发者ID:insequent,项目名称:neutron,代码行数:60,代码来源:base.py
示例4: delete
def delete(self, request, id, **kwargs):
plugin = manager.NeutronManager.get_plugin()
policy.enforce(request.context,
"delete_%s" % DHCP_NET,
{})
return plugin.remove_network_from_dhcp_agent(
request.context, kwargs['agent_id'], id)
开发者ID:aaronknister,项目名称:neutron,代码行数:7,代码来源:dhcpagentscheduler.py
示例5: delete
def delete(self, request, id, **kwargs):
"""Deletes the specified entity."""
self._notifier.info(request.context,
self._resource + '.delete.start',
{self._resource + '_id': id}) #通知
action = self._plugin_handlers[self.DELETE] #获取具体资源操作行为 eg delete_port
# Check authz
policy.init()
parent_id = kwargs.get(self._parent_id_name)
obj = self._item(request, id, parent_id=parent_id)
try:
policy.enforce(request.context,
action,
obj) #检查操作权限
except exceptions.PolicyNotAuthorized:
# To avoid giving away information, pretend that it
# doesn't exist
msg = _('The resource could not be found.')
raise webob.exc.HTTPNotFound(msg)
obj_deleter = getattr(self._plugin, action) #获取具体操作方法 eg:M2lplugin类中delete_port
obj_deleter(request.context, id, **kwargs) #根据参数,执行具体操作方法
notifier_method = self._resource + '.delete.end'
self._notifier.info(request.context,
notifier_method,
{self._resource + '_id': id}) #消息格式??
result = {self._resource: self._view(request.context, obj)}
self._send_nova_notification(action, {}, result) #通知nova消息,消息内容什么样的?
self._send_dhcp_notification(request.context, #通知dhcp消息,消息内容什么样的?
result,
notifier_method)
开发者ID:xiongmeng1108,项目名称:gcloud7_neutron-2014.2.2,代码行数:32,代码来源:base.py
示例6: create
def create(self, request, body, **kwargs):
plugin = manager.NeutronManager.get_plugin()
policy.enforce(request.context,
"create_%s" % DHCP_NET,
{})
return plugin.add_network_to_dhcp_agent(
request.context, kwargs['agent_id'], body['network_id'])
开发者ID:aaronknister,项目名称:neutron,代码行数:7,代码来源:dhcpagentscheduler.py
示例7: delete
def delete(self, request, id, **kwargs):
"""Deletes the specified entity."""
self._notifier.info(request.context,
self._resource + '.delete.start',
{self._resource + '_id': id})
action = self._plugin_handlers[self.DELETE]
# Check authz
policy.init()
parent_id = kwargs.get(self._parent_id_name)
obj = self._item(request, id, parent_id=parent_id)
try:
policy.enforce(request.context,
action,
obj,
pluralized=self._collection)
except oslo_policy.PolicyNotAuthorized:
# To avoid giving away information, pretend that it
# doesn't exist
msg = _('The resource could not be found.')
raise webob.exc.HTTPNotFound(msg)
obj_deleter = getattr(self._plugin, action)
obj_deleter(request.context, id, **kwargs)
notifier_method = self._resource + '.delete.end'
self._notifier.info(request.context,
notifier_method,
{self._resource + '_id': id})
result = {self._resource: self._view(request.context, obj)}
self._send_nova_notification(action, {}, result)
self._send_dhcp_notification(request.context,
result,
notifier_method)
开发者ID:bgxavier,项目名称:neutron,代码行数:33,代码来源:base.py
示例8: delete
def delete(self, request, id, **kwargs):
plugin = self.get_plugin()
policy.enforce(request.context, "delete_%s" % L3_ROUTER, {})
agent_id = kwargs["agent_id"]
result = plugin.remove_router_from_l3_agent(request.context, agent_id, id)
notify(request.context, "l3_agent.router.remove", id, agent_id)
return result
开发者ID:asadoughi,项目名称:neutron,代码行数:7,代码来源:l3agentscheduler.py
示例9: index
def index(self, request, **kwargs):
plugin = self.get_plugin()
policy.enforce(request.context,
"get_%s" % FIREWALLS,
{})
return plugin.list_firewalls_on_l3_agent(
request.context, kwargs['agent_id'])
开发者ID:rchunduru,项目名称:neutron,代码行数:7,代码来源:l3agentscheduler.py
示例10: _delete
def _delete(self, request, id, **kwargs):
action = self._plugin_handlers[self.DELETE]
# Check authz
policy.init()
parent_id = kwargs.get(self._parent_id_name)
obj = self._item(request, id, parent_id=parent_id)
try:
policy.enforce(request.context, action, obj, pluralized=self._collection)
except oslo_policy.PolicyNotAuthorized:
# To avoid giving away information, pretend that it
# doesn't exist
msg = _("The resource could not be found.")
raise webob.exc.HTTPNotFound(msg)
obj_deleter = getattr(self._plugin, action)
obj_deleter(request.context, id, **kwargs)
# A delete operation usually alters resource usage, so mark affected
# usage trackers as dirty
resource_registry.set_resources_dirty(request.context)
notifier_method = self._resource + ".delete.end"
result = {self._resource: self._view(request.context, obj)}
notifier_payload = {self._resource + "_id": id}
notifier_payload.update(result)
self._notifier.info(request.context, notifier_method, notifier_payload)
registry.notify(
self._resource,
events.BEFORE_RESPONSE,
self,
context=request.context,
data=result,
method_name=notifier_method,
action=action,
original={},
)
开发者ID:electrocucaracha,项目名称:neutron,代码行数:35,代码来源:base.py
示例11: index
def index(self, request, **kwargs):
lbaas_plugin = manager.NeutronManager.get_service_plugins().get(plugin_const.LOADBALANCER)
if not lbaas_plugin:
return {"pools": []}
policy.enforce(request.context, "get_%s" % LOADBALANCER_POOLS, {}, plugin=lbaas_plugin)
return lbaas_plugin.list_pools_on_lbaas_agent(request.context, kwargs["agent_id"])
开发者ID:bdrich,项目名称:neutron-lbaas,代码行数:7,代码来源:lbaas_agentscheduler.py
示例12: _delete
def _delete(self, request, id, **kwargs):
action = self._plugin_handlers[self.DELETE]
# Check authz
policy.init()
parent_id = kwargs.get(self._parent_id_name)
obj = self._item(request, id, parent_id=parent_id)
try:
policy.enforce(request.context,
action,
obj,
pluralized=self._collection)
except oslo_policy.PolicyNotAuthorized:
# To avoid giving away information, pretend that it
# doesn't exist
msg = _('The resource could not be found.')
raise webob.exc.HTTPNotFound(msg)
obj_deleter = getattr(self._plugin, action)
obj_deleter(request.context, id, **kwargs)
# A delete operation usually alters resource usage, so mark affected
# usage trackers as dirty
resource_registry.set_resources_dirty(request.context)
notifier_method = self._resource + '.delete.end'
self._notifier.info(request.context,
notifier_method,
{self._resource + '_id': id})
result = {self._resource: self._view(request.context, obj)}
self._send_nova_notification(action, {}, result)
self._send_dhcp_notification(request.context,
result,
notifier_method)
开发者ID:Jackwwg,项目名称:neutron,代码行数:32,代码来源:base.py
示例13: index
def index(self, request, **kwargs):
plugin = self.get_plugin()
policy.enforce(request.context,
"get_%s" % L3_ROUTERS,
{})
return plugin.list_routers_on_l3_agent(
request.context, kwargs['agent_id'])
开发者ID:AradhanaSingh,项目名称:neutron,代码行数:7,代码来源:l3agentscheduler.py
示例14: delete
def delete(self, request, id, **kwargs):
"""Deletes the specified entity."""
notifier_api.notify(request.context,
self._publisher_id,
self._resource + '.delete.start',
notifier_api.CONF.default_notification_level,
{self._resource + '_id': id})
action = self._plugin_handlers[self.DELETE]
# Check authz
parent_id = kwargs.get(self._parent_id_name)
obj = self._item(request, id, parent_id=parent_id)
try:
policy.enforce(request.context,
action,
obj)
except exceptions.PolicyNotAuthorized:
# To avoid giving away information, pretend that it
# doesn't exist
msg = _('The resource could not be found.')
raise webob.exc.HTTPNotFound(msg)
obj_deleter = getattr(self._plugin, action)
obj_deleter(request.context, id, **kwargs)
notifier_method = self._resource + '.delete.end'
notifier_api.notify(request.context,
self._publisher_id,
notifier_method,
notifier_api.CONF.default_notification_level,
{self._resource + '_id': id})
result = {self._resource: self._view(request.context, obj)}
self._send_dhcp_notification(request.context,
result,
notifier_method)
开发者ID:ChengZuo,项目名称:neutron,代码行数:34,代码来源:base.py
示例15: index
def index(self, request, **kwargs):
plugin = directory.get_plugin()
policy.enforce(request.context,
"get_%s" % DHCP_AGENTS,
{})
return plugin.list_dhcp_agents_hosting_network(
request.context, kwargs['network_id'])
开发者ID:AradhanaSingh,项目名称:neutron,代码行数:7,代码来源:dhcpagentscheduler.py
示例16: test_templatized_enforcement
def test_templatized_enforcement(self):
target_mine = {'tenant_id': 'fake'}
target_not_mine = {'tenant_id': 'another'}
action = "example:my_file"
policy.enforce(self.context, action, target_mine)
self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce,
self.context, action, target_not_mine)
开发者ID:ChengZuo,项目名称:neutron,代码行数:7,代码来源:test_policy.py
示例17: _update
def _update(self, request, id, body, **kwargs):
body = Controller.prepare_request_body(
request.context, copy.deepcopy(body), False, self._resource, self._attr_info, allow_bulk=self._allow_bulk
)
action = self._plugin_handlers[self.UPDATE]
# Load object to check authz
# but pass only attributes in the original body and required
# by the policy engine to the policy 'brain'
field_list = [
name
for (name, value) in six.iteritems(self._attr_info)
if (value.get("required_by_policy") or value.get("primary_key") or "default" not in value)
]
# Ensure policy engine is initialized
policy.init()
parent_id = kwargs.get(self._parent_id_name)
orig_obj = self._item(request, id, field_list=field_list, parent_id=parent_id)
orig_object_copy = copy.copy(orig_obj)
orig_obj.update(body[self._resource])
# Make a list of attributes to be updated to inform the policy engine
# which attributes are set explicitly so that it can distinguish them
# from the ones that are set to their default values.
orig_obj[n_const.ATTRIBUTES_TO_UPDATE] = body[self._resource].keys()
try:
policy.enforce(request.context, action, orig_obj, pluralized=self._collection)
except oslo_policy.PolicyNotAuthorized:
with excutils.save_and_reraise_exception() as ctxt:
# If a tenant is modifying it's own object, it's safe to return
# a 403. Otherwise, pretend that it doesn't exist to avoid
# giving away information.
if request.context.tenant_id != orig_obj["tenant_id"]:
ctxt.reraise = False
msg = _("The resource could not be found.")
raise webob.exc.HTTPNotFound(msg)
obj_updater = getattr(self._plugin, action)
kwargs = {self._resource: body}
if parent_id:
kwargs[self._parent_id_name] = parent_id
obj = obj_updater(request.context, id, **kwargs)
# Usually an update operation does not alter resource usage, but as
# there might be side effects it might be worth checking for changes
# in resource usage here as well (e.g: a tenant port is created when a
# router interface is added)
resource_registry.set_resources_dirty(request.context)
result = {self._resource: self._view(request.context, obj)}
notifier_method = self._resource + ".update.end"
self._notifier.info(request.context, notifier_method, result)
registry.notify(
self._resource,
events.BEFORE_RESPONSE,
self,
context=request.context,
data=result,
method_name=notifier_method,
action=action,
original=orig_object_copy,
)
return result
开发者ID:electrocucaracha,项目名称:neutron,代码行数:60,代码来源:base.py
示例18: before
def before(self, state):
if state.request.method not in self.ACTION_MAP:
pecan.abort(405)
neutron_context = state.request.context.get('neutron_context')
resource = state.request.context.get('resource')
is_update = (state.request.method == 'PUT')
items = state.request.resources
policy.init()
action = '%s_%s' % (self.ACTION_MAP[state.request.method], resource)
for item in items:
if is_update:
obj = copy.copy(state.request.original_object)
obj.update(item)
obj[const.ATTRIBUTES_TO_UPDATE] = item.keys()
item = obj
try:
policy.enforce(
neutron_context, action, item,
pluralized=attribute_population._plural(resource))
except oslo_policy.PolicyNotAuthorized:
with excutils.save_and_reraise_exception() as ctxt:
# If a tenant is modifying it's own object, it's safe to
# return a 403. Otherwise, pretend that it doesn't exist
# to avoid giving away information.
if (is_update and
neutron_context.tenant_id != obj['tenant_id']):
ctxt.reraise = False
msg = _('The resource could not be found.')
raise webob.exc.HTTPNotFound(msg)
开发者ID:apporc,项目名称:neutron,代码行数:29,代码来源:policy_enforcement.py
示例19: update
def update(self, request, id, body=None, **kwargs):
"""Updates the specified entity's attributes."""
parent_id = kwargs.get(self._parent_id_name)
try:
payload = body.copy()
except AttributeError:
msg = _("Invalid format: %s") % request.body
raise exceptions.BadRequest(resource='body', msg=msg)
payload['id'] = id
notifier_api.notify(request.context,
self._publisher_id,
self._resource + '.update.start',
notifier_api.CONF.default_notification_level,
payload)
body = Controller.prepare_request_body(request.context, body, False,
self._resource, self._attr_info,
allow_bulk=self._allow_bulk)
action = self._plugin_handlers[self.UPDATE]
# Load object to check authz
# but pass only attributes in the original body and required
# by the policy engine to the policy 'brain'
field_list = [name for (name, value) in self._attr_info.iteritems()
if (value.get('required_by_policy') or
value.get('primary_key') or
'default' not in value)]
# Ensure policy engine is initialized
policy.init()
orig_obj = self._item(request, id, field_list=field_list,
parent_id=parent_id)
orig_object_copy = copy.copy(orig_obj)
orig_obj.update(body[self._resource])
try:
policy.enforce(request.context,
action,
orig_obj)
except exceptions.PolicyNotAuthorized:
# To avoid giving away information, pretend that it
# doesn't exist
msg = _('The resource could not be found.')
raise webob.exc.HTTPNotFound(msg)
obj_updater = getattr(self._plugin, action)
kwargs = {self._resource: body}
if parent_id:
kwargs[self._parent_id_name] = parent_id
obj = obj_updater(request.context, id, **kwargs)
result = {self._resource: self._view(request.context, obj)}
notifier_method = self._resource + '.update.end'
notifier_api.notify(request.context,
self._publisher_id,
notifier_method,
notifier_api.CONF.default_notification_level,
result)
self._send_dhcp_notification(request.context,
result,
notifier_method)
self._nova_notifier.send_network_change(
action, orig_object_copy, result)
return result
开发者ID:Zemeio,项目名称:neutron,代码行数:59,代码来源:base.py
示例20: index
def index(self, request, **kwargs):
plugin = manager.NeutronManager.get_service_plugins().get(
service_constants.L3_ROUTER_NAT)
policy.enforce(request.context,
"get_%s" % L3_AGENTS,
{})
return plugin.list_l3_agents_hosting_router(
request.context, kwargs['router_id'])
开发者ID:aignatov,项目名称:neutron,代码行数:8,代码来源:l3agentscheduler.py
注:本文中的neutron.policy.enforce函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论