Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
654 views
in Technique[技术] by (71.8m points)

rx java2 - Process one PublishSubject emission at a time with a chain of operators in RxJava2

I have a PublishSubject<Event>.

On each new Event I trigger a database query (some locally cached data), then take the result and try to POST it via HTTP request to a server, when this server replies 200 I make another database query to delete the rows that were just sent.

This is done with chaining roughly as so:

subject
  .toSerialized()
  .flatMapMaybe { getCachedData() }
  .flatMap { uploadData() }
  .flatMapCompletable { cleanCache() }
  .subscribe()

The subject may emit two quick Events under certain conditions, let's say with a 10 ms interval.

The problem is getCachedData() for the second emission goes off immediately after getCachedData() for the first emission completed, i.e. before cleanCache() had a chance to clean the database before second emission.

I would like to combine those flatMaps into one observer somehow so that the subject does no new emissions until the whole chain has finished, preferably without any kind of handmade semaphores.

I do subscribeOn() on a single thread pool scheduler, that only orders the calls inside each flatMap.

I saw some advice to add toSerialized() to subjects, but now I think it has no relation to the way chains work.

I saw there are also lift() and compose() operators. I tried to just put all flatMaps inside the latter, that did not change the behavior. The former I am still wondering about.

question from:https://stackoverflow.com/questions/66060643/process-one-publishsubject-emission-at-a-time-with-a-chain-of-operators-in-rxjav

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

Put them inside a concatMapX subflow:

subject
.concatMapCompletable {
    getCachedData()
    .flatMap { uploadData() }
    .flatMapCompletable { cleanCache() }
}

I saw some advice to add toSerialized() to subjects, but now I think it has no relation to the way chains work

It has no practical effect on your flow unless you actually drive the Subject returned by it and from multiple threads.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...