This repo is a home for small presentation about RxJava, which was given on JEEConf 2016 and JavaDay Kyiv 2016. It contains slides, presented code samples, and some useful links. Presentation description is here.
Date | What happended |
---|---|
Nov 17, 2009 | Rx for .NET v.1.0 (shipped with .NET 4.0) |
Mar 17, 2010 | Reactive Extensions for JS released |
Aug 15, 2012 | Rx for .NET v.2.0 |
Feb, 2013 | Ben Christensen starts library porting to JVM |
Nov 18, 2014 | RxJava v. 1.0.0 |
Oct 5, 2016 | RxJava v. 1.2.1 (latest at the moment) |
Having stream of new tweets (based on keywords):
- Track and report most followed tweet author in stream
- Track and report most retweeted tweet of most popular user
Twitter Stream API (WebSocket alike):
- Doc: https://dev.twitter.com/streaming/overview
- Library:
com.twitter:hbc-core:2.2.0
Twitter REST API (Documentation):
GET https://api.twitter.com/1.1/users/show.json?screen_name=neposuda
GET https://api.twitter.com/1.1/search/tweets.json?q=from:neposuda
class Tweet {
String text;
int favorite_count;
String author;
int author_followers;
}
class Profile {
String screen_name;
String name;
String location;
int statuses_count;
int followers_count;
}
class UserWithTweet {
Profile profile;
Tweet tweet;
}
Profile getUserProfile(String screenName) {
ObjectMapper om = new ObjectMapper();
return (Profile) om.readValue(om.readTree(
Unirest.get(API_BASE_URL + "users/show.json")
.queryString("screen_name", screenName)
.header("Authorization", bearerAuth(authToken.get()))
.asString()
.getBody()),
Profile.class);
}
Observable<Profile> getUserProfile(String screenName) {
if (authToken.isPresent()) {
return Observable.fromCallable(() -> {
ObjectMapper om = new ObjectMapper();
return (Profile) om.readValue(om.readTree(
Unirest.get(API_BASE_URL + "users/show.json")
.queryString("screen_name", screenName)
.header("Authorization", bearerAuth(authToken.get()))
.asString()
.getBody()),
Profile.class);
}).doOnCompleted(() -> log("getUserProfile completed for: " + screenName));
} else {
return Observable.error(new RuntimeException("Can not connect to twitter"));
}
}
Observable<UserWithTweet> getUserAndPopularTweet(String author){
return Observable.just(author)
.flatMap(u -> {
Observable<Profile> profile = client.getUserProfile(u)
.subscribeOn(Schedulers.io());
Observable<Tweet> tweet = client.getUserRecentTweets(u)
.defaultIfEmpty(null)
.reduce((t1, t2) ->
t1.retweet_count > t2.retweet_count ? t1 : t2)
.subscribeOn(Schedulers.io());
return Observable.zip(profile, tweet, UserWithTweet::new);
});
}
streamClient.getStream("RxJava", "JEEConf", "JavaDay" "Java", "Trump")
.scan((u1, u2) -> u1.author_followers > u2.author_followers ? u1 : u2)
.distinctUntilChanged()
.map(p -> p.author)
.flatMap(name -> getUserAndPopularTweet(name))
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.immediate())
.subscribe(p -> log.info("The most popular tweet of user "
+ p.profile.name + ": " + p.tweet));
Same solution but with extended method: getUserAndPopularTweet(name)
streamClient.getStream("RxJava", "JEEConf", "JavaDay", "Java", "Trump")
.scan((u1, u2) -> u1.author_followers > u2.author_followers ? u1 : u2)
.distinctUntilChanged()
.map(p -> p.author)
.flatMap(name -> {
Observable<Profile> profile = client.getUserProfile(name)
.subscribeOn(Schedulers.io());
Observable<Tweet> tweet = client.getUserRecentTweets(name)
.defaultIfEmpty(null)
.reduce((t1, t2) ->
t1.retweet_count > t2.retweet_count ? t1 : t2)
.subscribeOn(Schedulers.io());
return Observable.zip(profile, tweet, UserWithTweet::new);
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.immediate())
.subscribe(p -> log.info("The most popular tweet of user "
+ p.profile.name + ": " + p.tweet));
Pitfalls:
- API is big (150+ methods to remember)Enhancing Java 8 Streams
- Requires to understand underlying magic
- Hard to debug
- Don’t forget about back pressure
Strength:
- It is functional, it is reactive*
- Good for integration scenarios
- Allows to control execution threads
- Easy to compose workflows
- Easy to integrate into existing solutions
- Easy to test
@Test public void correctlyJoinsHttpResults() throws Exception {
String testUser = "testUser";
Profile profile = new Profile("u1", "Name", "USA", 10, 20, 30);
Tweet tweet1 = new Tweet("text-1", 10, 20, testUser, 30);
Tweet tweet2 = new Tweet("text-2", 40, 50, testUser, 30);
TwitterClient client = mock(TwitterClient.class);
when(client.getUserProfile(testUser)).thenReturn(Observable.just(profile));
when(client.getUserRecentTweets(testUser)).thenReturn(Observable.just(tweet1, tweet2));
TestSubscriber<UserWithTweet> testSubscriber = new TestSubscriber<>();
new Solutions().getUserAndPopularTweet(client, testUser).subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent();
assertEquals(singletonList(new UserWithTweet(profile, tweet2)),
testSubscriber.getOnNextEvents());
}
Good starting points:
Recommended tutorials and courses:
Recommended videos:
- José Paumard: Java 8 Stream and RxJava comparison: patterns and performances
- Artur Glier: Learn you some Rx for the greater good
- Ross Hambrick: RxJava and Retrolambda Making Android | Devnexus 2015
- Li Haoyi: Fun Functional-Reactive Programming with Scala.Rx
- Ben Christensen: Functional Reactive Programming with RxJava, Netflix
- Erik Meijer: A Playful Introduction to Rx
- Chris Richardson: Futures and Rx Observables: powerful abstractions for consuming web services asynchronously
- Roland Kuhn: Reactive Design Patterns
- Dan Lew: Common RxJava Mistakes
Recommended articles:
Good Presentations:
- RxJava - introduction & design
- Building Scalable Stateless Applications with RxJava
- Reactive Programming with RxJava for Efficient Data Access
- Simon Baslé: Practical RxJava
- Java 8 Streaming API vs RxJava
Reactive programming for Scala:
- RxScala - RxJava binding
- Scientific paper: Ingo Maier Martin Odersky: Deprecating the Observer Pattern with Scala.React
- Scala.Rx: experimental feature of Scala SDK
Reactive programming for other platforms:
Java 8 Stream API related libraries: