/demo-java-reactive-streams

'Reactive Streams specification in Java' article and source code.

Primary LanguageJava

Reactive Streams specification in Java

Introduction

Reactive Streams is a cross-platform specification for processing a potentially infinite sequence of events across asynchronous boundaries (threads, processes, or network-connected computers) with non-blocking backpressure. A reactive stream contains a publisher that sends forward data, error, completion events, and subscribers that send backward request and cancel backpressure events. There can also be intermediate processors between the publisher and the subscriber that filter or transform events.

Backpressure is application-level flow control from the subscriber to the publisher to control the sending rate.

The Reactive Streams specification is designed to efficiently process (in terms of CPU and memory usage) time-ordered sequences of events. For efficient CPU usage, the specification describes the contracts for asynchronous and non-blocking events processing in different stages (producers, processors, consumers). For efficient memory usage, the specification describes the contracts for switching between push and pull communication models based on the events processing rate, which avoids using unbounded buffers.

Problems and solutions

When designing systems to transfer items from a producer to a consumer, the goal is to send them with minimal latency and maximum throughput.

Latency is the time between sending an item from the producer and its receiving by the consumer. Throughput is the number of items sent from producer to consumer per unit of time.

However, the producer and the consumer may have limitations that can prevent the system from achieving the best performance:

  • The consumer can be slower than the producer.
  • The consumer may not be able to skip items that it does not have time to process.
  • The producer may not be able to slow or stop sending items that the consumer does not have time to process.
  • The producer and consumer may have a limited number of CPU cores to process items asynchronously and memory to buffer items.
  • The communication channel between the producer and the consumer may have limited bandwidth.

There are several patterns for sequential item processing, that solve some or most of the above limitations:

  • Iterator
  • Observer
  • Reactive Extensions
  • Reactive Streams

These patterns fall into two groups: synchronous pull communication models (in which the consumer determines when to receive items from the producer) and asynchronous push communication models (in which the producer determines when to send items to the consumer).

Iterator

In the Iterator pattern, the consumer synchronously pulls items from the producer one by one. The producer sends an item only when the consumer requests it. If the producer does not have an item at the time of the request, he sends an empty response.

Iterator

Pros:

  • The consumer can start the exchange at any time.
  • The consumer cannot request the next item if he has not yet processed the previous one.
  • The consumer can stop the exchange at any time.

Cons:

  • The latency may not be optimal due to an incorrectly chosen pulling period (too long pulling period leads to high latency; too short pulling period wastes CPU and I/O resources).
  • The throughput is not optimal because it takes one request-response to send each item.
  • The consumer cannot determine if the producer has finished generating items.

When using the Iterator pattern, which transfers items one at a time, latency and throughput are often unsatisfactory. To improve these parameters with minimal changes, the same Iterator pattern can transfer items in batches rather than one at a time.

Iterator with batching

Pros:

  • The consumer can start the exchange at any time.
  • The consumer cannot request the next item if he has not yet processed the previous one.
  • The consumer can stop the exchange at any time.
  • Throughput increases as the number of requests/responses decreases from one for each item to one for all items in a batch.

Cons:

  • The latency increases because the producer needs more time to send more items.
  • If the batch size is too large, it may not fit in the memory of the producer or the consumer.
  • If the consumer wants to stop processing, he can do so no sooner than he receives the entire batch.

Observer

In the Observer pattern, one or many consumers subscribe to the producer's events. The producer asynchronously pushes events to all subscribed consumers as soon as it generates them. The consumer can unsubscribe from the producer if it does not need further events.

Observer

Pros:

  • The consumer can start the exchange at any time.
  • The consumer can stop the exchange at any time.
  • The latency is lower than in synchronous pull communication models because the producer sends events to the consumer as soon as they become available.

Cons:

  • A slower consumer may be overwhelmed by events from a faster producer.
  • The consumer cannot determine when the producer has finished generating items.
  • Implementing concurrent producers and consumers may be non-trivial.

Reactive Extensions

Reactive Extensions (ReactiveX) is a family of multi-platform frameworks for handling synchronous or asynchronous event streams, originally created by Erik Meijer at Microsoft. The implementation of Reactive Extensions for Java is the Netflix RxJava framework.

In simplified terms, Reactive Extensions are a combination of the Observer and Iterator patterns and functional programming. From the Observer pattern, they took the consumer’s ability to subscribe to the producer’s events. From the Iterator pattern, they took the ability to handle event streams of three types (data, error, completion). From functional programming, they took the ability to handle event streams with chained methods (filter, transform, combine, etc.).

Reactive Extensions

Pros:

  • The consumer can start the exchange at any time.
  • The consumer can stop the exchange at any time.
  • The consumer can determine when the producer has finished generating events.
  • The latency is lower than in synchronous pull communication models because the producer sends events to the consumer as soon as they become available.
  • The consumer can uniformly handle event streams of three types (data, error, completion).
  • Handling event streams with chained methods can be easier than with many nested event handlers.

Cons:

  • A slower consumer may be overwhelmed by events from a faster producer.
  • Implementing concurrent producers and consumers may be non-trivial.

Reactive Streams

Reactive Streams are a further development of Reactive Extensions, which use backpressure to match producer and consumer performance. In simplified terms, Reactive Streams are a combination of Reactive Extensions and batching.

The main difference between them is who is the initiator of the exchange. In Reactive Extensions, a publisher sends events to a subscriber as soon as they become available and in any number. In Reactive Streams, a publisher must send events to a subscriber only after they have been requested and no more than the requested number.

Reactive Streams

Pros:

  • The consumer can start the exchange at any time.
  • The consumer can stop the exchange at any time.
  • The consumer can determine when the producer has finished generating events.
  • The latency is lower than in synchronous pull communication models because the producer sends events to the consumer as soon as they become available.
  • The consumer can uniformly handle event streams of three types (data, error, completion).
  • Handling event streams with chained methods can be easier than with many nested event handlers.
  • The consumer can request events from the producer depending on the need.

Cons:

  • Implementing concurrent producers and consumers may be non-trivial.

Backpressure

There are several solutions for the problem where a producer generates events faster than a consumer processes them. This does not happen in pull communication models because the consumer initiates the exchange. In push communication models, the producer cannot usually determine the sending rate, so the consumer may eventually receive more events than it can process. Backpressure is a solution to this problem by informing the producer about the processing rate of its consumers.

Without the use of backpressure, the consumer has a few solutions to deal with excessive events:

  • buffer events
  • drop events
  • drop events and request the producer to resend them by their identifiers

Any solution that includes dropping events on the consumer may be inefficient because these events still require I/O operations to send them from the producer.

The backpressure in Reactive Streams is implemented as follows. To start receiving events from the producer, the consumer pulls the number of items it wants to receive. Only then does the producer push events to the consumer; the producer never sends them on its initiative. After the consumer has processed all the requested events, the whole cycle repeats. In a particular case, if the consumer is known to be faster than the producer, it can work in the push communication model and request all items immediately after subscribing. Or vice versa, if the consumer is known to be slower than the producer, it can work in the pull communication model and request the next items only after the previous ones have been processed. Thus, the model in which reactive streams operate can be described as a dynamic push/pull communication model. It works effectively if the producer is faster or slower than the consumer or even when that ratio can change over time.

With the use of backpressure, the producer has much more solutions to deal with excessive events:

  • buffer events
  • drop events
  • pause generation events
  • block the producer
  • cancel the event stream

Which solutions to use for a particular reactive stream depends on the nature of the events. But backpressure is not a silver bullet. It simply shifts the problem of performance mismatch to the producer's side, where it is supposed to be easier to solve. However, in some cases, there are better solutions than backpressure, such as simply dropping excessive events on the consumer's side.

The Reactive Streams specification

Reactive Streams is a specification to provide a standard for asynchronous stream processing with non-blocking backpressure for various runtime environments (JVM, .NET, and JavaScript) and network protocols. The Reactive Streams specification was created by engineers from Kaazing, Lightbend, Netflix, Pivotal, Red Hat, Twitter, and others.

The specification describes the concept of reactive streams that have the following features:

  • reactive streams can be unicast and multicast: a publisher can send events to one or many consumers.
  • reactive streams are potentially infinite: they can handle zero, one, many, or an infinite number of events.
  • reactive streams are sequential: a consumer processes events in the same order in which a producer sends them.
  • reactive streams can be synchronous or asynchronous: they can use computing resources for parallel processing in separate stages.
  • reactive streams are non-blocking: they do not waste computing resources if the performance of a producer and a consumer are different.
  • reactive streams use mandatory backpressure: a consumer can request events from a producer according to their processing rate.
  • reactive streams use bounded buffers: they can be implemented without unbounded buffers, avoiding out-of-memory errors.

The Reactive Streams specification for the JVM (the latest version 1.0.4 was released on May 26th, 2022) contains the textual specification and the Java API, which contains four interfaces that must be implemented according to this specification. It also includes the Technology Compatibility Kit (TCK), a standard test suite for conformance testing of implementations.

It is important to note that the Reactive Streams specification was created after several mature but incompatible implementations of Reactive Streams already existed. Therefore, the specification is currently limited and contains only low-level APIs. Application developers should use this specification to provide interoperability between existing implementations. To have high-level functional APIs (filter, transform, combine, etc.), application developers should use implementations of this specification (Lightbend Akka Streams, Pivotal Project Reactor, Netflix RxJava, etc.) through their native APIs.

The Reactive Streams API

The Reactive Streams API consists of four interfaces, which are located in the org.reactivestreams package:

  • The Publisher<T> interface represents a producer of data and control events.
  • The Subscriber<T> interface represents a consumer of events.
  • The Subscription interface represents a connection between a Publisher and a Subscriber.
  • The Processor<T,R> interface represents a processor of events that acts as both a Subscriber and a Publisher.

Reactive Streams API

Publisher

The Publisher interface represents a producer of potentially infinite sequenced data and control events. A Publisher produces events according to the demand received from one or many Subscribers.

Demand is the aggregated number of items requested by a Subscriber that have not yet been delivered by the Publisher.

Publishers may vary about whether Subscribers receive events that were produced before they subscribed. Cold publishers can be repeated and do not start until they are subscribed (in-memory iterators, file readings, database queries, etc.). Hot publishers cannot be repeated and start immediately, regardless of the presence of subscribers (keyboard and mouse events, sensor events, network requests, etc.).

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

This interface has the following method:

  • The subscribe(Subscriber) method requests the Publisher to start sending events to a Subscriber.

Subscriber

The Subscriber interface represents a consumer of events. Multiple Subscribers can subscribe to and unsubscribe from a Producer at different times.

public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T item);
    public void onError(Throwable t);
    public void onComplete();
}

This interface has the following methods:

  • The onSubscribe(Subscription) method is invoked when the Producer accepts a new Subscription.
  • The onNext(T) method is invoked on each received item.
  • The onError(Throwable) method is invoked on erroneous completion.
  • The onComplete() method is invoked on successful completion.

Subscription

The Subscription interface represents a connection between a Publisher and a Subscriber. Through a Subscription, the Subscriber can request items from the Publisher or cancel the connection.

public interface Subscription {
    public void request(long n);
    public void cancel();
}

This interface has the following methods:

  • The request(long) method adds the given number of items to the unfulfilled demand for this Subscription.
  • The cancel() method requests the Publisher to eventually stop sending items.

Processor

The Processor interface represents a processing stage that extends the Subscriber and Publisher interfaces and is subject to the contracts of both. It acts as a Subscriber for the previous stage of a reactive stream and as a publisher for the next one.

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

The Reactive Streams workflow

The Reactive Streams workflow consists of three steps: establishing a connection, exchanging data and control events, and successfully or exceptionally terminating the connection.

Reactive Streams workflow

When a Subscriber wants to start receiving events from a Publisher, it calls the Publisher.subscribe(Subscriber) method. If the Publisher accepts the request, it creates a new Subscription instance and invokes the Subscriber.onSubscribe(Subscription) method. If the Publisher rejects the request or otherwise fails, it invokes the Subscriber.onError(Throwable) method.

Once the Publisher and the Subscriber establish a connection with each other through the Subscription instance, the Subscriber can request events, and the Publisher can send them. When the Subscriber wants to receive events, it calls the Subscription#request(long) method with the number of items requested. Typically, the first such call occurs in the Subscriber.onSubscribe(Subscription) method. The Publisher sends each requested item by calling the Subscriber.onNext(T) method only in response to a previous request. A Publisher can send fewer events than requested if the reactive stream ends, but then must call either the Subscriber.onComplete() or Subscriber.onError(Throwable) methods.

If the Subscriber wants to stop receiving events, it calls the Subscription.cancel() method. After calling this method, the Subscriber can continue to receive events to meet the previously requested demand. A canceled Subscription does not receive Subscriber.onComplete() or Subscriber.onError(Throwable) events.

When there are no more events, the Publisher completes the Subscription successfully by calling the Subscriber.onCompleted() method. When an unrecoverable exception occurs in the Publisher, it completes the Subscription exceptionally by calling the Subscriber.onError(Throwable) method. After invocation of Subscriber.onComplete() or Subscriber.onError(Throwable) events, the current Subscription will not send any other events to the Subscriber.

The JDK Flow API

The JDK has supported the Reactive Streams specification since version 9 in the form of the Flow API. The Flow class contains nested static interfaces Publisher, Subscriber, Subscription, Processor, which are 100% semantically equivalent to their respective Reactive Streams counterparts. The Reactive Streams specification contains the FlowAdapters class, which is a bridge between the Reactive Streams API (the org.reactivestreams package) and the JDK Flow API (the java.util.concurrent.Flow class). The only implementation of the Reactive Streams specification that JDK provides so far is the SubmissionPublisher class that implements the Publisher interface.

Code examples

Cold synchronous reactive stream

This document describes the implementation of a synchronous Producer, a synchronous Consumer, and a cold reactive stream created from them.

Cold asynchronous reactive stream

This document describes the implementation of an asynchronous Producer, an asynchronous Consumer, and a cold reactive stream created from them.

Hot asynchronous reactive stream

This document describes the implementation of an asynchronous Producer and an asynchronous Processor extending the SubmissionPublisher class and a hot reactive stream created from them.

Conclusion

Before Reactive Streams appeared in the JDK, there were related CompletableFuture and Stream APIs. The CompletableFuture API uses the push communication model but supports asynchronous computations of a single value. The Stream API supports synchronous or asynchronous computations of multiple values but uses the pull communication model. Reactive Streams have taken a vacant place and support synchronous or asynchronous computations of multiple values and can also dynamically switch between the push and pull computations models. Therefore, Reactive Streams are suitable for processing sequences of events with unpredictable rates, such as mouse and keyboard events, sensor events, and latency-bound I/O events from a file or network.

Crucially, application developers should not implement the interfaces of the Reactive Streams specification themselves. First, the specification is complex enough, especially in asynchronous contracts, and cannot be easily implemented correctly. Second, the specification does not contain APIs for intermediate stream operations. Instead, application developers should implement the reactive stream stages (producers, processors, consumers) using existing frameworks (Lightbend Akka Streams, Pivotal Project Reactor, Netflix RxJava) with their much richer native APIs. They should use the Reactive Streams API only to combine heterogeneous stages into a single reactive stream.

Complete code examples are available in the GitHub repository.