microsoft/kernel-memory

[Feature Request] Handle pipeline failure

marcominerva opened this issue · 3 comments

Context / Scenario

Currently, pipeline failures aren't handled at all:

return new DataPipelineStatus
{
Completed = this.Complete,
Failed = false, // TODO
Empty = this.Files.Count == 0,
Index = this.Index,

The problem

It is important to keep track of errors that occurs during pipeline exectuion.

Proposed solution

We can add a couple of properties to DataPipeline.cs:

/// <summary>
/// The step that failed, if any.
/// </summary>
public string? FailedStep { get; set; } = null;

/// <summary>
/// The error that caused the pipeline to fail, if any.
/// </summary>
public string? FailureReason { get; set; } = null;

FailureReason can be useful to immediately obtain information about the problem, but it is not strictly necessary to implement this feature.

Then, we need to handle exceptions during pipeline execution, both in InProcessPipelineOrchestrator.cs and in DistributedPipelineOrchestrator.cs.

Finally, after updating the DataPipelineStatus.cs class accordingly, we just need this code:

public DataPipelineStatus ToDataPipelineStatus()
{
    return new DataPipelineStatus
    {
        Completed = this.Complete,
        Failed = this.FailedStep != null,
        Empty = this.Files.Count == 0,
        Index = this.Index,
        DocumentId = this.DocumentId,
        Tags = this.Tags,
        Creation = this.Creation,
        LastUpdate = this.LastUpdate,
        Steps = this.Steps,
        RemainingSteps = this.RemainingSteps,
        CompletedSteps = this.CompletedSteps,
        FailedStep = this.FailedStep,
        FailureReason = this.FailureReason,
    };
}

Importance

would be great to have

dluc commented

What about using DataPipeline.Files.LogEntries to know what happened? work is always done on a file, and a global log can be extracted iterating the list of files.

FailedStep can be calculated using Steps and RemainingSteps, there's also CompletedSteps if needed.

I was thinking about a generic FailureReason property because in this way we can centralize all the failures handling, even if the handler does not explicitly set the LogEntries, for example in case of unhandled exceptions. For example, in case of InProcessPipelineOrchestrator (this is just an idea):

public override async Task RunPipelineAsync(DataPipeline pipeline, CancellationToken cancellationToken = default)
{
    // Files must be uploaded before starting any other task
    await this.UploadFilesAsync(pipeline, cancellationToken).ConfigureAwait(false);

    await this.UpdatePipelineStatusAsync(pipeline, cancellationToken).ConfigureAwait(false);

    while (!pipeline.Complete)
    {
        string currentStepName = pipeline.RemainingSteps.First();

        if (!this._handlers.TryGetValue(currentStepName, out var stepHandler))
        {
            pipeline.LastUpdate = DateTimeOffset.UtcNow;
            pipeline.Failed = true;
            pipeline.Logs = $"No handler found for step '{currentStepName}'";
            await this.UpdatePipelineStatusAsync(pipeline, cancellationToken).ConfigureAwait(false);

            this.Log.LogError("No handler found for step '{0}'", currentStepName);
            throw new OrchestrationException($"No handler found for step '{currentStepName}'");
        }

        try
        {
            // Run handler
            (bool success, DataPipeline updatedPipeline) = await stepHandler
                .InvokeAsync(pipeline, this.CancellationTokenSource.Token)
                .ConfigureAwait(false);

            pipeline = updatedPipeline;
            pipeline.LastUpdate = DateTimeOffset.UtcNow;

            if (success)
            {
                this.Log.LogInformation("Handler '{0}' processed pipeline '{1}/{2}' successfully", currentStepName, pipeline.Index, pipeline.DocumentId);
                pipeline.MoveToNextStep();
                await this.UpdatePipelineStatusAsync(pipeline, cancellationToken).ConfigureAwait(false);
            }
            else
            {
                pipeline.Failed = true;
                pipeline.Logs = string.Join(", ", pipeline.Files.Where(f => f.LogEntries is not null).SelectMany(f => f.LogEntries!).Select(l => $"{l.Source}: {l.Text}"));
                await this.UpdatePipelineStatusAsync(pipeline, cancellationToken).ConfigureAwait(false);

                this.Log.LogError("Handler '{0}' failed to process pipeline '{1}/{2}'", currentStepName, pipeline.Index, pipeline.DocumentId);
                throw new OrchestrationException($"Pipeline error, step {currentStepName} failed");
            }
        }
        catch (Exception ex)
        {
            // Gets the exception and its inner message, that is some cases is more descriptive.
            var failureReson = ex.Message;
            if (ex.InnerException is not null)
            {
                failureReson += $" ({ex.InnerException.Message})";
            }

            pipeline.LastUpdate = DateTimeOffset.UtcNow;
            pipeline.Failed = true;
            pipeline.Logs = failureReson;
            await this.UpdatePipelineStatusAsync(pipeline, cancellationToken).ConfigureAwait(false);

            this.Log.LogError(ex, "Handler '{0}' failed to process pipeline '{1}/{2}'", currentStepName, pipeline.Index, pipeline.DocumentId);
            throw new OrchestrationException($"Pipeline error, step {currentStepName} failed", ex);
        }
    }

    await this.CleanUpAfterCompletionAsync(pipeline, cancellationToken).ConfigureAwait(false);

    this.Log.LogInformation("Pipeline '{0}/{1}' complete", pipeline.Index, pipeline.DocumentId);
}

Note: I have renamed the FailureReason property to Logs.

I have tried to put my idea in a PR: #443. So, you can see if it is the approach you're thinking about.