/Airflow-on-Fargate

Primary LanguageTypeScriptMIT No AttributionMIT-0

Using AWS Fargate(ECS) to host Apache Airflow

This repository contains a sample setup for hosting Apache Airflow on AWS ECS using Fargate.
This setup uses AWS Cloud Development Kit to automate resource creation.

Table of Contents

  1. What is AWS Fargate
  2. What is Airflow
  3. How to use this?
  4. Sample DAG Explanation
  5. Configuration Options
  6. Understanding Code Structure
  7. Some Useful Resources

What is AWS Fargate

AWS Fargate is a serverless compute engine for containers that works with both Amazon Elastic Container Service (ECS) and Amazon Elastic Kubernetes Service (EKS). Fargate makes it easy for you to focus on building your applications. Fargate removes the need to provision and manage servers, lets you specify and pay for resources per application, and improves security through application isolation by design.

Source

What is Airflow

Airflow is a platform created by the community to programmatically author, schedule and monitor workflows.

More info about Airflow can be found here

How to use this?

This setup is based on AWS CDK. So, install CDK first.

Prerequisites

  1. Node.js => 12.x or later
  2. AWS CLI => Installation Guide
  3. Docker
$ npm install -g aws-cdk

Once Node.js and CDK are installed, pull this repository and run following commands:

$ npm install    // Installs all necessary dependencies
$ npm run build  // Build this package and generates Cloudformation Stack with resources
$ cdk deploy     // Deploys the CloudFormation template to AWS account configured for your AWS CLI

This will output LoadBalancerDNSName on the terminal, which can be used to access Airflow Webserver UI. Along with LoadBalancerDNSName, you will also get password for Admin account, to login into Airflow UI. You can change this password from Profile section after login. For details about Admin config, check airflow/config/webserver_entry.sh

If you want to delete this stack, run following command:

$ cdk destroy

Sample DAG Explanation

This stack creates a worflow/DAG, which has 5 tasks

start_process >> [odd_task, even_task] >> numbers_task >> on_worker_task

This DAG showcases how to create parallel tasks and how to use ECSOperator, which spins-up OnDemand Fargate instances for running a task.

Note: Each sub-folder under tasks will result in a new Fargate TaskDefinition. These task definition will be used as part of ECSOperator

start_process: It's a dummy task, which will run on default worker of Airflow odd_task: This task will execute odd_numbers.py file, which is located under tasks/multi_task. This will be executed on an OnDemand Fargate task even_task: This task will execute even_numbers.py file, which is located under tasks/multi_task. This will be executed on an OnDemand Fargate task numbers_task: This task will execute numbers_numbers.py file, which is located under tasks/number_task. This will be executed on an OnDemand Fargate instance on_worker_task: This task will be executed on the default worker. It showcases how to use PythonOperator to run a task on Airflow worker

Configuration Options

Once deployed this setup will create following AWS resources with some necessary dependencies:

  • 1 VPC
  • 1 Postgres DB on AWS RDS
  • 1 ECS Cluster
  • 3 Fargate Task Definitions
  • 1 Fargate ECS Service with 3 task instances with one container each for Airflow Webserver, Scheduler and Worker
  • 1 EC2 NetworkLoadBalancer
  • 1 SecurityGroup: this will be used to restrict access to all of the above resources to VPC. Only webserver can be accessed from outside, using load balancer DNS name

You can find default config in config.ts file.

Default Postgres RDS config

export const defaultDBConfig: DBConfig = {
  dbName: "farflow",  // DB cluster and instance Name
  port: 5432,  // Port on which db instance runs
  masterUsername: "airflow",  // Username for master-user. Password will be autogenerated and stored in ParameterStore
  instanceType: InstanceType.of(InstanceClass.T2, InstanceSize.SMALL), // Using T2.small for this setup. Upgrade as per your requirements
  allocatedStorageInGB: 25, // 25GB of storeage will be allocated
  backupRetentionInDays: 30 // Backup will be deleted after 30 days
};

Fargate Config

export const defaultWebserverConfig: ContainerConfig = {
  name: "WebserverContainer",
  containerPort: 8080,
  entryPoint: "/webserver_entry.sh"
}

export const defaultSchedulerConfig: ContainerConfig = {
  name: "SchedulerContainer",
  containerPort: 8081,
  entryPoint: "/scheduler_entry.sh"
}

export const defaultWorkerConfig: ContainerConfig = {
  name: "WorkerContainer",
  containerPort: 8082,
  entryPoint: "/worker_entry.sh"
}

export const airflowTaskConfig: AirflowTaskConfig = {
  cpu: 2048,
  memoryLimitMiB: 4096,
  webserverConfig: defaultWebserverConfig,
  schedulerConfig: defaultSchedulerConfig,
  workerConfig: defaultWorkerConfig,
  logRetention: RetentionDays.ONE_MONTH,
  // Uncomment this to have dedicated worker pool that can be auto-scaled as per workerAutoScalingConfig
  // createWorkerPool: true  
};

Adjust configuration in this file, as per your requirements. If you need a dedicated Worker pool for resource intense operations that need to available all the time, set createWorkerPool: true under airflowTaskConfig. This will create a separate ECS Service which holds Task/Container which holds Airflow Worker. This is configured with auto-scaling to add more task instances, depending on load. Auto-scaling config can be set as follows:

export const workerAutoScalingConfig: AutoScalingConfig = {
  minTaskCount: 1,
  maxTaskCount: 5,
  cpuUsagePercent: 70
};

AWS Account and Region Setting

This setup uses default account and region that were used in AWS CLI configuration. In order to install it in different/multiple region or account, follow this guide

Understanding Code Structure

Let's understand the code structure and what each file does. Hope this helps to change things as required

📦FarFlow
 ┣ 📂airflow                    => Top-level directory that holds Airflow related config
 ┃ ┣ 📂config
 ┃ ┃ ┣ 📜scheduler_entry.sh     => Entrypoint for Scheduler Container.
 ┃ ┃ ┣ 📜webserver_entry.sh     => Entrypoint for Webserver Container. This also initializes backedn database
 ┃ ┃ ┗ 📜worker_entry.sh        => Entrypoint for Worker Container.
 ┃ ┣ 📂dags                     => Holds all the DAGs for this Airflow instance. Add more DAGs here.
 ┃ ┃ ┗ 📜dag.py                 => Sample DAG
 ┃ ┗ 📜Dockerfile               => Dockerfile for Airflow Image, with some dependencies
 ┣ 📂app                        => Base folder for CDK application
 ┃ ┣ 📂constructs               => Holds helper files for CDK setup
 ┃ ┃ ┣ 📜airflow-construct.ts   => Creates Fargate Service holding Airflow
 ┃ ┃ ┣ 📜dag-tasks.ts           => Creates fargate tasks containing modules invoked from DAG using ECSOperator
 ┃ ┃ ┣ 📜rds.ts                 => Creates RDS Postgres instance
 ┃ ┃ ┣ 📜service-construct.ts   => Top level Fargate service helper
 ┃ ┃ ┗ 📜task-construct.ts      => Helper for Dag-tasks Construct
 ┃ ┣ 📜config.ts                => Configuration for entire CDK application
 ┃ ┣ 📜farflow.ts               => Starting point for this CDK application
 ┃ ┗ 📜policies.ts              => Configure policies that will be attached to Airflow instances
 ┣ 📂tasks                      => Sample tasks that will be invoked from DAG using ECSOperator
 ┃ ┣ 📂multi_task               => example task 1
 ┃ ┃ ┣ 📜Dockerfile             => config for container holding this task
 ┃ ┃ ┣ 📜even_numbers.py        => module-1 in this container
 ┃ ┃ ┗ 📜odd_numbers.py         => module-2 in this container
 ┃ ┗ 📂number_task              => example task 2
 ┃ ┃ ┣ 📜Dockerfile             => config for container holding this task
 ┃ ┃ ┗ 📜numbers.py             => only module for this container
 ┣ 📜README.md                  => YOU ARE READIN IT
 ┣ 📜cdk.json                   => CDK config
 ┣ 📜package-lock.json          => npm package info (auto-generated)
 ┣ 📜package.json               => npm dependencies
 ┗ 📜tsconfig.json              => Typescript config

Code-tree generated using this plugin

Some Useful Resources