GoogleCloudPlatform/oozie-to-airflow

Finalize shell mapper when file node functionality is ready (0/1)

sprzedwojski opened this issue · 4 comments

<workflow-app xmlns="uri:oozie:workflow:1.0" name="shell-wf">
    <start to="shell-node"/>
    <action name="shell-node">
        <shell xmlns="uri:oozie:shell-action:1.0">
            <resource-manager>${resourceManager}</resource-manager>
            <name-node>${nameNode}</name-node>
            <prepare>
                <create path="${nameNode}/examples/input-data/demo/pig-node"/>
                <delete path="${nameNode}/examples/output-data/demo/pig-node"/>
            </prepare>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
            </configuration>
            <exec>java</exec>
            <argument>-version</argument>
            <!--<file>${EXEC}</file>--> <!-- TODO Add the binary upload when file mapper ready -->
            <capture-output/> <!-- This can be ignored by the converter -->
        </shell>
<!--        <ok to="check-output"/>-->
        <ok to="end"/>
        <error to="fail"/>
    </action>
<!--    TODO uncomment once decision has been implemented -->
<!--    <decision name="check-output">-->
<!--        <switch>-->
<!--            <case to="end">-->
<!--                ${wf:actionData('shell-node')['my_output'] eq 'Hello Oozie'}-->
<!--            </case>-->
<!--            <default to="fail-output"/>-->
<!--        </switch>-->
<!--    </decision>-->
    <kill name="fail">
        <message>Shell action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <!--<kill name="fail-output">-->
        <!--<message>Incorrect output, expected [Hello Oozie] but was [${wf:actionData('shell-node')['my_output']}]</message>-->
    <!--</kill>-->
    <end name="end"/>
</workflow-app>

Can I ask for a more detailed description? I do not understand what to do here.

This task calls for adding the File/Archive functionality support to Shell mapper.
According to what we have now this would mean 2 things:

  • Adding a File and Archive nodes to the example workflow
  • Using the File/Archive extractors in the mapper
  • Using the results of the extractors in shell.tpl

The last one is a tricky part and applies to every template where we use gcloud dataproc jobs submit pig (...) --execute 'sh...' - how to perform the File/Archive action on Dataproc?

This task is blocked until we resolve #243

Post-mortem on what was done in trying to solve this issue:

On 18-19.06.2019 me (Szymon) and Tomek we were working on adding file/archive support to the Shell mapper.
It was supposed to be a simple task, however it turned out to be anything but.

Approach 1

First of all, the way we run a shell command is using the gcloud dataproc jobs submit pig ... --execute ... command.

We looked at the API and found the following flag:

--properties=[PROPERTY=VALUE,…]
A list of key value pairs to configure Pig.

Using it we tried setting the appropriate configuration properties to pass the files / archives, based on this SO answer.
The result was something like:

gcloud dataproc jobs submit pig --cluster=oozie-o2a-2cpu \
--region=europe-west3 \
--properties "mapred.cache.file=/user/szymon/examples/apps/demo/workflow.xml#myxml.xml,mapred.create.symlink=yes" \
--execute "sh cat myxml.xml"

Unfortunately, it didn't work. The file myxml.xml was not present in the working directory of the job and the cat command was failing.

Approach 2

Knowing that we have a Pig mapper which produces a DAG with a DataprocPigOperator task running a Pig script and it already handles file/archive, we tried to explore this path.

We found that a .pig script can successfully run a bash command, e.g.:

script.pig

sh ls -al

The file/archive functionality in a Pig script is handled by modifying the script and adding a few SET commands on top, e.g.:

set mapred.create.symlink yes;
set mapred.cache.file hdfs:///user/szymon/examples/apps/pig/test_dir/test.txt#test_link.txt,hdfs:///user/szymon/examples/apps/pig/test_dir/test2.zip#test_link.zip;
set mapred.cache.archives hdfs:///user/szymon/examples/apps/pig/test_dir/test2.zip#test_zip_dir,hdfs:///user/szymon/examples/apps/pig/test_dir/test3.zip#test3_zip_dir,hdfs:///user/szymon/examples/apps/pig/test_dir/testcopy.zip#testcopy_zip_dir;

We ran this on Dataproc and there are a few observations:

  • When the HDFS URI points to a non-existent archive, there is an error and the job fails
  • When the HDFS URI points to a non-existent file, no error is thrown and the job completes
  • If we try to sh cat ... the referenced file or a file from a referenced archive (which should be unarchived and present), there is an error - file not found
  • A sh ls -al only showed the script.pig file

Additional actions

When printing sh pwd we found out that the job is actually executed from a /tmp/{job-hash} directory.

It is removed after the job has completed so unable to be inspected. We placed an sh sleep 1000 inside the Pig script to inspect the directory at runtime but didn't find the file/archive resources inside.

Conclusions

We didn't manage to find a way to add file/archive functionality to the Shell mapper.

Moreover, it seems that it doesn't work correctly for the Pig mapper either.

We've decided to abandon this problem for now and return to it later.
Things to resolve:

  • Find out where exactly the file/archive resources should be store in the local cache (local filesystem)
  • Find out if they are stored there in the case of the gcloud dataproc jobs submit pig command as well as the DataprocPigOperator (which probably uses the same command underneath)
  • Check (somehow) if for Pig the file/archive functionality works at all - possibly by creating a Pig script which makes use of the symlinked file from the local cache