Add a feature to await tasks started by eternal orchestration in preceding runs (AKA maintain task references after ContinueAsNew)
Opened this issue · 0 comments
Is your feature request related to a problem? Please describe.
If tasks are started in an eternal orchestration run, it is impossible to await for tasks started by preceding run.
Why this is bad: we often have a task to process queues of items coming from database with limited degree of parallelism. Using async-await pattern, we could run eternal loop that fetches some items, starts a pool of tasks and uses WhenAny to wait for completed tasks and re-use released slots to schedule more tasks.
static async Task Main(string[] args)
{
var queue = new ConcurrentQueue<int>(Enumerable.Range(1, 50)); // Simulated items to process
int maxParallelism = 10; // Maximum parallel tasks
var tasks = new List<Task>();
while (queue.Any() || tasks.Any())
{
// Fill up to the max parallelism with new tasks
while (tasks.Count < maxParallelism && queue.TryDequeue(out var item))
{
tasks.Add(Task.Run(async () =>
{
await ProcessItemAsync(item);
}));
}
// Wait for any task to complete
var completedTask = await Task.WhenAny(tasks);
// Remove the completed task from the list
tasks.Remove(completedTask);
}
Console.WriteLine("All items processed.");
}
This pattern does not work well in Durable functions, because if queue is say, millions of items, replay history will be quite long.
In order to not to have a huge replay history, we can use context.ContinueAsNew. However, if we do, we lose Task references and can't await them in the new run.
What I would like to propose is ability to instantiate the Task reference from InstanceId. Using this functionality, new runs of orchestrators can get a reference to tasks started by previous orchestration runs. Eternal orchestrations then will get an ability to pass most important artefacts they have - task references - to future selves.
public static async Task LongQueueProcessorOrchestrator(
[OrchestrationTrigger] TaskOrchestrationContext context, EternalOrchestratorArgs args)
{
var previousTasks = args.instanceIds
.Select(instanceId => context.**TaskFromInstanceId**(instanceId))
.ToArray();
var availableParallelism = 10 - previousTasks.Count;
var workload = context.CallActivityAsync<int[]>("GetSomeWorkload");
var tasksToStart = Math.Min(availableParallelism, workLoad.Length);
var newTasks = workLoad.Take(tasksToStart).Select((work, index) =>
{
var instanceId = Guid.NewGuid().ToString();
return (instanceId, context.CallSubOrchestratorAsync(
"DoWork",
work,
new TaskOptions().WithInstanceId(instanceId)));
}).ToArray();
var allTasks = previousTasks.Concat(newTasks).ToArray();
var completedTasks = Task.WhenAny(allTasks);
allTasks.Remove(completedTasks);
// Workload completed, as well as tasks?
if (workload.Length == 0 && allTasks.Count = 0)
return;
// If not, continue looping
context.ContinueAsNew(
new EternalOrchestratorArgs(allTasks.Select(t => t.instanceId).ToArray()));
}