本文整理汇总了Java中java.util.concurrent.Flow.Subscription类的典型用法代码示例。如果您正苦于以下问题:Java Subscription类的具体用法?Java Subscription怎么用?Java Subscription使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
Subscription类属于java.util.concurrent.Flow包,在下文中一共展示了Subscription类的16个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: subscribe
import java.util.concurrent.Flow.Subscription; //导入依赖的package包/类
@Override
public void subscribe(Subscriber<? super T> s) {
subscriber = s;
subscriber.onSubscribe(new Subscription() {
@Override
public void request(long count) {
requested = true;
if (item != null) {
publish(item);
}
}
@Override
public void cancel() {
subscriber = null;
}
});
if (error != null) {
publish(error);
}
}
开发者ID:openknowledge,项目名称:reactive-jax-rs,代码行数:23,代码来源:SingleItemPublisher.java
示例2: andThen
import java.util.concurrent.Flow.Subscription; //导入依赖的package包/类
default CompletableSubscriber<T> andThen(Runnable runnable) {
CompletableSubscriber<T> thisSubscriber = this;
return new AbstractCompletableSubscriber<T>() {
@Override
public void onSubscribe(Subscription subscription) {
thisSubscriber.onSubscribe(subscription);
}
@Override
public void onNext(T item) {
thisSubscriber.onNext(item);
}
@Override
public void onComplete() {
runnable.run();
}
@Override
public void onError(Throwable error) {
thisSubscriber.onError(error);
}
};
}
开发者ID:openknowledge,项目名称:reactive-jax-rs,代码行数:26,代码来源:CompletableSubscriber.java
示例3: exceptionally
import java.util.concurrent.Flow.Subscription; //导入依赖的package包/类
default CompletableSubscriber<T> exceptionally(Consumer<Throwable> consumer) {
CompletableSubscriber<T> thisSubscriber = this;
return new AbstractCompletableSubscriber<T>() {
@Override
public void onSubscribe(Subscription subscription) {
thisSubscriber.onSubscribe(subscription);
}
@Override
public void onNext(T item) {
thisSubscriber.onNext(item);
}
@Override
public void onComplete() {
thisSubscriber.onComplete();
}
@Override
public void onError(Throwable error) {
consumer.accept(error);
}
};
}
开发者ID:openknowledge,项目名称:reactive-jax-rs,代码行数:26,代码来源:CompletableSubscriber.java
示例4: pushEach
import java.util.concurrent.Flow.Subscription; //导入依赖的package包/类
public static <T> CompletableSubscriber<T> pushEach(Consumer<T> consumer) {
return new AbstractCompletableSubscriber<T>() {
@Override
public void onSubscribe(Subscription subscription) {
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(T item) {
consumer.accept(item);
}
};
}
开发者ID:openknowledge,项目名称:reactive-jax-rs,代码行数:15,代码来源:CompletableSubscriber.java
示例5: pullEach
import java.util.concurrent.Flow.Subscription; //导入依赖的package包/类
public static <T> CompletableSubscriber<T> pullEach(BiConsumer<T, Subscription> consumer) {
return new AbstractCompletableSubscriber<T>() {
@Override
public void onSubscribe(Subscription subscription) {
super.onSubscribe(subscription);
subscription.request(1);
}
@Override
public void onNext(T item) {
consumer.accept(item, subscription);
}
};
}
开发者ID:openknowledge,项目名称:reactive-jax-rs,代码行数:16,代码来源:CompletableSubscriber.java
示例6: subscribe
import java.util.concurrent.Flow.Subscription; //导入依赖的package包/类
@Override
public void subscribe(Subscriber<? super byte[]> subscriber) {
byte[] buffer = new byte[1024];
subscriber.onSubscribe(new Subscription() {
@Override
public void request(long count) {
try {
for (long i = 0; i < count; i++) {
int read = stream.read(buffer);
byte[] item = buffer;
if (read != buffer.length) {
item = new byte[read];
System.arraycopy(buffer, 0, item, 0, read);
}
subscriber.onNext(item);
}
} catch (IOException e) {
subscriber.onError(e);
}
}
@Override
public void cancel() {
}
});
}
开发者ID:openknowledge,项目名称:reactive-jax-rs,代码行数:29,代码来源:InputStreamPublisher.java
示例7: save
import java.util.concurrent.Flow.Subscription; //导入依赖的package包/类
public Publisher<Integer> save(Publisher<Customer> customers) throws IOException {
AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(path, StandardOpenOption.WRITE);
AtomicLong offset = new AtomicLong(0);
AtomicInteger resultCount = new AtomicInteger(0);
SingleItemPublisher<Integer> resultPublisher = new SingleItemPublisher<>();
Semaphore writeSemaphore = new Semaphore(1);
writeSemaphore.acquireUninterruptibly();
fileChannel.write(ByteBuffer.wrap("[".getBytes()), 0, resultPublisher,
andThen((count, s) -> {
writeSemaphore.release();
customers.subscribe(pullEach((Customer customer, Subscription subscription) -> {
String json = String.format("%s{\"firstName\": \"%s\", \"lastName\": \"%s\"}", offset.longValue() == 0 ? "" : ",",
customer.getFirstName(), customer.getLastName());
offset.addAndGet(count);
writeSemaphore.acquireUninterruptibly();
fileChannel.write(ByteBuffer.wrap(json.getBytes()), offset.get(), resultPublisher,
andThen((size, c) -> {
writeSemaphore.release();
offset.addAndGet(size);
resultCount.incrementAndGet();
subscription.request(1);
}));
}).andThen(() -> {
writeSemaphore.acquireUninterruptibly();
fileChannel.write(ByteBuffer.wrap("]".getBytes()), offset.longValue(), resultPublisher,
andThen((d, e) -> {
writeSemaphore.release();
try {
fileChannel.close();
resultPublisher.publish(resultCount.intValue());
} catch (IOException error) {
resultPublisher.publish(error);
}
}));
}).exceptionally(error -> resultPublisher.publish(error)));
}));
return resultPublisher;
}
开发者ID:openknowledge,项目名称:reactive-jax-rs,代码行数:39,代码来源:CustomerRepository.java
示例8: onSubscribe
import java.util.concurrent.Flow.Subscription; //导入依赖的package包/类
@Override
public void onSubscribe(Flow.Subscription subscription) {
if (this.subscription != null) {
throw new IllegalStateException();
}
this.subscription = subscription;
subscription.request(1);
}
开发者ID:AdoptOpenJDK,项目名称:openjdk-jdk10,代码行数:9,代码来源:Stream.java
示例9: onSubscribe
import java.util.concurrent.Flow.Subscription; //导入依赖的package包/类
@Override
public void onSubscribe(Subscription subscription) {
log("Subscribed...");
this.subscription = subscription;
this.buffer = new AtomicInteger();
requestItems();
}
开发者ID:CodeFX-org,项目名称:demo-java-9,代码行数:8,代码来源:LoggingRandomDelaySubscriber.java
示例10: onSubscribe
import java.util.concurrent.Flow.Subscription; //导入依赖的package包/类
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
}
开发者ID:openknowledge,项目名称:reactive-jax-rs,代码行数:5,代码来源:CompletableSubscriber.java
示例11: onSubscribe
import java.util.concurrent.Flow.Subscription; //导入依赖的package包/类
@Override
public void onSubscribe(Subscription subscription) {
(this.subscription = subscription).request(1);
}
开发者ID:PacktPublishing,项目名称:Reactive-Programming-With-Java-9,代码行数:5,代码来源:NumberSubscriber.java
示例12: onSubscribe
import java.util.concurrent.Flow.Subscription; //导入依赖的package包/类
@Override
public void onSubscribe(Subscription subscription) {
// Request an unbounded number of items
subscription.request(Long.MAX_VALUE);
// Long.MAX_VALUE is considered as unbounded
}
开发者ID:PacktPublishing,项目名称:Reactive-Programming-With-Java-9,代码行数:7,代码来源:WelcomeProcessor.java
示例13: onSubscribe
import java.util.concurrent.Flow.Subscription; //导入依赖的package包/类
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
System.out.printf(Thread.currentThread().getName()+" subscribed with max count %d\n", maxCount);
subscription.request(maxCount);
}
开发者ID:PacktPublishing,项目名称:Reactive-Programming-With-Java-9,代码行数:7,代码来源:WelcomeSubscriber.java
示例14: onSubscribe
import java.util.concurrent.Flow.Subscription; //导入依赖的package包/类
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.printf("%s: Consumer 2: Subscription received\n", Thread.currentThread().getName());
this.subscription=subscription;
subscription.request(1);
}
开发者ID:PacktPublishing,项目名称:Java-9-Concurrency-Cookbook-Second-Edition,代码行数:7,代码来源:Consumer2.java
示例15: onSubscribe
import java.util.concurrent.Flow.Subscription; //导入依赖的package包/类
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
System.out.printf("%s: Consumer - Subscription\n",Thread.currentThread().getName());
}
开发者ID:PacktPublishing,项目名称:Java-9-Concurrency-Cookbook-Second-Edition,代码行数:7,代码来源:Consumer.java
示例16: onSubscribe
import java.util.concurrent.Flow.Subscription; //导入依赖的package包/类
@Override
public final void onSubscribe(final Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
开发者ID:inbravo,项目名称:java-feature-set,代码行数:6,代码来源:FlowAPITest.java
注:本文中的java.util.concurrent.Flow.Subscription类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论