/oozie-to-airflow

Oozie Workflow to Airflow DAGs migration tool

Primary LanguagePythonApache License 2.0Apache-2.0

Oozie to Airflow

Build Status codecov Code style: black License Dependabot Status Python 3

A tool to easily convert between Apache Oozie workflows and Apache Airflow workflows.

The program targets Apache Airflow >= 1.10 and Apache Oozie 1.0 XML schema.

If you want to contribute to the project, please take a look at CONTRIBUTING.md

Table of Contents

Background

Apache Airflow is a workflow management system developed by AirBnB in 2014. It is a platform to programmatically author, schedule, and monitor workflows. Airflow workflows are designed as Directed Acyclic Graphs (DAGs) of tasks in Python. The Airflow scheduler executes your tasks on an array of workers while following the specified dependencies.

Apache Oozie is a workflow scheduler system to manage Apache Hadoop jobs. Oozie workflows are also designed as Directed Acyclic Graphs (DAGs) in XML.

There are a few differences noted below:

Spec. Task Dependencies "Subworkflows" Parameterization Notification
Oozie XML Action Node Control Node Subworkflow EL functions/Properties file URL based callbacks
Airflow Python Operators Trigger Rules, set_downstream() SubDag jinja2 and macros Callbacks/Emails

Running the Program

Note that you need Python >= 3.6 to run the converter.

Installing from PyPi

You can install o2a from PyPi via pip install o2a. After installation, the o2a and o2a-validate-workflows should be available on your path.

Installing from the sources

In case you use sources of o2a, the environment can be set up via the virtualenv setup (you can create one using virtualenvwrapper for example.

While in your virtualenv, you can install all the requirements via pip install -r requirements.txt.

You can add the bin subdirectory to your PATH, then all the scripts below can be run without adding the ./bin prefix. This can be done for example by adding a line similar to the one below to your .bash_profile or bin/postactivate from your virtual environment:

export PATH=${PATH}:<INSERT_PATH_TO_YOUR_OOZIE_PROJECT>/bin

Otherwise you need to run all the scripts from the bin subdirectory, for example:

./bin/o2a --help

In all the example commands below, it is assumed that the bin directory is in your PATH - either installed from PyPi or from the sources.

You can also install o2a from local folder using pip install -e .

Running the conversion

You can run the program by calling: o2a -i <INPUT_APPLICATION_FOLDER> -o <OUTPUT_FOLDER_PATH>

Example: o2a -i examples/demo -o output/demo

This is the full usage guide, available by running o2a -h

usage: o2a [-h] -i INPUT_DIRECTORY_PATH -o OUTPUT_DIRECTORY_PATH [-n DAG_NAME]
           [-u USER] [-s START_DAYS_AGO] [-v SCHEDULE_INTERVAL] [-d]

Convert Apache Oozie workflows to Apache Airflow workflows.

optional arguments:
  -h, --help            show this help message and exit
  -i INPUT_DIRECTORY_PATH, --input-directory-path INPUT_DIRECTORY_PATH
                        Path to input directory
  -o OUTPUT_DIRECTORY_PATH, --output-directory-path OUTPUT_DIRECTORY_PATH
                        Desired output directory
  -n DAG_NAME, --dag-name DAG_NAME
                        Desired DAG name [defaults to input directory name]
  -u USER, --user USER  The user to be used in place of all ${user.name}
                        [defaults to user who ran the conversion]
  -s START_DAYS_AGO, --start-days-ago START_DAYS_AGO
                        Desired DAG start as number of days ago
  -v SCHEDULE_INTERVAL, --schedule-interval SCHEDULE_INTERVAL
                        Desired DAG schedule interval as number of days
  -d, --dot             Renders workflow files in DOT format

Structure of the application folder

The input application directory has to follow the structure defined as follows:

<APPLICATION>/
             |- job.properties        - job properties that are used to run the job
             |- hdfs                  - folder with application - should be copied to HDFS
             |     |- workflow.xml    - Oozie workflow xml (1.0 schema)
             |     |- ...             - additional folders required to be copied to HDFS
             |- configuration.template.properties - template of configuration values used during conversion
             |- configuration.properties          - generated properties for configuration values

Supported Oozie features

Control nodes

Fork

A fork node splits the path of execution into multiple concurrent paths of execution.

Join

A join node waits until every concurrent execution of the previous fork node arrives to it. The fork and join nodes must be used in pairs. The join node assumes concurrent execution paths are children of the same fork node.

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
    ...
    <fork name="[FORK-NODE-NAME]">
        <path start="[NODE-NAME]" />
        ...
        <path start="[NODE-NAME]" />
    </fork>
    ...
    <join name="[JOIN-NODE-NAME]" to="[NODE-NAME]" />
    ...
</workflow-app>

Decision

A decision node enables a workflow to make a selection on the execution path to follow.

The behavior of a decision node can be seen as a switch-case statement.

A decision node consists of a list of predicates-transition pairs plus a default transition. Predicates are evaluated in order or appearance until one of them evaluates to true and the corresponding transition is taken. If none of the predicates evaluates to true the default transition is taken.

Predicates are JSP Expression Language (EL) expressions (refer to section 4.2 of this document) that resolve into a boolean value, true or false . For example: ${fs:fileSize('/usr/foo/myinputdir') gt 10 * GB}

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
    ...
    <decision name="[NODE-NAME]">
        <switch>
            <case to="[NODE_NAME]">[PREDICATE]</case>
            ...
            <case to="[NODE_NAME]">[PREDICATE]</case>
            <default to="[NODE_NAME]"/>
        </switch>
    </decision>
    ...
</workflow-app>

Start

The start node is the entry point for a workflow job, it indicates the first workflow node the workflow job must transition to.

When a workflow is started, it automatically transitions to the node specified in the start .

A workflow definition must have one start node.

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
  ...
  <start to="[NODE-NAME]"/>
  ...
</workflow-app>

End

The end node is the end for a workflow job, it indicates that the workflow job has completed successfully.

When a workflow job reaches the end it finishes successfully (SUCCEEDED).

If one or more actions started by the workflow job are executing when the end node is reached, the actions will be killed. In this scenario the workflow job is still considered as successfully run.

A workflow definition must have one end node.

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
    ...
    <end name="[NODE-NAME]"/>
    ...
</workflow-app>

Kill

The kill node allows a workflow job to exit with an error.

When a workflow job reaches the kill it finishes in error (KILLED).

If one or more actions started by the workflow job are executing when the kill node is reached, the actions will be killed.

A workflow definition may have zero or more kill nodes.

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
    ...
    <kill name="[NODE-NAME]">
        <message>[MESSAGE-TO-LOG]</message>
    </kill>
    ...
</workflow-app>

EL Functions

As of now, a very minimal set of Oozie EL functions are supported. The way they work is that there exists a dictionary mapping from each Oozie EL function string to the corresponding Python function. This is in utils/el_utils.py. This design allows for custom EL function mapping if one so chooses. By default everything gets mapped to the module o2a_libs. This means in order to use EL function mapping, the folder o2a_libs should be copied over to the Airflow DAG folder. This should then be picked up and parsed by the Airflow workers and then available to all DAGs.

Examples

All examples can be found in the examples directory.

Demo Example

The demo example contains several action and control nodes. The control nodes are fork, join, decision, start, end, and kill. As far as action nodes go, there are fs, map-reduce, and pig.

Most of these are already supported, but when the program encounters a node it does not know how to parse, it will perform a sort of "skeleton transformation" - it will convert all the unknown nodes to dummy nodes. This will allow users to manually parse the nodes if they so wish as the control flow is there.

The demo can be run as:

o2a -i examples/demo -o output/demo

This will parse and write to an output file in the output/demo directory.

Known limitations

The decision node is not fully functional as there is not currently support for all EL functions. So in order for it to run in Airflow you may need to edit the Python output file and change the decision node expression.

Issue in GitHub: Implement decision node

Output

In this example the output (including sub-workflow dag) will be created in the ./output/ssh/ folder.

Childwf Example

The childwf example is sub-workflow for the demo example. It can be run as:

o2a -i examples/childwf -o output/childwf

Make sure to first copy examples/subwf/configuration.template.properties, rename it as configuration.properties and fill in with configuration data.

Output

In this example the output will be created in the ./output/childwf/ folder.

Known limitations

No known limitations.

SSH Example

The ssh example can be run as:

o2a -i examples/ssh -o output/ssh

This will convert the specified Oozie XML and write the output into the specified output directory, in this case output/ssh/test_ssh_dag.py.

There are some differences between Apache Oozie and Apache Airflow as far as the SSH specification goes. In Airflow you will have to add/edit an SSH-specific connection that contains the credentials required for the specified SSH action. For example, if the SSH node looks like:

<action name="ssh">
    <ssh xmlns="uri:oozie:ssh-action:0.1">
        <host>user@apache.org</host>
        <command>echo</command>
        <args>"Hello Oozie!"</args>
    </ssh>
    <ok to="end"/>
    <error to="fail"/>
</action>

Then the default Airflow SSH connection, ssh_default should have at the very least a password set. This can be found in the Airflow Web UI under Admin > Connections. From the command line it is impossible to edit connections so you must add one like:

airflow connections --add --conn_id <SSH_CONN_ID> --conn_type SSH --conn_password <PASSWORD>

More information can be found in Airflow's documentation.

Output

In this example the output will be created in the ./output/ssh/ folder.

The converted DAG uses the SSHOperator in Airflow.

Known limitations

No known limitations.

MapReduce Example

The MapReduce example can be run as:

o2a -i examples/mapreduce -o output/mapreduce

Make sure to first copy examples/mapreduce/configuration.template.properties, rename it as configuration.properties and fill in with configuration data.

Output

In this example the output will be created in the ./output/mapreduce/ folder.

The converted DAG uses the DataProcHadoopOperator in Airflow.

Known limitations

1. Exit status not available

From the Oozie documentation:

The counters of the Hadoop job and job exit status (FAILED, KILLED or SUCCEEDED) must be available to the workflow job after the Hadoop jobs ends. This information can be used from within decision nodes and other actions configurations.

Currently we use the DataProcHadoopOperator which does not store the job exit status in an XCOM for other tasks to use.

Issue in Github: Implement exit status and counters in MapReduce Action

2. Configuration options

From the Oozie documentation (the strikethrough is from us):

Hadoop JobConf properties can be specified as part of

  • the config-default.xml or
  • JobConf XML file bundled with the workflow application or
  • <global> tag in workflow definition or
  • Inline map-reduce action configuration or
  • An implementation of OozieActionConfigurator specified by the tag in workflow definition.

Currently the only supported way of configuring the map-reduce action is with the inline action configuration, i.e. using the <configuration> tag in the workflow's XML file definition.

Issues in Github:

3. Streaming and pipes

Streaming and pipes are currently not supported.

Issue in github Implement streaming support

FS Example

The FS example can be run as:

o2a -i examples/fs -o output/fs

Make sure to first copy examples/fs/configuration.template.properties, rename it as configuration.properties and fill in with configuration data.

Output

In this example the output will be created in the ./output/fs/ folder.

The converted DAG uses the BashOperator in Airflow.

Known limitations

Not all FS operations are currently idempotent. It's not a problem if prepare action is used in other tasks but might be a problem in certain situations. Fixing the operators to be idempotent requires more complex logic and support for Pig actions is missing currently.

Issue in Github: FS Mapper and idempotence

Java Example

The Java example can be run as:

o2a -i examples/java -o output/java

Make sure to first copy examples/fs/configuration.template.properties, rename it as configuration.properties and fill in with configuration data.

Output

In this example the output will be created in the ./output/java/ folder.

The converted DAG uses the DataProcHadoopOperator in Airflow.

Known limitations

  1. Overriding action's Main class via oozie.launcher.action.main.class is not implemented.

Issue in Github: Override Java main class with property

Pig Example

The Pig example can be run as:

o2a -i examples/pig -o output/pig

Make sure to first copy examples/pig/configuration.template.properties, rename it as configuration.properties and fill in with configuration data.

Output

In this example the output will be created in the ./output/pig/ folder.

The converted DAG uses the DataProcPigOperator in Airflow.

Known limitations

1. Configuration options

From the Oozie documentation (the strikethrough is from us):

Hadoop JobConf properties can be specified as part of

  • the config-default.xml or
  • JobConf XML file bundled with the workflow application or
  • <global> tag in workflow definition or
  • Inline pig action configuration.

Currently the only supported way of configuring the pig action is with the inline action configuration, i.e. using the <configuration> tag in the workflow's XML file definition.

Issues in Github:

Shell Example

The Shell example can be run as:

o2a -i examples/shell -o output/shell

Make sure to first copy examples/shell/configuration.template.properties, rename it as configuration.properties and fill in with configuration data.

Output

In this example the output will be created in the ./output/shell/ folder.

The converted DAG uses the BashOperator in Airflow, which executes the desired shell action with Pig by invoking gcloud dataproc jobs submit pig --cluster=<cluster> --region=<region> --execute 'sh <action> <args>'.

Known limitations

1. Exit status not available

From the Oozie documentation:

The output (STDOUT) of the Shell job can be made available to the workflow job after the Shell job ends. This information could be used from within decision nodes.

Currently we use the BashOperator which can store only the last line of the job output in an XCOM. In this case the line is not helpful as it relates to the Dataproc job submission status and not the Shell action's result.

Issue in Github: Finalize shell mapper

2. No Shell launcher configuration

From the Oozie documentation:

Shell launcher configuration can be specified with a file, using the job-xml element, and inline, using the configuration elements.

Currently there is no way specify the shell launcher configuration (it is ignored).

Issue in Github: Shell Launcher Configuration

Spark Example

The Spark example can be run as:

o2a -i examples/spark -o output/spark

Make sure to first copy /examples/spark/configuration.template.properties, rename it as configuration.properties and fill in with configuration data.

Output

In this example the output will be created in the ./output/spark/ folder.

The converted DAG uses the DataProcSparkOperator in Airflow.

Known limitations

1. Only tasks written in Java are supported

From the Oozie documentation:

The jar element indicates a comma separated list of jars or python files.

The solution was tested with only a single Jar file.

2. No Spark launcher configuration

From the Oozie documentation:

Shell launcher configuration can be specified with a file, using the job-xml element, and inline, using the configuration elements.

Currently there is no way to specify the Spark launcher configuration (it is ignored).

3. Not all elements are supported

The following elements are not supported: job-tracker, name-node, master, mode.

Sub-workflow Example

The Sub-workflow example can be run as:

o2a -i examples/subwf -o output/subwf

Make sure to first copy examples/subwf/configuration.template.properties, rename it as configuration.properties and fill in with configuration data.

Output

In this example the output (together with sub-worfklow dag) will be created in the ./output/subwf/ folder.

The converted DAG uses the SubDagOperator in Airflow.

Known limitations

No known limitations.

DistCp Example

The DistCp example can be run as:

o2a -i examples/distcp -o output/distcp

Make sure to first copy examples/distcp/configuration.template.properties, rename it as configuration.properties and fill in with configuration data.

Output

In this example the output will be created in the ./output/distcp/ folder.

The converted DAG uses the BashOperator in Airflow, which submits the Hadoop DistCp job using the gcloud dataproc jobs submit hadoop command.

Known limitations

The system test of the example run with Oozie fails due to unknown reasons. The converted DAG run by Airflow completes successfully.

Decision Example

The decision example can be run as:

o2a -i examples/decision -o output/decision

Make sure to first copy examples/decision/configuration.template.properties, rename it as configuration.properties and fill in with configuration data.

Output

In this example the output will be created in the ./output/decision/ folder.

The converted DAG uses the BranchPythonOperator in Airflow.

Known limitations

Decision example is not yet fully functional as EL functions are not yet fully implemented so condition is hard-coded for now. Once EL functions are implemented, the condition in the example will be updated.

Github issue: Implement decision node

EL Example

The Oozie Expression Language (EL) example can be run as: o2a -i examples/el -o output/el

This will showcase the ability to use the o2a_libs directory to map EL functions to Python methods. This example assumes that the user has a valid Apache Airflow SSH connection set up and the o2a_libs directory has been copied to the dags folder.

Please keep in mind that as of the current version only a single EL variable or single EL function. Variable/function chaining is not currently supported.

Output

In this example the output will be created in the ./output/el/ folder.

Known limitations

Decision example is not yet fully functional as EL functions are not yet fully implemented so condition is hard-coded for now. Once EL functions are implemented, the condition in the example will be updated.

Github issue: Implement decision node

Hive/Hive2 Example

The Hive example can be run as:

o2a -i examples/hive -o output/hive

Make sure to first copy /examples/hive/configuration.template.properties, rename it as configuration.properties and fill in with configuration data.

Output

In this example the output will be created in the ./output/hive/ folder.

The converted DAG uses the DataProcHiveOperator in Airflow.

Known limitations

1. Only the connection to the local Hive instance is supported.

Connection configuration options are not supported.

2. Not all elements are supported

For Hive, the following elements are not supported: job-tracker, name-node. For Hive2, the following elements are not supported: job-tracker, name-node, jdbc-url, password.

The Github issue for both problems: Hive connection configuration and other elements

Email Example

The Email example can be run as:

o2a -i examples/email -o output/email

Make sure to first copy /examples/email/configuration.template.properties, rename it as configuration.properties and fill in with configuration data.

Output

In this example the output will be created in the ./output/email/ folder.

The converted DAG uses the EmailOperator in Airflow.

Prerequisites

In Oozie the SMTP server configuration is located in oozie-site.xml.

For Airflow it needs to be located in airflow.cfg. Example Airflow SMTP configuration:

[email]
email_backend = airflow.utils.email.send_email_smtp

[smtp]
smtp_host = example.com
smtp_starttls = True
smtp_ssl = False
smtp_user = airflow_user
smtp_password = password
smtp_port = 587
smtp_mail_from = airflow_user@example.com

For more information on setting Airflow configuration options see here.

Known limitations

1. Attachments are not supported

Due to the complexity of extracting files from HDFS inside Airflow and providing them for the EmailOperator, the functionality of sending attachments has not yet been implemented.

Solution: Implement in O2A a mechanism to extract a file from HDFS inside Airflow.

Github Issue: Add support for attachment in Email mapper

2. <content_type> tag is not supported

From Oozie docs:

From uri:oozie:email-action:0.2 one can also specify mail content type as <content_type>text/html</content_type>. “text/plain” is default.

Unfortunately, currently the EmailOperator only accepts the mime_subtype parameter. However it only works for multipart subtypes, as the operator appends the subtype to the multipart/ prefix. Therefore passing either html or plain from Oozie makes no sense.

As a result the email will always be sent with the EmailOperator's default Content-Type value, which is multipart/mixed.

Solution: Modify the Airflow's EmailOperator to support more content types.

Github Issue: Content type support in Email mapper

3. cc and bcc fields are not templated in EmailOperator

Only the 'to', 'subject' and 'html_content' fields in EmailOperator are templated. In practice this covers all fields of an Oozie email action node apart from cc and bcc.

Therefore if there is an EL function in the action node in either of these two fields which will require a Jinja expression in Airflow, it will not work - the expression will not be executed, but rather treated as a plain string.

Solution: Modify the Airflow's EmailOperator to mark more fields as template_fields.

Github Issue: The CC: and BCC: fields are not templated in EmailOperator