本文整理汇总了Java中io.reactivex.observables.ConnectableObservable类的典型用法代码示例。如果您正苦于以下问题:Java ConnectableObservable类的具体用法?Java ConnectableObservable怎么用?Java ConnectableObservable使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
ConnectableObservable类属于io.reactivex.observables包,在下文中一共展示了ConnectableObservable类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: doSomeWork
import io.reactivex.observables.ConnectableObservable; //导入依赖的package包/类
private void doSomeWork() {
PublishSubject<Integer> source = PublishSubject.create();
ConnectableObservable<Integer> connectableObservable = source.replay(3); // bufferSize = 3 to retain 3 values to replay
connectableObservable.connect(); // connecting the connectableObservable
connectableObservable.subscribe(getFirstObserver());
source.onNext(1);
source.onNext(2);
source.onNext(3);
source.onNext(4);
source.onComplete();
/*
* it will emit 2, 3, 4 as (count = 3), retains the 3 values for replay
*/
connectableObservable.subscribe(getSecondObserver());
}
开发者ID:weiwenqiang,项目名称:GitHub,代码行数:21,代码来源:ReplayExampleActivity.java
示例2: doSomeWork
import io.reactivex.observables.ConnectableObservable; //导入依赖的package包/类
private void doSomeWork() {
Observable<Long> observable = Observable.interval(1, TimeUnit.SECONDS).take(6);
//使用publish操作符将普通Observable转换为可连接的Observable
ConnectableObservable<Long> connectableObservable = observable
// .publish();
.replay(5);
//第一个订阅者订阅,不会开始发射数据
connectableObservable
.compose(Utils.<Long>ioToMain())
.subscribe(getFirstObserver());
//如果不调用connect方法,connectableObservable则不会发射数据
//即使没有任何订阅者订阅它,你也可以使用connect方法让一个Observable开始发射数据(或者开始生成待发射的数据)。
//这样,你可以将一个”冷”的Observable变为”热”的。
connectableObservable.connect();
//第二个订阅者延迟2s订阅,这将导致丢失前面2s内发射的数据
connectableObservable.delaySubscription(2, TimeUnit.SECONDS)//0,1数据丢失
.compose(Utils.<Long>ioToMain())
.subscribe(getSecondObserver());
}
开发者ID:changjiashuai,项目名称:RxJava2-Android-Sample,代码行数:22,代码来源:ConnectableExampleActivity.java
示例3: postTextOnlyTweet
import io.reactivex.observables.ConnectableObservable; //导入依赖的package包/类
private void postTextOnlyTweet(String status) {
mProgressDialog.show();
ConnectableObservable<StatusUpdate> observable =
mTwitterApi.postTweet(status, null, mLatitude, mLongitude)
.subscribeOn(Schedulers.io())
.publish();
Disposable postingDisposable = observable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(this::onSuccessfulTweetPosting, this::onErrorTweetPosting);
mCompositeDisposable.add(postingDisposable);
Disposable crossPostingDisposable = observable
.flatMap(this::pushTweetToLoklak)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
push -> Log.e(LOG_TAG, push.getStatus()),
t -> Log.e(LOG_TAG, "Cross posting failed: " + t.toString())
);
mCompositeDisposable.add(crossPostingDisposable);
Disposable publishDisposable = observable.connect();
mCompositeDisposable.add(publishDisposable);
}
开发者ID:fossasia,项目名称:loklak_wok_android,代码行数:27,代码来源:TweetPostingFragment.java
示例4: main
import io.reactivex.observables.ConnectableObservable; //导入依赖的package包/类
public static void main(String[] args) {
ConnectableObservable<Integer> threeRandoms =
Observable.range(1, 3)
.map(i -> randomInt()).publish();
//Observer 1 - print each random integer
threeRandoms.subscribe(i -> System.out.println("Observer 1: " + i));
//Observer 2 - sum the random integers, then print
threeRandoms.reduce(0, (total, next) -> total + next)
.subscribe(i -> System.out.println("Observer 2: " + i));
threeRandoms.connect();
}
开发者ID:PacktPublishing,项目名称:Learning-RxJava,代码行数:15,代码来源:Ch5_6.java
示例5: main
import io.reactivex.observables.ConnectableObservable; //导入依赖的package包/类
public static void main(String[] args) {
ConnectableObservable<Integer> threeRandoms =
Observable.range(1, 3)
.map(i -> randomInt()).publish();
threeRandoms.subscribe(i -> System.out.println("Observer 1: " + i));
threeRandoms.subscribe(i -> System.out.println("Observer 2: " + i));
threeRandoms.connect();
}
开发者ID:PacktPublishing,项目名称:Learning-RxJava,代码行数:9,代码来源:Ch5_5.java
示例6: main
import io.reactivex.observables.ConnectableObservable; //导入依赖的package包/类
public static void main(String[] args) {
ConnectableObservable<Integer> threeIntegers =
Observable.range(1, 3).publish();
threeIntegers.subscribe(i -> System.out.println("Observer One:" + i));
threeIntegers.subscribe(i -> System.out.println("Observer Two:" + i));
threeIntegers.connect();
}
开发者ID:PacktPublishing,项目名称:Learning-RxJava,代码行数:8,代码来源:Ch5_2.java
示例7: main
import io.reactivex.observables.ConnectableObservable; //导入依赖的package包/类
public static void main(String[] args) {
ConnectableObservable<Integer> threeRandoms =
Observable.range(1, 3)
.map(i -> randomInt()).publish();
threeRandoms.subscribe(i -> System.out.println("Observer 1: " + i));
threeRandoms.subscribe(i -> System.out.println("Observer 2: " + i));
threeRandoms.connect();
}
开发者ID:PacktPublishing,项目名称:Learning-RxJava,代码行数:12,代码来源:Ch5_11.java
示例8: main
import io.reactivex.observables.ConnectableObservable; //导入依赖的package包/类
public static void main(String[] args) {
ConnectableObservable<Integer> threeInts =
Observable.range(1, 3).publish();
Observable<Integer> threeRandoms = threeInts.map(i ->
randomInt());
threeRandoms.subscribe(i -> System.out.println("Observer 1: " + i));
threeRandoms.subscribe(i -> System.out.println("Observer 2: " + i));
threeInts.connect();
}
开发者ID:PacktPublishing,项目名称:Learning-RxJava,代码行数:10,代码来源:Ch5_4.java
示例9: main
import io.reactivex.observables.ConnectableObservable; //导入依赖的package包/类
public static void main(String[] args) {
ConnectableObservable<Integer> threeRandoms =
Observable.range(1, 3)
.map(i -> randomInt()).publish();
//Observer 1 - print each random integer
threeRandoms.subscribe(i -> System.out.println("Observer 1: " + i));
//Observer 2 - sum the random integers, then print
threeRandoms.reduce(0, (total, next) -> total + next)
.subscribe(i -> System.out.println("Observer 2: " + i));
threeRandoms.connect();
}
开发者ID:PacktPublishing,项目名称:Learning-RxJava,代码行数:16,代码来源:Ch5_12.java
示例10: main
import io.reactivex.observables.ConnectableObservable; //导入依赖的package包/类
public static void main(String[] args) {
ConnectableObservable<Long> seconds =
Observable.interval(1, TimeUnit.SECONDS).publish();
//observer 1
seconds.subscribe(l -> System.out.println("Observer 1: " + l));
seconds.connect();
//sleep 5 seconds
sleep(5000);
//observer 2
seconds.subscribe(l -> System.out.println("Observer 2: " + l));
//sleep 5 seconds
sleep(5000);
}
开发者ID:PacktPublishing,项目名称:Learning-RxJava,代码行数:14,代码来源:Ch2_19.java
示例11: main
import io.reactivex.observables.ConnectableObservable; //导入依赖的package包/类
public static void main(String[] args) {
ConnectableObservable<String> source =
Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
.publish();
//Set up observer 1
source.subscribe(s -> System.out.println("Observer 1: " + s));
//Set up observer 2
source.map(String::length)
.subscribe(i -> System.out.println("Observer 2: " + i));
//Fire!
source.connect();
}
开发者ID:PacktPublishing,项目名称:Learning-RxJava,代码行数:13,代码来源:Ch2_14.java
示例12: freeFlowEmps
import io.reactivex.observables.ConnectableObservable; //导入依赖的package包/类
@Override
public ConnectableObservable<String> freeFlowEmps() {
List<String> rosterNames = new ArrayList<>();
Function<Employee, String> familyNames = (emp) -> emp.getLastName().toUpperCase();
ConnectableObservable<String> flowyNames = Observable.fromIterable(employeeDaoImpl.getEmployees()).map(familyNames).cache().publish();
flowyNames.subscribe(System.out::println);
flowyNames.subscribe((name) ->{
rosterNames.add(name);
});
System.out.println(rosterNames);
return flowyNames;
}
开发者ID:PacktPublishing,项目名称:Spring-5.0-Cookbook,代码行数:14,代码来源:EmployeeRxJavaServiceImpl.java
示例13: execute
import io.reactivex.observables.ConnectableObservable; //导入依赖的package包/类
/**
* If the receiver is enabled, this method will:
* <p>
* 1. Invoke the `func` given at the time of creation.
* 2. Multicast the returned observable.
* 3. Send the multicasted observable on {@link #executionObservables()}.
* 4. Subscribe (connect) to the original observable on the main thread.
*
* @param input The input value to pass to the receiver's `func`. This may be null.
* @return the multicasted observable, after subscription. If the receiver is not
* enabled, returns a observable that will send an error.
*/
@MainThread
public final Observable<T> execute(@Nullable Object input) {
boolean enabled = mImmediateEnabled.blockingFirst();
if (!enabled) {
return Observable.error(new IllegalStateException("The command is disabled and cannot be executed"));
}
try {
Observable<T> observable = mFunc.apply(input);
if (observable == null) {
throw new RuntimeException(String.format("null Observable returned from observable func for value %s", input));
}
// This means that `executing` and `enabled` will send updated values before
// the observable actually starts performing work.
final ConnectableObservable<T> connection = observable
.subscribeOn(AndroidSchedulers.mainThread())
.replay();
mAddedExecutionObservableSubject.onNext(connection);
connection.connect();
return connection;
} catch (Exception e) {
e.printStackTrace();
return Observable.error(e);
}
}
开发者ID:listenzz,项目名称:RxCommand,代码行数:39,代码来源:RxCommand.java
示例14: testRefCount
import io.reactivex.observables.ConnectableObservable; //导入依赖的package包/类
private void testRefCount() {
Observable<Long> observable = Observable.interval(1, TimeUnit.SECONDS).take(6);
ConnectableObservable<Long> connectableObservable = observable.publish();
connectableObservable.connect();
Observable<Long> longObservable = connectableObservable.refCount();
longObservable.delaySubscription(2, TimeUnit.SECONDS)
.compose(Utils.<Long>ioToMain())
.subscribe(getFirstObserver());
longObservable.compose(Utils.<Long>ioToMain()).subscribe(getSecondObserver());
}
开发者ID:changjiashuai,项目名称:RxJava2-Android-Sample,代码行数:11,代码来源:ConnectableExampleActivity.java
示例15: checkSyncStartAndFinishEvents
import io.reactivex.observables.ConnectableObservable; //导入依赖的package包/类
private void checkSyncStartAndFinishEvents(ConnectableObservable<BaseEvent> syncEventBus) {
syncEventBus
.test()
.awaitCount(2, () -> {
}, MAX_TIMEOUT_MILLIS)
.assertNoErrors()
.assertValueAt(0, event -> {
assertThat(event).isInstanceOf(SyncStartEvent.class);
return true;
})
.assertValueAt(1, event -> {
assertThat(event).isInstanceOf(SyncFinishEvent.class);
return true;
});
}
开发者ID:andreybgm,项目名称:gigreminder,代码行数:16,代码来源:SyncRepositoryTest.java
示例16: postImageAndTextTweet
import io.reactivex.observables.ConnectableObservable; //导入依赖的package包/类
private void postImageAndTextTweet(List<Observable<String>> imageIdObservables, String status) {
mProgressDialog.show();
ConnectableObservable<StatusUpdate> observable = Observable.zip(
imageIdObservables,
mediaIdArray -> {
String mediaIds = "";
for (Object mediaId : mediaIdArray) {
mediaIds = mediaIds + String.valueOf(mediaId) + ",";
}
return mediaIds.substring(0, mediaIds.length() - 1);
})
.flatMap(imageIds -> mTwitterApi.postTweet(status, imageIds, mLatitude, mLongitude))
.subscribeOn(Schedulers.io())
.publish();
Disposable postingDisposable = observable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(this::onSuccessfulTweetPosting, this::onErrorTweetPosting);
mCompositeDisposable.add(postingDisposable);
Disposable crossPostingDisposable = observable
.flatMap(this::pushTweetToLoklak)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
push -> {},
t -> Log.e(LOG_TAG, "Cross posting failed: " + t.toString())
);
mCompositeDisposable.add(crossPostingDisposable);
Disposable publishDisposable = observable.connect();
mCompositeDisposable.add(publishDisposable);
}
开发者ID:fossasia,项目名称:loklak_wok_android,代码行数:35,代码来源:TweetPostingFragment.java
示例17: displayAndPostScrapedData
import io.reactivex.observables.ConnectableObservable; //导入依赖的package包/类
private void displayAndPostScrapedData() {
progressBar.setVisibility(View.VISIBLE);
ConnectableObservable<ScrapedData> observable = Observable.interval(4, TimeUnit.SECONDS)
.flatMap(this::getSuggestionsPeriodically)
.flatMap(query -> {
mSuggestionQuerries.add(query);
return getScrapedTweets(query);
})
.retry(2)
.publish();
Disposable viewDisposable = observable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
this::displayScrapedData,
this::setNetworkErrorView
);
mCompositeDisposable.add(viewDisposable);
Disposable pushDisposable = observable
.flatMap(this::pushScrapedData)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
push -> {
mHarvestedTweets += push.getRecords();
harvestedTweetsCountTextView.setText(String.valueOf(mHarvestedTweets));
},
throwable -> Log.e(LOG_TAG, throwable.toString())
);
mCompositeDisposable.add(pushDisposable);
Disposable publishDisposable = observable.connect();
mCompositeDisposable.add(publishDisposable);
}
开发者ID:fossasia,项目名称:loklak_wok_android,代码行数:37,代码来源:TweetHarvestingFragment.java
示例18: subscribeToDataStoreInternal
import io.reactivex.observables.ConnectableObservable; //导入依赖的package包/类
@Override
public void subscribeToDataStoreInternal(@NonNull final CompositeDisposable compositeDisposable) {
checkNotNull(compositeDisposable);
Log.v(TAG, "subscribeToDataStoreInternal");
ConnectableObservable<DataStreamNotification<GitHubRepositorySearch>> repositorySearchSource =
searchString
.debounce(SEARCH_INPUT_DELAY, TimeUnit.MILLISECONDS)
.distinctUntilChanged()
.filter(value -> value.length() > 2)
.doOnNext(value -> Log.d(TAG, "Searching with: " + value))
.switchMap(getGitHubRepositorySearch::call)
.publish();
compositeDisposable.add(repositorySearchSource
.map(toProgressStatus())
.doOnNext(progressStatus -> Log.d(TAG, "Progress status: " + progressStatus.name()))
.subscribe(this::setNetworkStatusText));
compositeDisposable.add(repositorySearchSource
.filter(DataStreamNotification::isOnNext)
.map(DataStreamNotification::getValue)
.map(GitHubRepositorySearch::getItems)
.flatMap(toGitHubRepositoryList())
.doOnNext(list -> Log.d(TAG, "Publishing " + list.size() + " repositories from the ViewModel"))
.subscribe(repositories::onNext));
compositeDisposable.add(repositorySearchSource.connect());
}
开发者ID:reark,项目名称:reark,代码行数:30,代码来源:RepositoriesViewModel.java
示例19: ObservableOnAssemblyConnectable
import io.reactivex.observables.ConnectableObservable; //导入依赖的package包/类
ObservableOnAssemblyConnectable(ConnectableObservable<T> source) {
this.source = source;
this.assembled = new RxJavaAssemblyException();
}
开发者ID:akaita,项目名称:RxJava2Debug,代码行数:5,代码来源:ObservableOnAssemblyConnectable.java
示例20: testHotObservable
import io.reactivex.observables.ConnectableObservable; //导入依赖的package包/类
@Test
public void testHotObservable() throws Exception {
ConnectableObservable<Long> connectableObservable = Observable.interval(1,
TimeUnit.SECONDS,
Schedulers.from(executorService)).publish();
connectableObservable.subscribe(x -> System.out.println("L1:" + x));
Thread.sleep(2000);
Disposable disposable = connectableObservable.connect(); //Let us begin
connectableObservable.subscribe(x -> System.out.println("L2:" + x));
Thread.sleep(2000);
connectableObservable.subscribe(x -> System.out.println("L3:" + x));
disposable.dispose();
Thread.sleep(10000);
}
开发者ID:dhinojosa,项目名称:rxjavatraining,代码行数:24,代码来源:HotVsColdObservableTest.java
注:本文中的io.reactivex.observables.ConnectableObservable类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论