I want to synchronize heavy shared object with RxJava, but I don't know how to do it properly.
Unfortunately, I was unable to include the RxJava tag which requires 1500 reputation.
example:
class Service {
HeavyObject heavyObject;
HeavyObjectRepository heavyObjectRepository; // reactive repository
Single<ComputationResult> doHeavyComputation(Object params) {
// here I want to apply synchronization to heavy object
return (heavyObject == null ? heavyObjectRepository.fetch().doOnSuccess(ho -> {
heavyObject = ho;
}) : Single.just(heavyObject))
.flatMap(ho -> compute(ho, params));
}
Single<ComputationResult> compute(HeavyObject heavyObject, Object params) {
// ...
}
}
More details:
Service instance is shared, so many threads could access and possibly mutate the shared object so I need to provide some sort of synchronization.
This is what I came up with just right now, but I still don't know if it's correct:
class Service {
private final HeavyObjectRepository heavyObjectRepository = new HeavyObjectRepository();
HeavyObject heavyObject;
public Single<ComputationResult> compute(Object... args) {
return Single.fromCallable(() -> {
synchronized (this) {
if (heavyObject == null) {
return heavyObjectRepository.findOne().map(ho -> {
synchronized (this) {
if (heavyObject == null) {
heavyObject = ho;
}
return heavyObject;
}
});
}
return Single.just(heavyObject);
}
}).flatMap(o -> o).flatMap(ho -> compute(ho, args));
}
private Single<ComputationResult> compute(HeavyObject heavyObject, Object... args) {
return Single.fromCallable(() -> {
synchronized (this) {
for (int i = 0; i < 10; i++) {
heavyObject.counter += 1;
}
}
return new ComputationResult();
});
}
public static class HeavyObject {
long counter = 0;
}
public static class ComputationResult {
}
private static class HeavyObjectRepository {
public Single<HeavyObject> findOne() {
return Single.just(new HeavyObject());
}
}
public static void main(String[] args) {
Service service = new Service();
Single.mergeArray(
service.compute(new Object()).subscribeOn(Schedulers.computation()),
service.compute(new Object()).subscribeOn(Schedulers.computation()),
service.compute(new Object()).subscribeOn(Schedulers.computation()),
service.compute(new Object()).subscribeOn(Schedulers.computation()),
service.compute(new Object()).subscribeOn(Schedulers.computation())
).blockingSubscribe();
if (service.heavyObject.counter != 50) {
throw new AssertionError(String.format("%d != %d", service.heavyObject.counter, 50));
}
}
}
If I understand correctly, this is what @akarnokd suggested:
public Single<ComputationResult> compute(Object... args) {
return Single
.fromCallable(() -> {
if (heavyObject == null) {
return heavyObjectRepository.findOne()
.observeOn(Schedulers.single())
.map(ho -> {
if (heavyObject == null) {
heavyObject = ho;
}
return heavyObject;
});
}
return Single.just(heavyObject);
})
.flatMap(o -> o)
.flatMap(ho -> compute(ho, args))
.subscribeOn(Schedulers.single());
}
private Single<ComputationResult> compute(HeavyObject heavyObject, Object... args) {
return Single.fromCallable(() -> {
for (int i = 0; i < 100; i++) {
heavyObject.counter += 1;
}
return new ComputationResult();
});
}
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…