danielshaya/reactivejournal

Design Point: Replay as Observable

akarnokd opened this issue · 27 comments

Continuing the discussion from twitter:

I assume the journal acts as an unbounded buffer that stores items on disk, thus when replaying it can read elements from this disk buffer on demand, supporting backpressure naturally.

The problem I see with the current setup is that unless one consumes the journal directly with a Consumer, it can run into backpressure problems again caused by the replay itself. For example, applying observeOn would trigger a replay to flood the internal buffer of Observable.observeOn thus the backpressure problem is back.

Thanks for your comment appreciate the feedback.

What you say is indeed correct and by design - will respond fully when I have some time. In the meantime have a look at this example where I touch on this issue by converting the Observable to a Flowable that handles back pressure.

Hi David

I've thought a lot about your point and agree with you that Observable is not the correct construct to return from RxPlayer.play.

In addition I've had feedback to ask me to support more than RxJava.

For these reasons I've refactored the project so that it supports any Reactive framework with RxJava just being one implementation.

Please let me know if you prefer this abstraction and if you have any more comments.

I welcome the use of standard Reactive-Streams APIs and let the client wrap their rich fluent APIs of choice around a Publisher.

However, targeting reactive streams directly exposes you to the rules and requirements of the RS specification which is much harder than it seems.

For example, the reworked play() method takes a Subscriber in (odd) but there is no sign of the required onSubscribe call inside and thus no chance for handling client backpressure requests or cancellation.

The reason I used Subscriber as opposed to my own interface was just to 'borrow' an interface that had onNext(), onError(), onComplete(). Do you think that's confusing and I should just create my own interface. I'd be very interested to hear how you would handle this. Thx

actually if you go back to returning something and eliminate the subscriber, having a Flowable as the return type would work: libs like Reactor are capable of consuming any Publisher and wrapping it, and Flowable is a Publisher, unlike Observable. But by using Flowable.create you would basically alway consume the whole Chronicle and buffer it for backpressure purposes.

The other path is to create you own Publisher, but as @akarnokd hinted this will be a bit more difficult, notably because:

  • you need to react to subscription and have separate states for each subscription in order to make your Publisher cold. That would probably mean create a ChronicleQueue per subscribe? Is is possible to open the same file in ChronicleQueues several time in parallel?
  • you need to track request and serve only the requested amount of onNext from the ChronicleQueue
  • you need to accommodate cancellation

This is by no means a complete implementation but gives you an idea of how the API and internal class structure would look like. Note there's no particular request handling, which is the most difficult part (the pattern in RxJava and Reactor is a usually a complex drain loop). Note also that the cancellation evaluation could probably be more fine-grained in fastpath().

public class RxPlayer {
    private RxJournal rxJournal;

    RxPlayer(RxJournal rxJournal) {
        this.rxJournal = rxJournal;
    }

    /**
     * See documentation on {@link PlayOptions}
     * @param options Options controlling how play is executed.
     */
    public Publisher<Object> play(PlayOptions options) {
        options.validate();

        return new PlayPublisher(rxJournal, options);
    }

    static final class PlayPublisher implements Publisher<Object> {

        private final RxJournal rxJournal;
        private final PlayOptions options;

        PlayPublisher(RxJournal rxJournal, PlayOptions options) {
            this.rxJournal = rxJournal;
            this.options = options;
        }

        @Override
        public void subscribe(Subscriber<? super Object> s) {
            s.onSubscribe(new PlaySubscription(s, rxJournal, options));
        }
    }

    static final class PlaySubscription implements Subscription {

        private final Subscriber actual;
        private final RxJournal rxJournal;
        private final PlayOptions options;
        private final DataItemProcessor dim = new DataItemProcessor();

        private volatile boolean cancelled;

        PlaySubscription(Subscriber<? super Object> actual, RxJournal journal, PlayOptions options) {
            this.actual = actual;
            this.rxJournal = journal;
            this.options = options;
        }

        @Override
        public void request(long n) {
            //this ignores request, but it's a start :o
            fastPath();
        }

        @Override
        public void cancel() {
            cancelled = true;
        }

        private void fastPath() {
            try (ChronicleQueue queue = rxJournal.createQueue()) {
                ExcerptTailer tailer = queue.createTailer();
                long[] lastTime = new long[]{Long.MIN_VALUE};
                boolean[] stop = new boolean[]{false};
                while (true) {
                    boolean foundItem = tailer.readDocument(w -> {
                        if (cancelled) {
                            return;
                        }
                        ValueIn in = w.getValueIn();
                        dim.process(in, options.using());

                        if (dim.getTime() > options.playUntilTime()
                                || dim.getMessageCount() >= options.playUntilSeqNo()) {
                            actual.onComplete();
                            stop[0] = true;
                            return;
                        }

                        if (dim.getTime() > options.playFromTime() && dim.getMessageCount() >= options.playFromSeqNo()) {
                            pause(options, lastTime, dim.getTime());

                            if (options.filter().equals(dim.getFilter())) {
                                if (dim.getStatus() == RxStatus.COMPLETE) {
                                    actual.onComplete();
                                    stop[0] = true;
                                    return;
                                }

                                if (dim.getStatus() == RxStatus.ERROR) {
                                    actual.onError((Throwable) dim.getObject());
                                    stop[0] = true;
                                    return;
                                }
                                actual.onNext(dim.getObject());
                            }
                            lastTime[0] = dim.getTime();
                        }
                    });
                    if (cancelled) {
                        return;
                    }

                    if (!foundItem && !options.completeAtEndOfFile()) {
                        actual.onComplete();
                        return;
                    }
                    if (stop[0]) {
                        return;
                    }
                }
            }
        }

        private void pause(PlayOptions options, long[] lastTime, long recordedAtTime) {
            if (options.replayRate() == ReplayRate.ACTUAL_TIME && lastTime[0] != Long.MIN_VALUE) {
                DSUtil.sleep((int) (recordedAtTime - lastTime[0]));
            } else if (options.pauseStrategy() == PauseStrategy.YIELD) {
                Thread.yield();
            }
            //otherwise SPIN
        }
    }
}

Hi Simon,

Thank's that's really helpful and a lot to think about. I do like the idea of implementing the Publisher myself and then I assume being able to use for example Flowable.unsafeCreate to get back into RxJava land if need be. Is that the way you would suggest or is there a better way in RxJava?

In terms of Chronicle you would be actually be creating a new tailer for each subscription (not a new ChronicleQueue) which is absolutely fine.

Not 100% sure what you mean by request handling. But in this case I think it's valid to ignore the request because of the fact that everything is in the Journal, so effectively buffered. If the user wants to use a LATEST strategy they could play the publisher into a Flowable created with the LATEST strategy and that could handle this strategy. Does that make sense to you?

Thanks again
Dan

Yeah, in terms of switching to RxJava, that would just be a matter of using Flowable.fromPublisher(journalPublisher). Same for Reactor with Flux.from(journalPublisher).

In terms of backpressure and request handling, what I meant is that if the JournalSubscription receives a request(3) it should only read 3 elements from the journal and propagate them to the actual via 3 calls to onNext. It must also be able to handle concurrent calls to request without concurrently calling onNext.

That is usually solved in RxJava by a requested volatile and a more complex drain loop that will detect the requested amount has changed / more data was requested and in these cases continue the loop that pushes data downstream.

If you don't do that and there is backpressure, you'll end up probably buffering the whole journal in memory in the queue of the first operator you use after your publisher, which is counterproductive.

Understand what you mean I'll have a go at implementing...

So if you get request(3) and have returned one item and then get request(4) does 4 replace the 3 or does 4 kick in once the 3 is completed? And I understood you to say that calls to request can come in on different threads - is that correct?

One other quick question on your code above - should onComplete() be called when the subscription is cancelled? I noticed you didn't call it - I guess if the subscription is cancelled it's not 'complete'....

Thanks again for all your input

The requests add up and the publisher is now expected to send a total of 7 values. Of course, if the journal only contains eg. 6, the publisher will onComplete after sending 6 values.

Calls to onRequest can come in on different threads, yes. However the Publisher must ensure that downstream calls to actual.onNext do not happen concurrently.
So two request calls that are not concurrent but happen on different threads can lead to two onNext calls downstream on different threads, but if they are concurrent then one of the two threads must "win" and be the one on which all the onNext calls happen.

When the subscription is cancelled, no need to call onComplete. It means that the downstream is not interested in values anymore, and data generation can stop. At this point basically it is as if no one is looking at actual anymore, yet it is not strictly completed.

Checked in a version of code based on your code snippet. It's not complete but works for simple cases and all tests pass. Could you perhaps give me a couple of test cases that break the code and I can look to fix. As usual your expertise in this area is invaluable to this project!

Once the code has settled down I'll update the README and build a new pre-release version.

Does tailer.readDocument returning false indicate an end of data or a temporary lack of data? For the former case, you can adapt fromIterable from our libraries. The latter case is tricky as it requires repeated polling or some external stimulus to know data is available to be emitted if there is a demand available as well.

another potential problem that I just thought about: the ReplayRate.ACTUAL_TIME would lead to the thread in which reactive signals are emitted to be blocked during pause. This is bad, as it can impact more complex reactive chains and grind these to a halt.

Detecting that case in the fastpath and doing the whole processing from a single dedicated Thread or ExecutorService should do it.

(Note that fastpath in the end should only be triggered when request is called with Long.MAX_VALUE)

@akarnokd return false indicates a temporary lack of data. e.g. We are listening live to a journal... that why we poll with a while(true).

@simonbasle Replaying in ACTUAL_TIME in reality would only be done when the journal is used as a source for a test scenario. Would this be a problem - in real life (without the journal) this is the speed at which the events were produced and the system would have to deal with it. I'm not sure I understand why executing from a single dedicated Thead is the solution - do you mind explaining.

When would request be called with Long.MAX_VALUE? Would that ever happen? Usually the request would be for relatively small chunks (128).

Just a thought - given the changes over the past week what do you think about renaming the project to ReactiveJournal?

@danielshaya if renaming is still a possibility, I think it is a good idea.

writing a Publisher is hard (see the RxJava2 wiki page initiated by @akarnokd, which gives a perspective on writing intermediate operators for RxJava but most concepts also apply to a publisher)... The benefit however is that you don't depend on a particular reactive library and can become compatible with several (thanks to reactive streams).

About the request, yes request(Long.MAX_VALUE) can definitely happen and is basically the signal codifying the concept of "send all available data as fast as possible". This is actually the default (eg. if you do Flux.fromIterable(source).subscribe(System.out::println); it requests Long.MAX_VALUE, aka an unbounded amount).

About the replay rate, the problem is that by default all the processing happens on the thread on which you subscribe, which could be the main thread. If you subscribe to multiple reactive chains in your main they end up executing on that same thread. As long as the processing is truly non-blocking, this is fine (because reactive libraries like RxJava or Reactor are tailored for that kind of thing). But if you block by introducing pauses like here, it becomes a problem: you end up blocking ALL reactive chains, not just the one generated from the RxJournal being paused.

In Reactor for instance, you have operators that deal with time and introducing delays. The way these work without blocking all sequences is that they internally use a Worker thread from a Scheduler, into which they schedule delayed operations.

If you manage to split the reading from the tailer into restartable routines, you can use the same trick to schedule the next read from the queue inside such a dedicated thread, avoiding the blocking pause entirely. This will of course also help in implementing backpressure support (if you have a routine to read one element from the queue, you can hold on calling it if there is no pending request).

@simonbasle Whilst the project is still new I took the opportunity to rename - makes much more sense with this nomenclature. (I'll soon rewrite the README)

I've made a start on handling the request method and now ReactivePlayer (formerly RxPlayer) respects the request number passed in by the Subscription. See ReactiveRequestTest.

I'm going to read @akarnokd 's Wiki page. Thanks for the reference.

Any further comments much appreciated.

@simonbasle

I've taken your suggestions in mind and refactored RxPlayer accordingly.
It now runs the subscription on its own thread so should handle concurrency issues and respects the request().

Would you be able to review this please?

The only issue I have with this strategy is that it always calls back the onNext() etc methods on a different thread. I'm wondering if the user should have some option to allow them to be called back on the same thread. What do you think?

Rgds

I took the decision for now to let the user choose to go on the 'fast path' (to be called back on the same thread and to ignore calls to request) by using a setting on PlayOptions.

Before I write this up in the README I'd be very interested in your comments.

Thx

I see 4 combinations:

  1. unbounded request (fastpath) and ReplayRate.FAST
  2. unbounded request and ReplayRate.ACTUAL_TIME
  3. bounded progressive request and ReplayRate.FAST
  4. bounded progressive request and ReplayRate.ACTUAL_TIME

The only thing that makes a separate thread necessary is actually the blocking pauses induced by ACTUAL_TIME. 1) doesn't need one because you can play the whole queue without pauses in a single pass. 3) doesn't need one because you can use a drain loop pattern for the progressive handling of requests.

The idea is to trigger the fastpath by calling request(Long.MAX_VALUE), which is the convention used by reactive streams.

You could have a Subscriber implementation for the ReplayRate.FAST cases (with a fastpath and a drain loop, but no thread) and one for ReplayRate.ACTUAL_TIME cases (with a thread, and both fastpath and drain loop too).

Don't you think it might be confusing to the users to sometimes be called back on their own thread and sometimes not depending on the subscription? Shouldn't the user have an override if they really want to be called back on their own thread for 2) and 4)

on one hand it could make sense to provide some form of configuration, but on the other hand you need to be sure that the execution context maintains the invariant of onNext not being called concurrently, and that the operator should never block another operator's thread...

In RxJava or Reactor, this is usually solved by allowing the user to provide a custom Scheduler to run on, optionally. The operator would then create a single Worker out of it and schedule onNexts on it, maintaining the invariant... But outside of reactor, in order to ensure the invariant is maintained you'd be limited in accepting either a ThreadFactory or a ThreadPoolExecutor on which you'd assert the corePoolSize and maximumPoolSize are both == 1.

edit: oh yeah it would totally make sense to always run on the user-chosen thread, just as long as it is one dedicated to the operator. What is not desirable is to execute on a thread that is not dedicated to the operator, risking blocking another operator with the blocking pauses.

Thanks again for the points:

So I think this is the way to go for now (I do want to get 1.0 release out soon :) )

By default when the user asks for ACTUAL_TIME I'll create a new thread otherwise I'll use the user thread (or maybe one that they pass in).

I'd like to give the user option of always being able to run on their own thread even if they have chosen ACTUAL_TIME (I'll configure that as an override option in PlayOptions).

I'll use Long.MAX for fastPath as you suggest.

One question I still have is that even when you choose FAST_TIME you might well still have pauses waiting for the journal to get a message (e.g. if the user is listening remotely) - does that pause require having its own thread or can you use the user thread for that?

Ah I forgot that the ReactiveJournal could be reading remotely :(
Yes, latencies at the network level are long enough that this warrants protecting against. Unless there is a callback-based or otherwise asynchronous and non-blocking way of reading from the ChronicleQueue?

It's not so much the network latencies as the fact that the delays will come because the remote event might not be generated for a while. At the end of the day we're just reading from a live journal which can be populated at any time as and when events become available.

@simonbasle I've rewritten ReactivePlayer using the drain pattern so it should handle the request(Long.MAX_VALUE) use case.
In terms of threading - by default the Publisher calls back on a new thread because in most cases there will be delays e.g. If you use ACTUAL_TIME or are waiting for events on the journal.
However I've given the user an option in PlayOptions to return on the same thread.

At this point I'd like to tidy up and go for a 1.0 release.

Of course I welcome your feedback before then if you have a moment to review.