Azure/azure-functions-durable-extension

Add a feature to await tasks started by eternal orchestration in preceding runs (AKA maintain task references after ContinueAsNew)

Opened this issue · 0 comments

Is your feature request related to a problem? Please describe.
If tasks are started in an eternal orchestration run, it is impossible to await for tasks started by preceding run.

Why this is bad: we often have a task to process queues of items coming from database with limited degree of parallelism. Using async-await pattern, we could run eternal loop that fetches some items, starts a pool of tasks and uses WhenAny to wait for completed tasks and re-use released slots to schedule more tasks.

 static async Task Main(string[] args)
    {
        var queue = new ConcurrentQueue<int>(Enumerable.Range(1, 50)); // Simulated items to process
        int maxParallelism = 10; // Maximum parallel tasks
        var tasks = new List<Task>();

        while (queue.Any() || tasks.Any())
        {
            // Fill up to the max parallelism with new tasks
            while (tasks.Count < maxParallelism && queue.TryDequeue(out var item))
            {
                tasks.Add(Task.Run(async () =>
                {
                    await ProcessItemAsync(item);
                }));
            }

            // Wait for any task to complete
            var completedTask = await Task.WhenAny(tasks);

            // Remove the completed task from the list
            tasks.Remove(completedTask);
        }

        Console.WriteLine("All items processed.");
    }

This pattern does not work well in Durable functions, because if queue is say, millions of items, replay history will be quite long.
In order to not to have a huge replay history, we can use context.ContinueAsNew. However, if we do, we lose Task references and can't await them in the new run.

What I would like to propose is ability to instantiate the Task reference from InstanceId. Using this functionality, new runs of orchestrators can get a reference to tasks started by previous orchestration runs. Eternal orchestrations then will get an ability to pass most important artefacts they have - task references - to future selves.

 public static async Task LongQueueProcessorOrchestrator(
     [OrchestrationTrigger] TaskOrchestrationContext context, EternalOrchestratorArgs args)
 {
     var previousTasks = args.instanceIds
         .Select(instanceId => context.**TaskFromInstanceId**(instanceId))
         .ToArray();

     var availableParallelism = 10 - previousTasks.Count;

     var workload = context.CallActivityAsync<int[]>("GetSomeWorkload");

     var tasksToStart = Math.Min(availableParallelism, workLoad.Length);

     var newTasks = workLoad.Take(tasksToStart).Select((work, index) =>
     {
         var instanceId = Guid.NewGuid().ToString();
         return (instanceId, context.CallSubOrchestratorAsync(
             "DoWork",
             work,
             new TaskOptions().WithInstanceId(instanceId)));
     }).ToArray();

     var allTasks = previousTasks.Concat(newTasks).ToArray();

     var completedTasks = Task.WhenAny(allTasks);

     allTasks.Remove(completedTasks);

     // Workload completed, as well as tasks?
     if (workload.Length == 0 && allTasks.Count = 0)
         return;

     // If not, continue looping
     context.ContinueAsNew(
         new EternalOrchestratorArgs(allTasks.Select(t => t.instanceId).ToArray()));
 }