本文整理汇总了Java中io.grpc.stub.ServerCallStreamObserver类的典型用法代码示例。如果您正苦于以下问题:Java ServerCallStreamObserver类的具体用法?Java ServerCallStreamObserver怎么用?Java ServerCallStreamObserver使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
ServerCallStreamObserver类属于io.grpc.stub包,在下文中一共展示了ServerCallStreamObserver类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: oneToMany
import io.grpc.stub.ServerCallStreamObserver; //导入依赖的package包/类
/**
* Implements a unary -> stream call as {@link Single} -> {@link Flowable}, where the server responds with a
* stream of messages.
*/
public static <TRequest, TResponse> void oneToMany(
TRequest request, StreamObserver<TResponse> responseObserver,
Function<Single<TRequest>, Flowable<TResponse>> delegate) {
try {
Single<TRequest> rxRequest = Single.just(request);
Flowable<TResponse> rxResponse = Preconditions.checkNotNull(delegate.apply(rxRequest));
rxResponse.subscribe(new ReactivePublisherBackpressureOnReadyHandler<TResponse>(
(ServerCallStreamObserver<TResponse>) responseObserver));
} catch (Throwable throwable) {
responseObserver.onError(prepareError(throwable));
}
}
开发者ID:salesforce,项目名称:reactive-grpc,代码行数:18,代码来源:ServerCalls.java
示例2: ReactivePublisherBackpressureOnReadyHandler
import io.grpc.stub.ServerCallStreamObserver; //导入依赖的package包/类
public ReactivePublisherBackpressureOnReadyHandler(ServerCallStreamObserver<T> requestStream) {
this.requestStream = Preconditions.checkNotNull(requestStream);
requestStream.setOnReadyHandler(this);
requestStream.setOnCancelHandler(new Runnable() {
@Override
public void run() {
subscription.cancel();
}
});
}
开发者ID:salesforce,项目名称:reactive-grpc,代码行数:11,代码来源:ReactivePublisherBackpressureOnReadyHandler.java
示例3: oneToMany
import io.grpc.stub.ServerCallStreamObserver; //导入依赖的package包/类
/**
* Implements a unary -> stream call as {@link Mono} -> {@link Flux}, where the server responds with a
* stream of messages.
*/
public static <TRequest, TResponse> void oneToMany(
TRequest request, StreamObserver<TResponse> responseObserver,
Function<Mono<TRequest>, Flux<TResponse>> delegate) {
try {
Mono<TRequest> rxRequest = Mono.just(request);
Flux<TResponse> rxResponse = Preconditions.checkNotNull(delegate.apply(rxRequest));
rxResponse.subscribe(new ReactivePublisherBackpressureOnReadyHandler<>(
(ServerCallStreamObserver<TResponse>) responseObserver));
} catch (Throwable throwable) {
responseObserver.onError(prepareError(throwable));
}
}
开发者ID:salesforce,项目名称:reactive-grpc,代码行数:18,代码来源:ServerCalls.java
示例4: BlockingStreamObserver
import io.grpc.stub.ServerCallStreamObserver; //导入依赖的package包/类
BlockingStreamObserver(String id, ServerCallStreamObserver<T> delegate) {
this.id = id;
this.delegate = delegate;
this.delegate.setOnReadyHandler(this::wakeup);
this.delegate.setOnCancelHandler(this::wakeup);
delegate.setCompression("gzip");
delegate.setMessageCompression(true);
}
开发者ID:uweschaefer,项目名称:factcast,代码行数:9,代码来源:BlockingStreamObserver.java
示例5: subscribe
import io.grpc.stub.ServerCallStreamObserver; //导入依赖的package包/类
@Override
public void subscribe(@NonNull MSG_SubscriptionRequest request,
@NonNull StreamObserver<MSG_Notification> responseObserver) {
SubscriptionRequestTO req = converter.fromProto(request);
resetDebugInfo(req);
BlockingStreamObserver<MSG_Notification> resp = new BlockingStreamObserver<>(req.toString(),
(ServerCallStreamObserver) responseObserver);
final boolean idOnly = req.idOnly();
store.subscribe(req, new GrpcObserverAdapter(req.toString(), resp, f -> idOnly ? converter
.createNotificationFor(f.id()) : converter.createNotificationFor(f)));
}
开发者ID:uweschaefer,项目名称:factcast,代码行数:15,代码来源:FactStoreGrpcService.java
示例6: testFetchById
import io.grpc.stub.ServerCallStreamObserver; //导入依赖的package包/类
@Test
public void testFetchById() throws Exception {
UUID id = UUID.randomUUID();
uut.fetchById(protoConverter.toProto(id), mock(ServerCallStreamObserver.class));
verify(backend).fetchById(eq(id));
}
开发者ID:uweschaefer,项目名称:factcast,代码行数:8,代码来源:FactStoreGrpcService0Test.java
示例7: testSubscribeFacts
import io.grpc.stub.ServerCallStreamObserver; //导入依赖的package包/类
@Test
public void testSubscribeFacts() throws Exception {
SubscriptionRequest req = SubscriptionRequest.catchup(FactSpec.forMark()).fromNowOn();
when(backend.subscribe(this.reqCaptor.capture(), any())).thenReturn(null);
uut.subscribe(new ProtoConverter().toProto(SubscriptionRequestTO.forFacts(req)), mock(
ServerCallStreamObserver.class));
verify(backend).subscribe(any(), any());
assertFalse(reqCaptor.getValue().idOnly());
}
开发者ID:uweschaefer,项目名称:factcast,代码行数:12,代码来源:FactStoreGrpcService0Test.java
示例8: testSubscribeIds
import io.grpc.stub.ServerCallStreamObserver; //导入依赖的package包/类
@Test
public void testSubscribeIds() throws Exception {
SubscriptionRequest req = SubscriptionRequest.catchup(FactSpec.forMark()).fromNowOn();
when(backend.subscribe(this.reqCaptor.capture(), any())).thenReturn(null);
uut.subscribe(new ProtoConverter().toProto(SubscriptionRequestTO.forIds(req)), mock(
ServerCallStreamObserver.class));
verify(backend).subscribe(any(), any());
assertTrue(reqCaptor.getValue().idOnly());
}
开发者ID:uweschaefer,项目名称:factcast,代码行数:12,代码来源:FactStoreGrpcService0Test.java
示例9: watch
import io.grpc.stub.ServerCallStreamObserver; //导入依赖的package包/类
@Override
public void watch(Request req, StreamObserver<Result> responseObserver) {
logger.info("Start watching: " + req.getQuery());
int responseNo = 0;
final ServerCallStreamObserver responseObserver2 = (ServerCallStreamObserver) responseObserver;
while (!responseObserver2.isCancelled()) {
sleepUpToMiilis(1000);
responseObserver.onNext(Result.newBuilder().setTitle(format("result %d for [%s] from backend %d", responseNo++, req.getQuery(), id)).build());
}
responseObserver2.setOnCancelHandler(() -> logger.warning("Request canceled!"));
}
开发者ID:mateuszdyminski,项目名称:grpc,代码行数:14,代码来源:Backend.java
示例10: staticUnaryCallSetsMessageCompression
import io.grpc.stub.ServerCallStreamObserver; //导入依赖的package包/类
@Override
public void staticUnaryCallSetsMessageCompression(SimpleRequest request,
StreamObserver<SimpleResponse> responseObserver) {
if (!request.equals(REQUEST_MESSAGE)) {
responseObserver.onError(new IllegalArgumentException("Unexpected request: " + request));
return;
}
ServerCallStreamObserver<SimpleResponse> callObserver =
(ServerCallStreamObserver<SimpleResponse>) responseObserver;
callObserver.setCompression("gzip");
callObserver.setMessageCompression(true);
responseObserver.onNext(RESPONSE_MESSAGE);
responseObserver.onCompleted();
}
开发者ID:line,项目名称:armeria,代码行数:15,代码来源:GrpcServiceServerTest.java
示例11: GrpcSink
import io.grpc.stub.ServerCallStreamObserver; //导入依赖的package包/类
@VisibleForTesting
GrpcSink(
final String rpcCommandName,
ServerCallStreamObserver<RunResponse> observer,
ExecutorService executor) {
// This queue is intentionally unbounded: we always act on it fairly quickly so filling up
// RAM is not a concern but we don't want to block in the gRPC cancel/onready handlers.
this.actionQueue = new LinkedBlockingQueue<>();
this.exchanger = new Exchanger<>();
this.observer = observer;
this.observer.setOnCancelHandler(
() -> {
Thread commandThread = GrpcSink.this.commandThread.get();
if (commandThread != null) {
logger.info(
String.format(
"Interrupting thread %s due to the streaming %s call being cancelled "
+ "(likely client hang up or explicit gRPC-level cancellation)",
commandThread.getName(), rpcCommandName));
commandThread.interrupt();
}
actionQueue.offer(SinkThreadAction.DISCONNECT);
});
this.observer.setOnReadyHandler(() -> actionQueue.offer(SinkThreadAction.READY));
this.future = executor.submit(GrpcSink.this::call);
}
开发者ID:bazelbuild,项目名称:bazel,代码行数:28,代码来源:GrpcServerImpl.java
示例12: serverReflectionInfo
import io.grpc.stub.ServerCallStreamObserver; //导入依赖的package包/类
@Override
public StreamObserver<ServerReflectionRequest> serverReflectionInfo(
final StreamObserver<ServerReflectionResponse> responseObserver) {
final ServerCallStreamObserver<ServerReflectionResponse> serverCallStreamObserver =
(ServerCallStreamObserver<ServerReflectionResponse>) responseObserver;
ProtoReflectionStreamObserver requestObserver =
new ProtoReflectionStreamObserver(updateIndexIfNecessary(), serverCallStreamObserver);
serverCallStreamObserver.setOnReadyHandler(requestObserver);
serverCallStreamObserver.disableAutoInboundFlowControl();
serverCallStreamObserver.request(1);
return requestObserver;
}
开发者ID:grpc,项目名称:grpc-java,代码行数:13,代码来源:ProtoReflectionService.java
示例13: streamingCall
import io.grpc.stub.ServerCallStreamObserver; //导入依赖的package包/类
@Override
public StreamObserver<Messages.SimpleRequest> streamingCall(
final StreamObserver<Messages.SimpleResponse> observer) {
final ServerCallStreamObserver<Messages.SimpleResponse> responseObserver =
(ServerCallStreamObserver<Messages.SimpleResponse>) observer;
// TODO(spencerfang): flow control to stop reading when !responseObserver.isReady
return new StreamObserver<Messages.SimpleRequest>() {
@Override
public void onNext(Messages.SimpleRequest value) {
if (shutdown.get()) {
responseObserver.onCompleted();
return;
}
responseObserver.onNext(Utils.makeResponse(value));
}
@Override
public void onError(Throwable t) {
// other side closed with non OK
responseObserver.onError(t);
}
@Override
public void onCompleted() {
// other side closed with OK
responseObserver.onCompleted();
}
};
}
开发者ID:grpc,项目名称:grpc-java,代码行数:30,代码来源:AsyncServer.java
示例14: streamingFromServer
import io.grpc.stub.ServerCallStreamObserver; //导入依赖的package包/类
@Override
public void streamingFromServer(
final Messages.SimpleRequest request,
final StreamObserver<Messages.SimpleResponse> observer) {
// send forever, until the client cancels or we shut down
final Messages.SimpleResponse response = Utils.makeResponse(request);
final ServerCallStreamObserver<Messages.SimpleResponse> responseObserver =
(ServerCallStreamObserver<Messages.SimpleResponse>) observer;
// If the client cancels, copyWithFlowControl takes care of calling
// responseObserver.onCompleted() for us
StreamObservers.copyWithFlowControl(
new Iterator<Messages.SimpleResponse>() {
@Override
public boolean hasNext() {
return !shutdown.get() && !responseObserver.isCancelled();
}
@Override
public Messages.SimpleResponse next() {
return response;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
},
responseObserver);
}
开发者ID:grpc,项目名称:grpc-java,代码行数:30,代码来源:AsyncServer.java
示例15: subscribe
import io.grpc.stub.ServerCallStreamObserver; //导入依赖的package包/类
@Override
public void subscribe(Subscriber<? super T> subscriber) {
Preconditions.checkNotNull(subscriber);
subscriber.onSubscribe(new Subscription() {
private static final int MAX_REQUEST_RETRIES = 20;
@Override
public void request(long l) {
// Some Reactive Streams implementations use Long.MAX_VALUE to indicate "all messages"; gRPC uses Integer.MAX_VALUE.
int i = (int) Math.min(l, Integer.MAX_VALUE);
// Very rarely, request() gets called before the client has finished setting up its stream. If this
// happens, wait momentarily and try again.
for (int j = 0; j < MAX_REQUEST_RETRIES; j++) {
try {
callStreamObserver.request(i);
break;
} catch (IllegalStateException ex) {
if (j == MAX_REQUEST_RETRIES - 1) {
throw ex;
}
try {
Thread.sleep(1);
} catch (InterruptedException e) {
// no-op
}
}
}
}
@Override
public void cancel() {
// Don't cancel twice if the server is already canceled
if (callStreamObserver instanceof ServerCallStreamObserver && ((ServerCallStreamObserver) callStreamObserver).isCancelled()) {
return;
}
isCanceled = true;
if (callStreamObserver instanceof ClientCallStreamObserver) {
((ClientCallStreamObserver) callStreamObserver).cancel("Client canceled request", null);
} else {
callStreamObserver.onError(Status.CANCELLED.withDescription("Server canceled request").asRuntimeException());
}
}
});
this.subscriber = subscriber;
subscribed.countDown();
}
开发者ID:salesforce,项目名称:reactive-grpc,代码行数:50,代码来源:ReactiveStreamObserverPublisher.java
示例16: unaryCall
import io.grpc.stub.ServerCallStreamObserver; //导入依赖的package包/类
/**
* Immediately responds with a payload of the type and size specified in the request.
*/
@Override
public void unaryCall(SimpleRequest req, StreamObserver<SimpleResponse> responseObserver) {
ServerCallStreamObserver<SimpleResponse> obs =
(ServerCallStreamObserver<SimpleResponse>) responseObserver;
SimpleResponse.Builder responseBuilder = SimpleResponse.newBuilder();
try {
switch (req.getResponseCompression()) {
case DEFLATE:
// fallthrough, just use gzip
case GZIP:
obs.setCompression("gzip");
break;
case NONE:
obs.setCompression("identity");
break;
case UNRECOGNIZED:
// fallthrough
default:
obs.onError(Status.INVALID_ARGUMENT
.withDescription("Unknown: " + req.getResponseCompression())
.asRuntimeException());
return;
}
} catch (IllegalArgumentException e) {
obs.onError(Status.UNIMPLEMENTED
.withDescription("compression not supported.")
.withCause(e)
.asRuntimeException());
return;
}
if (req.getResponseSize() != 0) {
boolean compressable = compressableResponse(req.getResponseType());
ByteString dataBuffer = compressable ? compressableBuffer : uncompressableBuffer;
// For consistency with the c++ TestServiceImpl, use a random offset for unary calls.
// TODO(wonderfly): whether or not this is a good approach needs further discussion.
int offset = random.nextInt(
compressable ? compressableBuffer.size() : uncompressableBuffer.size());
ByteString payload = generatePayload(dataBuffer, offset, req.getResponseSize());
responseBuilder.getPayloadBuilder()
.setType(compressable ? PayloadType.COMPRESSABLE : PayloadType.UNCOMPRESSABLE)
.setBody(payload);
}
if (req.hasResponseStatus()) {
obs.onError(Status.fromCodeValue(req.getResponseStatus().getCode())
.withDescription(req.getResponseStatus().getMessage())
.asRuntimeException());
return;
}
responseObserver.onNext(responseBuilder.build());
responseObserver.onCompleted();
}
开发者ID:line,项目名称:armeria,代码行数:58,代码来源:TestServiceImpl.java
示例17: unaryCall
import io.grpc.stub.ServerCallStreamObserver; //导入依赖的package包/类
/**
* Immediately responds with a payload of the type and size specified in the request.
*/
@Override
public void unaryCall(SimpleRequest req, StreamObserver<SimpleResponse> responseObserver) {
ServerCallStreamObserver<SimpleResponse> obs =
(ServerCallStreamObserver<SimpleResponse>) responseObserver;
SimpleResponse.Builder responseBuilder = SimpleResponse.newBuilder();
try {
if (req.hasResponseCompressed() && req.getResponseCompressed().getValue()) {
obs.setCompression("gzip");
} else {
obs.setCompression("identity");
}
} catch (IllegalArgumentException e) {
obs.onError(Status.UNIMPLEMENTED
.withDescription("compression not supported.")
.withCause(e)
.asRuntimeException());
return;
}
if (req.getResponseSize() != 0) {
boolean compressable = compressableResponse(req.getResponseType());
ByteString dataBuffer = compressable ? compressableBuffer : uncompressableBuffer;
// For consistency with the c++ TestServiceImpl, use a random offset for unary calls.
// TODO(wonderfly): whether or not this is a good approach needs further discussion.
int offset = random.nextInt(
compressable ? compressableBuffer.size() : uncompressableBuffer.size());
ByteString payload = generatePayload(dataBuffer, offset, req.getResponseSize());
responseBuilder.getPayloadBuilder()
.setType(compressable ? PayloadType.COMPRESSABLE : PayloadType.UNCOMPRESSABLE)
.setBody(payload);
}
if (req.hasResponseStatus()) {
obs.onError(Status.fromCodeValue(req.getResponseStatus().getCode())
.withDescription(req.getResponseStatus().getMessage())
.asRuntimeException());
return;
}
responseObserver.onNext(responseBuilder.build());
responseObserver.onCompleted();
}
开发者ID:grpc,项目名称:grpc-java,代码行数:46,代码来源:TestServiceImpl.java
示例18: startChainingServer
import io.grpc.stub.ServerCallStreamObserver; //导入依赖的package包/类
/**
* Create a chain of client to server calls which can be cancelled top down.
*
* @return a Future that completes when call chain is created
*/
private Future<?> startChainingServer(final int depthThreshold) throws IOException {
final AtomicInteger serversReady = new AtomicInteger();
final SettableFuture<Void> chainReady = SettableFuture.create();
class ChainingService extends TestServiceGrpc.TestServiceImplBase {
@Override
public void unaryCall(final SimpleRequest request,
final StreamObserver<SimpleResponse> responseObserver) {
((ServerCallStreamObserver) responseObserver).setOnCancelHandler(new Runnable() {
@Override
public void run() {
receivedCancellations.countDown();
}
});
if (serversReady.incrementAndGet() == depthThreshold) {
// Stop recursion
chainReady.set(null);
return;
}
Context.currentContextExecutor(otherWork).execute(new Runnable() {
@Override
public void run() {
try {
blockingStub.unaryCall(request);
} catch (StatusRuntimeException e) {
Status status = e.getStatus();
if (status.getCode() == Status.Code.CANCELLED) {
observedCancellations.countDown();
} else {
responseObserver.onError(e);
}
}
}
});
}
}
server = InProcessServerBuilder.forName("channel").executor(otherWork)
.addService(new ChainingService())
.build().start();
return chainReady;
}
开发者ID:grpc,项目名称:grpc-java,代码行数:48,代码来源:CascadingTest.java
示例19: ProtoReflectionStreamObserver
import io.grpc.stub.ServerCallStreamObserver; //导入依赖的package包/类
ProtoReflectionStreamObserver(
ServerReflectionIndex serverReflectionIndex,
ServerCallStreamObserver<ServerReflectionResponse> serverCallStreamObserver) {
this.serverReflectionIndex = serverReflectionIndex;
this.serverCallStreamObserver = checkNotNull(serverCallStreamObserver, "observer");
}
开发者ID:grpc,项目名称:grpc-java,代码行数:7,代码来源:ProtoReflectionService.java
示例20: streamingBothWays
import io.grpc.stub.ServerCallStreamObserver; //导入依赖的package包/类
@Override
public StreamObserver<Messages.SimpleRequest> streamingBothWays(
final StreamObserver<Messages.SimpleResponse> observer) {
// receive data forever and send data forever until client cancels or we shut down.
final ServerCallStreamObserver<Messages.SimpleResponse> responseObserver =
(ServerCallStreamObserver<Messages.SimpleResponse>) observer;
// If the client cancels, copyWithFlowControl takes care of calling
// responseObserver.onCompleted() for us
StreamObservers.copyWithFlowControl(
new Iterator<Messages.SimpleResponse>() {
@Override
public boolean hasNext() {
return !shutdown.get() && !responseObserver.isCancelled();
}
@Override
public Messages.SimpleResponse next() {
return BIDI_RESPONSE;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
},
responseObserver
);
return new StreamObserver<Messages.SimpleRequest>() {
@Override
public void onNext(final Messages.SimpleRequest request) {
// noop
}
@Override
public void onError(Throwable t) {
// other side cancelled
}
@Override
public void onCompleted() {
// Should never happen, because clients should cancel this call in order to stop
// the operation. Also because copyWithFlowControl hogs the inbound network thread
// via the handler for onReady, we would never expect this callback to be able to
// run anyways.
log.severe("clients should CANCEL the call to stop bidi streaming");
}
};
}
开发者ID:grpc,项目名称:grpc-java,代码行数:50,代码来源:AsyncServer.java
注:本文中的io.grpc.stub.ServerCallStreamObserver类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论