flink-extended/ai-flow

action_on_event_received only process events sent by current workflow execution

jiangxin369 opened this issue · 0 comments

Describe the feature

Currently, the event is broadcasting, once the wanted event is sent, all workflow executions of this workflow would be triggered.

Describe the solution you'd like

As the scheduler dispatcher would inspect the context of each event to figure out if it contains the workflow execution id, we can inject the runtime context of each task execution to the user-defined event to make sure the event will only effect on the current workflow execution.

We need the following changes:

  • Add a global variable _CURRENT_TASK_CONTEXT in context.py, it is used to store the runtime context of each task execution.
  • To read and write the _CURRENT_TASK_CONTEXT, add get_runtime_task_context and set_runtime_task_context functions.
  • Add a public API called wrap_execution_context to inject the runtime info to the context of the event before sending it.
_CURRENT_TASK_CONTEXT: TaskExecutionContext = None


def set_runtime_task_context(context: TaskExecutionContext):
    global _CURRENT_TASK_CONTEXT
    _CURRENT_TASK_CONTEXT = context


def get_runtime_task_context():
    return _CURRENT_TASK_CONTEXT


def wrap_execution_info_to_context(event: Event):
    """
  The event whose context is wrapped with workflow execution info would only be processed by specific workflow execution.
  """
    pass

How to use it?

def func():
    notification_client = get_notification_client()
    event = Event(event_key=EVENT_KEY, message='This is a custom message.')
    
    // wrap event with context
    wrap_execution_info_to_context(event)
    // send event
    notification_client.send_event(event)

with Workflow(name='workflow') as w1:
    task = PythonOperator(name='task', python_callable=func)

Describe alternatives you've considered

Additional context