Transacted part of the stream seem to affect non transacted parts
Crystark opened this issue · 0 comments
Hi @davidmoten
Being pleased with rxjava-jdbc I've been doing my latest app using rxjava2-jdbc. Thanks for bringing this to us! As always it's really helpful.
However, I've hit some issue when using transactions. I have the following code:
@Override
public Completable insertNewEntry(int id1, String id2, int id3, boolean active) {
return db.update(INSERT_NEW_ENTRY)
.parameters(id1, id2, id3, active)
.transaction()
.flatMapCompletable(tx -> tx.update(UPDATE_ITEM_REF)
.parameters(id2, id1)
.countsOnly()
.ignoreElements());
}
@Override
public Completable updateEntry(int id1, Date lastSyncDate, boolean active) {
return db.update(UPDATE_ENTRY)
.parameters(lastSyncDate, active, id1)
.complete();
}
Those methods are chained using compose
and flatMaps
in processing "pipeline" which goal is to keep data synced between multiple services.
My stream ends with a retry/repeat to make "daemon-like".
/**
* Handles repeat and retry
*/
Flowable<?> handleRe(Flowable<?> f) {
AtomicLong lastSubscribe = new AtomicLong();
Function<? super Flowable<?>, ? extends Publisher<?>> re = o -> o
.map(c -> {
long sinceLasteRun = System.currentTimeMillis() - lastSubscribe.get();
long nextRunIn = config.delayBetweenRunsMs() - sinceLasteRun;
return nextRunIn;
})
.delay(l -> {
Flowable<Object> s = Flowable.just(1);
return l > 0 ? s.delay(l, TimeUnit.MILLISECONDS) : s;
});
return f
.doOnSubscribe(s -> lastSubscribe.set(System.currentTimeMillis()))
.doOnError(t -> L.warn("It went south!", t))
.repeatWhen(re)
.retryWhen(re);
}
The thing is, updateEntry
never affects the DB when insertNewEntry
has been run. However if the entry is already inserted (after a repeat) and we only need to update, it works fine.
So I tried removing the transaction:
@Override
public Completable insertNewEntry(int id1, String id2, int id3, boolean active) {
return db.update(INSERT_NEW_ENTRY)
.parameters(id1, id2, id3, active)
.counts()
.flatMapCompletable(tx -> db.update(UPDATE_ITEM_REF)
.parameters(id2, id1)
.complete());
}
And now updateEntry
works one the first run when insertNewEntry
is ran before.
Any idea what may be going on ?
Thanks alot !