• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python engines.run函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中taskflow.engines.run函数的典型用法代码示例。如果您正苦于以下问题:Python run函数的具体用法?Python run怎么用?Python run使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了run函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_purge_flow_normal

    def test_purge_flow_normal(self, mock_creds, mock_dns_client):
        service_id = str(uuid.uuid4())
        domains_old = domain.Domain(domain='cdn.poppy.org')
        current_origin = origin.Origin(origin='poppy.org')
        service_obj = service.Service(service_id=service_id,
                                      name='poppy cdn service',
                                      domains=[domains_old],
                                      origins=[current_origin],
                                      flavor_id='cdn')

        kwargs = {
            'project_id': json.dumps(str(uuid.uuid4())),
            'service_id': json.dumps(service_id),
            'provider_details': json.dumps(
                dict([(k, v.to_dict())
                      for k, v in service_obj.provider_details.items()])),
            'purge_url': 'cdn.poppy.org',
            'hard': json.dumps(True),
            'service_obj': json.dumps(service_obj.to_dict()),
            'context_dict': context_utils.RequestContext().to_dict()
        }

        service_controller, storage_controller, dns_controller = \
            self.all_controllers()

        with MonkeyPatchControllers(service_controller,
                                    dns_controller,
                                    storage_controller,
                                    memoized_controllers.task_controllers):

            self.patch_purge_flow(service_controller, storage_controller,
                                  dns_controller)
            engines.run(purge_service.purge_service(), store=kwargs)
开发者ID:BenjamenMeyer,项目名称:poppy,代码行数:33,代码来源:test_flows.py


示例2: test_delete_flow_normal

    def test_delete_flow_normal(self, mock_creds):
        service_id = str(uuid.uuid4())
        domains_old = domain.Domain(domain='cdn.poppy.org')
        current_origin = origin.Origin(origin='poppy.org')
        service_obj = service.Service(service_id=service_id,
                                      name='poppy cdn service',
                                      domains=[domains_old],
                                      origins=[current_origin],
                                      flavor_id='cdn')

        kwargs = {
            'project_id': json.dumps(str(uuid.uuid4())),
            'service_id': json.dumps(service_id),
            'time_seconds': [i * self.time_factor for
                             i in range(self.total_retries)],
            'provider_details': json.dumps(
                dict([(k, v.to_dict())
                      for k, v in service_obj.provider_details.items()]))
        }

        service_controller, storage_controller, dns_controller = \
            self.all_controllers()

        with MonkeyPatchControllers(service_controller,
                                    dns_controller,
                                    storage_controller,
                                    memoized_controllers.task_controllers):

            self.patch_delete_flow(service_controller, storage_controller,
                                   dns_controller)
            engines.run(delete_service.delete_service(), store=kwargs)
开发者ID:jc7998,项目名称:poppy,代码行数:31,代码来源:test_flows.py


示例3: test_service_state_flow_normal

    def test_service_state_flow_normal(self, mock_creds):
        service_id = str(uuid.uuid4())
        domains_old = domain.Domain(domain='cdn.poppy.org')
        current_origin = origin.Origin(origin='poppy.org')
        service_obj = service.Service(service_id=service_id,
                                      name='poppy cdn service',
                                      domains=[domains_old],
                                      origins=[current_origin],
                                      flavor_id='cdn')

        kwargs = {
            'project_id': json.dumps(str(uuid.uuid4())),
            'state': 'enable',
            'service_obj': json.dumps(service_obj.to_dict())
        }

        service_controller, storage_controller, dns_controller = \
            self.all_controllers()

        with MonkeyPatchControllers(service_controller,
                                    dns_controller,
                                    storage_controller,
                                    memoized_controllers.task_controllers):

            self.patch_service_state_flow(service_controller,
                                          storage_controller,
                                          dns_controller)
            engines.run(update_service_state.enable_service(), store=kwargs)
            engines.run(update_service_state.disable_service(), store=kwargs)
开发者ID:jc7998,项目名称:poppy,代码行数:29,代码来源:test_flows.py


示例4: test_delete_flow_dns_exception_with_retry

    def test_delete_flow_dns_exception_with_retry(self, mock_creds):
        service_id = str(uuid.uuid4())
        domains_old = domain.Domain(domain="cdn.poppy.org")
        current_origin = origin.Origin(origin="poppy.org")
        service_obj = service.Service(
            service_id=service_id,
            name="poppy cdn service",
            domains=[domains_old],
            origins=[current_origin],
            flavor_id="cdn",
        )

        kwargs = {
            "project_id": json.dumps(str(uuid.uuid4())),
            "service_id": json.dumps(service_id),
            "time_seconds": [i * self.time_factor for i in range(self.total_retries)],
            "provider_details": json.dumps(dict([(k, v.to_dict()) for k, v in service_obj.provider_details.items()])),
        }

        service_controller, storage_controller, dns_controller = self.all_controllers()

        with MonkeyPatchControllers(
            service_controller, dns_controller, storage_controller, memoized_controllers.task_controllers
        ):

            self.patch_delete_flow(service_controller, storage_controller, dns_controller)
            dns_controller.delete = mock.Mock()
            dns_responder_returns = self.dns_exceptions()

            dns_controller.delete._mock_side_effect = (dns_responder for dns_responder in dns_responder_returns)
            engines.run(delete_service.delete_service(), store=kwargs)
开发者ID:obulpathi,项目名称:poppy,代码行数:31,代码来源:test_flows.py


示例5: test_create_flow_dns_exception

    def test_create_flow_dns_exception(self, mock_creds,
                                       mock_dns_client):
        providers = ['cdn_provider']
        kwargs = {
            'providers_list_json': json.dumps(providers),
            'project_id': json.dumps(str(uuid.uuid4())),
            'auth_token': json.dumps(str(uuid.uuid4())),
            'service_id': json.dumps(str(uuid.uuid4())),
            'time_seconds': [i * self.time_factor for
                             i in range(self.total_retries)],
            'context_dict': context_utils.RequestContext().to_dict()
        }

        service_controller, storage_controller, dns_controller = \
            self.all_controllers()

        with MonkeyPatchControllers(service_controller,
                                    dns_controller,
                                    storage_controller,
                                    memoized_controllers.task_controllers):

            self.patch_create_flow(service_controller, storage_controller,
                                   dns_controller)
            dns_controller.create = mock.Mock()
            dns_controller.create._mock_return_value = {
                'cdn_provider': {
                    'error': 'Whoops!',
                    'error_class': 'tests.unit.distributed_task'
                                   '.taskflow.test_flows.DNSException'
                }
            }
            engines.run(create_service.create_service(), store=kwargs)
开发者ID:BenjamenMeyer,项目名称:poppy,代码行数:32,代码来源:test_flows.py


示例6: test_create_flow_dns_exception

    def test_create_flow_dns_exception(self, mock_creds):
        providers = ["cdn_provider"]
        kwargs = {
            "providers_list_json": json.dumps(providers),
            "project_id": json.dumps(str(uuid.uuid4())),
            "auth_token": json.dumps(str(uuid.uuid4())),
            "service_id": json.dumps(str(uuid.uuid4())),
            "time_seconds": [i * self.time_factor for i in range(self.total_retries)],
        }

        service_controller, storage_controller, dns_controller = self.all_controllers()

        with MonkeyPatchControllers(
            service_controller, dns_controller, storage_controller, memoized_controllers.task_controllers
        ):

            self.patch_create_flow(service_controller, storage_controller, dns_controller)
            dns_controller.create = mock.Mock()
            dns_controller.create._mock_return_value = {
                "cdn_provider": {
                    "error": "Whoops!",
                    "error_class": "tests.unit.distributed_task" ".taskflow.test_flows.DNSException",
                }
            }
            engines.run(create_service.create_service(), store=kwargs)
开发者ID:obulpathi,项目名称:poppy,代码行数:25,代码来源:test_flows.py


示例7: test_delete_flow_dns_exception

    def test_delete_flow_dns_exception(self, mock_creds):
        service_id = str(uuid.uuid4())
        domains_old = domain.Domain(domain="cdn.poppy.org")
        current_origin = origin.Origin(origin="poppy.org")
        service_obj = service.Service(
            service_id=service_id,
            name="poppy cdn service",
            domains=[domains_old],
            origins=[current_origin],
            flavor_id="cdn",
        )

        kwargs = {
            "project_id": json.dumps(str(uuid.uuid4())),
            "service_id": json.dumps(service_id),
            "time_seconds": [i * self.time_factor for i in range(self.total_retries)],
            "provider_details": json.dumps(dict([(k, v.to_dict()) for k, v in service_obj.provider_details.items()])),
        }

        service_controller, storage_controller, dns_controller = self.all_controllers()

        with MonkeyPatchControllers(
            service_controller, dns_controller, storage_controller, memoized_controllers.task_controllers
        ):

            self.patch_delete_flow(service_controller, storage_controller, dns_controller)
            dns_controller.delete = mock.Mock()
            dns_controller.delete._mock_return_value = {
                "cdn_provider": {
                    "error": "Whoops!",
                    "error_class": "tests.unit.distributed_task" ".taskflow.test_flows.DNSException",
                }
            }

            engines.run(delete_service.delete_service(), store=kwargs)
开发者ID:obulpathi,项目名称:poppy,代码行数:35,代码来源:test_flows.py


示例8: test_purge_flow_normal

    def test_purge_flow_normal(self, mock_creds):
        service_id = str(uuid.uuid4())
        domains_old = domain.Domain(domain="cdn.poppy.org")
        current_origin = origin.Origin(origin="poppy.org")
        service_obj = service.Service(
            service_id=service_id,
            name="poppy cdn service",
            domains=[domains_old],
            origins=[current_origin],
            flavor_id="cdn",
        )

        kwargs = {
            "project_id": json.dumps(str(uuid.uuid4())),
            "service_id": json.dumps(service_id),
            "provider_details": json.dumps(dict([(k, v.to_dict()) for k, v in service_obj.provider_details.items()])),
            "purge_url": "cdn.poppy.org",
            "hard": json.dumps(True),
            "service_obj": json.dumps(service_obj.to_dict()),
        }

        service_controller, storage_controller, dns_controller = self.all_controllers()

        with MonkeyPatchControllers(
            service_controller, dns_controller, storage_controller, memoized_controllers.task_controllers
        ):

            self.patch_purge_flow(service_controller, storage_controller, dns_controller)
            engines.run(purge_service.purge_service(), store=kwargs)
开发者ID:obulpathi,项目名称:poppy,代码行数:29,代码来源:test_flows.py


示例9: test_service_state_flow_normal

    def test_service_state_flow_normal(self, mock_creds):
        service_id = str(uuid.uuid4())
        domains_old = domain.Domain(domain="cdn.poppy.org")
        current_origin = origin.Origin(origin="poppy.org")
        service_obj = service.Service(
            service_id=service_id,
            name="poppy cdn service",
            domains=[domains_old],
            origins=[current_origin],
            flavor_id="cdn",
        )

        kwargs = {
            "project_id": json.dumps(str(uuid.uuid4())),
            "state": "enable",
            "service_obj": json.dumps(service_obj.to_dict()),
        }

        service_controller, storage_controller, dns_controller = self.all_controllers()

        with MonkeyPatchControllers(
            service_controller, dns_controller, storage_controller, memoized_controllers.task_controllers
        ):

            self.patch_service_state_flow(service_controller, storage_controller, dns_controller)
            engines.run(update_service_state.enable_service(), store=kwargs)
            engines.run(update_service_state.disable_service(), store=kwargs)
开发者ID:obulpathi,项目名称:poppy,代码行数:27,代码来源:test_flows.py


示例10: test_create_flow_dns_exception_with_retry_and_fail

    def test_create_flow_dns_exception_with_retry_and_fail(self, mock_creds):
        providers = ['cdn_provider']
        kwargs = {
            'providers_list_json': json.dumps(providers),
            'project_id': json.dumps(str(uuid.uuid4())),
            'auth_token': json.dumps(str(uuid.uuid4())),
            'service_id': json.dumps(str(uuid.uuid4())),
            'time_seconds': [i * self.time_factor for
                             i in range(self.total_retries)],
            'context_dict': context_utils.RequestContext().to_dict()
        }

        service_controller, storage_controller, dns_controller = \
            self.all_controllers()

        with MonkeyPatchControllers(service_controller,
                                    dns_controller,
                                    storage_controller,
                                    memoized_controllers.task_controllers):

            self.patch_create_flow(service_controller, storage_controller,
                                   dns_controller)
            dns_controller.create = mock.Mock()

            dns_responder_returns = self.dns_exceptions_only()

            dns_controller.create._mock_side_effect = (dns_responder for
                                                       dns_responder in
                                                       dns_responder_returns)

            engines.run(create_service.create_service(), store=kwargs)
开发者ID:BenjamenMeyer,项目名称:poppy,代码行数:31,代码来源:test_flows.py


示例11: test_recreate_ssl_certificate

    def test_recreate_ssl_certificate(self, mock_creds, mock_dns_client):
        providers = ['cdn_provider']
        cert_obj_json = ssl_certificate.SSLCertificate('cdn',
                                                       'mytestsite.com',
                                                       'san')
        kwargs = {
            'providers_list_json': json.dumps(providers),
            'project_id': json.dumps(str(uuid.uuid4())),
            'domain_name': 'mytestsite.com',
            'cert_type': 'san',
            'cert_obj_json': json.dumps(cert_obj_json.to_dict()),
        }

        service_controller, storage_controller, dns_controller = \
            self.all_controllers()

        with MonkeyPatchControllers(service_controller,
                                    dns_controller,
                                    storage_controller,
                                    memoized_controllers.task_controllers):

            self.patch_recreate_ssl_certificate_flow(service_controller,
                                                     storage_controller,
                                                     dns_controller)
            engines.run(recreate_ssl_certificate.recreate_ssl_certificate(),
                        store=kwargs)
开发者ID:BenjamenMeyer,项目名称:poppy,代码行数:26,代码来源:test_flows.py


示例12: test_create_flow_normal

    def test_create_flow_normal(self):
        providers = ['cdn_provider']
        kwargs = {
            'providers_list_json': json.dumps(providers),
            'project_id': json.dumps(str(uuid.uuid4())),
            'auth_token': json.dumps(str(uuid.uuid4())),
            'service_id': json.dumps(str(uuid.uuid4())),
            'time_seconds': [i * self.time_factor
                             for i in range(self.total_retries)],
            'context_dict': context_utils.RequestContext().to_dict()
        }

        (
            service_controller,
            storage_controller,
            dns_controller,
            ssl_cert_controller
        ) = self.all_controllers()

        with MonkeyPatchControllers(service_controller,
                                    dns_controller,
                                    storage_controller,
                                    ssl_cert_controller,
                                    memoized_controllers.task_controllers):

            self.patch_create_flow(service_controller,
                                   storage_controller,
                                   dns_controller)
            engines.run(create_service.create_service(), store=kwargs)
开发者ID:openstack,项目名称:poppy,代码行数:29,代码来源:test_flows.py


示例13: test_delete_ssl_certificate_normal

    def test_delete_ssl_certificate_normal(self):

        kwargs = {
            'cert_type': "san",
            'project_id': json.dumps(str(uuid.uuid4())),
            'domain_name': "san.san.com",
            'context_dict': context_utils.RequestContext().to_dict()
        }

        (
            service_controller,
            storage_controller,
            dns_controller,
            ssl_cert_controller
        ) = self.all_controllers()

        with MonkeyPatchControllers(service_controller,
                                    dns_controller,
                                    storage_controller,
                                    ssl_cert_controller,
                                    memoized_controllers.task_controllers):

            self.patch_create_ssl_certificate_flow(
                service_controller,
                storage_controller,
                dns_controller
            )
            engines.run(
                delete_ssl_certificate.delete_ssl_certificate(),
                store=kwargs
            )
开发者ID:yunhaia,项目名称:poppy,代码行数:31,代码来源:test_flows.py


示例14: test_create_ssl_certificate_normal

    def test_create_ssl_certificate_normal(self):

        providers = ['cdn_provider']
        cert_obj_json = ssl_certificate.SSLCertificate('cdn',
                                                       'mytestsite.com',
                                                       'san')
        kwargs = {
            'providers_list_json': json.dumps(providers),
            'project_id': json.dumps(str(uuid.uuid4())),
            'cert_obj_json': json.dumps(cert_obj_json.to_dict()),
            'context_dict': context_utils.RequestContext().to_dict()
        }

        (
            service_controller,
            storage_controller,
            dns_controller,
            ssl_cert_controller
        ) = self.all_controllers()

        with MonkeyPatchControllers(service_controller,
                                    dns_controller,
                                    storage_controller,
                                    ssl_cert_controller,
                                    memoized_controllers.task_controllers):

            self.patch_create_ssl_certificate_flow(service_controller,
                                                   storage_controller,
                                                   dns_controller)
            engines.run(create_ssl_certificate.create_ssl_certificate(),
                        store=kwargs)
开发者ID:openstack,项目名称:poppy,代码行数:31,代码来源:test_flows.py


示例15: test_delete_stack

    def test_delete_stack(self):
        heat_client = mock.MagicMock(name='heat_client')
        stack_id = 'stack_id'
        flow_store = {'stack_id': stack_id}
        flow = self._get_delete_stack_flow(heat_client)

        engines.run(flow, store=flow_store)
        heat_client.stacks.delete.assert_called_once_with(stack_id)
开发者ID:Fangfenghua,项目名称:magnum,代码行数:8,代码来源:test_heat_tasks.py


示例16: test_service_state_flow_dns_exception

    def test_service_state_flow_dns_exception(self):
        service_id = str(uuid.uuid4())
        domains_old = domain.Domain(domain='cdn.poppy.org')
        current_origin = origin.Origin(origin='poppy.org')
        service_obj = service.Service(service_id=service_id,
                                      name='poppy cdn service',
                                      domains=[domains_old],
                                      origins=[current_origin],
                                      flavor_id='cdn')

        enable_kwargs = {
            'project_id': json.dumps(str(uuid.uuid4())),
            'state': 'enable',
            'service_obj': json.dumps(service_obj.to_dict()),
            'time_seconds': [i * self.time_factor for
                             i in range(self.total_retries)],
            'context_dict': context_utils.RequestContext().to_dict()
        }

        disable_kwargs = enable_kwargs.copy()
        disable_kwargs['state'] = 'disable'

        (
            service_controller,
            storage_controller,
            dns_controller,
            ssl_cert_controller
        ) = self.all_controllers()

        with MonkeyPatchControllers(service_controller,
                                    dns_controller,
                                    storage_controller,
                                    ssl_cert_controller,
                                    memoized_controllers.task_controllers):

            self.patch_service_state_flow(service_controller,
                                          storage_controller,
                                          dns_controller)
            dns_controller.enable = mock.Mock()
            dns_controller.enable._mock_return_value = {
                'cdn_provider': {
                    'error': 'Whoops!',
                    'error_class': 'tests.unit.distributed_task'
                                   '.taskflow.test_flows.DNSException'
                }
            }
            dns_controller.disable = mock.Mock()
            dns_controller.disable._mock_return_value = {
                'cdn_provider': {
                    'error': 'Whoops!',
                    'error_class': 'tests.unit.distributed_task'
                                   '.taskflow.test_flows.DNSException'
                }
            }
            engines.run(update_service_state.enable_service(),
                        store=enable_kwargs)
            engines.run(update_service_state.disable_service(),
                        store=disable_kwargs)
开发者ID:openstack,项目名称:poppy,代码行数:58,代码来源:test_flows.py


示例17: test_check_cert_status_and_update_flow

 def test_check_cert_status_and_update_flow(self):
     kwargs = {
         'domain_name': "blog.testabc.com",
         'cert_type': "san",
         'flavor_id': "premium",
         'project_id': "000"
     }
     engines.run(check_cert_status_and_update_flow.
                 check_cert_status_and_update_flow(),
                 store=kwargs)
开发者ID:BenjamenMeyer,项目名称:poppy,代码行数:10,代码来源:test_flows.py


示例18: test_service_state_flow_dns_exception_retry_and_succeed

    def test_service_state_flow_dns_exception_retry_and_succeed(self):
        service_id = str(uuid.uuid4())
        domains_old = domain.Domain(domain='cdn.poppy.org')
        current_origin = origin.Origin(origin='poppy.org')
        service_obj = service.Service(service_id=service_id,
                                      name='poppy cdn service',
                                      domains=[domains_old],
                                      origins=[current_origin],
                                      flavor_id='cdn')

        enable_kwargs = {
            'project_id': json.dumps(str(uuid.uuid4())),
            'state': 'enable',
            'service_obj': json.dumps(service_obj.to_dict()),
            'time_seconds': [i * self.time_factor for
                             i in range(self.total_retries)],
            'context_dict': context_utils.RequestContext().to_dict()
        }

        disable_kwargs = enable_kwargs.copy()
        disable_kwargs['state'] = 'disable'

        (
            service_controller,
            storage_controller,
            dns_controller,
            ssl_cert_controller
        ) = self.all_controllers()

        with MonkeyPatchControllers(service_controller,
                                    dns_controller,
                                    storage_controller,
                                    ssl_cert_controller,
                                    memoized_controllers.task_controllers):

            self.patch_service_state_flow(service_controller,
                                          storage_controller,
                                          dns_controller)
            dns_responder_returns = self.dns_exceptions_and_succeed()

            dns_controller.enable._mock_side_effect = (dns_responder for
                                                       dns_responder in
                                                       dns_responder_returns)
            dns_controller.disable._mock_side_effect = (dns_responder for
                                                        dns_responder in
                                                        dns_responder_returns)

            engines.run(update_service_state.enable_service(),
                        store=enable_kwargs)
            engines.run(update_service_state.disable_service(),
                        store=disable_kwargs)
开发者ID:openstack,项目名称:poppy,代码行数:51,代码来源:test_flows.py


示例19: test_update_flow_dns_exception_with_retry_and_fail

    def test_update_flow_dns_exception_with_retry_and_fail(self):
        service_id = str(uuid.uuid4())
        domains_old = domain.Domain(domain='cdn.poppy.org')
        domains_new = domain.Domain(domain='mycdn.poppy.org')
        current_origin = origin.Origin(origin='poppy.org')
        service_old = service.Service(service_id=service_id,
                                      name='poppy cdn service',
                                      domains=[domains_old],
                                      origins=[current_origin],
                                      flavor_id='cdn')
        service_new = service.Service(service_id=service_id,
                                      name='poppy cdn service',
                                      domains=[domains_new],
                                      origins=[current_origin],
                                      flavor_id='cdn')

        kwargs = {
            'project_id': json.dumps(str(uuid.uuid4())),
            'auth_token': json.dumps(str(uuid.uuid4())),
            'service_id': json.dumps(service_id),
            'time_seconds': [i * self.time_factor for
                             i in range(self.total_retries)],
            'service_old': json.dumps(service_old.to_dict()),
            'service_obj': json.dumps(service_new.to_dict()),
            'context_dict': context_utils.RequestContext().to_dict()
        }

        (
            service_controller,
            storage_controller,
            dns_controller,
            ssl_cert_controller
        ) = self.all_controllers()

        with MonkeyPatchControllers(service_controller,
                                    dns_controller,
                                    storage_controller,
                                    ssl_cert_controller,
                                    memoized_controllers.task_controllers):

            self.patch_update_flow(service_controller, storage_controller,
                                   dns_controller)
            dns_controller.update = mock.Mock()
            dns_responder_returns = self.dns_exceptions_only()

            dns_controller.update._mock_side_effect = (dns_responder for
                                                       dns_responder in
                                                       dns_responder_returns)

            engines.run(update_service.update_service(), store=kwargs)
开发者ID:openstack,项目名称:poppy,代码行数:50,代码来源:test_flows.py


示例20: test_delete_flow_dns_exception

    def test_delete_flow_dns_exception(self):
        service_id = str(uuid.uuid4())
        domains_old = domain.Domain(domain='cdn.poppy.org')
        current_origin = origin.Origin(origin='poppy.org')
        service_obj = service.Service(service_id=service_id,
                                      name='poppy cdn service',
                                      domains=[domains_old],
                                      origins=[current_origin],
                                      flavor_id='cdn')

        kwargs = {
            'project_id': json.dumps(str(uuid.uuid4())),
            'service_id': json.dumps(service_id),
            'time_seconds': [i * self.time_factor for
                             i in range(self.total_retries)],
            'provider_details': json.dumps(
                dict([(k, v.to_dict())
                      for k, v in service_obj.provider_details.items()])),
            'context_dict': context_utils.RequestContext().to_dict()
        }

        (
            service_controller,
            storage_controller,
            dns_controller,
            ssl_cert_controller
        ) = self.all_controllers()

        with MonkeyPatchControllers(service_controller,
                                    dns_controller,
                                    storage_controller,
                                    ssl_cert_controller,
                                    memoized_controllers.task_controllers):

            self.patch_delete_flow(service_controller, storage_controller,
                                   dns_controller)

            service_mock = mock.Mock()
            type(service_mock).domains = []
            storage_controller.get_service.return_value = service_mock
            dns_controller.delete = mock.Mock()
            dns_controller.delete._mock_return_value = {
                'cdn_provider': {
                    'error': 'Whoops!',
                    'error_class': 'tests.unit.distributed_task'
                                   '.taskflow.test_flows.DNSException'
                }
            }

            engines.run(delete_service.delete_service(), store=kwargs)
开发者ID:openstack,项目名称:poppy,代码行数:50,代码来源:test_flows.py



注:本文中的taskflow.engines.run函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python exceptions.raise_with_cause函数代码示例发布时间:2022-05-27
下一篇:
Python engines.load函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap