• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python context.ctx函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中mistral.context.ctx函数的典型用法代码示例。如果您正苦于以下问题:Python ctx函数的具体用法?Python ctx怎么用?Python ctx使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了ctx函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_workflow_definition_public

    def test_workflow_definition_public(self):
        # Create a workflow(scope=public) as under one project
        # then make sure it's visible for other projects.
        created0 = db_api.create_workflow_definition(WF_DEFINITIONS[0])

        fetched = db_api.get_workflow_definitions()

        self.assertEqual(1, len(fetched))
        self.assertEqual(created0, fetched[0])

        # Assert that the project_id stored is actually the context's
        # project_id not the one given.
        self.assertEqual(created0.project_id, auth_context.ctx().project_id)
        self.assertNotEqual(
            WF_DEFINITIONS[0]['project_id'],
            auth_context.ctx().project_id
        )

        # Create a new user.
        auth_context.set_ctx(test_base.get_context(default=False))

        fetched = db_api.get_workflow_definitions()

        self.assertEqual(1, len(fetched))
        self.assertEqual(created0, fetched[0])
        self.assertEqual('public', created0.scope)
开发者ID:adarshkoyya,项目名称:mistral,代码行数:26,代码来源:test_sqlalchemy_db_api.py


示例2: post

    def post(self):
        """Create a new action.

        NOTE: This text is allowed to have definitions
            of multiple actions. In this case they all will be created.
        """
        acl.enforce('actions:create', context.ctx())

        definition = pecan.request.text
        scope = pecan.request.GET.get('scope', 'private')
        pecan.response.status = 201

        resources.Action.validate_scope(scope)
        if scope == 'public':
            acl.enforce('actions:publicize', context.ctx())

        LOG.debug("Create action(s) [definition=%s]", definition)

        @rest_utils.rest_retry_on_db_error
        def _create_action_definitions():
            with db_api.transaction():
                return actions.create_actions(definition, scope=scope)

        db_acts = _create_action_definitions()

        action_list = [
            resources.Action.from_db_model(db_act) for db_act in db_acts
        ]

        return resources.Actions(actions=action_list).to_json()
开发者ID:openstack,项目名称:mistral,代码行数:30,代码来源:action.py


示例3: post

    def post(self, event_trigger):
        """Creates a new event trigger."""
        acl.enforce('event_triggers:create', auth_ctx.ctx())

        values = event_trigger.to_dict()
        input_keys = [k for k in values if values[k]]

        if CREATE_MANDATORY - set(input_keys):
            raise exc.EventTriggerException(
                "Params %s must be provided for creating event trigger." %
                CREATE_MANDATORY
            )

        if values.get('scope') == 'public':
            acl.enforce('event_triggers:create:public', auth_ctx.ctx())

        LOG.debug('Create event trigger: %s', values)

        db_model = rest_utils.rest_retry_on_db_error(
            triggers.create_event_trigger
        )(
            name=values.get('name', ''),
            exchange=values.get('exchange'),
            topic=values.get('topic'),
            event=values.get('event'),
            workflow_id=values.get('workflow_id'),
            scope=values.get('scope'),
            workflow_input=values.get('workflow_input'),
            workflow_params=values.get('workflow_params'),
        )

        return resources.EventTrigger.from_db_model(db_model)
开发者ID:openstack,项目名称:mistral,代码行数:32,代码来源:event_trigger.py


示例4: post

    def post(self, namespace=''):
        """Create a new workflow.

        :param namespace: Optional. The namespace to create the workflow
            in. Workflows with the same name can be added to a given
            project if they are in two different namespaces.

        The text is allowed to have definitions of multiple workflows.
        In such case, they all will be created.
        """
        acl.enforce('workflows:create', context.ctx())

        definition = pecan.request.text
        scope = pecan.request.GET.get('scope', 'private')
        pecan.response.status = 201

        resources.Workflow.validate_scope(scope)
        if scope == 'public':
            acl.enforce('workflows:publicize', context.ctx())

        LOG.debug("Create workflow(s) [definition=%s]", definition)

        db_wfs = rest_utils.rest_retry_on_db_error(workflows.create_workflows)(
            definition,
            scope=scope,
            namespace=namespace
        )

        workflow_list = [
            resources.Workflow.from_db_model(db_wf) for db_wf in db_wfs
        ]

        return resources.Workflows(workflows=workflow_list).to_json()
开发者ID:openstack,项目名称:mistral,代码行数:33,代码来源:workflow.py


示例5: get_all

    def get_all(self, marker=None, limit=None, sort_keys='created_at',
                sort_dirs='asc', fields='', all_projects=False, **filters):
        """Return all event triggers."""
        acl.enforce('event_triggers:list', auth_ctx.ctx())

        if all_projects:
            acl.enforce('event_triggers:list:all_projects', auth_ctx.ctx())

        LOG.debug(
            "Fetch event triggers. marker=%s, limit=%s, sort_keys=%s, "
            "sort_dirs=%s, fields=%s, all_projects=%s, filters=%s", marker,
            limit, sort_keys, sort_dirs, fields, all_projects, filters
        )

        return rest_utils.get_all(
            resources.EventTriggers,
            resources.EventTrigger,
            db_api.get_event_triggers,
            db_api.get_event_trigger,
            resource_function=None,
            marker=marker,
            limit=limit,
            sort_keys=sort_keys,
            sort_dirs=sort_dirs,
            fields=fields,
            all_projects=all_projects,
            **filters
        )
开发者ID:openstack,项目名称:mistral,代码行数:28,代码来源:event_trigger.py


示例6: test_workbook_public

    def test_workbook_public(self):
        # create a workbook(scope=public) as under one project
        # then make sure it's visible for other projects.
        created0 = db_api.workbook_create(WORKBOOKS[0])

        fetched = db_api.workbooks_get_all()

        self.assertEqual(1, len(fetched))
        self.assertDictEqual(created0, fetched[0])

        # assert that the project_id stored is actually the context's
        # project_id not the one given.
        self.assertEqual(created0['project_id'], auth_context.ctx().project_id)
        self.assertNotEqual(WORKBOOKS[0]['project_id'],
                            auth_context.ctx().project_id)

        # create a new user.
        ctx = auth_context.MistralContext(user_id='9-0-44-5',
                                          project_id='99-88-33',
                                          user_name='test-user',
                                          project_name='test-another',
                                          is_admin=False)
        auth_context.set_ctx(ctx)

        fetched = db_api.workbooks_get_all()
        self.assertEqual(1, len(fetched))
        self.assertDictEqual(created0, fetched[0])
        self.assertEqual('public', created0['scope'])
开发者ID:dshulyak,项目名称:mistral,代码行数:28,代码来源:test_sqlalchemy_db_api.py


示例7: test_workflow_definition_public

    def test_workflow_definition_public(self):
        # Create a workflow(scope=public) as under one project
        # then make sure it's visible for other projects.
        created0 = db_api.create_workflow_definition(WF_DEFINITIONS[0])

        fetched = db_api.get_workflow_definitions()

        self.assertEqual(1, len(fetched))
        self.assertEqual(created0, fetched[0])

        # Assert that the project_id stored is actually the context's
        # project_id not the one given.
        self.assertEqual(created0.project_id, auth_context.ctx().project_id)
        self.assertNotEqual(
            WF_DEFINITIONS[0]['project_id'],
            auth_context.ctx().project_id
        )

        # Create a new user.
        ctx = auth_context.MistralContext(
            user_id='9-0-44-5',
            project_id='99-88-33',
            user_name='test-user',
            project_name='test-another',
            is_admin=False
        )

        auth_context.set_ctx(ctx)

        fetched = db_api.get_workflow_definitions()

        self.assertEqual(1, len(fetched))
        self.assertEqual(created0, fetched[0])
        self.assertEqual('public', created0.scope)
开发者ID:ainkov,项目名称:mistral,代码行数:34,代码来源:test_sqlalchemy_db_api.py


示例8: delete

    def delete(self, identifier):
        """Delete a workflow."""
        acl.enforce('workflows:delete', context.ctx())
        LOG.info("Delete workflow [identifier=%s]" % identifier)

        with db_api.transaction():
            db_api.delete_workflow_definition(identifier)
开发者ID:PrinceKatiyar,项目名称:mistral,代码行数:7,代码来源:workflow.py


示例9: put

    def put(self, identifier=None):
        """Update one or more workflows.

        :param identifier: Optional. If provided, it's UUID of a workflow.
            Only one workflow can be updated with identifier param.

        The text is allowed to have definitions of multiple workflows. In this
        case they all will be updated.
        """
        acl.enforce('workflows:update', context.ctx())
        definition = pecan.request.text
        scope = pecan.request.GET.get('scope', 'private')

        if scope not in SCOPE_TYPES.values:
            raise exc.InvalidModelException(
                "Scope must be one of the following: %s; actual: "
                "%s" % (SCOPE_TYPES.values, scope)
            )

        LOG.info("Update workflow(s) [definition=%s]" % definition)

        db_wfs = workflows.update_workflows(
            definition,
            scope=scope,
            identifier=identifier
        )

        models_dicts = [db_wf.to_dict() for db_wf in db_wfs]
        workflow_list = [Workflow.from_dict(wf) for wf in models_dicts]

        return (workflow_list[0].to_json() if identifier
                else Workflows(workflows=workflow_list).to_json())
开发者ID:PrinceKatiyar,项目名称:mistral,代码行数:32,代码来源:workflow.py


示例10: delete

    def delete(self, name):
        """Delete the named workbook."""
        acl.enforce('workbooks:delete', context.ctx())

        LOG.info("Delete workbook [name=%s]" % name)

        db_api.delete_workbook(name)
开发者ID:luckysamadhiya93,项目名称:mistral,代码行数:7,代码来源:workbook.py


示例11: post

    def post(self, wf_ex):
        """Create a new Execution.

        :param wf_ex: Execution object with input content.
        """
        acl.enforce('executions:create', context.ctx())
        LOG.info('Create execution [execution=%s]' % wf_ex)

        engine = rpc.get_engine_client()
        exec_dict = wf_ex.to_dict()

        if not (exec_dict.get('workflow_id')
                or exec_dict.get('workflow_name')):
            raise exc.WorkflowException(
                "Workflow ID or workflow name must be provided. Workflow ID is"
                " recommended."
            )

        result = engine.start_workflow(
            exec_dict.get('workflow_id', exec_dict.get('workflow_name')),
            exec_dict.get('input'),
            exec_dict.get('description', ''),
            **exec_dict.get('params') or {}
        )

        return Execution.from_dict(result)
开发者ID:PrinceKatiyar,项目名称:mistral,代码行数:26,代码来源:execution.py


示例12: on_action_complete

    def on_action_complete(self, action_ex_id, result, wf_action=False,
                           async_=False):
        """Conveys action result to Mistral Engine.

        This method should be used by clients of Mistral Engine to update
        the state of an action execution once action has executed. One of
        the clients of this method is Mistral REST API server that receives
        action result from the outside action handlers.

        Note: calling this method serves an event notifying Mistral that
        it possibly needs to move the workflow on, i.e. run other workflow
        tasks for which all dependencies are satisfied.

        :param action_ex_id: Action execution id.
        :param result: Action execution result.
        :param wf_action: If True it means that the given id points to
            a workflow execution rather than action execution. It happens
            when a nested workflow execution sends its result to a parent
            workflow.
        :param async_: If True, run action in asynchronous mode (w/o waiting
            for completion).
        :return: Action(or workflow if wf_action=True) execution object.
        """

        call = self._client.async_call if async_ else self._client.sync_call

        return call(
            auth_ctx.ctx(),
            'on_action_complete',
            action_ex_id=action_ex_id,
            result=result,
            wf_action=wf_action
        )
开发者ID:openstack,项目名称:mistral,代码行数:33,代码来源:clients.py


示例13: _get_client

    def _get_client(self):
        ctx = context.ctx()

        LOG.debug("Nova action security context: %s" % ctx)

        keystone_endpoint = keystone_utils.get_keystone_endpoint_v2()
        nova_endpoint = keystone_utils.get_endpoint_for_project('nova')

        client = self._client_class(
            username=None,
            api_key=None,
            endpoint_type='publicURL',
            service_type='compute',
            auth_token=ctx.auth_token,
            tenant_id=ctx.project_id,
            region_name=keystone_endpoint.region,
            auth_url=keystone_endpoint.url
        )

        client.client.management_url = keystone_utils.format_url(
            nova_endpoint.url,
            {'tenant_id': ctx.project_id}
        )

        return client
开发者ID:kantorv,项目名称:mistral,代码行数:25,代码来源:actions.py


示例14: update_event_trigger

 def update_event_trigger(self, trigger):
     return self._client.async_call(
         auth_ctx.ctx(),
         'update_event_trigger',
         trigger=trigger,
         fanout=True,
     )
开发者ID:openstack,项目名称:mistral,代码行数:7,代码来源:clients.py


示例15: post

    def post(self):
        """Create a new action.

        NOTE: This text is allowed to have definitions
            of multiple actions. In this case they all will be created.
        """
        acl.enforce('actions:create', context.ctx())
        definition = pecan.request.text
        scope = pecan.request.GET.get('scope', 'private')
        pecan.response.status = 201

        if scope not in SCOPE_TYPES.values:
            raise exc.InvalidModelException(
                "Scope must be one of the following: %s; actual: "
                "%s" % (SCOPE_TYPES.values, scope)
            )

        LOG.info("Create action(s) [definition=%s]" % definition)

        db_acts = actions.create_actions(definition, scope=scope)
        models_dicts = [db_act.to_dict() for db_act in db_acts]

        action_list = [Action.from_dict(act) for act in models_dicts]

        return Actions(actions=action_list).to_json()
开发者ID:xavierhardy,项目名称:mistral,代码行数:25,代码来源:action.py


示例16: delete

    def delete(self, id):
        """Delete the specified action_execution.

        :param id: UUID of action execution to delete
        """
        acl.enforce('action_executions:delete', context.ctx())

        LOG.debug("Delete action_execution [id=%s]", id)

        if not cfg.CONF.api.allow_action_execution_deletion:
            raise exc.NotAllowedException("Action execution deletion is not "
                                          "allowed.")

        with db_api.transaction():
            action_ex = db_api.get_action_execution(id)

            if action_ex.task_execution_id:
                raise exc.NotAllowedException(
                    "Only ad-hoc action execution can be deleted."
                )

            if not states.is_completed(action_ex.state):
                raise exc.NotAllowedException(
                    "Only completed action execution can be deleted."
                )

            return db_api.delete_action_execution(id)
开发者ID:openstack,项目名称:mistral,代码行数:27,代码来源:action_execution.py


示例17: post

    def post(self, action_ex):
        """Create new action_execution.

        :param action_ex: Action to execute
        """
        acl.enforce('action_executions:create', context.ctx())

        LOG.debug(
            "Create action_execution [action_execution=%s]",
            action_ex
        )

        name = action_ex.name
        description = action_ex.description or None
        action_input = action_ex.input or {}
        params = action_ex.params or {}

        if not name:
            raise exc.InputException(
                "Please provide at least action name to run action."
            )

        values = rpc.get_engine_client().start_action(
            name,
            action_input,
            description=description,
            **params
        )

        return resources.ActionExecution.from_dict(values)
开发者ID:openstack,项目名称:mistral,代码行数:30,代码来源:action_execution.py


示例18: schedule_call

def schedule_call(factory_method_path, target_method_name,
                  run_after, serializers=None, **method_args):
    """Schedules call and lately invokes target_method.

    Add this call specification to DB, and then after run_after
    seconds service CallScheduler invokes the target_method.

    :param factory_method_path: Full python-specific path to
    factory method for target object construction.
    :param target_method_name: Name of target object method which
    will be invoked.
    :param run_after: Value in seconds.
    param serializers: map of argument names and their serializer class paths.
     Use when an argument is an object of specific type, and needs to be
      serialized. Example:
      { "result": "mistral.utils.serializer.ResultSerializer"}
      Serializer for the object type must implement serializer interface
       in mistral/utils/serializer.py
    :param method_args: Target method keyword arguments.
    """
    ctx_serializer = context.RpcContextSerializer(
        context.JsonPayloadSerializer()
    )

    ctx = (
        ctx_serializer.serialize_context(context.ctx())
        if context.has_ctx() else {}
    )

    execution_time = (datetime.datetime.now() +
                      datetime.timedelta(seconds=run_after))

    if serializers:
        for arg_name, serializer_path in serializers.items():
            if arg_name not in method_args:
                raise exc.MistralException(
                    "Serializable method argument %s"
                    " not found in method_args=%s"
                    % (arg_name, method_args))
            try:
                serializer = importutils.import_class(serializer_path)()
            except ImportError as e:
                raise ImportError("Cannot import class %s: %s"
                                  % (serializer_path, e))

            method_args[arg_name] = serializer.serialize(
                method_args[arg_name]
            )

    values = {
        'factory_method_path': factory_method_path,
        'target_method_name': target_method_name,
        'execution_time': execution_time,
        'auth_context': ctx,
        'serializers': serializers,
        'method_arguments': method_args,
        'processing': False
    }

    db_api.create_delayed_call(values)
开发者ID:prabhuinbarajan,项目名称:mistral,代码行数:60,代码来源:scheduler.py


示例19: delete

    def delete(self, name):
        """Delete the named environment."""
        acl.enforce('environments:delete', context.ctx())

        LOG.info("Delete environment [name=%s]" % name)

        db_api.delete_environment(name)
开发者ID:luckysamadhiya93,项目名称:mistral,代码行数:7,代码来源:environment.py


示例20: post

    def post(self):
        """Create a new workflow.

        NOTE: The text is allowed to have definitions
            of multiple workflows. In this case they all will be created.
        """
        acl.enforce('workflows:create', context.ctx())

        definition = pecan.request.text
        scope = pecan.request.GET.get('scope', 'private')
        pecan.response.status = 201

        if scope not in resources.SCOPE_TYPES.values:
            raise exc.InvalidModelException(
                "Scope must be one of the following: %s; actual: "
                "%s" % (resources.SCOPE_TYPES.values, scope)
            )

        LOG.info("Create workflow(s) [definition=%s]" % definition)

        db_wfs = workflows.create_workflows(definition, scope=scope)
        models_dicts = [db_wf.to_dict() for db_wf in db_wfs]

        workflow_list = [
            resources.Workflow.from_dict(wf) for wf in models_dicts
        ]

        return resources.Workflows(workflows=workflow_list).to_json()
开发者ID:luckysamadhiya93,项目名称:mistral,代码行数:28,代码来源:workflow.py



注:本文中的mistral.context.ctx函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python context.set_ctx函数代码示例发布时间:2022-05-27
下一篇:
Python access_control.enforce函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap