This is the sample code for the video Score streaming data in R with Watson Studio Streams Flows. The code in the video is discussed in this blog post on Medium. This repo includes the full application from the video and a modifiable template so you can use it with your own data.
- Precompiled versions of the microservice are in the
bin
folder, so you can try the application without having to compile any code. - Sample flow from the demo (uses generated data).
- Running the sample
- Modify the sample to use your own model
- The
example
folder contains a template flow and template SPL code so that you can run the sample with your own data.
To use the sample, you need to
- Import and run the sample flow in Watson Studio Streams flows.
- Run the forecasting microservice in the Streaming analytics service.
First, create an account in Watson Studio and an instance of the Streaming analytics service.
- Sign up for Watson Studio if you haven't already.
- Upload the sample flow:
- Download the PredictHotspotUsage.stp file
- From Watson Studio, create a project if you do not have one.
- Follow these steps to upload the sample flow you just downloaded.
- Start the flow:
Once the flow is running, submit the forecasting microservice. There are 2 versions of the application:
bin/score.Forecast_With_R.sab
, just scores the data and publishes the resultsbin/score.Forecast_With_ModelUpdates.sab
is configured to connect to Cloud Object Storage (COS) for updates to the model.
To use the application that has model updates enabled, follow the steps below to configure Cloud object storage.]LINK]
- From the metrics page of the flow, open the Streaming analytics dashboard by clicking Show notifications > Manage [your instance name] in the cloud.
- Once it opens, click the submit button
- Upload the
bin/score.Forecast_With_R.sab
file. - Once the application is running, the streams graph should look like this:
You can return to the flow and see that the forecast results are being ingested correctly.
If you want to run the version of the application that has model updates, you must:
- Create an instance of Cloud Object Storage in the IBM cloud.
- Configure your Streaming analytics service to connect to Cloud Object Storage instance as described here.
- Create a bucket called
models-demo
in your COS instance.
This repo also includes a sample flow and template SPL code so you can try out forecasting with your own data and model.
To modify the sample to use your own model:
- Modify the template flow to publish your own data
- Modify the template SPL to ingest the right data
- Change the R scripts to load and score your models
Unlike the video, the template flow converts the stream of data to JSON before publishing it to the Streams instance. This makes it easy for you to set the data schema in the forecasting microservice.
-
Download the template flow and upload it to Streams flows. Connect the data stream you wish to publish to the
ConvertToJson
node. -
Run the flow. Once it is running you can see the list of attributes that will be expected by clicking the stream being published:
In this case the data has the following attributes:
{"id": 443.0, "address": "A717_M", "time": 1568742876.0}
These attributes will be used in the SPL code later.
If you are completely new to SPL please follow the SPL development guide first so you can learn some basics.
-
First, configure your development environment (Atom or VS Code) to develop with SPL. Install the Streams plugins for VS Code or Atom and then import the source code into the editor.
-
Edit [example/ForecastingTemplate.spl]` to subscribe to the data you published from Streams flows:
- At line 10, edit the schema to match the list of attributes
//Change this type to match the type of the attributes you expect type InputDataSchema =float64 id, float64 time, int32 unique_users, int32 total_users;
In our example, our schema was
{"id": 443.0, "address": "A717_M", "time": 1568742876.0}
.Change the
InputDataSchema
to match those types, e.g.type InputDataSchema =float64 id, rstring address, int64 time;
See the doc for a full list of SPL types.
- Make sure the topic you are subscribing to matches the topic published from the flow. The default is
inputData
:
stream<JsonData> JsonDataToScore = Subscribe() { param topic : "inputData" ; }
- At line 10, edit the schema to match the list of attributes
-
Edit the
initialize.r
script to load your own model, at line 6. Make sure the model you are using is in theetc/R
folder of your project. -
Edit the scoring function in
predict.r
to use your own variables. -
Change the
RScript
operator to send the data to the R script so that the attributes in your input match the variable names expected by the R scriptpredict.r
. Modify these lines as needed. E.g. if you wanted to map theid
attribute to your input to a variable in the R script calledsensorId
, you would have:stream<ForecastResult> RScriptResult = RScript(DataToScore) { param initializationScriptFileName : $appDir + "/etc/R/initialize.r" ; //edit this file to load your model rScriptFileName : $appDir + "/etc/R/predict.r" ; // streamAttributes : id; rObjects : "sensorId" ;
-
Change the
RScript
operator to retreive the results from your RScript:stream<ForecastResult> RScriptResult = RScript(DataToScore) { param initializationScriptFileName : $appDir + "/etc/R/initialize.r" ; //edit this file to load your model rScriptFileName : $appDir + "/etc/R/predict.r" ; // streamAttributes : id; rObjects : "sensorId" ; output RScriptResult : forecastedValue1 = fromR("value1"), forecastedValue2 = fromR("value2") ; }
In the example above,
predict.r
has a value calledvalue1
andvalue2
which we want to retreive. -
Add the names of the output add the attribute names to the
ForecastedResults
type definition on line 12 of ForecastingTemplate.spl:type ForecastResult = tuple <float32 forecastedValue, int32 forecastedValue2>, InputDataSchema;
-
Compile and run the sample - right click the SPL file in the tree view and select "Build and submit job":
-
Go back to the flow to see the results once the application is built successfully.