Finalize shell mapper when file node functionality is ready (0/1)
sprzedwojski opened this issue · 4 comments
<workflow-app xmlns="uri:oozie:workflow:1.0" name="shell-wf">
<start to="shell-node"/>
<action name="shell-node">
<shell xmlns="uri:oozie:shell-action:1.0">
<resource-manager>${resourceManager}</resource-manager>
<name-node>${nameNode}</name-node>
<prepare>
<create path="${nameNode}/examples/input-data/demo/pig-node"/>
<delete path="${nameNode}/examples/output-data/demo/pig-node"/>
</prepare>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<exec>java</exec>
<argument>-version</argument>
<!--<file>${EXEC}</file>--> <!-- TODO Add the binary upload when file mapper ready -->
<capture-output/> <!-- This can be ignored by the converter -->
</shell>
<!-- <ok to="check-output"/>-->
<ok to="end"/>
<error to="fail"/>
</action>
<!-- TODO uncomment once decision has been implemented -->
<!-- <decision name="check-output">-->
<!-- <switch>-->
<!-- <case to="end">-->
<!-- ${wf:actionData('shell-node')['my_output'] eq 'Hello Oozie'}-->
<!-- </case>-->
<!-- <default to="fail-output"/>-->
<!-- </switch>-->
<!-- </decision>-->
<kill name="fail">
<message>Shell action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<!--<kill name="fail-output">-->
<!--<message>Incorrect output, expected [Hello Oozie] but was [${wf:actionData('shell-node')['my_output']}]</message>-->
<!--</kill>-->
<end name="end"/>
</workflow-app>
Can I ask for a more detailed description? I do not understand what to do here.
This task calls for adding the File/Archive functionality support to Shell mapper.
According to what we have now this would mean 2 things:
- Adding a File and Archive nodes to the example workflow
- Using the File/Archive extractors in the mapper
- Using the results of the extractors in
shell.tpl
The last one is a tricky part and applies to every template where we use gcloud dataproc jobs submit pig (...) --execute 'sh...'
- how to perform the File/Archive action on Dataproc?
This task is blocked until we resolve #243
Post-mortem on what was done in trying to solve this issue:
On 18-19.06.2019 me (Szymon) and Tomek we were working on adding file/archive support to the Shell mapper.
It was supposed to be a simple task, however it turned out to be anything but.
Approach 1
First of all, the way we run a shell command is using the gcloud dataproc jobs submit pig ... --execute ...
command.
We looked at the API and found the following flag:
--properties=[PROPERTY=VALUE,…]
A list of key value pairs to configure Pig.
Using it we tried setting the appropriate configuration properties to pass the files / archives, based on this SO answer.
The result was something like:
gcloud dataproc jobs submit pig --cluster=oozie-o2a-2cpu \
--region=europe-west3 \
--properties "mapred.cache.file=/user/szymon/examples/apps/demo/workflow.xml#myxml.xml,mapred.create.symlink=yes" \
--execute "sh cat myxml.xml"
Unfortunately, it didn't work. The file myxml.xml
was not present in the working directory of the job and the cat
command was failing.
Approach 2
Knowing that we have a Pig mapper which produces a DAG with a DataprocPigOperator
task running a Pig script and it already handles file/archive, we tried to explore this path.
We found that a .pig
script can successfully run a bash command, e.g.:
script.pig
sh ls -al
The file/archive functionality in a Pig script is handled by modifying the script and adding a few SET
commands on top, e.g.:
set mapred.create.symlink yes;
set mapred.cache.file hdfs:///user/szymon/examples/apps/pig/test_dir/test.txt#test_link.txt,hdfs:///user/szymon/examples/apps/pig/test_dir/test2.zip#test_link.zip;
set mapred.cache.archives hdfs:///user/szymon/examples/apps/pig/test_dir/test2.zip#test_zip_dir,hdfs:///user/szymon/examples/apps/pig/test_dir/test3.zip#test3_zip_dir,hdfs:///user/szymon/examples/apps/pig/test_dir/testcopy.zip#testcopy_zip_dir;
We ran this on Dataproc and there are a few observations:
- When the HDFS URI points to a non-existent
archive
, there is an error and the job fails - When the HDFS URI points to a non-existent
file
, no error is thrown and the job completes - If we try to
sh cat ...
the referencedfile
or a file from a referencedarchive
(which should be unarchived and present), there is an error - file not found - A
sh ls -al
only showed thescript.pig
file
Additional actions
When printing sh pwd
we found out that the job is actually executed from a /tmp/{job-hash}
directory.
It is removed after the job has completed so unable to be inspected. We placed an sh sleep 1000
inside the Pig script to inspect the directory at runtime but didn't find the file/archive resources inside.
Conclusions
We didn't manage to find a way to add file/archive functionality to the Shell mapper.
Moreover, it seems that it doesn't work correctly for the Pig mapper either.
We've decided to abandon this problem for now and return to it later.
Things to resolve:
- Find out where exactly the file/archive resources should be store in the local cache (local filesystem)
- Find out if they are stored there in the case of the
gcloud dataproc jobs submit pig
command as well as theDataprocPigOperator
(which probably uses the same command underneath) - Check (somehow) if for Pig the file/archive functionality works at all - possibly by creating a Pig script which makes use of the symlinked file from the local cache