etsy/boundary-layer

Adding more parameters in dag_args

everglory99 opened this issue · 3 comments

Hi,
From my testing as well as this schema in the source, seems like only catchup, max_active_runs, concurrency and schedule_interval are allowed in dag_args section. However, there are many more other parameters available in DAG constructor (see this). In most of our DAGs in production, we use dagrun_timeout and on_failure_callback. I am wondering is it possible to enable more parameters for dag_args schema without changing the source code, maybe through a plugin? Thanks for the help!

hi @everglory99 you are correct that right now those are the only arguments supported in dag_args. The plugin system does not support additional dag_args, however, we can enable more dag_args by adding them to the schema that you linked. Any argument supported by that schema will be automatically serialized here and included in the generated python code.

For dagrun_timeout, I would want the yaml argument to be an integer (number of seconds). Assuming it requires a timedelta object like most of the Airflow timeout arguments, we could then have the schema automatically transform the integer argument to a timedelta.

For on_failure_callback the argument in the yaml would have to be one of the <<verbatim string>> arguments; we could have the schema check to confirm that it is.

If you would like to try making these changes, we would welcome your contributions! Otherwise I can probably do this myself if you give me a few days. It's an oversight on our part to be missing all of these arguments --- we should ideally support every argument that it allowed by the DAG constructor.

Thanks for bringing this to our attention @everglory99 !

Thanks for your help! I have made changes to the code locally in my branch. I will submit a PR after getting some internal paper work done for contributing open source repo.

I see that there are no much update on current issues in the repo, so I thought to ask it here (also because this is the first result when googling "boundary layer on_failure_callback").

It will be nice to add an example how do you set the on_failure_callback property correctly.
Let's say that I have a method, similar to this one:

def report_failure(context):
    dag_run = context.get('dag_run')
    send_email = EmailOperator(...)
    send_email.execute(context)

How can I inject this code to Boundary Layer? Can I add a custom import (how?) that includes this method and just add
on_failure_callback: <<eyal_custom.report_failure>>, or am I on the wrong path?

@everglory99 can you maybe attach here your full example of adding a custom on_failure_callback?
Thanks!

UPDATE: I managed to get the following working:

on_failure_callback: "<<lambda context:
    EmailOperator(... ).execute(context)>>"