• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python test_common.test_server函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中tests.unit.test_common.test_server函数的典型用法代码示例。如果您正苦于以下问题:Python test_server函数的具体用法?Python test_server怎么用?Python test_server使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了test_server函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: setUp

 def setUp(self):
     self._server = test_common.test_server()
     self._server.add_generic_rpc_handlers((_GenericHandler(
         weakref.proxy(self)),))
     port = self._server.add_insecure_port('[::]:0')
     self._server.start()
     self._channel = grpc.insecure_channel('localhost:%d' % port)
开发者ID:Falco20019,项目名称:grpc,代码行数:7,代码来源:_error_message_encoding_test.py


示例2: testSecureClientCert

    def testSecureClientCert(self):
        handler = grpc.method_handlers_generic_handler('test', {
            'UnaryUnary':
            grpc.unary_unary_rpc_method_handler(handle_unary_unary)
        })
        server = test_common.test_server()
        server.add_generic_rpc_handlers((handler,))
        server_cred = grpc.ssl_server_credentials(
            _SERVER_CERTS,
            root_certificates=_TEST_ROOT_CERTIFICATES,
            require_client_auth=True)
        port = server.add_secure_port('[::]:0', server_cred)
        server.start()

        channel_creds = grpc.ssl_channel_credentials(
            root_certificates=_TEST_ROOT_CERTIFICATES,
            private_key=_PRIVATE_KEY,
            certificate_chain=_CERTIFICATE_CHAIN)
        channel = grpc.secure_channel(
            'localhost:{}'.format(port),
            channel_creds,
            options=_PROPERTY_OPTIONS)

        response = channel.unary_unary(_UNARY_UNARY)(_REQUEST)
        channel.close()
        server.stop(None)

        auth_data = pickle.loads(response)
        auth_ctx = auth_data[_AUTH_CTX]
        six.assertCountEqual(self, _CLIENT_IDS, auth_data[_ID])
        self.assertEqual('x509_subject_alternative_name', auth_data[_ID_KEY])
        self.assertSequenceEqual([b'ssl'], auth_ctx['transport_security_type'])
        self.assertSequenceEqual([b'*.test.google.com'],
                                 auth_ctx['x509_common_name'])
开发者ID:Falco20019,项目名称:grpc,代码行数:34,代码来源:_auth_context_test.py


示例3: testSessionResumption

    def testSessionResumption(self):
        # Set up a secure server
        handler = grpc.method_handlers_generic_handler('test', {
            'UnaryUnary':
            grpc.unary_unary_rpc_method_handler(handle_unary_unary)
        })
        server = test_common.test_server()
        server.add_generic_rpc_handlers((handler,))
        server_cred = grpc.ssl_server_credentials(_SERVER_CERTS)
        port = server.add_secure_port('[::]:0', server_cred)
        server.start()

        # Create a cache for TLS session tickets
        cache = session_cache.ssl_session_cache_lru(1)
        channel_creds = grpc.ssl_channel_credentials(
            root_certificates=_TEST_ROOT_CERTIFICATES)
        channel_options = _PROPERTY_OPTIONS + (
            ('grpc.ssl_session_cache', cache),)

        # Initial connection has no session to resume
        self._do_one_shot_client_rpc(
            channel_creds,
            channel_options,
            port,
            expect_ssl_session_reused=[b'false'])

        # Subsequent connections resume sessions
        self._do_one_shot_client_rpc(
            channel_creds,
            channel_options,
            port,
            expect_ssl_session_reused=[b'true'])
        server.stop(None)
开发者ID:Falco20019,项目名称:grpc,代码行数:33,代码来源:_auth_context_test.py


示例4: testSecureNoCert

    def testSecureNoCert(self):
        handler = grpc.method_handlers_generic_handler('test', {
            'UnaryUnary':
            grpc.unary_unary_rpc_method_handler(handle_unary_unary)
        })
        server = test_common.test_server()
        server.add_generic_rpc_handlers((handler,))
        server_cred = grpc.ssl_server_credentials(_SERVER_CERTS)
        port = server.add_secure_port('[::]:0', server_cred)
        server.start()

        channel_creds = grpc.ssl_channel_credentials(
            root_certificates=_TEST_ROOT_CERTIFICATES)
        channel = grpc.secure_channel(
            'localhost:{}'.format(port),
            channel_creds,
            options=_PROPERTY_OPTIONS)
        response = channel.unary_unary(_UNARY_UNARY)(_REQUEST)
        channel.close()
        server.stop(None)

        auth_data = pickle.loads(response)
        self.assertIsNone(auth_data[_ID])
        self.assertIsNone(auth_data[_ID_KEY])
        self.assertDictEqual({
            'transport_security_type': [b'ssl'],
            'ssl_session_reused': [b'false'],
        }, auth_data[_AUTH_CTX])
开发者ID:Falco20019,项目名称:grpc,代码行数:28,代码来源:_auth_context_test.py


示例5: serve

def serve():
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--port', type=int, required=True, help='the port on which to serve')
    parser.add_argument(
        '--use_tls',
        default=False,
        type=resources.parse_bool,
        help='require a secure connection')
    args = parser.parse_args()

    server = test_common.test_server()
    test_pb2_grpc.add_TestServiceServicer_to_server(methods.TestService(),
                                                    server)
    if args.use_tls:
        private_key = resources.private_key()
        certificate_chain = resources.certificate_chain()
        credentials = grpc.ssl_server_credentials(((private_key,
                                                    certificate_chain),))
        server.add_secure_port('[::]:{}'.format(args.port), credentials)
    else:
        server.add_insecure_port('[::]:{}'.format(args.port))

    server.start()
    _LOGGER.info('Server serving.')
    try:
        while True:
            time.sleep(_ONE_DAY_IN_SECONDS)
    except BaseException as e:
        _LOGGER.info('Caught exception "%s"; stopping server...', e)
        server.stop(None)
        _LOGGER.info('Server stopped; exiting.')
开发者ID:clockzhong,项目名称:grpc,代码行数:32,代码来源:server.py


示例6: _CreateService

def _CreateService():
    """Provides a servicer backend and a stub.

  Returns:
    A _Service with which to test RPCs.
  """
    servicer_methods = _ServicerMethods()

    class Servicer(getattr(service_pb2_grpc, SERVICER_IDENTIFIER)):

        def UnaryCall(self, request, context):
            return servicer_methods.UnaryCall(request, context)

        def StreamingOutputCall(self, request, context):
            return servicer_methods.StreamingOutputCall(request, context)

        def StreamingInputCall(self, request_iter, context):
            return servicer_methods.StreamingInputCall(request_iter, context)

        def FullDuplexCall(self, request_iter, context):
            return servicer_methods.FullDuplexCall(request_iter, context)

        def HalfDuplexCall(self, request_iter, context):
            return servicer_methods.HalfDuplexCall(request_iter, context)

    server = test_common.test_server()
    getattr(service_pb2_grpc, ADD_SERVICER_TO_SERVER_IDENTIFIER)(Servicer(),
                                                                 server)
    port = server.add_insecure_port('[::]:0')
    server.start()
    channel = grpc.insecure_channel('localhost:{}'.format(port))
    stub = getattr(service_pb2_grpc, STUB_IDENTIFIER)(channel)
    return _Service(servicer_methods, server, stub)
开发者ID:wowo666,项目名称:grpc,代码行数:33,代码来源:_python_plugin_test.py


示例7: setUp

 def setUp(self):
     self.server = test_common.test_server()
     test_pb2_grpc.add_TestServiceServicer_to_server(service.TestService(),
                                                     self.server)
     port = self.server.add_insecure_port('[::]:0')
     self.server.start()
     self.stub = test_pb2_grpc.TestServiceStub(
         grpc.insecure_channel('localhost:{}'.format(port)))
开发者ID:sitianchao,项目名称:grpc,代码行数:8,代码来源:_insecure_intraop_test.py


示例8: setUp

    def setUp(self):
        self._server = test_common.test_server()
        reflection.enable_server_reflection(_SERVICE_NAMES, self._server)
        port = self._server.add_insecure_port('[::]:0')
        self._server.start()

        channel = grpc.insecure_channel('localhost:%d' % port)
        self._stub = reflection_pb2_grpc.ServerReflectionStub(channel)
开发者ID:alexinblock,项目名称:grpc,代码行数:8,代码来源:_reflection_servicer_test.py


示例9: run_worker_server

def run_worker_server(port):
    server = test_common.test_server()
    servicer = worker_server.WorkerServer()
    services_pb2_grpc.add_WorkerServiceServicer_to_server(servicer, server)
    server.add_insecure_port('[::]:{}'.format(port))
    server.start()
    servicer.wait_for_quit()
    server.stop(0)
开发者ID:wowo666,项目名称:grpc,代码行数:8,代码来源:qps_worker.py


示例10: run_server

def run_server(port_queue):
    server = test_common.test_server()
    port = server.add_insecure_port('[::]:0')
    port_queue.put(port)
    server.add_generic_rpc_handlers((GenericHandler(),))
    server.start()
    # threading.Event.wait() does not exhibit the bug identified in
    # https://github.com/grpc/grpc/issues/17093, sleep instead
    time.sleep(WAIT_TIME)
开发者ID:Falco20019,项目名称:grpc,代码行数:9,代码来源:_server_shutdown_scenarios.py


示例11: setUp

    def setUp(self):
        self._control = test_control.PauseFailControl()
        self._handler = _Handler(self._control)

        self._server = test_common.test_server()
        port = self._server.add_insecure_port('[::]:0')
        self._server.add_generic_rpc_handlers((_GenericHandler(self._handler),))
        self._server.start()

        self._channel = grpc.insecure_channel('localhost:%d' % port)
开发者ID:kpayson64,项目名称:grpc,代码行数:10,代码来源:_rpc_test.py


示例12: start_secure_server

def start_secure_server():
    handler = grpc.method_handlers_generic_handler('test', {
        'UnaryUnary':
        grpc.unary_unary_rpc_method_handler(handle_unary_unary)
    })
    server = test_common.test_server()
    server.add_generic_rpc_handlers((handler,))
    server_cred = grpc.ssl_server_credentials(_SERVER_CERTS)
    port = server.add_secure_port('[::]:0', server_cred)
    server.start()

    return server, port
开发者ID:Falco20019,项目名称:grpc,代码行数:12,代码来源:_session_cache_test.py


示例13: test_call

    def test_call(self):
        self._protoc()

        for services_module in self._services_modules():
            server = test_common.test_server()
            services_module.add_TestServiceServicer_to_server(
                _Servicer(self._messages_pb2.Response), server)
            port = server.add_insecure_port('[::]:0')
            server.start()
            channel = grpc.insecure_channel('localhost:{}'.format(port))
            stub = services_module.TestServiceStub(channel)
            response = stub.Call(self._messages_pb2.Request())
            self.assertEqual(self._messages_pb2.Response(), response)
开发者ID:wowo666,项目名称:grpc,代码行数:13,代码来源:_split_definitions_test.py


示例14: run_test

def run_test(args):
    if args.scenario == SERVER_RAISES_EXCEPTION:
        server = test_common.test_server()
        server.start()
        raise Exception()
    elif args.scenario == SERVER_DEALLOCATED:
        server = test_common.test_server()
        server.start()
        server.__del__()
        while server._state.stage != grpc._server._ServerStage.STOPPED:
            pass
    elif args.scenario == SERVER_FORK_CAN_EXIT:
        port_queue = queue.Queue()
        thread = threading.Thread(target=run_server, args=(port_queue,))
        thread.daemon = True
        thread.start()
        port = port_queue.get()
        channel = grpc.insecure_channel('localhost:%d' % port)
        multi_callable = channel.unary_unary(FORK_EXIT)
        result, call = multi_callable.with_call(REQUEST, wait_for_ready=True)
        os.wait()
    else:
        raise ValueError('unknown test scenario')
开发者ID:Falco20019,项目名称:grpc,代码行数:23,代码来源:_server_shutdown_scenarios.py


示例15: test_call_wait_for_ready_enabled

    def test_call_wait_for_ready_enabled(self):
        # To test the wait mechanism, Python thread is required to make
        #   client set up first without handling them case by case.
        # Also, Python thread don't pass the unhandled exceptions to
        #   main thread. So, it need another method to store the
        #   exceptions and raise them again in main thread.
        unhandled_exceptions = queue.Queue()
        tcp, addr = get_free_loopback_tcp_port()
        wg = test_common.WaitGroup(len(_ALL_CALL_CASES))

        def wait_for_transient_failure(channel_connectivity):
            if channel_connectivity == grpc.ChannelConnectivity.TRANSIENT_FAILURE:
                wg.done()

        def test_call(perform_call):
            with grpc.insecure_channel(addr) as channel:
                try:
                    channel.subscribe(wait_for_transient_failure)
                    perform_call(channel, wait_for_ready=True)
                except BaseException as e:  # pylint: disable=broad-except
                    # If the call failed, the thread would be destroyed. The
                    # channel object can be collected before calling the
                    # callback, which will result in a deadlock.
                    wg.done()
                    unhandled_exceptions.put(e, True)

        test_threads = []
        for perform_call in _ALL_CALL_CASES:
            test_thread = threading.Thread(
                target=test_call, args=(perform_call,))
            test_thread.exception = None
            test_thread.start()
            test_threads.append(test_thread)

        # Start the server after the connections are waiting
        wg.wait()
        tcp.close()
        server = test_common.test_server()
        server.add_generic_rpc_handlers((_GenericHandler(weakref.proxy(self)),))
        server.add_insecure_port(addr)
        server.start()

        for test_thread in test_threads:
            test_thread.join()

        # Stop the server to make test end properly
        server.stop(0)

        if not unhandled_exceptions.empty():
            raise unhandled_exceptions.get(True)
开发者ID:Falco20019,项目名称:grpc,代码行数:50,代码来源:_metadata_flags_test.py


示例16: setUp

 def setUp(self):
     self.server = test_common.test_server()
     services_pb2_grpc.add_FirstServiceServicer_to_server(
         _server_application.FirstServiceServicer(), self.server)
     switch_cert_on_client_num = 10
     initial_cert_config = grpc.ssl_server_certificate_configuration(
         [(SERVER_KEY_1_PEM, SERVER_CERT_CHAIN_1_PEM)],
         root_certificates=CA_2_PEM)
     self.cert_config_fetcher = CertConfigFetcher()
     server_credentials = grpc.dynamic_ssl_server_credentials(
         initial_cert_config,
         self.cert_config_fetcher,
         require_client_authentication=self.require_client_auth())
     self.port = self.server.add_secure_port('[::]:0', server_credentials)
     self.server.start()
开发者ID:CCNITSilchar,项目名称:grpc,代码行数:15,代码来源:_server_ssl_cert_config_test.py


示例17: setUp

    def setUp(self):
        servicer = health.HealthServicer()
        servicer.set('', health_pb2.HealthCheckResponse.SERVING)
        servicer.set('grpc.test.TestServiceServing',
                     health_pb2.HealthCheckResponse.SERVING)
        servicer.set('grpc.test.TestServiceUnknown',
                     health_pb2.HealthCheckResponse.UNKNOWN)
        servicer.set('grpc.test.TestServiceNotServing',
                     health_pb2.HealthCheckResponse.NOT_SERVING)
        self._server = test_common.test_server()
        port = self._server.add_insecure_port('[::]:0')
        health_pb2_grpc.add_HealthServicer_to_server(servicer, self._server)
        self._server.start()

        channel = grpc.insecure_channel('localhost:%d' % port)
        self._stub = health_pb2_grpc.HealthStub(channel)
开发者ID:alexinblock,项目名称:grpc,代码行数:16,代码来源:_health_servicer_test.py


示例18: setUp

 def setUp(self):
     self.server = test_common.test_server()
     test_pb2_grpc.add_TestServiceServicer_to_server(methods.TestService(),
                                                     self.server)
     port = self.server.add_secure_port(
         '[::]:0',
         grpc.ssl_server_credentials([(resources.private_key(),
                                       resources.certificate_chain())]))
     self.server.start()
     self.stub = test_pb2_grpc.TestServiceStub(
         grpc.secure_channel('localhost:{}'.format(port),
                             grpc.ssl_channel_credentials(
                                 resources.test_root_certificates()), ((
                                     'grpc.ssl_target_name_override',
                                     _SERVER_HOST_OVERRIDE,
                                 ),)))
开发者ID:CCNITSilchar,项目名称:grpc,代码行数:16,代码来源:_secure_intraop_test.py


示例19: testInsecure

    def testInsecure(self):
        handler = grpc.method_handlers_generic_handler('test', {
            'UnaryUnary':
            grpc.unary_unary_rpc_method_handler(handle_unary_unary)
        })
        server = test_common.test_server()
        server.add_generic_rpc_handlers((handler,))
        port = server.add_insecure_port('[::]:0')
        server.start()

        with grpc.insecure_channel('localhost:%d' % port) as channel:
            response = channel.unary_unary(_UNARY_UNARY)(_REQUEST)
        server.stop(None)

        auth_data = pickle.loads(response)
        self.assertIsNone(auth_data[_ID])
        self.assertIsNone(auth_data[_ID_KEY])
        self.assertDictEqual({}, auth_data[_AUTH_CTX])
开发者ID:Falco20019,项目名称:grpc,代码行数:18,代码来源:_auth_context_test.py


示例20: _CreateIncompleteService

def _CreateIncompleteService():
    """Provides a servicer backend that fails to implement methods and its stub.

  Returns:
    A _Service with which to test RPCs. The returned _Service's
      servicer_methods implements none of the methods required of it.
  """

    class Servicer(getattr(service_pb2_grpc, SERVICER_IDENTIFIER)):
        pass

    server = test_common.test_server()
    getattr(service_pb2_grpc, ADD_SERVICER_TO_SERVER_IDENTIFIER)(Servicer(),
                                                                 server)
    port = server.add_insecure_port('[::]:0')
    server.start()
    channel = grpc.insecure_channel('localhost:{}'.format(port))
    stub = getattr(service_pb2_grpc, STUB_IDENTIFIER)(channel)
    return _Service(None, server, stub)
开发者ID:wowo666,项目名称:grpc,代码行数:19,代码来源:_python_plugin_test.py



注:本文中的tests.unit.test_common.test_server函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python unittest.main函数代码示例发布时间:2022-05-27
下一篇:
Python test_common.metadata_transmitted函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap