/databricks-api-workflow

Workflow client for running jobs with the Databricks REST API

Primary LanguagePython

ApiWorkflowClient

The ApiWorkflowClient is a semi-opinionated Python wrapper around the Databricks REST API to execute job runs in a synchronous polling manner.

Overview

  • Intended for basic workflow tasks.
  • Launches a run and waits until it is finished (TERMINATED state) by polling the jobs/runs/get REST endpoint.
  • Supports both the jobs/runs/submit and jobs/run-now endpoints.
  • Only supports jar tasks but easy to extend to other tasks.
  • ApiWorkflowClient - main workhorse class.
  • Requires Python 3.
  • Make sure to run python with the -u option for unbuffered output so print statements are displayed in real-time.

jobs/runs/submit

Files

Synopsis

  • Databricks documentation - jobs/runs/submit.
  • Only supports the spark_jar_task task.
  • Inserts command-line arguments into the JSON payload as spark_jar_task.parameters attribute.
  • Launches a run based upon the JSON spec file .
  • Prints run ID.
  • Polls until cluster is created.
  • Prints cluster ID.
  • Polls until run is finished.
  • Prints run status and log directory.

Run

python -u run_submit.py \
  --url https://acme.cloud.databricks.com/api/2.0 \
  --token MY_TOKEN --json_file runs_submit.json --sleep_seconds 3 \
  param1 param2
2019-12-10 14:52:46 New run_id: 2392940
2019-12-10 14:52:46 Start waiting for 'cluster_is_created_for_run'.
2019-12-10 14:52:48 Waiting for 'cluster_is_created_for_run'. run 2392940.
2019-12-10 14:52:51 Done waiting for 'cluster_is_created_for_run'. Cluster 1210-195246-joyed285 has been created for run 2392940.
2019-12-10 14:52:51 Processing time: 4.68 seconds
2019-12-10 14:52:51 cluster_id: 1210-195246-joyed285
2019-12-10 14:52:51 Start waiting for 'until_run_is_done'. Run 2392940.
2019-12-10 14:52:53 Waiting for 'until_run_is_done'. Run 2392940 is in PENDING life_cycle_state.
2019-12-10 14:52:56 Waiting for 'until_run_is_done'. Run 2392940 is in PENDING life_cycle_state.
. . .
2019-12-10 14:53:55 Waiting for 'until_run_is_done'. Run 2392940 is in RUNNING life_cycle_state.
2019-12-10 14:53:58 Done waiting for 'until_run_is_done'. Run 2392940 is in TERMINATED life_cycle_state.
2019-12-10 14:53:58 Processing time: 66.83 seconds
2019-12-10 14:53:58 log_dir: dbfs:/andre/logs/jobs/run_submit/1210-195246-joyed285

jobs/run-now

Files

Synopsis

  • Databricks documentation - jobs/run-now.
  • You must first create the job with create_job.json or through the UI.
  • Only supports the jar_params task.
  • Inserts command-line arguments into the JSON payload as jar_params attribute.
  • Launches a run for the specified job_id.
  • Prints run ID.
  • Polls until run is finished.
  • Prints run status and log directory

Run

python -u run_job.py \
  --url https://acme.cloud.databricks.com/api/2.0 \
  --token MY_TOKEN --job_id 1812 --sleep_seconds 3 \
  param1 param2

2019-01-03 00:35:58 url: https://demo.cloud.databricks.com/api/2.0
2019-01-03 00:35:58 job_id: 11926
2019-01-03 00:36:00 New run_id: 2374690
2019-01-03 00:36:00 Start waiting for 'until_run_is_done'. Run 2374690.
2019-01-03 00:36:03 Waiting for 'until_run_is_done'. Run 2374690 is in PENDING life_cycle_state.
2019-01-03 00:36:05 Waiting for 'until_run_is_done'. Run 2374690 is in PENDING life_cycle_state.
. . .
2019-01-03 00:37:48 Waiting for 'until_run_is_done'. Run 2374690 is in RUNNING life_cycle_state.
2019-01-03 00:37:51 Waiting for 'until_run_is_done'. Run 2374690 is in TERMINATING life_cycle_state.
2019-01-03 00:37:55 Done waiting for 'until_run_is_done'. Run 2374690 is in TERMINATED life_cycle_state.
2019-01-03 00:37:55 Processing time: 115.29 seconds
2019-01-03 00:37:56 cluster_id: 0103-003601-ales185
2019-01-03 00:37:56 job_status: SUCCESS
2019-01-03 00:37:56 log_dir: dbfs:/andre/logs/jobs/workflowApiClient_run_now/0103-003601-ales185

Sample JAR

A simple jar is provided for testing. Build the jar and push it to dbfs. See sample_jar and HelloFelidae.scala.

Build with sbt

cd sample_jar

sbt package

databricks fs cp \
  target/scala-2.11/amm-hellofelidae_2.11-0.1-SNAPSHOT.jar \
  dbfs:/andre/jars --overwrite

To test the jar locally.

spark-submit --class org.andre.HelloFelidae --master local[2] \
  target/scala-2.11/amm-hellofelidae_2.11-0.1-SNAPSHOT.jar \
  tiger

+---+--------------+
| id|          name|
+---+--------------+
|200|Sumatran tiger|
|201|    Amur tiger|
|202|  Bengal tiger|
+---+--------------+

Build with maven

cd sample_jar

mvn clean package

databricks fs cp \
  target/amm-HelloFelidae-1.0-SNAPSHOT.jar \
  dbfs:/andre/jars --overwrite

To test the jar locally.

spark-submit --class org.andre.HelloFelidae --master local[2] \
  target/amm-HelloFelidae-1.0-SNAPSHOT.jar \
  tiger

+---+--------------+
| id|          name|
+---+--------------+
|200|Sumatran tiger|
|201|    Amur tiger|
|202|  Bengal tiger|
+---+--------------+