• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python utils.get_container函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中nameko.testing.utils.get_container函数的典型用法代码示例。如果您正苦于以下问题:Python get_container函数的具体用法?Python get_container怎么用?Python get_container使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了get_container函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_graceful_stop_on_one_container_error

def test_graceful_stop_on_one_container_error(runner_factory, rabbit_config):

    runner = runner_factory(rabbit_config, ExampleService, SecondService)
    runner.start()

    container = get_container(runner, ExampleService)
    second_container = get_container(runner, SecondService)
    original_stop = second_container.stop
    with patch.object(second_container, 'stop', autospec=True,
                      wraps=original_stop) as stop:
        rpc_consumer = get_extension(container, RpcConsumer)
        with patch.object(
                rpc_consumer, 'handle_result', autospec=True) as handle_result:
            exception = Exception("error")
            handle_result.side_effect = exception

            # use a standalone rpc proxy to call exampleservice.task()
            with ServiceRpcProxy("exampleservice", rabbit_config) as proxy:
                # proxy.task() will hang forever because it generates an error
                # in the remote container (so never receives a response).
                proxy.task.call_async()

            # verify that the error bubbles up to runner.wait()
            with pytest.raises(Exception) as exc_info:
                runner.wait()
            assert exc_info.value == exception

            # Check that the second service was stopped due to the first
            # service being killed
            stop.assert_called_once_with()
开发者ID:evilino,项目名称:nameko,代码行数:30,代码来源:test_errors.py


示例2: test_shop_checkout_integration

def test_shop_checkout_integration(
    rabbit_config, runner_factory, rpc_proxy_factory
):
    """ Simulate a checkout flow as an integration test.

    Requires instances of AcmeShopService, StockService and InvoiceService
    to be running. Explicitly replaces the rpc proxy to PaymentService so
    that service doesn't need to be hosted.

    Also replaces the event dispatcher dependency on AcmeShopService and
    disables the timer entrypoint on StockService. Limiting the interactions
    of services in this way reduces the scope of the integration test and
    eliminates undesirable side-effects (e.g. processing events unnecessarily).
    """
    context_data = {'user_id': 'wile_e_coyote'}
    shop = rpc_proxy_factory('acmeshopservice', context_data=context_data)

    runner = runner_factory(
        rabbit_config, AcmeShopService, StockService, InvoiceService)

    # replace ``event_dispatcher`` and ``payment_rpc``  dependencies on
    # AcmeShopService with ``MockDependencyProvider``\s
    shop_container = get_container(runner, AcmeShopService)
    fire_event, payment_rpc = replace_dependencies(
        shop_container, "fire_event", "payment_rpc")

    # restrict entrypoints on StockService
    stock_container = get_container(runner, StockService)
    restrict_entrypoints(stock_container, "check_price", "check_stock")

    runner.start()

    # add some items to the basket
    assert shop.add_to_basket("anvil") == "anvil"
    assert shop.add_to_basket("invisible_paint") == "invisible_paint"

    # try to buy something that's out of stock
    with pytest.raises(RemoteError) as exc_info:
        shop.add_to_basket("toothpicks")
    assert exc_info.value.exc_type == "ItemOutOfStockError"

    # provide a mock response from the payment service
    payment_rpc.take_payment.return_value = "Payment complete."

    # checkout
    res = shop.checkout()

    total_amount = 100 + 10
    assert res == total_amount

    # verify integration with mocked out payment service
    payment_rpc.take_payment.assert_called_once_with({
        'customer': "wile_e_coyote",
        'address': "12 Long Road, High Cliffs, Utah",
        'amount': total_amount,
        'message': "Dear Wile E Coyote. Please pay $110 to ACME Corp."
    })

    # verify events fired as expected
    assert fire_event.call_count == 3
开发者ID:davidszotten,项目名称:nameko,代码行数:60,代码来源:large_integration_test.py


示例3: test_get_container

def test_get_container(runner_factory, rabbit_config):

    class ServiceX(object):
        name = "service_x"

    class ServiceY(object):
        name = "service_y"

    runner = runner_factory(rabbit_config, ServiceX, ServiceY)

    assert get_container(runner, ServiceX).service_cls is ServiceX
    assert get_container(runner, ServiceY).service_cls is ServiceY
    assert get_container(runner, object) is None
开发者ID:SivagnanamCiena,项目名称:nameko,代码行数:13,代码来源:test_utils.py


示例4: test_call_id_over_events

def test_call_id_over_events(rabbit_config, predictable_call_ids,
                             runner_factory):
    one_called = Mock()
    two_called = Mock()

    stack_request = Mock()
    LoggingWorkerContext = get_logging_worker_context(stack_request)

    class EventListeningServiceOne(object):
        name = "listener_one"

        @event_handler('event_raiser', 'hello')
        def hello(self, name):
            one_called()

    class EventListeningServiceTwo(object):
        name = "listener_two"

        @event_handler('event_raiser', 'hello')
        def hello(self, name):
            two_called()

    class EventRaisingService(object):
        name = "event_raiser"
        dispatch = EventDispatcher()

        @rpc
        def say_hello(self):
            self.dispatch('hello', self.name)

    runner = runner_factory(rabbit_config)
    runner.add_service(EventListeningServiceOne, LoggingWorkerContext)
    runner.add_service(EventListeningServiceTwo, LoggingWorkerContext)
    runner.add_service(EventRaisingService, LoggingWorkerContext)
    runner.start()

    container = get_container(runner, EventRaisingService)
    listener1 = get_container(runner, EventListeningServiceOne)
    listener2 = get_container(runner, EventListeningServiceTwo)
    with entrypoint_hook(container, "say_hello") as say_hello:
        waiter1 = entrypoint_waiter(listener1, 'hello')
        waiter2 = entrypoint_waiter(listener2, 'hello')
        with waiter1, waiter2:
            say_hello()

    assert predictable_call_ids.call_count == 3
    stack_request.assert_has_calls([
        call(None),
        call(['event_raiser.say_hello.0']),
        call(['event_raiser.say_hello.0']),
    ])
开发者ID:gwongz,项目名称:nameko,代码行数:51,代码来源:test_call_id_stack.py


示例5: test_runner_catches_container_errors

def test_runner_catches_container_errors(runner_factory, rabbit_config):

    runner = runner_factory(rabbit_config, ExampleService)
    runner.start()

    container = get_container(runner, ExampleService)

    rpc_consumer = get_dependency(container, RpcConsumer)
    with patch.object(
            rpc_consumer, 'handle_result', autospec=True) as handle_result:
        exception = Exception("error")
        handle_result.side_effect = exception

        # use a standalone rpc proxy to call exampleservice.task()
        with RpcProxy("exampleservice", rabbit_config) as proxy:
            # proxy.task() will hang forever because it generates an error
            # in the remote container (so never receives a response).
            # generate and then swallow a timeout as soon as the thread yields
            try:
                with eventlet.Timeout(0):
                    proxy.task()
            except eventlet.Timeout:
                pass

        # verify that the error bubbles up to runner.wait()
        with pytest.raises(Exception) as exc_info:
            runner.wait()
        assert exc_info.value == exception
开发者ID:ahmb,项目名称:nameko,代码行数:28,代码来源:test_errors.py


示例6: test_config_key

    def test_config_key(self, service_cls, container_cls):
        config = {
            'SERVICE_CONTAINER_CLS': "fake_module.ServiceContainerX"
        }
        runner = ServiceRunner(config)
        runner.add_service(service_cls)

        container = get_container(runner, service_cls)
        assert isinstance(container, container_cls)
开发者ID:gwongz,项目名称:nameko,代码行数:9,代码来源:test_service_runner.py


示例7: service_container

def service_container(patched_db):
    config = {'AMQP_URI': settings.NAMEKO_AMQP_URI}
    runner = ServiceRunner(config)
    runner.add_service(NamekoCollectionService)
    runner.start()

    container = get_container(runner, NamekoCollectionService)
    yield container

    runner.stop()
开发者ID:turc42,项目名称:pylytics,代码行数:10,代码来源:test_nameko.py


示例8: test_call_id_stack

def test_call_id_stack(rabbit_config, predictable_call_ids, runner_factory):
    child_do_called = Mock()

    stack_request = Mock()
    LoggingWorkerContext = get_logging_worker_context(stack_request)

    class Child(object):
        name = 'child'

        @rpc
        def child_do(self):
            child_do_called()
            return 1

    class Parent(object):
        name = "parent"

        child_service = RpcProxy('child')

        @rpc
        def parent_do(self):
            return self.child_service.child_do()

    class Grandparent(object):
        name = "grandparent"

        parent_service = RpcProxy('parent')

        @rpc
        def grandparent_do(self):
            return self.parent_service.parent_do()

    runner = runner_factory(rabbit_config)
    runner.add_service(Child, LoggingWorkerContext)
    runner.add_service(Parent, LoggingWorkerContext)
    runner.add_service(Grandparent, LoggingWorkerContext)
    runner.start()

    container = get_container(runner, Grandparent)
    with entrypoint_hook(container, "grandparent_do") as grandparent_do:
        assert grandparent_do() == 1

    # Check child is called
    child_do_called.assert_called_with()
    assert child_do_called.call_count == 1

    # Check IDs were requested
    assert predictable_call_ids.call_count == 3

    # Check call ID stack persisted over RPC
    stack_request.assert_has_calls([
        call(None),
        call(['grandparent.grandparent_do.0']),
        call(['grandparent.grandparent_do.0', 'parent.parent_do.1']),
    ])
开发者ID:gwongz,项目名称:nameko,代码行数:55,代码来源:test_call_id_stack.py


示例9: test_mail_service_integration

    def test_mail_service_integration(self):
        config = {'AMQP_URI': 'amqp://guest:[email protected]:5672/'}
        runner = ServiceRunner(config)
        runner.add_service(PaymentService)
        runner.add_service(MailService)


        payment_container = get_container(runner, PaymentService)
        mail_container = get_container(runner, MailService)

        # turns off timer event
        # restrict_entrypoints(payment_container, *[])

        runner.start()

        with entrypoint_hook(payment_container, 'emit_event') as entrypoint:
            with entrypoint_waiter(mail_container, 'on_payment_received'):
                entrypoint()

        assert True
开发者ID:gyk,项目名称:nameko-mailer,代码行数:20,代码来源:test_mail_service.py


示例10: test_service_x_y_integration

def test_service_x_y_integration(runner_factory, rabbit_config):

    # run services in the normal manner
    runner = runner_factory(rabbit_config, ServiceX, ServiceY)
    runner.start()

    # artificially fire the "remote_method" entrypoint on ServiceX
    # and verify response
    container = get_container(runner, ServiceX)
    with entrypoint_hook(container, "remote_method") as entrypoint:
        assert entrypoint("value") == "value-x-y"
开发者ID:Costeijn,项目名称:nameko,代码行数:11,代码来源:integration_test.py


示例11: test_kwarg_deprecation_warning

    def test_kwarg_deprecation_warning(
        self, warnings, service_cls, container_cls
    ):
        config = {}
        runner = ServiceRunner(config, container_cls=container_cls)
        runner.add_service(service_cls)

        container = get_container(runner, service_cls)
        assert isinstance(container, container_cls)

        # TODO: replace with pytest.warns when eventlet >= 0.19.0 is released
        assert warnings.warn.call_args_list == [call(ANY, DeprecationWarning)]
开发者ID:gwongz,项目名称:nameko,代码行数:12,代码来源:test_service_runner.py


示例12: test_call_id_stack

def test_call_id_stack(
    rabbit_config, predictable_call_ids, runner_factory, stack_logger, tracker
):
    StackLogger = stack_logger

    class Child(object):
        name = 'child'

        stack_logger = StackLogger()

        @rpc
        def method(self):
            return 1

    class Parent(object):
        name = "parent"

        stack_logger = StackLogger()
        child_service = RpcProxy('child')

        @rpc
        def method(self):
            return self.child_service.method()

    class Grandparent(object):
        name = "grandparent"

        stack_logger = StackLogger()
        parent_service = RpcProxy('parent')

        @rpc
        def method(self):
            return self.parent_service.method()

    runner = runner_factory(rabbit_config)
    runner.add_service(Child)
    runner.add_service(Parent)
    runner.add_service(Grandparent)
    runner.start()

    container = get_container(runner, Grandparent)
    with entrypoint_hook(container, "method") as grandparent_method:
        assert grandparent_method() == 1

    # Check IDs were requested
    assert predictable_call_ids.call_count == 3

    # Check call ID stack persisted over RPC
    assert tracker.call_args_list == [
        call(['grandparent.method.0']),
        call(['grandparent.method.0', 'parent.method.1']),
        call(['grandparent.method.0', 'parent.method.1', 'child.method.2']),
    ]
开发者ID:davidszotten,项目名称:nameko,代码行数:53,代码来源:test_call_id_stack.py


示例13: test_crawler_triggers_webhook

def test_crawler_triggers_webhook(runner_factory, web_container_config):
    """Is crawler_container dispatching to webhook_container?"""
    runner = runner_factory(web_container_config, CrawlerService,
                            WebhookService)
    webhook_container = get_container(runner, WebhookService)
    storage_w = replace_dependencies(webhook_container, 'storage')
    dispatch = event_dispatcher(web_container_config)
    runner.start()
    with entrypoint_waiter(webhook_container, 'send_response'):
        dispatch('http_server', 'url_to_check',
                 ['http://example.org/test_crawling_group',
                  'datagouvfr', None])
    assert storage_w.get_webhooks_for_url.call_count == 1
开发者ID:opendatateam,项目名称:croquemort,代码行数:13,代码来源:test_integrations.py


示例14: test_entrypoint_hook

def test_entrypoint_hook(runner_factory, rabbit_config):

    service_classes = (Service, ServiceA, ServiceB, ServiceC)
    runner = runner_factory(rabbit_config, *service_classes)
    runner.start()

    service_container = get_container(runner, Service)

    event_payload = "msg"
    with entrypoint_hook(service_container, "handle") as handle:
        with entrypoint_waiter(service_container, "handle"):
            handle(event_payload)
    handle_event.assert_called_once_with(event_payload)
开发者ID:koenvo,项目名称:nameko,代码行数:13,代码来源:test_services.py


示例15: test_service_integration

def test_service_integration():
    config = {"AMQP_URI": "amqp://guest:[email protected]:5672/"}
    runner = ServiceRunner(config)
    runner.add_service(ServiceX)
    runner.add_service(ServiceY)
    runner.start()

    container = get_container(runner, ServiceX)

    with entrypoint_hook(container, "remote_method") as entrypoint:
        assert entrypoint("value") == "value-x-y"

    runner.stop()
开发者ID:pombredanne,项目名称:nameko,代码行数:13,代码来源:integration_test.py


示例16: test_entrypoint_hook_with_return

def test_entrypoint_hook_with_return(runner_factory, rabbit_config):

    service_classes = (Service, ServiceA, ServiceB, ServiceC)
    runner = runner_factory(rabbit_config, *service_classes)
    runner.start()

    service_container = get_container(runner, Service)

    with entrypoint_hook(service_container, "working") as working:
        assert working("value") == "value-a-b-c"

    with entrypoint_hook(service_container, "broken") as broken:
        with pytest.raises(ExampleError):
            broken("value")
开发者ID:koenvo,项目名称:nameko,代码行数:14,代码来源:test_services.py


示例17: test_runner_catches_managed_thread_errors

def test_runner_catches_managed_thread_errors(runner_factory, rabbit_config):

    class Broken(Exception):
        pass

    def raises():
        raise Broken('error')

    runner = runner_factory(rabbit_config, Service)

    container = get_container(runner, Service)
    container.spawn_managed_thread(raises)

    with pytest.raises(Broken):
        runner.wait()
开发者ID:ahmb,项目名称:nameko,代码行数:15,代码来源:test_service_runner.py


示例18: test_service_x_y_integration

def test_service_x_y_integration():

    # run services in the normal manner
    config = {'AMQP_URI': 'amqp://guest:[email protected]:5672/'}
    runner = ServiceRunner(config)
    runner.add_service(ServiceX)
    runner.add_service(ServiceY)
    runner.start()

    # artificially fire the "remote_method" entrypoint on ServiceX
    # and verify response
    container = get_container(runner, ServiceX)
    with entrypoint_hook(container, "remote_method") as entrypoint:
        assert entrypoint("value") == "value-x-y"

    runner.stop()
开发者ID:davinirjr,项目名称:nameko,代码行数:16,代码来源:integration_test.py


示例19: test_call_id_over_events

def test_call_id_over_events(rabbit_config, predictable_call_ids,
                             runner_factory):
    one_called = Mock()
    two_called = Mock()

    stack_request = Mock()
    LoggingWorkerContext = get_logging_worker_context(stack_request)

    class HelloEvent(NamekoEvent):
        type = "hello"

    class EventListeningServiceOne(object):
        @event_handler('event_raiser', 'hello')
        def hello(self, name):
            one_called()

    class EventListeningServiceTwo(object):
        @event_handler('event_raiser', 'hello')
        def hello(self, name):
            two_called()

    class EventRaisingService(object):
        name = "event_raiser"
        dispatch = event_dispatcher()

        @rpc
        def say_hello(self):
            self.dispatch(HelloEvent(self.name))

    runner = runner_factory(rabbit_config)
    runner.add_service(EventListeningServiceOne, LoggingWorkerContext)
    runner.add_service(EventListeningServiceTwo, LoggingWorkerContext)
    runner.add_service(EventRaisingService, LoggingWorkerContext)
    runner.start()

    container = get_container(runner, EventRaisingService)
    with entrypoint_hook(container, "say_hello") as say_hello:
        say_hello()

    with wait_for_call(5, one_called), wait_for_call(5, two_called):

        assert predictable_call_ids.call_count == 3
        stack_request.assert_has_calls([
            call(None),
            call(['event_raiser.say_hello.0']),
            call(['event_raiser.say_hello.0']),
        ])
开发者ID:juanginzo,项目名称:nameko,代码行数:47,代码来源:test_call_id_stack.py


示例20: test_runner_catches_container_errors

def test_runner_catches_container_errors(runner_factory, rabbit_config):

    runner = runner_factory(rabbit_config, ExampleService)
    runner.start()

    container = get_container(runner, ExampleService)

    rpc_consumer = get_extension(container, RpcConsumer)
    with patch.object(
            rpc_consumer, 'handle_result', autospec=True) as handle_result:
        exception = Exception("error")
        handle_result.side_effect = exception

        # use a standalone rpc proxy to call exampleservice.task()
        with ServiceRpcProxy("exampleservice", rabbit_config) as proxy:
            # proxy.task() will hang forever because it generates an error
            # in the remote container (so never receives a response).
            proxy.task.call_async()

        # verify that the error bubbles up to runner.wait()
        with pytest.raises(Exception) as exc_info:
            runner.wait()
        assert exc_info.value == exception
开发者ID:evilino,项目名称:nameko,代码行数:23,代码来源:test_errors.py



注:本文中的nameko.testing.utils.get_container函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python utils.get_extension函数代码示例发布时间:2022-05-27
下一篇:
Python services.entrypoint_waiter函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap