
Stream processing with Azure Databricks

This reference architecture shows an end-to-end stream processing pipeline. This type of pipeline has four stages: ingest, process, store, and analysis and reporting. For this reference architecture, the pipeline ingests data from two sources, performs a join on related records from each stream, enriches the result, and calculates an average in real time. The results are stored for further analysis.

Scenario: A taxi company collects data about each taxi trip. For this scenario, we assume there are two separate devices sending data. The taxi has a meter that sends information about each ride — the duration, distance, and pickup and dropoff locations. A separate device accepts payments from customers and sends data about fares. To spot ridership trends, the taxi company wants to calculate the average tip per mile driven, in real time, for each neighborhood.

Deploy the solution

A deployment for this reference architecture is available on GitHub.


  1. Clone, fork, or download this GitHub repository.

  2. Install Docker to run the data generator.

  3. Install Azure CLI.

  4. Install Databricks CLI.

  5. Install a Java IDE, with the following resources:

    • JDK 1.8
    • Scala SDK 2.11
    • Maven 3.5.4

Download the New York City taxi and neighborhood data files

  1. Create a directory named DataFile in the root of the cloned Github repository in your local file system.

  2. Open a web browser and navigate to https://uofi.app.box.com/v/NYCtaxidata/folder/2332219935.

  3. Click the Download button on this page to download a zip file of all the taxi data for that year.

  4. Extract the zip file to the DataFile directory.

    [!NOTE] This zip file contains other zip files. Don't extract the child zip files.

    The directory structure must look like the following:


Deploy the Azure resources

  1. From a shell or Windows Command Prompt, run the following command and follow the sign-in prompt:

    az login
  2. Navigate to the folder named azure in the GitHub repository:

    cd azure
  3. Run the following commands to deploy the Azure resources:

    export resourceGroup='[Resource group name]'
    export resourceLocation='[Region]'
    export eventHubNamespace='[Event Hubs namespace name]'
    export databricksWorkspaceName='[Azure Databricks workspace name]'
    export cosmosDatabaseAccount='[Cosmos DB database name]'
    export logAnalyticsWorkspaceName='[Log Analytics workspace name]'
    export logAnalyticsWorkspaceRegion='[Log Analytics region]'
    # Create a resource group
    az group create --name $resourceGroup --location $resourceLocation
    # Deploy resources
    az deployment group create --resource-group $resourceGroup \
        --template-file deployresources.json --parameters \
        eventHubNamespace=$eventHubNamespace \
        databricksWorkspaceName=$databricksWorkspaceName \
        cosmosDatabaseAccount=$cosmosDatabaseAccount \
        logAnalyticsWorkspaceName=$logAnalyticsWorkspaceName \
  4. The output of the deployment is written to the console once complete. Search the output for the following JSON:

"outputs": {
        "cosmosDb": {
          "type": "Object",
          "value": {
            "hostName": <value>,
            "secret": <value>,
            "username": <value>
        "eventHubs": {
          "type": "Object",
          "value": {
            "taxi-fare-eh": <value>,
            "taxi-ride-eh": <value>
        "logAnalytics": {
          "type": "Object",
          "value": {
            "secret": <value>,
            "workspaceId": <value>

These values are the secrets that will be added to Databricks secrets in upcoming sections. Keep them secure until you add them in those sections.

Add a Cassandra table to the Cosmos DB Account

  1. In the Azure portal, navigate to the resource group created in the deploy the Azure resources section above. Click on Azure Cosmos DB Account. Create a table with the Cassandra API.

  2. In the overview blade, click add table.

  3. When the add table blade opens, enter newyorktaxi in the Keyspace name text box.

  4. In the enter CQL command to create the table section, enter neighborhoodstats in the text box beside newyorktaxi.

  5. In the text box below, enter the following:

(neighborhood text, window_end timestamp, number_of_rides bigint,total_fare_amount double, primary key(neighborhood, window_end))
  1. In the Throughput (1,000 - 1,000,000 RU/s) text box enter the value 4000.

  2. Click OK.

Add the Databricks secrets using the Databricks CLI

Using the Azure Databricks CLI installed in step 2 of the prerequisites, create the Azure Databricks secret scope:

databricks secrets create-scope --scope "azure-databricks-job"

First, enter the secrets for EventHub:

  1. Add the secret for the taxi ride EventHub:

    databricks secrets put --scope "azure-databricks-job" --key "taxi-ride"

    Once executed, this command opens the vi editor. Enter the taxi-ride-eh value from the eventHubs output section in step 4 of the deploy the Azure resources section. Save and exit vi.

  2. Add the secret for the taxi fare EventHub:

    databricks secrets put --scope "azure-databricks-job" --key "taxi-fare"

    Once executed, this command opens the vi editor. Enter the taxi-fare-eh value from the eventHubs output section in step 4 of the deploy the Azure resources section. Save and exit vi.

Next, enter the secrets for Cosmos DB:

  1. Open the Azure portal, and navigate to the resource group specified in step 3 of the deploy the Azure resources section. Click on the Azure Cosmos DB Account.

  2. Using the Azure Databricks CLI, add the secret for the Cosmos DB user name:

    databricks secrets put --scope azure-databricks-job --key "cassandra-username"

Once executed, this command opens the vi editor. Enter the username value from the CosmosDb output section in step 4 of the deploy the Azure resources section. Save and exit vi.

  1. Next, add the secret for the Cosmos DB password:
    databricks secrets put --scope azure-databricks-job --key "cassandra-password"

Once executed, this command opens the vi editor. Enter the secret value from the CosmosDb output section in step 4 of the deploy the Azure resources section. Save and exit vi.


If using an Azure Key Vault-backed secret scope, the scope must be named azure-databricks-job and the secrets must have the exact same names as those above.

Add the Zillow Neighborhoods data file to the Databricks file system

  1. Create a directory in the Databricks file system:

    dbfs mkdirs dbfs:/azure-databricks-jobs
  2. Navigate to the DataFile directory and enter the following:

    dbfs cp ZillowNeighborhoods-NY.zip dbfs:/azure-databricks-jobs

Add the Azure Log Analytics workspace ID and primary key to configuration files

For this section, you require the Log Analytics workspace ID and primary key. The workspace ID is the workspaceId value from the logAnalytics output section in step 4 of the deploy the Azure resources section. The primary key is the secret from the output section.

  1. To configure log4j logging, open \azure\AzureDataBricksJob\src\main\resources\com\microsoft\pnp\azuredatabricksjob\log4j.properties. Edit the following two values:

    log4j.appender.A1.workspaceId=<Log Analytics workspace ID>
    log4j.appender.A1.secret=<Log Analytics primary key>
  2. To configure custom logging, open \azure\azure-databricks-monitoring\scripts\metrics.properties. Edit the following two values:

    *.sink.loganalytics.workspaceId=<Log Analytics workspace ID>
    *.sink.loganalytics.secret=<Log Analytics primary key>

Build the .jar files for the Databricks job and Databricks monitoring

  1. Use your Java IDE to import the Maven project file named pom.xml located in the root directory.

  2. Perform a clean build. The output of this build is files named azure-databricks-job-1.0-SNAPSHOT.jar and azure-databricks-monitoring-0.9.jar.

Configure custom logging for the Databricks job

  1. Copy the azure-databricks-monitoring-0.9.jar file to the Databricks file system by entering the following command in the Databricks CLI:

    databricks fs cp --overwrite azure-databricks-monitoring-0.9.jar dbfs:/azure-databricks-job/azure-databricks-monitoring-0.9.jar
  2. Copy the custom logging properties from \azure\azure-databricks-monitoring\scripts\metrics.properties to the Databricks file system by entering the following command:

    databricks fs cp --overwrite metrics.properties dbfs:/azure-databricks-job/metrics.properties
  3. While you haven't yet decided on a name for your Databricks cluster, select one now. You'll enter the name below in the Databricks file system path for your cluster. Copy the initialization script from \azure\azure-databricks-monitoring\scripts\spark.metrics to the Databricks file system by entering the following command:

    databricks fs cp --overwrite spark-metrics.sh dbfs:/databricks/init/spark-metrics.sh

Create a Databricks cluster

  1. In the Databricks workspace, click "Clusters", then click "create cluster". Enter the cluster name you created in step 3 of the configure custom logging for the Databricks job section above.

  2. Select a standard cluster mode.

  3. Set Databricks runtime version to 6.4 (includes Apache Spark 2.4.5 Scala 2.11)

  4. Set Python version to 3.

  5. Set Driver Type to Same as worker

  6. Set Worker Type to Standard_DS3_v2.

  7. Set Min Workers to 2.

  8. Deselect Enable autoscaling.

  9. Expand Advanced Options and choose the tab Init Scripts.

  10. Enter dbfs:/databricks/init/spark-metrics.sh.

  11. Click the Add button.

  12. Click the Create Cluster button.

Create a Databricks job

  1. In the Databricks workspace, click "Jobs", "create job".

  2. Enter a job name.

  3. Click "set jar", this opens the "Upload JAR to Run" dialog box.

  4. Drag the azure-databricks-job-1.0-SNAPSHOT.jar file you created in the build the .jar for the Databricks job section to the Drop JAR here to upload box.

  5. Enter com.microsoft.pnp.TaxiCabReader in the Main Class field.

  6. In the arguments field, enter the following:

    -n jar:file:/dbfs/azure-databricks-jobs/ZillowNeighborhoods-NY.zip!/ZillowNeighborhoods-NY.shp --taxi-ride-consumer-group taxi-ride-eh-cg --taxi-fare-consumer-group taxi-fare-eh-cg --window-interval "1 minute" --cassandra-host <Cosmos DB Cassandra host name from above> 
  7. Install the dependent libraries by following these steps:

    1. In the Databricks user interface, click on the home button.

    2. In the Users drop-down, click on your user account name to open your account workspace settings.

    3. Click on the drop-down arrow beside your account name, click on create, and click on Library to open the New Library dialog.

    4. Under the Library Source, select Maven.

    5. Under the Coordinates heading, enter com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.5 in the text box.

    6. Click on Create to open the Artifacts window.

    7. Under Status on running clusters check the Attach automatically to all clusters checkbox.

    8. Repeat steps 1 - 7 for the com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.0.0 Maven coordinate.

    9. Repeat steps 1 - 7 for the com.datastax.spark:spark-cassandra-connector_2.11:2.3.2 Maven coordinate.

    10. Repeat steps 1 - 6 for the org.geotools:gt-shapefile:23.0 Maven coordinate.

    11. Click on Advanced Options.

    12. Enter https://repo.osgeo.org/repository/release/ in the Repository text box.

    13. Click Create to open the Artifacts window.

    14. Under Status on running clusters check the Attach automatically to all clusters checkbox.

  8. Add the dependent libraries added in step 7 to the job created at the end of step 6:

    1. In the Azure Databricks workspace, click on Jobs.

    2. Click on the job name created in step 2 of the create a Databricks job section.

    3. Beside the Dependent Libraries section, click on Add to open the Add Dependent Library dialog.

    4. Under Library From select Workspace.

    5. Click on users, then your username, then click on azure-eventhubs-spark_2.11:2.3.15.

    6. Click OK.

    7. Repeat steps 1 - 6 for spark-cassandra-connector_2.11:2.3.2, gt-shapefile:23.0 and azure-cosmos-cassandra-spark-helper:1.0.0.

  9. Beside Cluster:, click on Edit. This opens the Configure Cluster dialog. In the Cluster Type drop-down, select Existing Cluster. In the Select Cluster drop-down, select the cluster created the create a Databricks cluster section. Click confirm.

  10. Click run now.

Run the data generator

  1. Navigate to the directory named onprem in the GitHub repository.

  2. Update the values in the file main.env as follows:

    RIDE_EVENT_HUB=[Connection string for the taxi-ride event hub]
    FARE_EVENT_HUB=[Connection string for the taxi-fare event hub]

    The connection string for the taxi-ride event hub is the taxi-ride-eh value from the eventHubs output section in step 4 of the deploy the Azure resources section. The connection string for the taxi-fare event hub the taxi-fare-eh value from the eventHubs output section in step 4 of the deploy the Azure resources section.

  3. Run the following command to build the Docker image.

    docker build --no-cache -t dataloader .
  4. Navigate back to the parent directory.

    cd ..
  5. Run the following command to run the Docker image.

    docker run -v `pwd`/DataFile:/DataFile --env-file=onprem/main.env dataloader:latest

The output should look like the following:

Created 10000 records for TaxiFare
Created 10000 records for TaxiRide
Created 20000 records for TaxiFare
Created 20000 records for TaxiRide
Created 30000 records for TaxiFare

To verify the Databricks job is running correctly, open the Azure portal and navigate to the Cosmos DB database. Open the Data Explorer blade and examine the data in the taxi records table.

[1] Donovan, Brian; Work, Dan (2016): New York City Taxi Trip Data (2010-2013). University of Illinois at Urbana-Champaign. https://doi.org/10.13012/J8PN93H8