davidmoten/rxjava2-jdbc

Transacted part of the stream seem to affect non transacted parts

Crystark opened this issue · 0 comments

Hi @davidmoten

Being pleased with rxjava-jdbc I've been doing my latest app using rxjava2-jdbc. Thanks for bringing this to us! As always it's really helpful.

However, I've hit some issue when using transactions. I have the following code:

	@Override
	public Completable insertNewEntry(int id1, String id2, int id3, boolean active) {
		return db.update(INSERT_NEW_ENTRY)
			.parameters(id1, id2, id3, active)
			.transaction()
			.flatMapCompletable(tx -> tx.update(UPDATE_ITEM_REF)
				.parameters(id2, id1)
				.countsOnly()
				.ignoreElements());
	}

	@Override
	public Completable updateEntry(int id1, Date lastSyncDate, boolean active) {
		return db.update(UPDATE_ENTRY)
			.parameters(lastSyncDate, active, id1)
			.complete();
	}

Those methods are chained using compose and flatMaps in processing "pipeline" which goal is to keep data synced between multiple services.

My stream ends with a retry/repeat to make "daemon-like".

	/**
	 * Handles repeat and retry
	 */
	Flowable<?> handleRe(Flowable<?> f) {
		AtomicLong lastSubscribe = new AtomicLong();

		Function<? super Flowable<?>, ? extends Publisher<?>> re = o -> o
			.map(c -> {
				long sinceLasteRun = System.currentTimeMillis() - lastSubscribe.get();
				long nextRunIn = config.delayBetweenRunsMs() - sinceLasteRun;
				return nextRunIn;
			})
			.delay(l -> {
				Flowable<Object> s = Flowable.just(1);
				return l > 0 ? s.delay(l, TimeUnit.MILLISECONDS) : s;
			});

		return f
			.doOnSubscribe(s -> lastSubscribe.set(System.currentTimeMillis()))
			.doOnError(t -> L.warn("It went south!", t))
			.repeatWhen(re)
			.retryWhen(re);
	}

The thing is, updateEntry never affects the DB when insertNewEntry has been run. However if the entry is already inserted (after a repeat) and we only need to update, it works fine.

So I tried removing the transaction:

	@Override
	public Completable insertNewEntry(int id1, String id2, int id3, boolean active) {
		return db.update(INSERT_NEW_ENTRY)
			.parameters(id1, id2, id3, active)
			.counts()
			.flatMapCompletable(tx -> db.update(UPDATE_ITEM_REF)
				.parameters(id2, id1)
				.complete());
	}

And now updateEntry works one the first run when insertNewEntry is ran before.
Any idea what may be going on ?

Thanks alot !