本文整理汇总了Python中tests.unit.test_common.test_server函数的典型用法代码示例。如果您正苦于以下问题:Python test_server函数的具体用法?Python test_server怎么用?Python test_server使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了test_server函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: setUp
def setUp(self):
self._server = test_common.test_server()
self._server.add_generic_rpc_handlers((_GenericHandler(
weakref.proxy(self)),))
port = self._server.add_insecure_port('[::]:0')
self._server.start()
self._channel = grpc.insecure_channel('localhost:%d' % port)
开发者ID:Falco20019,项目名称:grpc,代码行数:7,代码来源:_error_message_encoding_test.py
示例2: testSecureClientCert
def testSecureClientCert(self):
handler = grpc.method_handlers_generic_handler('test', {
'UnaryUnary':
grpc.unary_unary_rpc_method_handler(handle_unary_unary)
})
server = test_common.test_server()
server.add_generic_rpc_handlers((handler,))
server_cred = grpc.ssl_server_credentials(
_SERVER_CERTS,
root_certificates=_TEST_ROOT_CERTIFICATES,
require_client_auth=True)
port = server.add_secure_port('[::]:0', server_cred)
server.start()
channel_creds = grpc.ssl_channel_credentials(
root_certificates=_TEST_ROOT_CERTIFICATES,
private_key=_PRIVATE_KEY,
certificate_chain=_CERTIFICATE_CHAIN)
channel = grpc.secure_channel(
'localhost:{}'.format(port),
channel_creds,
options=_PROPERTY_OPTIONS)
response = channel.unary_unary(_UNARY_UNARY)(_REQUEST)
channel.close()
server.stop(None)
auth_data = pickle.loads(response)
auth_ctx = auth_data[_AUTH_CTX]
six.assertCountEqual(self, _CLIENT_IDS, auth_data[_ID])
self.assertEqual('x509_subject_alternative_name', auth_data[_ID_KEY])
self.assertSequenceEqual([b'ssl'], auth_ctx['transport_security_type'])
self.assertSequenceEqual([b'*.test.google.com'],
auth_ctx['x509_common_name'])
开发者ID:Falco20019,项目名称:grpc,代码行数:34,代码来源:_auth_context_test.py
示例3: testSessionResumption
def testSessionResumption(self):
# Set up a secure server
handler = grpc.method_handlers_generic_handler('test', {
'UnaryUnary':
grpc.unary_unary_rpc_method_handler(handle_unary_unary)
})
server = test_common.test_server()
server.add_generic_rpc_handlers((handler,))
server_cred = grpc.ssl_server_credentials(_SERVER_CERTS)
port = server.add_secure_port('[::]:0', server_cred)
server.start()
# Create a cache for TLS session tickets
cache = session_cache.ssl_session_cache_lru(1)
channel_creds = grpc.ssl_channel_credentials(
root_certificates=_TEST_ROOT_CERTIFICATES)
channel_options = _PROPERTY_OPTIONS + (
('grpc.ssl_session_cache', cache),)
# Initial connection has no session to resume
self._do_one_shot_client_rpc(
channel_creds,
channel_options,
port,
expect_ssl_session_reused=[b'false'])
# Subsequent connections resume sessions
self._do_one_shot_client_rpc(
channel_creds,
channel_options,
port,
expect_ssl_session_reused=[b'true'])
server.stop(None)
开发者ID:Falco20019,项目名称:grpc,代码行数:33,代码来源:_auth_context_test.py
示例4: testSecureNoCert
def testSecureNoCert(self):
handler = grpc.method_handlers_generic_handler('test', {
'UnaryUnary':
grpc.unary_unary_rpc_method_handler(handle_unary_unary)
})
server = test_common.test_server()
server.add_generic_rpc_handlers((handler,))
server_cred = grpc.ssl_server_credentials(_SERVER_CERTS)
port = server.add_secure_port('[::]:0', server_cred)
server.start()
channel_creds = grpc.ssl_channel_credentials(
root_certificates=_TEST_ROOT_CERTIFICATES)
channel = grpc.secure_channel(
'localhost:{}'.format(port),
channel_creds,
options=_PROPERTY_OPTIONS)
response = channel.unary_unary(_UNARY_UNARY)(_REQUEST)
channel.close()
server.stop(None)
auth_data = pickle.loads(response)
self.assertIsNone(auth_data[_ID])
self.assertIsNone(auth_data[_ID_KEY])
self.assertDictEqual({
'transport_security_type': [b'ssl'],
'ssl_session_reused': [b'false'],
}, auth_data[_AUTH_CTX])
开发者ID:Falco20019,项目名称:grpc,代码行数:28,代码来源:_auth_context_test.py
示例5: serve
def serve():
parser = argparse.ArgumentParser()
parser.add_argument(
'--port', type=int, required=True, help='the port on which to serve')
parser.add_argument(
'--use_tls',
default=False,
type=resources.parse_bool,
help='require a secure connection')
args = parser.parse_args()
server = test_common.test_server()
test_pb2_grpc.add_TestServiceServicer_to_server(methods.TestService(),
server)
if args.use_tls:
private_key = resources.private_key()
certificate_chain = resources.certificate_chain()
credentials = grpc.ssl_server_credentials(((private_key,
certificate_chain),))
server.add_secure_port('[::]:{}'.format(args.port), credentials)
else:
server.add_insecure_port('[::]:{}'.format(args.port))
server.start()
_LOGGER.info('Server serving.')
try:
while True:
time.sleep(_ONE_DAY_IN_SECONDS)
except BaseException as e:
_LOGGER.info('Caught exception "%s"; stopping server...', e)
server.stop(None)
_LOGGER.info('Server stopped; exiting.')
开发者ID:clockzhong,项目名称:grpc,代码行数:32,代码来源:server.py
示例6: _CreateService
def _CreateService():
"""Provides a servicer backend and a stub.
Returns:
A _Service with which to test RPCs.
"""
servicer_methods = _ServicerMethods()
class Servicer(getattr(service_pb2_grpc, SERVICER_IDENTIFIER)):
def UnaryCall(self, request, context):
return servicer_methods.UnaryCall(request, context)
def StreamingOutputCall(self, request, context):
return servicer_methods.StreamingOutputCall(request, context)
def StreamingInputCall(self, request_iter, context):
return servicer_methods.StreamingInputCall(request_iter, context)
def FullDuplexCall(self, request_iter, context):
return servicer_methods.FullDuplexCall(request_iter, context)
def HalfDuplexCall(self, request_iter, context):
return servicer_methods.HalfDuplexCall(request_iter, context)
server = test_common.test_server()
getattr(service_pb2_grpc, ADD_SERVICER_TO_SERVER_IDENTIFIER)(Servicer(),
server)
port = server.add_insecure_port('[::]:0')
server.start()
channel = grpc.insecure_channel('localhost:{}'.format(port))
stub = getattr(service_pb2_grpc, STUB_IDENTIFIER)(channel)
return _Service(servicer_methods, server, stub)
开发者ID:wowo666,项目名称:grpc,代码行数:33,代码来源:_python_plugin_test.py
示例7: setUp
def setUp(self):
self.server = test_common.test_server()
test_pb2_grpc.add_TestServiceServicer_to_server(service.TestService(),
self.server)
port = self.server.add_insecure_port('[::]:0')
self.server.start()
self.stub = test_pb2_grpc.TestServiceStub(
grpc.insecure_channel('localhost:{}'.format(port)))
开发者ID:sitianchao,项目名称:grpc,代码行数:8,代码来源:_insecure_intraop_test.py
示例8: setUp
def setUp(self):
self._server = test_common.test_server()
reflection.enable_server_reflection(_SERVICE_NAMES, self._server)
port = self._server.add_insecure_port('[::]:0')
self._server.start()
channel = grpc.insecure_channel('localhost:%d' % port)
self._stub = reflection_pb2_grpc.ServerReflectionStub(channel)
开发者ID:alexinblock,项目名称:grpc,代码行数:8,代码来源:_reflection_servicer_test.py
示例9: run_worker_server
def run_worker_server(port):
server = test_common.test_server()
servicer = worker_server.WorkerServer()
services_pb2_grpc.add_WorkerServiceServicer_to_server(servicer, server)
server.add_insecure_port('[::]:{}'.format(port))
server.start()
servicer.wait_for_quit()
server.stop(0)
开发者ID:wowo666,项目名称:grpc,代码行数:8,代码来源:qps_worker.py
示例10: run_server
def run_server(port_queue):
server = test_common.test_server()
port = server.add_insecure_port('[::]:0')
port_queue.put(port)
server.add_generic_rpc_handlers((GenericHandler(),))
server.start()
# threading.Event.wait() does not exhibit the bug identified in
# https://github.com/grpc/grpc/issues/17093, sleep instead
time.sleep(WAIT_TIME)
开发者ID:Falco20019,项目名称:grpc,代码行数:9,代码来源:_server_shutdown_scenarios.py
示例11: setUp
def setUp(self):
self._control = test_control.PauseFailControl()
self._handler = _Handler(self._control)
self._server = test_common.test_server()
port = self._server.add_insecure_port('[::]:0')
self._server.add_generic_rpc_handlers((_GenericHandler(self._handler),))
self._server.start()
self._channel = grpc.insecure_channel('localhost:%d' % port)
开发者ID:kpayson64,项目名称:grpc,代码行数:10,代码来源:_rpc_test.py
示例12: start_secure_server
def start_secure_server():
handler = grpc.method_handlers_generic_handler('test', {
'UnaryUnary':
grpc.unary_unary_rpc_method_handler(handle_unary_unary)
})
server = test_common.test_server()
server.add_generic_rpc_handlers((handler,))
server_cred = grpc.ssl_server_credentials(_SERVER_CERTS)
port = server.add_secure_port('[::]:0', server_cred)
server.start()
return server, port
开发者ID:Falco20019,项目名称:grpc,代码行数:12,代码来源:_session_cache_test.py
示例13: test_call
def test_call(self):
self._protoc()
for services_module in self._services_modules():
server = test_common.test_server()
services_module.add_TestServiceServicer_to_server(
_Servicer(self._messages_pb2.Response), server)
port = server.add_insecure_port('[::]:0')
server.start()
channel = grpc.insecure_channel('localhost:{}'.format(port))
stub = services_module.TestServiceStub(channel)
response = stub.Call(self._messages_pb2.Request())
self.assertEqual(self._messages_pb2.Response(), response)
开发者ID:wowo666,项目名称:grpc,代码行数:13,代码来源:_split_definitions_test.py
示例14: run_test
def run_test(args):
if args.scenario == SERVER_RAISES_EXCEPTION:
server = test_common.test_server()
server.start()
raise Exception()
elif args.scenario == SERVER_DEALLOCATED:
server = test_common.test_server()
server.start()
server.__del__()
while server._state.stage != grpc._server._ServerStage.STOPPED:
pass
elif args.scenario == SERVER_FORK_CAN_EXIT:
port_queue = queue.Queue()
thread = threading.Thread(target=run_server, args=(port_queue,))
thread.daemon = True
thread.start()
port = port_queue.get()
channel = grpc.insecure_channel('localhost:%d' % port)
multi_callable = channel.unary_unary(FORK_EXIT)
result, call = multi_callable.with_call(REQUEST, wait_for_ready=True)
os.wait()
else:
raise ValueError('unknown test scenario')
开发者ID:Falco20019,项目名称:grpc,代码行数:23,代码来源:_server_shutdown_scenarios.py
示例15: test_call_wait_for_ready_enabled
def test_call_wait_for_ready_enabled(self):
# To test the wait mechanism, Python thread is required to make
# client set up first without handling them case by case.
# Also, Python thread don't pass the unhandled exceptions to
# main thread. So, it need another method to store the
# exceptions and raise them again in main thread.
unhandled_exceptions = queue.Queue()
tcp, addr = get_free_loopback_tcp_port()
wg = test_common.WaitGroup(len(_ALL_CALL_CASES))
def wait_for_transient_failure(channel_connectivity):
if channel_connectivity == grpc.ChannelConnectivity.TRANSIENT_FAILURE:
wg.done()
def test_call(perform_call):
with grpc.insecure_channel(addr) as channel:
try:
channel.subscribe(wait_for_transient_failure)
perform_call(channel, wait_for_ready=True)
except BaseException as e: # pylint: disable=broad-except
# If the call failed, the thread would be destroyed. The
# channel object can be collected before calling the
# callback, which will result in a deadlock.
wg.done()
unhandled_exceptions.put(e, True)
test_threads = []
for perform_call in _ALL_CALL_CASES:
test_thread = threading.Thread(
target=test_call, args=(perform_call,))
test_thread.exception = None
test_thread.start()
test_threads.append(test_thread)
# Start the server after the connections are waiting
wg.wait()
tcp.close()
server = test_common.test_server()
server.add_generic_rpc_handlers((_GenericHandler(weakref.proxy(self)),))
server.add_insecure_port(addr)
server.start()
for test_thread in test_threads:
test_thread.join()
# Stop the server to make test end properly
server.stop(0)
if not unhandled_exceptions.empty():
raise unhandled_exceptions.get(True)
开发者ID:Falco20019,项目名称:grpc,代码行数:50,代码来源:_metadata_flags_test.py
示例16: setUp
def setUp(self):
self.server = test_common.test_server()
services_pb2_grpc.add_FirstServiceServicer_to_server(
_server_application.FirstServiceServicer(), self.server)
switch_cert_on_client_num = 10
initial_cert_config = grpc.ssl_server_certificate_configuration(
[(SERVER_KEY_1_PEM, SERVER_CERT_CHAIN_1_PEM)],
root_certificates=CA_2_PEM)
self.cert_config_fetcher = CertConfigFetcher()
server_credentials = grpc.dynamic_ssl_server_credentials(
initial_cert_config,
self.cert_config_fetcher,
require_client_authentication=self.require_client_auth())
self.port = self.server.add_secure_port('[::]:0', server_credentials)
self.server.start()
开发者ID:CCNITSilchar,项目名称:grpc,代码行数:15,代码来源:_server_ssl_cert_config_test.py
示例17: setUp
def setUp(self):
servicer = health.HealthServicer()
servicer.set('', health_pb2.HealthCheckResponse.SERVING)
servicer.set('grpc.test.TestServiceServing',
health_pb2.HealthCheckResponse.SERVING)
servicer.set('grpc.test.TestServiceUnknown',
health_pb2.HealthCheckResponse.UNKNOWN)
servicer.set('grpc.test.TestServiceNotServing',
health_pb2.HealthCheckResponse.NOT_SERVING)
self._server = test_common.test_server()
port = self._server.add_insecure_port('[::]:0')
health_pb2_grpc.add_HealthServicer_to_server(servicer, self._server)
self._server.start()
channel = grpc.insecure_channel('localhost:%d' % port)
self._stub = health_pb2_grpc.HealthStub(channel)
开发者ID:alexinblock,项目名称:grpc,代码行数:16,代码来源:_health_servicer_test.py
示例18: setUp
def setUp(self):
self.server = test_common.test_server()
test_pb2_grpc.add_TestServiceServicer_to_server(methods.TestService(),
self.server)
port = self.server.add_secure_port(
'[::]:0',
grpc.ssl_server_credentials([(resources.private_key(),
resources.certificate_chain())]))
self.server.start()
self.stub = test_pb2_grpc.TestServiceStub(
grpc.secure_channel('localhost:{}'.format(port),
grpc.ssl_channel_credentials(
resources.test_root_certificates()), ((
'grpc.ssl_target_name_override',
_SERVER_HOST_OVERRIDE,
),)))
开发者ID:CCNITSilchar,项目名称:grpc,代码行数:16,代码来源:_secure_intraop_test.py
示例19: testInsecure
def testInsecure(self):
handler = grpc.method_handlers_generic_handler('test', {
'UnaryUnary':
grpc.unary_unary_rpc_method_handler(handle_unary_unary)
})
server = test_common.test_server()
server.add_generic_rpc_handlers((handler,))
port = server.add_insecure_port('[::]:0')
server.start()
with grpc.insecure_channel('localhost:%d' % port) as channel:
response = channel.unary_unary(_UNARY_UNARY)(_REQUEST)
server.stop(None)
auth_data = pickle.loads(response)
self.assertIsNone(auth_data[_ID])
self.assertIsNone(auth_data[_ID_KEY])
self.assertDictEqual({}, auth_data[_AUTH_CTX])
开发者ID:Falco20019,项目名称:grpc,代码行数:18,代码来源:_auth_context_test.py
示例20: _CreateIncompleteService
def _CreateIncompleteService():
"""Provides a servicer backend that fails to implement methods and its stub.
Returns:
A _Service with which to test RPCs. The returned _Service's
servicer_methods implements none of the methods required of it.
"""
class Servicer(getattr(service_pb2_grpc, SERVICER_IDENTIFIER)):
pass
server = test_common.test_server()
getattr(service_pb2_grpc, ADD_SERVICER_TO_SERVER_IDENTIFIER)(Servicer(),
server)
port = server.add_insecure_port('[::]:0')
server.start()
channel = grpc.insecure_channel('localhost:{}'.format(port))
stub = getattr(service_pb2_grpc, STUB_IDENTIFIER)(channel)
return _Service(None, server, stub)
开发者ID:wowo666,项目名称:grpc,代码行数:19,代码来源:_python_plugin_test.py
注:本文中的tests.unit.test_common.test_server函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论