/Akka-Play-Spark

An implementation of two NLP classifiers using Play, Spark, Akka and Scala

Primary LanguageJavaScript

Apache Spark, MLlib, Scala, Akka and Play Framework

The main focus is the orchestration of these technologies by an example of using machine learning for classifying the sentiment of Twitter messages using MLlib.

The fundamental idea of sentiment classification used in this template is based on the paper by Alec Go et al..

Setup Instructions

Make sure that you have Java 8, either Sbt or Typesafe Activator and Node.js already installed on your machine. You should have at least two cores available on this machine since Spark streaming (used by the OnlineTrainer) will occupy one core. Hence, to be able to process the data the application needs at least one more resource.

  1. Clone this repository: git clone etc.
  2. Change into the newly created directory: cd spark-mlib-scala-play
  3. Insert your Twitter access token and consumer key/secret pairs in application.conf. For generating a token, please refer to dev.twitter.com. By default the application runs in single-user-mode which means the access tokens configured in your application.conf respectively local.conf will be also used for querying Twitter by keywords. This is fine when you run the application locally. Note: If you want to run the application in production mode you would have to turn single-user-mode off so that OAuth per user is used instead. To do so change the line in your conf/application.conf to twitter.single-user-mode = no. Also make sure to provide an application secret.
  4. Launch SBT: sbt run.
  5. Navigate your browser to: http://localhost:9000
  6. If necessary change the twitter.redirect.url in application.conf to the url the application actually uses
  7. If necessary (if twitter changes the url to its fetch tweets service) change the twitter.fetch.url in application.conf to the new one. Ensure that the last url parameter is the query string, the application will append the keyword at the end of the url.

If starting the application takes a very long time or even times out it may be due to a known Activator issue. In that case do the following before starting with sbt run.

  1. Delete the project/sbt-fork-run.sbt file
  2. Remove the line fork in run := true (added automatically when you start activator) from the bottom of build.sbt

Without the fork option, which is needed by Activator the application should start within a few seconds.

The Classification Workflow

The following outline demonstrates how the actor communication workflow for classification looks like:

The Application controller serves HTTP requests from the client/browser and obtains ActorRefs for EventServer, StatisticsServer and Director.

The Director is the root of the Actor hierarchy, which creates all other durable (long lived) actors except StatisticsServer and EventServer. Besides supervision of the child actors it builds the bridge between Playframework and Akka by handing over the Classifier ActorRefs to the controller. Moreover, when trainings of the estimators within BatchTrainer and OnlineTrainer are finished, this actor passes the latest Machine Learning models to the StatisticsServer (see Figure below). For the OnlineTrainer statistics generation is scheduled every 5 seconds.

The Classifier creates a FetchResponseHandler actor and tells the TwitterHandler with a Fetch message (and the ActorRef of the FetchResponseHandler) to get the latest Tweets by a given keyword or query.

Once the TwitterHandler has fetched some Tweets, the FetchResponse is sent to the FetchResponseHandler.

The FetchResponseHandler creates a TrainingModelResponseHandler actor and tells the BatchTrainer and OnlineTrainer to pass the latest model to TrainingResponseHandler. It registers itself as a monitor for TrainingResponseHandler and when this actor terminates it stops itself as well.

The TrainingModelResponseHandler collects the models and vectorized Tweets makes predictions and sends the results to the original sender (the Application controller). The original sender is passed through the ephemeral (short lived) actors, indicated by the yellow dotted line in the figure above.

Model Training and Statistics

The following outline demonstrates how the actors involved in training the machine learning estimators and serving statistics about their predictive performance:

The BatchTrainer receives a Train message as soon as a corpus (a collection of labeled Tweets) has been initialized. This corpus is initialized by the CorpusInitializer and can either be created on-the-fly via Sparks TwitterUtils.createStream (with automatic labeling by using emoticons ":)" and ":(") or a static corpus provided by Sentiment140 which is read from a CSV file. Which one to choose can be configured via ml.corpus.initialization.streamed in application.conf. For batch training we use the high-level org.apache.spark.ml API. We use Grid Search Cross Validation to get the best hyperparameters for our LogisticRegression model.

The OnlineTrainer receives a Train message with a corpus (an RDD[Tweet]) upon successful initialization just like the BatchTrainer. For the online learning approach we use the experimental StreamingLogisticRegressionWithSGD estimator which, as the name implies, uses Stochastic Gradient Descent to update the model continually on each Mini-Batch (RDD) of the DStream created via TwitterUtils.createStream.

The StatisticsServer receives {Online,Batch}TrainerModel messages and creates performance metrics like Accuracy, Area under the ROC Curve and so forth which in turn are forwarded to the subscribed EventListeners and finally sent to the client (browser) via Web Socket.

The EventListener s are created for each client via the Playframeworks built-in WebSocket.acceptWithActor. EventListeners subscribe for EventServer and StatisticsServer. When the connections terminate (e.g. browser window is closed) the respective EventListener shuts down and unsubscribes from EventServer and/or StatisticsServer via postStop().

The EventServer is created by the Application controller and forwards event messages (progress of corpus initialization) to the client (also via Web Socket).