• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Java Subscription类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中java.util.concurrent.Flow.Subscription的典型用法代码示例。如果您正苦于以下问题:Java Subscription类的具体用法?Java Subscription怎么用?Java Subscription使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



Subscription类属于java.util.concurrent.Flow包,在下文中一共展示了Subscription类的16个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: subscribe

import java.util.concurrent.Flow.Subscription; //导入依赖的package包/类
@Override
public void subscribe(Subscriber<? super T> s) {
  subscriber = s;
  subscriber.onSubscribe(new Subscription() {
    
    @Override
    public void request(long count) {
      requested = true;
      if (item != null) {
        publish(item);
      }
    }
    
    @Override
    public void cancel() {
      subscriber = null;
    }
  });
  if (error != null) {
    publish(error);
  }
}
 
开发者ID:openknowledge,项目名称:reactive-jax-rs,代码行数:23,代码来源:SingleItemPublisher.java


示例2: andThen

import java.util.concurrent.Flow.Subscription; //导入依赖的package包/类
default CompletableSubscriber<T> andThen(Runnable runnable) {
  CompletableSubscriber<T> thisSubscriber = this;
  return new AbstractCompletableSubscriber<T>() {
    
    @Override
    public void onSubscribe(Subscription subscription) {
      thisSubscriber.onSubscribe(subscription);
    }

    @Override
    public void onNext(T item) {
      thisSubscriber.onNext(item);
    }

    @Override
    public void onComplete() {
      runnable.run();
    }

    @Override
    public void onError(Throwable error) {
      thisSubscriber.onError(error);
    }
  };
}
 
开发者ID:openknowledge,项目名称:reactive-jax-rs,代码行数:26,代码来源:CompletableSubscriber.java


示例3: exceptionally

import java.util.concurrent.Flow.Subscription; //导入依赖的package包/类
default CompletableSubscriber<T> exceptionally(Consumer<Throwable> consumer) {
  CompletableSubscriber<T> thisSubscriber = this;
  return new AbstractCompletableSubscriber<T>() {
    
    @Override
    public void onSubscribe(Subscription subscription) {
      thisSubscriber.onSubscribe(subscription);
    }

    @Override
    public void onNext(T item) {
      thisSubscriber.onNext(item);
    }

    @Override
    public void onComplete() {
      thisSubscriber.onComplete();
    }

    @Override
    public void onError(Throwable error) {
      consumer.accept(error);
    }
  };
}
 
开发者ID:openknowledge,项目名称:reactive-jax-rs,代码行数:26,代码来源:CompletableSubscriber.java


示例4: pushEach

import java.util.concurrent.Flow.Subscription; //导入依赖的package包/类
public static <T> CompletableSubscriber<T> pushEach(Consumer<T> consumer) {
  return new AbstractCompletableSubscriber<T>() {

    @Override
    public void onSubscribe(Subscription subscription) {
      subscription.request(Long.MAX_VALUE);
    }

    @Override
    public void onNext(T item) {
      consumer.accept(item);
    }
  };
}
 
开发者ID:openknowledge,项目名称:reactive-jax-rs,代码行数:15,代码来源:CompletableSubscriber.java


示例5: pullEach

import java.util.concurrent.Flow.Subscription; //导入依赖的package包/类
public static <T> CompletableSubscriber<T> pullEach(BiConsumer<T, Subscription> consumer) {
  return new AbstractCompletableSubscriber<T>() {

    @Override
    public void onSubscribe(Subscription subscription) {
      super.onSubscribe(subscription);
      subscription.request(1);
    }

    @Override
    public void onNext(T item) {
      consumer.accept(item, subscription);
    }
  };
}
 
开发者ID:openknowledge,项目名称:reactive-jax-rs,代码行数:16,代码来源:CompletableSubscriber.java


示例6: subscribe

import java.util.concurrent.Flow.Subscription; //导入依赖的package包/类
@Override
public void subscribe(Subscriber<? super byte[]> subscriber) {
  byte[] buffer = new byte[1024];
  subscriber.onSubscribe(new Subscription() {
    
    @Override
    public void request(long count) {
      
      try {
        for (long i = 0; i < count; i++) {
          int read = stream.read(buffer);
          byte[] item = buffer;
          if (read != buffer.length) {
            item = new byte[read];
            System.arraycopy(buffer, 0, item, 0, read);
          }
          subscriber.onNext(item);
        }
      } catch (IOException e) {
        subscriber.onError(e);
      }
    }
    
    @Override
    public void cancel() {
    }
  });
}
 
开发者ID:openknowledge,项目名称:reactive-jax-rs,代码行数:29,代码来源:InputStreamPublisher.java


示例7: save

import java.util.concurrent.Flow.Subscription; //导入依赖的package包/类
public Publisher<Integer> save(Publisher<Customer> customers) throws IOException {
  AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(path, StandardOpenOption.WRITE);
  AtomicLong offset = new AtomicLong(0);
  AtomicInteger resultCount = new AtomicInteger(0);
  SingleItemPublisher<Integer> resultPublisher = new SingleItemPublisher<>();
  Semaphore writeSemaphore = new Semaphore(1);
  writeSemaphore.acquireUninterruptibly();
  fileChannel.write(ByteBuffer.wrap("[".getBytes()), 0, resultPublisher,
      andThen((count, s) -> {
        writeSemaphore.release();
        customers.subscribe(pullEach((Customer customer, Subscription subscription) -> {
            String json = String.format("%s{\"firstName\": \"%s\", \"lastName\": \"%s\"}", offset.longValue() == 0 ? "" : ",",
                customer.getFirstName(), customer.getLastName());
            offset.addAndGet(count);
            writeSemaphore.acquireUninterruptibly();
            fileChannel.write(ByteBuffer.wrap(json.getBytes()), offset.get(), resultPublisher,
                andThen((size, c) -> {
                  writeSemaphore.release();
                  offset.addAndGet(size);
                  resultCount.incrementAndGet();
                  subscription.request(1);
                }));
          }).andThen(() -> {
            writeSemaphore.acquireUninterruptibly();
              fileChannel.write(ByteBuffer.wrap("]".getBytes()), offset.longValue(), resultPublisher,
                  andThen((d, e) -> {
                    writeSemaphore.release();
                    try {
                      fileChannel.close();
                      resultPublisher.publish(resultCount.intValue());
                    } catch (IOException error) {
                      resultPublisher.publish(error);
                    }
                  }));
          }).exceptionally(error -> resultPublisher.publish(error)));
      }));
  return resultPublisher;
}
 
开发者ID:openknowledge,项目名称:reactive-jax-rs,代码行数:39,代码来源:CustomerRepository.java


示例8: onSubscribe

import java.util.concurrent.Flow.Subscription; //导入依赖的package包/类
@Override
public void onSubscribe(Flow.Subscription subscription) {
    if (this.subscription != null) {
        throw new IllegalStateException();
    }
    this.subscription = subscription;
    subscription.request(1);
}
 
开发者ID:AdoptOpenJDK,项目名称:openjdk-jdk10,代码行数:9,代码来源:Stream.java


示例9: onSubscribe

import java.util.concurrent.Flow.Subscription; //导入依赖的package包/类
@Override
public void onSubscribe(Subscription subscription) {
	log("Subscribed...");
	this.subscription = subscription;
	this.buffer = new AtomicInteger();
	requestItems();
}
 
开发者ID:CodeFX-org,项目名称:demo-java-9,代码行数:8,代码来源:LoggingRandomDelaySubscriber.java


示例10: onSubscribe

import java.util.concurrent.Flow.Subscription; //导入依赖的package包/类
@Override
public void onSubscribe(Subscription subscription) {
  this.subscription = subscription;
}
 
开发者ID:openknowledge,项目名称:reactive-jax-rs,代码行数:5,代码来源:CompletableSubscriber.java


示例11: onSubscribe

import java.util.concurrent.Flow.Subscription; //导入依赖的package包/类
@Override
public void onSubscribe(Subscription subscription) {
	(this.subscription = subscription).request(1);
}
 
开发者ID:PacktPublishing,项目名称:Reactive-Programming-With-Java-9,代码行数:5,代码来源:NumberSubscriber.java


示例12: onSubscribe

import java.util.concurrent.Flow.Subscription; //导入依赖的package包/类
@Override
public void onSubscribe(Subscription subscription) {
	// Request an unbounded number of items
	subscription.request(Long.MAX_VALUE);
	// Long.MAX_VALUE is considered as unbounded 
}
 
开发者ID:PacktPublishing,项目名称:Reactive-Programming-With-Java-9,代码行数:7,代码来源:WelcomeProcessor.java


示例13: onSubscribe

import java.util.concurrent.Flow.Subscription; //导入依赖的package包/类
@Override
public void onSubscribe(Subscription subscription) {
	this.subscription = subscription;
	System.out.printf(Thread.currentThread().getName()+" subscribed with max count %d\n", maxCount);
	subscription.request(maxCount);
}
 
开发者ID:PacktPublishing,项目名称:Reactive-Programming-With-Java-9,代码行数:7,代码来源:WelcomeSubscriber.java


示例14: onSubscribe

import java.util.concurrent.Flow.Subscription; //导入依赖的package包/类
@Override
public void onSubscribe(Flow.Subscription subscription) {
	System.out.printf("%s: Consumer 2: Subscription received\n", Thread.currentThread().getName());
	this.subscription=subscription;
	subscription.request(1);
}
 
开发者ID:PacktPublishing,项目名称:Java-9-Concurrency-Cookbook-Second-Edition,代码行数:7,代码来源:Consumer2.java


示例15: onSubscribe

import java.util.concurrent.Flow.Subscription; //导入依赖的package包/类
@Override
public void onSubscribe(Subscription subscription) {
	this.subscription = subscription;
	subscription.request(1);
	System.out.printf("%s: Consumer - Subscription\n",Thread.currentThread().getName());
}
 
开发者ID:PacktPublishing,项目名称:Java-9-Concurrency-Cookbook-Second-Edition,代码行数:7,代码来源:Consumer.java


示例16: onSubscribe

import java.util.concurrent.Flow.Subscription; //导入依赖的package包/类
@Override
public final void onSubscribe(final Subscription subscription) {
  this.subscription = subscription;
  subscription.request(1);
}
 
开发者ID:inbravo,项目名称:java-feature-set,代码行数:6,代码来源:FlowAPITest.java



注:本文中的java.util.concurrent.Flow.Subscription类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java TaskLauncher类代码示例发布时间:2022-05-23
下一篇:
Java Particle类代码示例发布时间:2022-05-23
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap