Adding more parameters in dag_args
everglory99 opened this issue · 3 comments
Hi,
From my testing as well as this schema in the source, seems like only catchup
, max_active_runs
, concurrency
and schedule_interval
are allowed in dag_args
section. However, there are many more other parameters available in DAG constructor (see this). In most of our DAGs in production, we use dagrun_timeout
and on_failure_callback
. I am wondering is it possible to enable more parameters for dag_args
schema without changing the source code, maybe through a plugin? Thanks for the help!
hi @everglory99 you are correct that right now those are the only arguments supported in dag_args
. The plugin system does not support additional dag_args
, however, we can enable more dag_args
by adding them to the schema that you linked. Any argument supported by that schema will be automatically serialized here and included in the generated python code.
For dagrun_timeout
, I would want the yaml argument to be an integer (number of seconds). Assuming it requires a timedelta
object like most of the Airflow timeout arguments, we could then have the schema automatically transform the integer argument to a timedelta
.
For on_failure_callback
the argument in the yaml would have to be one of the <<verbatim string>>
arguments; we could have the schema check to confirm that it is.
If you would like to try making these changes, we would welcome your contributions! Otherwise I can probably do this myself if you give me a few days. It's an oversight on our part to be missing all of these arguments --- we should ideally support every argument that it allowed by the DAG constructor.
Thanks for bringing this to our attention @everglory99 !
Thanks for your help! I have made changes to the code locally in my branch. I will submit a PR after getting some internal paper work done for contributing open source repo.
I see that there are no much update on current issues in the repo, so I thought to ask it here (also because this is the first result when googling "boundary layer on_failure_callback").
It will be nice to add an example how do you set the on_failure_callback
property correctly.
Let's say that I have a method, similar to this one:
def report_failure(context):
dag_run = context.get('dag_run')
send_email = EmailOperator(...)
send_email.execute(context)
How can I inject this code to Boundary Layer? Can I add a custom import (how?) that includes this method and just add
on_failure_callback: <<eyal_custom.report_failure>>
, or am I on the wrong path?
@everglory99 can you maybe attach here your full example of adding a custom on_failure_callback
?
Thanks!
UPDATE: I managed to get the following working:
on_failure_callback: "<<lambda context:
EmailOperator(... ).execute(context)>>"