A tool to easily convert between Apache Oozie workflows and Apache Airflow workflows.
The program targets Apache Airflow >= 1.10 and Apache Oozie 1.0 XML schema.
If you want to contribute to the project, please take a look at CONTRIBUTING.md
Apache Airflow is a workflow management system developed by AirBnB in 2014. It is a platform to programmatically author, schedule, and monitor workflows. Airflow workflows are designed as Directed Acyclic Graphs (DAGs) of tasks in Python. The Airflow scheduler executes your tasks on an array of workers while following the specified dependencies.
Apache Oozie is a workflow scheduler system to manage Apache Hadoop jobs. Oozie workflows are also designed as Directed Acyclic Graphs (DAGs) in XML.
There are a few differences noted below:
Spec. | Task | Dependencies | "Subworkflows" | Parameterization | Notification | |
---|---|---|---|---|---|---|
Oozie | XML | Action Node | Control Node | Subworkflow | EL functions/Properties file | URL based callbacks |
Airflow | Python | Operators | Trigger Rules, set_downstream() | SubDag | jinja2 and macros | Callbacks/Emails |
Note that you need Python >= 3.6 to run the converter.
You can install o2a
from PyPi via pip install o2a
. After installation, the
o2a and o2a-validate-workflows should be available on your path.
In case you use sources of o2a
, the environment can be set up via the virtualenv setup
(you can create one using virtualenvwrapper
for example.
While in your virtualenv, you can install all the requirements via pip install -r requirements.txt
.
You can add the bin subdirectory to your
PATH, then all the scripts below can be run without adding the ./bin
prefix.
This can be done for example by adding a line similar to the one below to your .bash_profile
or bin/postactivate
from your virtual environment:
export PATH=${PATH}:<INSERT_PATH_TO_YOUR_OOZIE_PROJECT>/bin
Otherwise you need to run all the scripts from the bin subdirectory, for example:
./bin/o2a --help
In all the example commands below, it is assumed that the bin directory is in your PATH - either installed from PyPi or from the sources.
You can also install o2a
from local folder using pip install -e .
You can run the program by calling:
o2a -i <INPUT_APPLICATION_FOLDER> -o <OUTPUT_FOLDER_PATH>
Example:
o2a -i examples/demo -o output/demo
This is the full usage guide, available by running o2a -h
usage: o2a [-h] -i INPUT_DIRECTORY_PATH -o OUTPUT_DIRECTORY_PATH [-n DAG_NAME]
[-u USER] [-s START_DAYS_AGO] [-v SCHEDULE_INTERVAL] [-d]
Convert Apache Oozie workflows to Apache Airflow workflows.
optional arguments:
-h, --help show this help message and exit
-i INPUT_DIRECTORY_PATH, --input-directory-path INPUT_DIRECTORY_PATH
Path to input directory
-o OUTPUT_DIRECTORY_PATH, --output-directory-path OUTPUT_DIRECTORY_PATH
Desired output directory
-n DAG_NAME, --dag-name DAG_NAME
Desired DAG name [defaults to input directory name]
-u USER, --user USER The user to be used in place of all ${user.name}
[defaults to user who ran the conversion]
-s START_DAYS_AGO, --start-days-ago START_DAYS_AGO
Desired DAG start as number of days ago
-v SCHEDULE_INTERVAL, --schedule-interval SCHEDULE_INTERVAL
Desired DAG schedule interval as number of days
-d, --dot Renders workflow files in DOT format
The input application directory has to follow the structure defined as follows:
<APPLICATION>/
|- job.properties - job properties that are used to run the job
|- hdfs - folder with application - should be copied to HDFS
| |- workflow.xml - Oozie workflow xml (1.0 schema)
| |- ... - additional folders required to be copied to HDFS
|- configuration.template.properties - template of configuration values used during conversion
|- configuration.properties - generated properties for configuration values
A fork node splits the path of execution into multiple concurrent paths of execution.
A join node waits until every concurrent execution of the previous fork node arrives to it. The fork and join nodes must be used in pairs. The join node assumes concurrent execution paths are children of the same fork node.
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
...
<fork name="[FORK-NODE-NAME]">
<path start="[NODE-NAME]" />
...
<path start="[NODE-NAME]" />
</fork>
...
<join name="[JOIN-NODE-NAME]" to="[NODE-NAME]" />
...
</workflow-app>
A decision node enables a workflow to make a selection on the execution path to follow.
The behavior of a decision node can be seen as a switch-case statement.
A decision node consists of a list of predicates-transition pairs plus a default transition. Predicates are evaluated in order or appearance until one of them evaluates to true and the corresponding transition is taken. If none of the predicates evaluates to true the default transition is taken.
Predicates are JSP Expression Language (EL) expressions (refer to section 4.2 of this document) that resolve into a boolean value, true or false . For example:
${fs:fileSize('/usr/foo/myinputdir') gt 10 * GB}
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
...
<decision name="[NODE-NAME]">
<switch>
<case to="[NODE_NAME]">[PREDICATE]</case>
...
<case to="[NODE_NAME]">[PREDICATE]</case>
<default to="[NODE_NAME]"/>
</switch>
</decision>
...
</workflow-app>
The start node is the entry point for a workflow job, it indicates the first workflow node the workflow job must transition to.
When a workflow is started, it automatically transitions to the node specified in the start .
A workflow definition must have one start node.
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
...
<start to="[NODE-NAME]"/>
...
</workflow-app>
The end node is the end for a workflow job, it indicates that the workflow job has completed successfully.
When a workflow job reaches the end it finishes successfully (SUCCEEDED).
If one or more actions started by the workflow job are executing when the end node is reached, the actions will be killed. In this scenario the workflow job is still considered as successfully run.
A workflow definition must have one end node.
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
...
<end name="[NODE-NAME]"/>
...
</workflow-app>
The kill node allows a workflow job to exit with an error.
When a workflow job reaches the kill it finishes in error (KILLED).
If one or more actions started by the workflow job are executing when the kill node is reached, the actions will be killed.
A workflow definition may have zero or more kill nodes.
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
...
<kill name="[NODE-NAME]">
<message>[MESSAGE-TO-LOG]</message>
</kill>
...
</workflow-app>
As of now, a very minimal set of Oozie EL
functions are supported. The way they work is that there exists a
dictionary mapping from each Oozie EL function string to the
corresponding Python function. This is in utils/el_utils.py
.
This design allows for custom EL function mapping if one so chooses. By
default everything gets mapped to the module o2a_libs
. This means in
order to use EL function mapping, the folder o2a_libs
should
be copied over to the Airflow DAG folder. This should then be picked up and
parsed by the Airflow workers and then available to all DAGs.
All examples can be found in the examples directory.
The demo example contains several action and control nodes. The control
nodes are fork
, join
, decision
, start
, end
, and kill
. As far as action
nodes go, there are fs
, map-reduce
, and pig
.
Most of these are already supported, but when the program encounters a node it does not know how to parse, it will perform a sort of "skeleton transformation" - it will convert all the unknown nodes to dummy nodes. This will allow users to manually parse the nodes if they so wish as the control flow is there.
The demo can be run as:
o2a -i examples/demo -o output/demo
This will parse and write to an output file in the output/demo
directory.
The decision node is not fully functional as there is not currently support for all EL functions. So in order for it to run in Airflow you may need to edit the Python output file and change the decision node expression.
Issue in GitHub: Implement decision node
In this example the output (including sub-workflow dag) will be created in the ./output/ssh/
folder.
The childwf example is sub-workflow for the demo
example. It can be run as:
o2a -i examples/childwf -o output/childwf
Make sure to first copy examples/subwf/configuration.template.properties
, rename it as
configuration.properties
and fill in with configuration data.
In this example the output will be created in the ./output/childwf/
folder.
No known limitations.
The ssh example can be run as:
o2a -i examples/ssh -o output/ssh
This will convert the specified Oozie XML and write the output into the
specified output directory, in this case output/ssh/test_ssh_dag.py
.
There are some differences between Apache Oozie and Apache Airflow as far as the SSH specification goes. In Airflow you will have to add/edit an SSH-specific connection that contains the credentials required for the specified SSH action. For example, if the SSH node looks like:
<action name="ssh">
<ssh xmlns="uri:oozie:ssh-action:0.1">
<host>user@apache.org</host>
<command>echo</command>
<args>"Hello Oozie!"</args>
</ssh>
<ok to="end"/>
<error to="fail"/>
</action>
Then the default Airflow SSH connection, ssh_default
should have at
the very least a password set. This can be found in the Airflow Web UI
under Admin > Connections. From the command line it is impossible to
edit connections so you must add one like:
airflow connections --add --conn_id <SSH_CONN_ID> --conn_type SSH --conn_password <PASSWORD>
More information can be found in Airflow's documentation.
In this example the output will be created in the ./output/ssh/
folder.
The converted DAG uses the SSHOperator
in Airflow.
No known limitations.
The MapReduce example can be run as:
o2a -i examples/mapreduce -o output/mapreduce
Make sure to first copy examples/mapreduce/configuration.template.properties
, rename it as
configuration.properties
and fill in with configuration data.
In this example the output will be created in the ./output/mapreduce/
folder.
The converted DAG uses the DataProcHadoopOperator
in Airflow.
1. Exit status not available
From the Oozie documentation:
The counters of the Hadoop job and job exit status (FAILED, KILLED or SUCCEEDED) must be available to the workflow job after the Hadoop jobs ends. This information can be used from within decision nodes and other actions configurations.
Currently we use the DataProcHadoopOperator
which does not store the job exit status in an XCOM for other tasks to use.
Issue in Github: Implement exit status and counters in MapReduce Action
2. Configuration options
From the Oozie documentation (the strikethrough is from us):
Hadoop JobConf properties can be specified as part of
the config-default.xml orJobConf XML file bundled with the workflow application or<global> tag in workflow definition or- Inline map-reduce action configuration or
An implementation of OozieActionConfigurator specified by the tag in workflow definition.
Currently the only supported way of configuring the map-reduce action is with the
inline action configuration, i.e. using the <configuration>
tag in the workflow's XML file definition.
Issues in Github:
- Add support for config-default.xml
- Add support for parameters section of the workflow.xml
- Handle global configuration properties
3. Streaming and pipes
Streaming and pipes are currently not supported.
Issue in github Implement streaming support
The FS example can be run as:
o2a -i examples/fs -o output/fs
Make sure to first copy examples/fs/configuration.template.properties
, rename it as
configuration.properties
and fill in with configuration data.
In this example the output will be created in the ./output/fs/
folder.
The converted DAG uses the BashOperator
in Airflow.
Not all FS operations are currently idempotent. It's not a problem if prepare action is used in other tasks but might be a problem in certain situations. Fixing the operators to be idempotent requires more complex logic and support for Pig actions is missing currently.
Issue in Github: FS Mapper and idempotence
The Java example can be run as:
o2a -i examples/java -o output/java
Make sure to first copy examples/fs/configuration.template.properties
, rename it as
configuration.properties
and fill in with configuration data.
In this example the output will be created in the ./output/java/
folder.
The converted DAG uses the DataProcHadoopOperator
in Airflow.
- Overriding action's Main class via
oozie.launcher.action.main.class
is not implemented.
Issue in Github: Override Java main class with property
The Pig example can be run as:
o2a -i examples/pig -o output/pig
Make sure to first copy examples/pig/configuration.template.properties
, rename it as
configuration.properties
and fill in with configuration data.
In this example the output will be created in the ./output/pig/
folder.
The converted DAG uses the DataProcPigOperator
in Airflow.
1. Configuration options
From the Oozie documentation (the strikethrough is from us):
Hadoop JobConf properties can be specified as part of
the config-default.xml orJobConf XML file bundled with the workflow application or<global> tag in workflow definition or- Inline pig action configuration.
Currently the only supported way of configuring the pig action is with the
inline action configuration, i.e. using the <configuration>
tag in the workflow's XML file definition.
Issues in Github:
- Add support for config-default.xml
- Add support for parameters section of the workflow.xml
- Handle global configuration properties
The Shell example can be run as:
o2a -i examples/shell -o output/shell
Make sure to first copy examples/shell/configuration.template.properties
, rename it as
configuration.properties
and fill in with configuration data.
In this example the output will be created in the ./output/shell/
folder.
The converted DAG uses the BashOperator
in Airflow, which executes the desired shell
action with Pig by invoking gcloud dataproc jobs submit pig --cluster=<cluster> --region=<region> --execute 'sh <action> <args>'
.
1. Exit status not available
From the Oozie documentation:
The output (STDOUT) of the Shell job can be made available to the workflow job after the Shell job ends. This information could be used from within decision nodes.
Currently we use the BashOperator
which can store only the last line of the job output in an XCOM.
In this case the line is not helpful as it relates to the Dataproc job submission status and
not the Shell action's result.
Issue in Github: Finalize shell mapper
2. No Shell launcher configuration
From the Oozie documentation:
Shell launcher configuration can be specified with a file, using the job-xml element, and inline, using the configuration elements.
Currently there is no way specify the shell launcher configuration (it is ignored).
Issue in Github: Shell Launcher Configuration
The Spark example can be run as:
o2a -i examples/spark -o output/spark
Make sure to first copy /examples/spark/configuration.template.properties
, rename it as
configuration.properties
and fill in with configuration data.
In this example the output will be created in the ./output/spark/
folder.
The converted DAG uses the DataProcSparkOperator
in Airflow.
1. Only tasks written in Java are supported
From the Oozie documentation:
The jar element indicates a comma separated list of jars or python files.
The solution was tested with only a single Jar file.
2. No Spark launcher configuration
From the Oozie documentation:
Shell launcher configuration can be specified with a file, using the job-xml element, and inline, using the configuration elements.
Currently there is no way to specify the Spark launcher configuration (it is ignored).
3. Not all elements are supported
The following elements are not supported: job-tracker
, name-node
, master
, mode
.
The Sub-workflow example can be run as:
o2a -i examples/subwf -o output/subwf
Make sure to first copy examples/subwf/configuration.template.properties
, rename it as
configuration.properties
and fill in with configuration data.
In this example the output (together with sub-worfklow dag) will be created in the ./output/subwf/
folder.
The converted DAG uses the SubDagOperator
in Airflow.
No known limitations.
The DistCp example can be run as:
o2a -i examples/distcp -o output/distcp
Make sure to first copy examples/distcp/configuration.template.properties
, rename it as
configuration.properties
and fill in with configuration data.
In this example the output will be created in the ./output/distcp/
folder.
The converted DAG uses the BashOperator
in Airflow, which submits the Hadoop DistCp job using the
gcloud dataproc jobs submit hadoop
command.
The system test of the example run with Oozie fails due to unknown reasons. The converted DAG run by Airflow completes successfully.
The decision example can be run as:
o2a -i examples/decision -o output/decision
Make sure to first copy examples/decision/configuration.template.properties
, rename it as
configuration.properties
and fill in with configuration data.
In this example the output will be created in the ./output/decision/
folder.
The converted DAG uses the BranchPythonOperator
in Airflow.
Decision example is not yet fully functional as EL functions are not yet fully implemented so condition is hard-coded for now. Once EL functions are implemented, the condition in the example will be updated.
Github issue: Implement decision node
The Oozie Expression Language (EL) example can be run as:
o2a -i examples/el -o output/el
This will showcase the ability to use the o2a_libs
directory to map EL functions
to Python methods. This example assumes that the user has a valid Apache Airflow
SSH connection set up and the o2a_libs
directory has been copied to the dags
folder.
Please keep in mind that as of the current version only a single EL variable or single EL function. Variable/function chaining is not currently supported.
In this example the output will be created in the ./output/el/
folder.
Decision example is not yet fully functional as EL functions are not yet fully implemented so condition is hard-coded for now. Once EL functions are implemented, the condition in the example will be updated.
Github issue: Implement decision node
The Hive example can be run as:
o2a -i examples/hive -o output/hive
Make sure to first copy /examples/hive/configuration.template.properties
, rename it as
configuration.properties
and fill in with configuration data.
In this example the output will be created in the ./output/hive/
folder.
The converted DAG uses the DataProcHiveOperator
in Airflow.
1. Only the connection to the local Hive instance is supported.
Connection configuration options are not supported.
2. Not all elements are supported
For Hive, the following elements are not supported: job-tracker
, name-node
.
For Hive2, the following elements are not supported: job-tracker
, name-node
, jdbc-url
, password
.
The Github issue for both problems: Hive connection configuration and other elements
The Email example can be run as:
o2a -i examples/email -o output/email
Make sure to first copy /examples/email/configuration.template.properties
, rename it as
configuration.properties
and fill in with configuration data.
In this example the output will be created in the ./output/email/
folder.
The converted DAG uses the EmailOperator
in Airflow.
In Oozie the SMTP server configuration is located in oozie-site.xml
.
For Airflow it needs to be located in airflow.cfg
.
Example Airflow SMTP configuration:
[email]
email_backend = airflow.utils.email.send_email_smtp
[smtp]
smtp_host = example.com
smtp_starttls = True
smtp_ssl = False
smtp_user = airflow_user
smtp_password = password
smtp_port = 587
smtp_mail_from = airflow_user@example.com
For more information on setting Airflow configuration options see here.
1. Attachments are not supported
Due to the complexity of extracting files from HDFS inside Airflow and providing them
for the EmailOperator
, the functionality of sending attachments has not yet been
implemented.
Solution: Implement in O2A a mechanism to extract a file from HDFS inside Airflow.
Github Issue: Add support for attachment in Email mapper
2. <content_type>
tag is not supported
From Oozie docs:
From uri:oozie:email-action:0.2 one can also specify mail content type as <content_type>text/html</content_type>. “text/plain” is default.
Unfortunately, currently the EmailOperator
only accepts the mime_subtype
parameter.
However it only works for multipart subtypes, as the operator appends the subtype
to the multipart/
prefix. Therefore passing either html
or plain
from Oozie makes no sense.
As a result the email will always be sent with the EmailOperator
's default Content-Type value,
which is multipart/mixed
.
Solution: Modify the Airflow's EmailOperator
to support more
content types.
Github Issue: Content type support in Email mapper
3. cc
and bcc
fields are not templated in EmailOperator
Only the 'to', 'subject' and 'html_content' fields in EmailOperator are templated.
In practice this covers all fields of an Oozie email action node apart from cc
and bcc
.
Therefore if there is an EL function in the action node in either of these two fields which will require a Jinja expression in Airflow, it will not work - the expression will not be executed, but rather treated as a plain string.
Solution: Modify the Airflow's EmailOperator
to mark more fields as
template_fields
.
Github Issue: The CC: and BCC: fields are not templated in EmailOperator