Cloud Composer DAG File move for failed Dataflow job does not happen
Srabasti opened this issue · 3 comments
The Composer DAG (simple_load_dag.py) that has a Dataflow job that ingests data from GCS bucket and moves to different subfolder in case of failure or success accordingly, is not working as expected.
My observation is that the move to failed folder does not happen when the Dataflow job fails. When the dataflow job runs successfully, the file gets moved to correct folder, as expected.
I replicated the issue and after checking the error and the code, it seems that the issue is with the setting of the source_object variable, if you check the code it is like this:
source_object = models.Variable.get('gcp_input_location')+'/usa_names.csv'
And this is causing the error, because the input location is being set twice, because the input location is already set in the source_bucket variable, and both variables are used in the copy command:
conn.copy(source_bucket, source_object, target_bucket, target_object)
Therefore I got the task to work by removing the input location, like this:
source_object = 'usa_names.csv'
With this, I am now getting the task failure-move-to-completion to succeed.
Thanks for your help in replicating AND resolving this error @ingdanielmichel
I hardcoded the variables and was able to get the success scenario to work, however not the failure scenario.
Appreciate if the Documentation can be upgraded for the fix, so that it helps the community.
Please let me know if I can contribute for this in any way for the patch!