[Feature Request] Handle pipeline failure
marcominerva opened this issue · 3 comments
Context / Scenario
Currently, pipeline failures aren't handled at all:
kernel-memory/service/Abstractions/Pipeline/DataPipeline.cs
Lines 434 to 439 in 775301a
The problem
It is important to keep track of errors that occurs during pipeline exectuion.
Proposed solution
We can add a couple of properties to DataPipeline.cs:
/// <summary>
/// The step that failed, if any.
/// </summary>
public string? FailedStep { get; set; } = null;
/// <summary>
/// The error that caused the pipeline to fail, if any.
/// </summary>
public string? FailureReason { get; set; } = null;
FailureReason
can be useful to immediately obtain information about the problem, but it is not strictly necessary to implement this feature.
Then, we need to handle exceptions during pipeline execution, both in InProcessPipelineOrchestrator.cs and in DistributedPipelineOrchestrator.cs.
Finally, after updating the DataPipelineStatus.cs class accordingly, we just need this code:
public DataPipelineStatus ToDataPipelineStatus()
{
return new DataPipelineStatus
{
Completed = this.Complete,
Failed = this.FailedStep != null,
Empty = this.Files.Count == 0,
Index = this.Index,
DocumentId = this.DocumentId,
Tags = this.Tags,
Creation = this.Creation,
LastUpdate = this.LastUpdate,
Steps = this.Steps,
RemainingSteps = this.RemainingSteps,
CompletedSteps = this.CompletedSteps,
FailedStep = this.FailedStep,
FailureReason = this.FailureReason,
};
}
Importance
would be great to have
What about using DataPipeline.Files.LogEntries to know what happened? work is always done on a file, and a global log can be extracted iterating the list of files.
FailedStep can be calculated using Steps and RemainingSteps, there's also CompletedSteps if needed.
I was thinking about a generic FailureReason
property because in this way we can centralize all the failures handling, even if the handler does not explicitly set the LogEntries, for example in case of unhandled exceptions. For example, in case of InProcessPipelineOrchestrator
(this is just an idea):
public override async Task RunPipelineAsync(DataPipeline pipeline, CancellationToken cancellationToken = default)
{
// Files must be uploaded before starting any other task
await this.UploadFilesAsync(pipeline, cancellationToken).ConfigureAwait(false);
await this.UpdatePipelineStatusAsync(pipeline, cancellationToken).ConfigureAwait(false);
while (!pipeline.Complete)
{
string currentStepName = pipeline.RemainingSteps.First();
if (!this._handlers.TryGetValue(currentStepName, out var stepHandler))
{
pipeline.LastUpdate = DateTimeOffset.UtcNow;
pipeline.Failed = true;
pipeline.Logs = $"No handler found for step '{currentStepName}'";
await this.UpdatePipelineStatusAsync(pipeline, cancellationToken).ConfigureAwait(false);
this.Log.LogError("No handler found for step '{0}'", currentStepName);
throw new OrchestrationException($"No handler found for step '{currentStepName}'");
}
try
{
// Run handler
(bool success, DataPipeline updatedPipeline) = await stepHandler
.InvokeAsync(pipeline, this.CancellationTokenSource.Token)
.ConfigureAwait(false);
pipeline = updatedPipeline;
pipeline.LastUpdate = DateTimeOffset.UtcNow;
if (success)
{
this.Log.LogInformation("Handler '{0}' processed pipeline '{1}/{2}' successfully", currentStepName, pipeline.Index, pipeline.DocumentId);
pipeline.MoveToNextStep();
await this.UpdatePipelineStatusAsync(pipeline, cancellationToken).ConfigureAwait(false);
}
else
{
pipeline.Failed = true;
pipeline.Logs = string.Join(", ", pipeline.Files.Where(f => f.LogEntries is not null).SelectMany(f => f.LogEntries!).Select(l => $"{l.Source}: {l.Text}"));
await this.UpdatePipelineStatusAsync(pipeline, cancellationToken).ConfigureAwait(false);
this.Log.LogError("Handler '{0}' failed to process pipeline '{1}/{2}'", currentStepName, pipeline.Index, pipeline.DocumentId);
throw new OrchestrationException($"Pipeline error, step {currentStepName} failed");
}
}
catch (Exception ex)
{
// Gets the exception and its inner message, that is some cases is more descriptive.
var failureReson = ex.Message;
if (ex.InnerException is not null)
{
failureReson += $" ({ex.InnerException.Message})";
}
pipeline.LastUpdate = DateTimeOffset.UtcNow;
pipeline.Failed = true;
pipeline.Logs = failureReson;
await this.UpdatePipelineStatusAsync(pipeline, cancellationToken).ConfigureAwait(false);
this.Log.LogError(ex, "Handler '{0}' failed to process pipeline '{1}/{2}'", currentStepName, pipeline.Index, pipeline.DocumentId);
throw new OrchestrationException($"Pipeline error, step {currentStepName} failed", ex);
}
}
await this.CleanUpAfterCompletionAsync(pipeline, cancellationToken).ConfigureAwait(false);
this.Log.LogInformation("Pipeline '{0}/{1}' complete", pipeline.Index, pipeline.DocumentId);
}
Note: I have renamed the FailureReason
property to Logs
.
I have tried to put my idea in a PR: #443. So, you can see if it is the approach you're thinking about.