/pcf-gcp-retail-demo

Demonstration of combining PCF running on GCP to help retailers stay close to their customers

Primary LanguageShellApache License 2.0Apache-2.0

PCF + GCP Retail Demo

Diagram showing major components and how data flows

A demo for retailers to see how PCF and GCP turn streams of data into action.

Prerequisites

  • Install the GCP Service Broker, available on Pivotal Network (source is GitHub)
  • Java 8 JDK installed
  • Git client installed
  • Using the Git client, clone this repo
  • Change into this newly created directory: cd ./pcf-gcp-retail-demo

Install Spring Cloud Dataflow (SCDF) server

SCDF is the foundation of the data flow through this system. The server orchestrates the data streams, which are composed of modular building blocks. These can be Source, Processor, or Sink. There is a large set of out of the box components available and, since they are Spring Boot apps, it is easy to build a customized module.

  1. Download the SCDF server and client JAR files, as documented here
  2. Configure the manifest
  3. Ensure RabbitMQ, MySQL, and Redis tiles are installed (Ops Manager)
  4. Create service instances of each of these, using cf cs ...
  5. Push this app to PCF on GCP
  6. Ensure it is running
  7. Access the SCDF Dashboard (at https://dataflow-server.YOUR_PCF_INSTALL.DOMAIN/dashboard/)
  8. Start the SCDF Client you downloaded in a previous step: java -jar spring-cloud-dataflow-shell-1.1.2.RELEASE.jar

Set up the SCDF stream "data backbone"

Initially, configure a simple stream to illustate the following data flow:

  • Social media API client (see 3, 4, 5 in diagram) periodically POSTs data to ...
  • a SCDF HTTP Source (item 6 in the diagram),
  • which is routed through a custom SCDF Processor (item 8 in diagram), where data normalization and enrichment are done.
  • Finally, the stream terminates with the SCDF Sink component (item 17 in diagram).

Build and upload your SCDF modules

  1. Build the Processor project: ( cd ./transform-proc/ && ./mvnw clean package )
  2. Upload the resulting JAR, ./transform-proc/target/transform-proc-0.0.1-SNAPSHOT.jar, into a Cloud Storage bucket, so SCDF is able to acces it.
  3. Build the Sink project: ( cd ./log-sink/ && ./mvnw clean package )
  4. And upload its JAR, ./log-sink/target/log-sink-0.0.1-SNAPSHOT.jar, to Cloud Storage.

Import these modules into SCDF for use in creating streams

  1. You can do this using the SCDF Dashboard (see above), by clicking the "APPS" tab,
  2. then, click "Bulk Import Applications",
  3. then, under "Out-of-the-box Stream app-starters", click the "Action" button associated with the "Maven based Stream Applications with RabbitMQ Binder" row.
  4. To import the two custom built apps which were uploaded to Cloud Storage, you need to go back to the "APPS" tab,
  5. then click the "+ Register Application(s)" button, then populate the various fields, clicking the "+" icon under "Actions" after you enter the first one, to provide a second set of inputs. Check the "Force" box, and note that this demo will name them xform-proc and log-ps.

Set up the stream

  1. Still within the SCDF Dashboard, click the "STREAMS" tab.
  2. Click "Create Stream"
  3. From the available modules, click the "http" module and drag it to the canvas area.
  4. Repeat this for "xform-proc" and "log-ps" modules.
  5. With the three modules now on the canvas, using the mouse, create connections between them.
  6. Click "Create Stream", and fill in values for the stream name (we use "socialmedia" here).
  7. Click the "Deploy stream(s)" check box.
  8. Click "Create".
  9. Click the "Definitions" tab, which should show a view similar to this:

SCDF Dashboard Streams View

Set up the social media API client

  • Push the app: ( cd ./mock-source/ && cf push --no-start )
  • Using cf apps, note the value in the "urls" column for the app whose name ends in "-http"
  • Now, create a service named "http-hub" using a URI based on that value: cf cups http-hub -p '{"uri": "http://dataflow-server-hf30QYI-socialmedia-http.YOUR_PCF_INSTALL.DOMAIN"}'
  • Bind this newly pushed app to this service instance: cf bs mock-source http-hub
  • Finally, start this app: cf start mock-source

Simulate the social media API interaction, sending events

  • Find the URL for this "mock-source" app, from the output of cf apps
  • Simulate the flow of events through this endpoint: while true ; do curl http://mock-source.YOUR_PCF_INSTALL.DOMAIN/datetime ; sleep $( perl -e 'print int(1 + 15*rand()) . "\n"' ) ; done
  • Tail the logs for the sink, the app whose name ends in "-log-ps": cf logs dataflow-server-hf30QYI-socialmedia-log-ps

Every so often (at intevals ranging from 1 to 15 seconds), a log entry should appear:

2017-02-26T06:25:51.15-0500 [APP/PROC/WEB/0]OUT 2017-02-26 11:25:51.157  INFO 18 --- [c.socialmedia-1] log.sink                                 : {"date_time": "02/26/17 11:25:51", "source": "mock", "days_until_message": "9 days 'til GCP NEXT"}

Review of what we have, so far

  • "mock-source" represents the yet-to-be-implemented social media data source, and we can POST data to it. As envisioned, there would be several such components, all falling within this "DATA" area of the diagram, one per social media platform. The way this is decoupled, by having only a REST interface with the SCDF HTTP Hub, allows for developers to build each of these using whichever technology is most appropriate, the best match for the skills of the team and the social media API.
  • SCDF HTTP Hub is just an out-of-the-box SCDF app. Its role here is simply to accept inputs from any of the disparate sources, and put these into the data stream.
  • SCDF Processor, so far, is accepting input, which so far is just the simple JSON string representing the current date and time, as defined in mock-source: {"date-time": "02/24/17 02:04:23"}. It parses the JSON message, computes the number of days between the given date and the date of the GCP NEXT, defined within ./transform-proc/src/main/resources/application.properties, enriches that data stream based on this computation, and emits the result into its outbound channel.
  • SCDF Sink simply takes this input and logs it.

Add the "Data Science Interrogator App", number 9 in the diagram

  1. Stop that curl command loop
  2. Add the new app: ( cd ./ds_app_09/ && cf push )
  3. Use cf apps to find the URL for this new app
  4. Update transform-proc/src/main/resources/application.properties with this URL value
  5. Rebuild this app, then upload it as before
  6. Using the SCDF Dashboard, delete the existing stream ("Destroy")
  7. Re-create that stream, using "Create Stream", as before
  8. Resume that curl command loop
  9. Resume tailing the logs for the app whose name ends with "-log-ps"

Now the log entries appearing here should show new features in the JSON, similar to this:

2017-02-26T06:25:51.15-0500 [APP/PROC/WEB/0]OUT 2017-02-26 11:25:51.157  INFO 18 --- [c.socialmedia-1] log.sink                                 : {"date_time": "02/26/17 11:25:51", "sentiment": {"magnitude": 0.9, "score": 0.4}, "source": "mock", "days_until_message": "9 days 'til GCP NEXT"}

Review: current state

  • With this change, the SCDF Processor (8) is able to interact with the Data Science Interrogator (9).
  • So far, pending a fleshed out version of (9), it just adds an example of what the Google Cloud Language API would provide, for sentiment: "sentiment": {"magnitude": 0.9, "score": 0.4}
  • Once that REST interaction is complete, (8) simply emits the enriched message back onto the message queue for downstream processing.

Resources

  # Validate signature against GitHub Webhook secret, only if environment variable
  # GIT_WEBHOOK_SECRET is set
  global git_webhook_secret
  if git_webhook_secret:
    log("Comparing SHA1 digests for payload")
    if type(git_webhook_secret) == unicode:
      git_webhook_secret = git_webhook_secret.encode()
    signature = request.headers.get("X-Hub-Signature").split('=')[1]
    mac = hmac.new(git_webhook_secret, msg = request.data, digestmod = sha1)
    if compare_digest(u'{0}'.format(mac.hexdigest()), u'{0}'.format(signature)):
      log("Digests match -- proceeding")
    else:
      log("Digests don't match -- aborting")
      abort(403)