Airflow is a platform to programmatically author, schedule and monitor workflows.
When workflows are defined as code, they become more maintainable, versionable, testable, and collaborative.
Use Airflow to author workflows as directed acyclic graphs (DAGs) of tasks. The Airflow scheduler executes your tasks on an array of workers while following the specified dependencies. Rich command line utilities make performing complex surgeries on DAGs a snap. The rich user interface makes it easy to visualize pipelines running in production, monitor progress, and troubleshoot issues when needed.
This app is built and deployed with docker and docker-compose.
Most Airflow jobs are thin wrappers that spin up an EMR cluster for running
the job. Be aware that the configuration of the created EMR clusters depends
on finding scripts in an S3 location configured by the SPARK_BUCKET
variable.
Those scripts are maintained in
emr-bootstrap-spark
and are deployed independently of this repository.
Changes in behavior of Airflow jobs not explained by changes in the source of the
Spark jobs or by changes in this repository
could be due to changes in the bootstrap scripts.
An Airflow container can be built with
make build
For now, DAGs that use the Databricks operator won't parse until the following environment variables are set (see issue #501):
AWS_SECRET_ACCESS_KEY
AWS_ACCESS_KEY_ID
DB_TOKEN
Airflow database migration is no longer a separate step for dev but is run by the web container if necessary on first run. That means, however, that you should run the web container (and the database container, of course) and wait for the database migrations to complete before running individual test commands per below. The easiest way to do this is to run make up
and let it run until the migrations complete.
A single task, e.g. spark
, of an Airflow dag, e.g. example
, can be run with an execution date, e.g. 2018-01-01
, in the dev
environment with:
export DEV_USERNAME=...
export AWS_SECRET_ACCESS_KEY=...
export AWS_ACCESS_KEY_ID=...
make run COMMAND="test example spark 20180101"
The DEV_USERNAME
is a short string used to identify your EMR instances.
This should be set to something like your IRC or Slack handle.
The container will run the desired task to completion (or failure). Note that if the container is stopped during the execution of a task, the task will be aborted. In the example's case, the Spark job will be terminated.
The logs of the task can be inspected in real-time with:
docker logs -f telemetryairflow_scheduler_1
You can see task logs and see cluster status on the EMR console
By default, the results will end up in the telemetry-test-bucket
in S3.
If your desired task depends on other views, it will expect to be able to find those results
in telemetry-test-bucket
too. It's your responsibility to run the tasks in correct
order of their dependencies.
CAVEAT: When running the make run
multiple times it can spin
up multiple versions of the web
container. It can also fail if you've never
run make up
to initialize the database. An alternative form of the above is to
launch the containers and shell into the web
container to run the airflow test
command.
In one terminal launch the docker containers:
make up
Note: initializing the web container will run the airflow initdb/upgradedb
In another terminal shell into the web
container, making sure to also supply
the environment variables, then run the airflow test
command:
export DEV_USERNAME=...
export AWS_ACCESS_KEY_ID=...
export AWS_SECRET_ACCESS_KEY=...
docker exec -ti -e DEV_USERNAME -e AWS_SECRET_ACCESS_KEY -e AWS_ACCESS_KEY_ID telemetry-airflow_web_1 /bin/bash
airflow test example spark 20180101
Tasks often require credentials to access external credentials. For example, one may choose to store API keys in an Airflow connection or variable. These variables are sure to exist in production but are often not mirrored locally for logistical reasons. Providing a dummy variable is the preferred way to keep the local development environment up to date.
In bin/run
, please update the init_connections
and init_variables
with appropriate strings to
prevent broken workflows. To test this, run bin/test-parse
to check for errors. You may manually
test this by restarting the orchestrated containers and checking for error messages within the main
administration UI at localhost:8000
.
Assuming you're using macOS and Docker for macOS, start the docker service, click the docker icon in the menu bar, click on preferences and change the available memory to 4GB.
To deploy the Airflow container on the docker engine, with its required dependencies, run:
make up
You can now connect to your local Airflow web console at
http://localhost:8000/
.
All DAGs are paused by default for local instances and our staging instance of Airflow. In order to submit a DAG via the UI, you'll need to toggle the DAG from "Off" to "On". You'll likely want to toggle the DAG back to "Off" as soon as your desired task starts running.
Users on Linux distributions will encounter permission issues with docker-compose
.
This is because the local application folder is mounted as a volume into the running container.
The Airflow user and group in the container is set to 10001
.
To work around this, replace all instances of 10001
in Dockerfile.dev
with the host user id.
sed -i "s/10001/$(id -u)/g" Dockerfile.dev
To run a job running on Databricks, run make up
in the background. Follow
this guide on generating a
token
and save this to a secure location. Export the token to a an environment
variable:
export DB_TOKEN=<TOKEN>
Finally, run the testing command using docker-compose directly:
docker-compose exec web airflow test example spark 20180101
Note: This only works for telemetry-batch-view
jobs
A dev changes can be run by simply changing the DEPLOY_TAG
environment variable
to whichever upstream branch you've pushed your local changes to.
Afterwards, you're going to need to:make clean
and make build
and nohup make up &
From there, you can either set the AWS_ACCESS_KEY_ID
and AWS_SECRET_ACCESS_KEY
in the
Dockerfile and run make up
to get a local UI and run from there, or you can follow the
testing instructions above and use make run
.
For now, follow the steps outlined here to create a service account: https://bugzilla.mozilla.org/show_bug.cgi?id=1553559#c1.
Enable that service account in Airflow with the following:
make build && make up
./bin/add_gcp_creds $GOOGLE_APPLICATION_CREDENTIALS
From there, connect to Airflow and enable your job.
Dataproc jobs run on a self-contained Dataproc cluster, created by Airflow.
To test these, jobs, you'll need a sandbox account and corresponding service account. For information on creating that, see "Testing GKE Jobs". Your service account will need Dataproc and GCS permissions (and BigQuery, if you're connecting to it). Note: Dataproc requires "Dataproc/Dataproc Worker" as well as Compute Admin permissions. You'll need to ensure that the Dataproc API is enabled in your sandbox project.
Ensure that your dataproc job has a configurable project to write to.
Set the project in the DAG entry to be configured based on development environment;
see the ltv.py
job for an example of that.
From there, run the following:
make build && make up
./bin/add_gcp_creds $GOOGLE_APPLICATION_CREDENTIALS google_cloud_airflow_dataproc
You can then connect to Airflow locally. Enable your DAG and see that it runs correctly.
Note: the canonical reference for production environment variables lives in a private repository.
When deploying to production make sure to set up the following environment variables:
AWS_ACCESS_KEY_ID
-- The AWS access key ID to spin up the Spark clustersAWS_SECRET_ACCESS_KEY
-- The AWS secret access keySPARK_BUCKET
-- The AWS S3 bucket where Spark related files are stored, e.g.telemetry-spark-emr-2
AIRFLOW_BUCKET
-- The AWS S3 bucket where airflow specific files are stored, e.g.telemetry-airflow
PUBLIC_OUTPUT_BUCKET
-- The AWS S3 bucket where public job results are stored in, e.g.telemetry-public-analysis-2
PRIVATE_OUTPUT_BUCKET
-- The AWS S3 bucket where private job results are stored in, e.g.telemetry-parquet
AIRFLOW_DATABASE_URL
-- The connection URI for the Airflow database, e.g.mysql://username:password@hostname:port/database
AIRFLOW_BROKER_URL
-- The connection URI for the Airflow worker queue, e.g.redis://hostname:6379/0
AIRFLOW_BROKER_URL
-- The connection URI for the Airflow result backend, e.g.redis://hostname:6379/1
AIRFLOW_GOOGLE_CLIENT_ID
-- The Google Auth client id used for authentication.AIRFLOW_GOOGLE_CLIENT_SECRET
-- The Google Auth client secret used for authentication.AIRFLOW_GOOGLE_APPS_DOMAIN
-- The domain(s) to restrict Google Auth login to e.g.mozilla.com
AIRFLOW_SMTP_HOST
-- The SMTP server to use to send emails e.g.email-smtp.us-west-2.amazonaws.com
AIRFLOW_SMTP_USER
-- The SMTP user nameAIRFLOW_SMTP_PASSWORD
-- The SMTP passwordAIRFLOW_SMTP_FROM
-- The email address to send emails from e.g.telemetry-alerts@workflow.telemetry.mozilla.org
URL
-- The base URL of the website e.g.https://workflow.telemetry.mozilla.org
DEPLOY_ENVIRONMENT
-- The environment currently running, e.g.stage
orprod
DEPLOY_TAG
-- The tag or branch to retrieve the JAR from, e.g.master
ortags
. You can specify the tag or travis build exactly as well, e.g.master/42.1
ortags/v2.2.1
. Not specifying the exact tag or build will use the latest from that branch, or the latest tag.ARTIFACTS_BUCKET
-- The s3 bucket where the build artifacts can be found, e.g.net-mozaws-data-us-west-2-ops-ci-artifacts
Also, please set
AIRFLOW_SECRET_KEY
-- A secret key for Airflow's Flask based webserverAIRFLOW__CORE__FERNET_KEY
-- A secret key to saving connection passwords in the DB
Both values should be set by using the cryptography module's fernet tool that we've wrapped in a docker-compose call:
make secret
Run this for each key config variable, and don't use the same for both!
Some useful docker tricks for development and debugging:
# Stop all docker containers:
docker stop $(docker ps -aq)
# Remove any leftover docker volumes:
docker volume rm $(docker volume ls -qf dangling=true)
# Purge docker volumes (helps with mysql container failing to start)
# Careful as this will purge all local volumes not used by at least one container.
docker volume prune
Failing CircleCI 'test-environment' check:
# These commands are from the bin/test-parse script (get_errors_in_listing)
# If --detach is unavailable, make sure you are running the latest version of docker-compose
docker-compose up --detach
docker-compose logs --follow --tail 0 | sed -n '/\[testing_stage_0\]/q'
# Don't pipe to grep to see the full output including your errors
docker-compose exec web airflow list_dags
- Check if the task / run you want to re-run is visible in the DAG's Tree View UI
- For example, the
main_summary
DAG tree view. - Hover over the little squares to find the scheduled dag run you're looking for.
- For example, the
- If the dag run is not showing in the Dag Tree View UI (maybe deleted)
- Browse -> Dag Runs
- Create (you can look at another dag run of the same dag for example values too)
- Dag Id: the name of the dag, for example,
main_summary
- Execution Date: The date the dag should have run, for example,
2018-05-14 00:00:00
- Start Date: Some date between the execution date and "now", for example,
2018-05-20 00:00:05
- End Date: Leave it blank
- State: success
- Run Id:
scheduled__2018-05-14T00:00:00
- External Trigger: unchecked
- Dag Id: the name of the dag, for example,
- Click Save
- Click on the Graph view for the dag in question. From the main DAGs view, click the name of the DAG
- Select the "Run Id" you just entered from the drop-down list
- Click "Go"
- Click each element of the DAG and "Mark Success"
- The tasks should now show in the Tree View UI
- If the dag run is showing in the DAG's Tree View UI
- Click on the small square for the task you want to re-run
- Uncheck the "Downstream" toggle
- Click the "Clear" button
- Confirm that you want to clear it
- The task should be scheduled to run again straight away.
- SSH into the ECS container instance
- List docker containers using
docker ps
- Log in to one of the docker containers using
docker exec -it <container_id> bash
. The web server instance is a good choice. - Run the desired backfill command, something like
$ airflow backfill main_summary -s 2018-05-20 -e 2018-05-26
- Commits to forked repo PRs will trigger CircleCI builds that build the docker container and test python dag compilation. This should pass prior to merging.
- Every commit to master or tag will trigger a CircleCI build that will build and push the container to dockerhub