Airflow plugin with AWS operators
Copy aws_operators directory to plugins directory in airflow (default AIRFLOW_HOME/plugins/).
List of operators by AWS service:
Operator responsible for triggering AWS Lambda function.
Example:
ExecuteLambdaOperator(
task_id='task_with_execute_lambda_operator',
airflow_context_to_lambda_payload=lambda c: {"date": c["execution_date"].strftime('%Y-%m-%d') },
additional_payload={"param1": "value1", "param2": 21},
lambda_function_name="LambdaFunctionName"
)
Above task executes AWS Lambda function LambdaFunctionName
with payload:
{
"date": "2018-08-01",
"param1": "value1",
"param2": 21
}
where date
is equal to execution_date
of airflow dag. This is extracted by airflow_context_to_lambda_payload
function from airflow context dictionary.
Execute Redshift query.
Example:
DROP Redshift table:
ExecuteRedshiftQueryOperator(
task_id='drop_table',
redshift_conn_id='redshift_dev',
query='DROP TABLE IF EXISTS TEST_TABLE'
)
Query can be constructed based on Airflow context, especially execution date.
Example:
ExecuteRedshiftQueryOperator(
task_id='delete_from_table',
redshift_conn_id='redshift_dev',
query=lambda c: "DELETE FROM TABLE TEST_TABLE WHERE MONTH = '{y}-{m}'".format(y=c["execution_date"].year, m=c["execution_date"].strftime("%m"))
)
Execute Redshift COPY command.
Example 1 - append data:
ExecuteCopyToRedshiftOperator(
task_id='redshift_copy_append',
redshift_conn_id='redshift_dev',
s3_bucket='bucket',
s3_key='key',
redshift_schema='public',
table='table',
iam_role='iam_role',
mode='append'
)
Example 2 - overwrite table:
ExecuteCopyToRedshiftOperator(
task_id='redshift_copy_overwrite',
redshift_conn_id='redshift_dev',
s3_bucket='bucket',
s3_key='key',
redshift_schema='public',
table='table',
iam_role='iam_role',
mode='overwrite',
copy_params=['CSV']
)
Execute Redshift UNLOAD command.
ExecuteUnloadFromRedshiftOperator(
task_id='redshift_unload',
redshift_conn_id='redshift_dev',
select_statement='SELECT * FROM TABLE',
s3_bucket='bucket',
s3_key='key',
iam_role='iam_role',
unload_params=["DELIMITER AS ';'", "GZIP"]
)
Source S3 key can be constructed using custom Python function based on airflow context.
Example:
ExecuteCopyToRedshiftOperator(
s3_key=lambda context: "year={y}/month={m}/day={d}/".format(y=context["execution_date"].year, m=context["execution_date"].strftime("%m"), d=context["execution_date"].strftime("%d"))
)