This HTTP Client wraps the excellent AsyncHttpClient so that Observables are returned, and a number of best practices in RESTful integration are enforced.
The intent is that your application uses one RxHttpClient
instance for each integration point (usually a REST service). Because creating
an RxHttpClient
is expensive, you should do this only once in your application.
As RxHttpClients
are limited to one service, we have natural bulkheads between integration points: errors and failures with
respect to one integration point will have no direct effect on other integration points (at least if following the recommendations below).
An RxHttpClient
is created using the RxHttpClient.Builder
as in this example for Java:
RxHttpClient client = new RxHttpClient.Builder()
.setRequestTimeout(REQUEST_TIME_OUT)
.setMaxConnections(MAX_CONNECTIONS)
.setConnectionTTL(60000)
.setConnectionTimeout(1000)
.setAccept("application/json")
.setBaseUrl("http://example.com/api")
.build();
and for Scala:
import be.wegenenverkeer.rxhttp.scala.ImplicitConversions._
val client = new RxHttpClient.Builder()
.setRequestTimeout(REQUEST_TIME_OUT)
.setMaxConnections(MAX_CONNECTIONS)
.setConnectionTTL(60000)
.setConnectionTimeout(1000)
.setAccept("application/json")
.setBaseUrl("http://example.com/api")
.build
.asScala
REST Requests can be created using ClientRequestBuilders
which in turn can be got from RxHttpClient
instances, like so:
ClientRequest request = client.requestBuilder()
.setMethod("GET")
.setUrlRelativetoBase(path)
.addQueryParam("q", "test")
.build();
ClientRequest
s are immutable so can be freely shared across threads or (Akka) Actors.
RxHttpClient
has several methods for executing ClientRequests
:
executeToCompletion(ClientRequest, Function<ServerResponse, F>)
returns anObservable<F>
. The second parameter is a function that decodes theServerResponse
to a value of typeF
. The returnedObservable
emits exactly oneF
before completing. In case of an error, it emits either anHttpClientError
orHttpServerError
.execute(ClientRequest, Function<ServerResponse, F>)
returns aCompletableFuture<F>
with the response after being decoded by the function in the argumentexecuteOservably(ClientRequest, Function<byte[], F>)
returns anObservable<F>
which returns anF
for each HTTP response part or chunk received. This is especially useful for processing HTTP responses that use chunked transfer encoding. Each chunk will be transformed to a value ofF
and directly emitted by the Observable.
All Observables returned by these methods are "Cold" Observables. This means that the ClientRequest
is executed only when some Observer subscribes
to the Observable. In fact, whenever an Observer subscribes to the Observable, the request is executed.
To allow proper bulkheading between integration points and the rest of your application, you should follow these recommendations:
- Set the maximum number of Connections using method
RxHttpClient.Builder().setMaxConnections(int)
so that one misbehaving integration doesn't start exhausting server resources - Set an explicit Connection Time-To-Live (TTL) using method
RxHttpClient.builder().setConnectionTTL(int)
. This ensures that connections are regularly recreated which is a good thing in dynamic (clooud) environments - Ensure you have an appropriate Connection Timeout set using
RxHttpClient.Builder().setConnectionTimeout(int)
. The default is set to 5 seconds. - Ensure you have appropriate Request Timeouts and Read Timeouts set. The default for both is 1 min. These time outs ensures your application doesn't get stuck waiting for very slow or non-responsive servers.
- Before discarding an
RxHttpClient
explicitly invoke theRxHttpClient.close()
method itsExecutorService
is closed and I/O threads are destroyed