• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python utils.merge_dicts函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中mistral.utils.merge_dicts函数的典型用法代码示例。如果您正苦于以下问题:Python merge_dicts函数的具体用法?Python merge_dicts怎么用?Python merge_dicts使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了merge_dicts函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_input

    def test_input(self):
        tests = [
            ({'input': ''}, True),
            ({'input': []}, True),
            ({'input': ['']}, True),
            ({'input': None}, True),
            ({'input': ['k1', 'k2']}, False),
            ({'input': ['k1', 12345]}, True),
            ({'input': ['k1', {'k2': 2}]}, False),
            ({'input': [{'k1': 1}, {'k2': 2}]}, False),
            ({'input': [{'k1': None}]}, False),
            ({'input': [{'k1': 1}, {'k1': 1}]}, True),
            ({'input': [{'k1': 1, 'k2': 2}]}, True)
        ]

        actions = {
            'a1': {
                'base': 'foobar'
            }
        }

        for inputs, expect_error in tests:
            overlay = {'actions': copy.deepcopy(actions)}
            utils.merge_dicts(overlay['actions']['a1'], inputs)
            self._parse_dsl_spec(changes=overlay,
                                 expect_error=expect_error)
开发者ID:kantorv,项目名称:mistral,代码行数:26,代码来源:test_actions.py


示例2: _parse_dsl_spec

    def _parse_dsl_spec(self, dsl_file=None, add_tasks=False,
                        changes=None, expect_error=False):
        if dsl_file and add_tasks:
            raise Exception('The add_tasks option is not a valid '
                            'combination with the dsl_file option.')

        if dsl_file:
            dsl_yaml = base.get_resource(self._resource_path + '/' + dsl_file)

            if changes:
                dsl_dict = yaml.safe_load(dsl_yaml)
                utils.merge_dicts(dsl_dict, changes)
                dsl_yaml = yaml.safe_dump(dsl_dict, default_flow_style=False)
        else:
            dsl_dict = copy.deepcopy(self._dsl_blank)

            if add_tasks:
                dsl_dict['test']['tasks'] = copy.deepcopy(self._dsl_tasks)

            if changes:
                utils.merge_dicts(dsl_dict, changes)

            dsl_yaml = yaml.safe_dump(dsl_dict, default_flow_style=False)

        if not expect_error:
            return self._spec_parser(dsl_yaml)
        else:
            return self.assertRaises(
                exc.DSLParsingException,
                self._spec_parser,
                dsl_yaml
            )
开发者ID:openstack,项目名称:mistral,代码行数:32,代码来源:base.py


示例3: test_output

    def test_output(self):
        tests = [
            ({'output': None}, False),
            ({'output': False}, False),
            ({'output': 12345}, False),
            ({'output': 0.12345}, False),
            ({'output': 'foobar'}, False),
            ({'output': '<% $.x %>'}, False),
            ({'output': '<% * %>'}, True),
            ({'output': '{{ _.x }}'}, False),
            ({'output': '{{ * }}'}, True),
            ({'output': ['v1']}, False),
            ({'output': {'k1': 'v1'}}, False)
        ]

        actions = {
            'a1': {
                'base': 'foobar'
            }
        }

        for outputs, expect_error in tests:
            overlay = {'actions': copy.deepcopy(actions)}

            utils.merge_dicts(overlay['actions']['a1'], outputs)

            self._parse_dsl_spec(changes=overlay, expect_error=expect_error)
开发者ID:openstack,项目名称:mistral,代码行数:27,代码来源:test_actions.py


示例4: test_keep_result

    def test_keep_result(self):
        tests = [
            ({'keep-result': ''}, True),
            ({'keep-result': []}, True),
            ({'keep-result': 'asd'}, True),
            ({'keep-result': None}, True),
            ({'keep-result': 12345}, True),
            ({'keep-result': True}, False),
            ({'keep-result': False}, False),
            ({'keep-result': "<% 'a' in $.val %>"}, False),
            ({'keep-result': '<% 1 + 2 %>'}, False),
            ({'keep-result': '<% * %>'}, True),
            ({'keep-result': "{{ 'a' in _.val }}"}, False),
            ({'keep-result': '{{ 1 + 2 }}'}, False),
            ({'keep-result': '{{ * }}'}, True)
        ]

        for keep_result, expect_error in tests:
            overlay = {'test': {'tasks': {}}}

            utils.merge_dicts(overlay['test']['tasks'], {'email': keep_result})

            self._parse_dsl_spec(
                add_tasks=True,
                changes=overlay,
                expect_error=expect_error
            )
开发者ID:openstack,项目名称:mistral,代码行数:27,代码来源:test_tasks.py


示例5: test_direct_workflow_invalid_join

    def test_direct_workflow_invalid_join(self):
        tests = [
            ({'task3': {'join': 2}}, False),
            ({'task3': {'join': 5}}, True),
            ({'task3': {'join': 1}}, False),
            ({'task3': {'join': 'one'}}, False),
            ({'task3': {'join': 'all'}}, False),
            ({'task4': {'join': 'all'}}, True),
            ({'task4': {'join': 1}}, True),
            ({'task4': {'join': 'one'}}, True)
        ]

        for test in tests:
            overlay = {
                'test': {
                    'type': 'direct',
                    'tasks': {
                        'task1': {'on-complete': 'task3'},
                        'task2': {'on-complete': 'task3'}
                    }
                }
            }

            utils.merge_dicts(overlay['test']['tasks'], test[0])

            self._parse_dsl_spec(
                add_tasks=False,
                changes=overlay,
                expect_error=test[1]
            )
开发者ID:Tesora,项目名称:tesora-mistral,代码行数:30,代码来源:test_workflows.py


示例6: validate_input

def validate_input(definition, input, spec=None):
    input_param_names = copy.deepcopy(list((input or {}).keys()))
    missing_param_names = []

    spec_input = (spec.get_input() if spec else
                  utils.get_dict_from_string(definition.input))

    for p_name, p_value in six.iteritems(spec_input):
        if p_value is utils.NotDefined and p_name not in input_param_names:
            missing_param_names.append(p_name)
        if p_name in input_param_names:
            input_param_names.remove(p_name)

    if missing_param_names or input_param_names:
        msg = 'Invalid input [name=%s, class=%s'
        msg_props = [definition.name, spec.__class__.__name__]

        if missing_param_names:
            msg += ', missing=%s'
            msg_props.append(missing_param_names)

        if input_param_names:
            msg += ', unexpected=%s'
            msg_props.append(input_param_names)

        msg += ']'

        raise exc.InputException(
            msg % tuple(msg_props)
        )
    else:
        utils.merge_dicts(input, spec_input, overwrite=False)
开发者ID:ISCAS-VDI,项目名称:mistral-base,代码行数:32,代码来源:utils.py


示例7: evaluate_task_outbound_context

def evaluate_task_outbound_context(task_ex, include_result=True):
    """Evaluates task outbound Data Flow context.

    This method assumes that complete task output (after publisher etc.)
    has already been evaluated.
    :param task_ex: DB task.
    :param include_result: boolean argument, if True - include the
        TaskResultProxy in outbound context under <task_name> key.
    :return: Outbound task Data Flow context.
    """

    if task_ex.state != states.SUCCESS:
        return task_ex.in_context

    in_context = (copy.deepcopy(dict(task_ex.in_context))
                  if task_ex.in_context is not None else {})

    out_ctx = utils.merge_dicts(in_context, task_ex.published)

    # Add task output under key 'taskName'.
    if include_result:
        task_ex_result = TaskResultProxy(task_ex.id)

        out_ctx = utils.merge_dicts(
            out_ctx,
            {task_ex.name: task_ex_result or None}
        )

    return ProxyAwareDict(out_ctx)
开发者ID:ainkov,项目名称:mistral,代码行数:29,代码来源:data_flow.py


示例8: evaluate_upstream_context

def evaluate_upstream_context(upstream_task_execs):
    published_vars = {}
    ctx = {}

    for t_ex in upstream_task_execs:
        # TODO(rakhmerov): These two merges look confusing. So it's a
        # temporary solution.There's still the bug
        # https://bugs.launchpad.net/mistral/+bug/1424461 that needs to be
        # fixed using context variable versioning.
        published_vars = utils.merge_dicts(
            published_vars,
            t_ex.published
        )

        utils.merge_dicts(
            ctx,
            evaluate_task_outbound_context(t_ex, include_result=False)
        )

    ctx = utils.merge_dicts(ctx, published_vars)

    # TODO(rakhmerov): IMO, this method shouldn't deal with these task ids or
    # anything else related to task proxies. Need to refactor.
    return utils.merge_dicts(
        ctx,
        _get_task_identifiers_dict(upstream_task_execs)
    )
开发者ID:ainkov,项目名称:mistral,代码行数:27,代码来源:data_flow.py


示例9: test_direct_transition

    def test_direct_transition(self):
        tests = [
            ({'on-success': ['email']}, False),
            ({'on-success': [{'email': '<% 1 %>'}]}, False),
            ({'on-success': [{'email': '<% 1 %>'}, 'echo']}, False),
            ({'on-success': [{'email': '<% $.v1 in $.v2 %>'}]}, False),
            ({'on-success': [{'email': '<% * %>'}]}, True),
            ({'on-success': [{'email': '{{ 1 }}'}]}, False),
            ({'on-success': [{'email': '{{ 1 }}'}, 'echo']}, False),
            ({'on-success': [{'email': '{{ _.v1 in _.v2 }}'}]}, False),
            ({'on-success': [{'email': '{{ * }}'}]}, True),
            ({'on-success': 'email'}, False),
            ({'on-success': None}, True),
            ({'on-success': ['']}, True),
            ({'on-success': []}, True),
            ({'on-success': ['email', 'email']}, True),
            ({'on-success': ['email', 12345]}, True),
            ({'on-error': ['email']}, False),
            ({'on-error': [{'email': '<% 1 %>'}]}, False),
            ({'on-error': [{'email': '<% 1 %>'}, 'echo']}, False),
            ({'on-error': [{'email': '<% $.v1 in $.v2 %>'}]}, False),
            ({'on-error': [{'email': '<% * %>'}]}, True),
            ({'on-error': [{'email': '{{ 1 }}'}]}, False),
            ({'on-error': [{'email': '{{ 1 }}'}, 'echo']}, False),
            ({'on-error': [{'email': '{{ _.v1 in _.v2 }}'}]}, False),
            ({'on-error': [{'email': '{{ * }}'}]}, True),
            ({'on-error': 'email'}, False),
            ({'on-error': None}, True),
            ({'on-error': ['']}, True),
            ({'on-error': []}, True),
            ({'on-error': ['email', 'email']}, True),
            ({'on-error': ['email', 12345]}, True),
            ({'on-complete': ['email']}, False),
            ({'on-complete': [{'email': '<% 1 %>'}]}, False),
            ({'on-complete': [{'email': '<% 1 %>'}, 'echo']}, False),
            ({'on-complete': [{'email': '<% $.v1 in $.v2 %>'}]}, False),
            ({'on-complete': [{'email': '<% * %>'}]}, True),
            ({'on-complete': [{'email': '{{ 1 }}'}]}, False),
            ({'on-complete': [{'email': '{{ 1 }}'}, 'echo']}, False),
            ({'on-complete': [{'email': '{{ _.v1 in _.v2 }}'}]}, False),
            ({'on-complete': [{'email': '{{ * }}'}]}, True),
            ({'on-complete': 'email'}, False),
            ({'on-complete': None}, True),
            ({'on-complete': ['']}, True),
            ({'on-complete': []}, True),
            ({'on-complete': ['email', 'email']}, True),
            ({'on-complete': ['email', 12345]}, True)
        ]

        for transition, expect_error in tests:
            overlay = {'test': {'tasks': {}}}

            utils.merge_dicts(overlay['test']['tasks'], {'get': transition})

            self._parse_dsl_spec(
                add_tasks=True,
                changes=overlay,
                expect_error=expect_error
            )
开发者ID:Tesora,项目名称:tesora-mistral,代码行数:59,代码来源:test_tasks.py


示例10: test_direct_workflow_invalid_task

    def test_direct_workflow_invalid_task(self):
        overlay = {'test': {'type': 'direct', 'tasks': {}}}
        requires = {'requires': ['echo', 'get']}

        utils.merge_dicts(overlay['test']['tasks'], {'email': requires})

        self._parse_dsl_spec(add_tasks=True,
                             changes=overlay,
                             expect_error=True)
开发者ID:kantorv,项目名称:mistral,代码行数:9,代码来源:test_workflows.py


示例11: _process_action_and_workflow

    def _process_action_and_workflow(self):
        params = {}

        if self._action:
            self._action, params = self._parse_cmd_and_input(self._action)
        elif self._workflow:
            self._workflow, params = self._parse_cmd_and_input(
                self._workflow)
        else:
            self._action = 'std.noop'

        utils.merge_dicts(self._input, params)
开发者ID:openstack,项目名称:mistral,代码行数:12,代码来源:tasks.py


示例12: test_reverse_workflow

    def test_reverse_workflow(self):
        overlay = {'test': {'type': 'reverse', 'tasks': {}}}
        require = {'requires': ['echo', 'get']}

        utils.merge_dicts(overlay['test']['tasks'], {'email': require})

        wfs_spec = self._parse_dsl_spec(add_tasks=True,
                                        changes=overlay,
                                        expect_error=False)

        self.assertEqual(1, len(wfs_spec.get_workflows()))
        self.assertEqual('test', wfs_spec.get_workflows()[0].get_name())
        self.assertEqual('reverse', wfs_spec.get_workflows()[0].get_type())
开发者ID:dennybaa,项目名称:mistral,代码行数:13,代码来源:test_workflows.py


示例13: __init__

    def __init__(self, data):
        super(ActionSpec, self).__init__(data)

        self._name = data['name']
        self._description = data.get('description')
        self._tags = data.get('tags', [])
        self._base = data['base']
        self._base_input = data.get('base-input', {})
        self._input = utils.get_input_dict(data.get('input', []))
        self._output = data.get('output')

        self._base, _input = self._parse_cmd_and_input(self._base)

        utils.merge_dicts(self._base_input, _input)
开发者ID:ainkov,项目名称:mistral,代码行数:14,代码来源:actions.py


示例14: add_workflow_variables_to_context

def add_workflow_variables_to_context(wf_ex, wf_spec):
    wf_ex.context = wf_ex.context or {}

    # The context for calculating workflow variables is workflow input
    # and other data already stored in workflow initial context.
    ctx_view = ContextView(
        get_workflow_environment_dict(wf_ex),
        wf_ex.context,
        wf_ex.input
    )

    wf_vars = expr.evaluate_recursively(wf_spec.get_vars(), ctx_view)

    utils.merge_dicts(wf_ex.context, wf_vars)
开发者ID:openstack,项目名称:mistral,代码行数:14,代码来源:data_flow.py


示例15: _get_task_inbound_context

    def _get_task_inbound_context(self, task_spec):
        upstream_task_execs = self._get_upstream_task_executions(task_spec)

        return u.merge_dicts(
            copy.deepcopy(self.wf_ex.context),
            data_flow.evaluate_upstream_context(upstream_task_execs)
        )
开发者ID:adarshkoyya,项目名称:mistral,代码行数:7,代码来源:base.py


示例16: evaluate_workflow_final_context

    def evaluate_workflow_final_context(self):
        ctx = {}

        for t_ex in self._find_end_tasks():
            ctx = utils.merge_dicts(ctx, data_flow.evaluate_task_outbound_context(t_ex))

        return ctx
开发者ID:kantorv,项目名称:mistral,代码行数:7,代码来源:direct_workflow.py


示例17: _fail_workflow

    def _fail_workflow(self, final_context, msg):
        if states.is_paused_or_completed(self.wf_ex.state):
            return

        output_on_error = {}
        try:
            output_on_error = data_flow.evaluate_workflow_output(
                self.wf_ex,
                self.wf_spec.get_output_on_error(),
                final_context
            )
        except exc.MistralException as e:
            msg = (
                "Failed to evaluate expression in output-on-error! "
                "(output-on-error: '%s', exception: '%s' Cause: '%s'"
                % (self.wf_spec.get_output_on_error(), e, msg)
            )
            LOG.error(msg)

        self.set_state(states.ERROR, state_info=msg)

        # When we set an ERROR state we should safely set output value getting
        # w/o exceptions due to field size limitations.
        msg = utils.cut_by_kb(
            msg,
            cfg.CONF.engine.execution_field_size_limit_kb
        )

        self.wf_ex.output = merge_dicts({'result': msg}, output_on_error)

        if self.wf_ex.task_execution_id:
            self._schedule_send_result_to_parent_workflow()
开发者ID:Tesora,项目名称:tesora-mistral,代码行数:32,代码来源:workflows.py


示例18: _schedule_run_action

def _schedule_run_action(task_ex, task_spec, action_input, index):
    wf_ex = task_ex.workflow_execution
    wf_spec = spec_parser.get_workflow_spec(wf_ex.spec)

    action_spec_name = task_spec.get_action_name()

    action_def = action_handler.resolve_definition(
        action_spec_name,
        task_ex,
        wf_spec
    )

    action_ex = action_handler.create_action_execution(
        action_def, action_input, task_ex, index
    )

    target = expr.evaluate_recursively(
        task_spec.get_target(),
        utils.merge_dicts(
            copy.deepcopy(action_input),
            copy.copy(task_ex.in_context)
        )
    )

    scheduler.schedule_call(
        None,
        'mistral.engine.action_handler.run_existing_action',
        0,
        action_ex_id=action_ex.id,
        target=target
    )
开发者ID:dennybaa,项目名称:mistral,代码行数:31,代码来源:task_handler.py


示例19: add_workflow_variables_to_context

def add_workflow_variables_to_context(wf_ex, wf_spec):
    wf_ex.context = wf_ex.context or {}

    return utils.merge_dicts(
        wf_ex.context,
        expr.evaluate_recursively(wf_spec.get_vars(), wf_ex.context)
    )
开发者ID:ISCAS-VDI,项目名称:mistral-base,代码行数:7,代码来源:data_flow.py


示例20: _get_target

 def _get_target(self, input_dict):
     return expr.evaluate_recursively(
         self.task_spec.get_target(),
         utils.merge_dicts(
             copy.deepcopy(input_dict),
             copy.deepcopy(self.ctx)
         )
     )
开发者ID:prabhuinbarajan,项目名称:mistral,代码行数:8,代码来源:tasks.py



注:本文中的mistral.utils.merge_dicts函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python keystone.get_endpoint_for_project函数代码示例发布时间:2022-05-27
下一篇:
Python utils.generate_unicode_uuid函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap