microsoft/kernel-memory

[Bug] Endless pipeline loop when trying to import corrupted document

marcominerva opened this issue · 15 comments

Context / Scenario

In service mode (using a queue), when trying to import a corrupted document (i.e., invalid PDF file), decoder will throw an exception, but then the message will be put again in the queue, generating an endless loop:

warn: Microsoft.KernelMemory.Pipeline.Queue.DevTools.SimpleQueues[0]
      Message '20240416.100334.6972903.f0f10e85d780488d927f6c03891d20d5' processing failed with exception, 
      putting message back in the queue.
      Message content: {"index":"default","document_id":"98d5b5a2d38e4cf49dbda5b50c4502c2202404161003346000416","execution_id":"e6f9118e170745168f1d0bc1a9cd39b5","steps":["extract","partition","gen_embeddings","save_records"]}

What happened?

If decoding fails with an exception, I expect that the document will be marked as failed to process and not process again. I see also that in the DataPipelineStatus class, there is a Failed property, but it is always false:

public DataPipelineStatus ToDataPipelineStatus()
{
return new DataPipelineStatus
{
Completed = this.Complete,
Failed = false, // TODO
Empty = this.Files.Count == 0,
Index = this.Index,
DocumentId = this.DocumentId,
Tags = this.Tags,
Creation = this.Creation,
LastUpdate = this.LastUpdate,
Steps = this.Steps,
RemainingSteps = this.RemainingSteps,
CompletedSteps = this.CompletedSteps,
};
}

Importance

I cannot use Kernel Memory

Platform, Language, Versions

Kernel Memory v0.36.240415.2

Relevant log output

dbug: Microsoft.KernelMemory.Handlers.TextExtractionHandler[0]
      Extracting text, pipeline 'default/577dbc0439a74a0b8a4996be204b45e3202404161021070293739'
dbug: Microsoft.KernelMemory.Handlers.TextExtractionHandler[0]
      Extracting text from file 'Iceberg.pdf' mime type 'application/pdf' using extractor 'Microsoft.KernelMemory.DataFormats.Pdf.PdfDecoder'
dbug: Microsoft.KernelMemory.DataFormats.Pdf.PdfDecoder[0]
      Extracting text from PDF file
warn: Microsoft.KernelMemory.Pipeline.Queue.DevTools.SimpleQueues[0]
      Message '20240416.102107.1238584.0a0332828e5f4538ba32bc09b4998acc' processing failed with exception, putting message back in the queue. Message content: {"index":"default","document_id":"577dbc0439a74a0b8a4996be204b45e3202404161021070293739","execution_id":"86d031771dc445d1bd085f110075266e","steps":["extract","partition","gen_embeddings","save_records"]}
      UglyToad.PdfPig.Core.PdfDocumentFormatException: Could not find the version header comment at the start of the document.
         at UglyToad.PdfPig.Parser.FileStructure.FileHeaderParser.Parse(ISeekableTokenScanner scanner, IInputBytes inputBytes, Boolean isLenientParsing, ILog log)
         at UglyToad.PdfPig.Parser.PdfDocumentFactory.OpenDocument(IInputBytes inputBytes, ISeekableTokenScanner scanner, InternalParsingOptions parsingOptions)
         at UglyToad.PdfPig.Parser.PdfDocumentFactory.Open(IInputBytes inputBytes, ParsingOptions options)
         at UglyToad.PdfPig.Parser.PdfDocumentFactory.Open(Stream stream, ParsingOptions options)
         at UglyToad.PdfPig.PdfDocument.Open(Stream stream, ParsingOptions options)
         at Microsoft.KernelMemory.DataFormats.Pdf.PdfDecoder.DecodeAsync(Stream data, CancellationToken cancellationToken)
         at Microsoft.KernelMemory.DataFormats.Pdf.PdfDecoder.DecodeAsync(BinaryData data, CancellationToken cancellationToken)
         at Microsoft.KernelMemory.Handlers.TextExtractionHandler.ExtractTextAsync(FileDetails uploadedFile, BinaryData fileContent, CancellationToken cancellationToken)
         at Microsoft.KernelMemory.Handlers.TextExtractionHandler.InvokeAsync(DataPipeline pipeline, CancellationToken cancellationToken)
         at Microsoft.KernelMemory.Pipeline.DistributedPipelineOrchestrator.RunPipelineStepAsync(DataPipeline pipeline, IPipelineStepHandler handler, CancellationToken cancellationToken)
         at Microsoft.KernelMemory.Pipeline.DistributedPipelineOrchestrator.<>c__DisplayClass5_0.<<AddHandlerAsync>b__0>d.MoveNext()
      --- End of stack trace from previous location ---
         at Microsoft.KernelMemory.Pipeline.Queue.DevTools.SimpleQueues.<>c__DisplayClass19_0.<<OnDequeue>b__0>d.MoveNext()
info: Microsoft.KernelMemory.Pipeline.Queue.DevTools.SimpleQueues[0]
      Message received
dbug: Microsoft.KernelMemory.Handlers.TextExtractionHandler[0]
      Extracting text, pipeline 'default/577dbc0439a74a0b8a4996be204b45e3202404161021070293739'
dbug: Microsoft.KernelMemory.Handlers.TextExtractionHandler[0]
      Extracting text from file 'Iceberg.pdf' mime type 'application/pdf' using extractor 'Microsoft.KernelMemory.DataFormats.Pdf.PdfDecoder'
dbug: Microsoft.KernelMemory.DataFormats.Pdf.PdfDecoder[0]
      Extracting text from PDF file
warn: Microsoft.KernelMemory.Pipeline.Queue.DevTools.SimpleQueues[0]
      Message '20240416.102107.1238584.0a0332828e5f4538ba32bc09b4998acc' processing failed with exception, putting message back in the queue. Message content: {"index":"default","document_id":"577dbc0439a74a0b8a4996be204b45e3202404161021070293739","execution_id":"86d031771dc445d1bd085f110075266e","steps":["extract","partition","gen_embeddings","save_records"]}
dluc commented

Does this happen only with SimpleQueues? By design SimpleQueues doesn't support poison queues, so that at dev time one can debug without having to worry about the number of retries.

With AzureQueue and RabbitMQ the number of retries should be capped, so that eventually the service stops retrying.

I have tried with Azure Queues, I have verified that the message is moved to a poison queue after a certain number of retries. However, the number of retries is hard-coded:

private const int MaxRetryBeforePoisonQueue = 20;

What do you think about adding this parameter in AzureQueuesConfig?

For what concerns RabbitMQ, it seems that the message is always put in the queue again:

public void OnDequeue(Func<string, Task<bool>> processMessageAction)
{
this._consumer.Received += async (object sender, BasicDeliverEventArgs args) =>
{
try
{
this._log.LogDebug("Message '{0}' received, expires at {1}", args.BasicProperties.MessageId, args.BasicProperties.Expiration);
byte[] body = args.Body.ToArray();
string message = Encoding.UTF8.GetString(body);
bool success = await processMessageAction.Invoke(message).ConfigureAwait(false);
if (success)
{
this._log.LogTrace("Message '{0}' successfully processed, deleting message", args.BasicProperties.MessageId);
this._channel.BasicAck(args.DeliveryTag, multiple: false);
}
else
{
this._log.LogWarning("Message '{0}' failed to process, putting message back in the queue", args.BasicProperties.MessageId);
this._channel.BasicNack(args.DeliveryTag, multiple: false, requeue: true);
}
}
#pragma warning disable CA1031 // Must catch all to handle queue properly
catch (Exception e)
{
// Exceptions caught by this block:
// - message processing failed with exception
// - failed to delete message from queue
// - failed to unlock message in the queue
this._log.LogWarning(e, "Message '{0}' processing failed with exception, putting message back in the queue", args.BasicProperties.MessageId);
// TODO: verify and document what happens if this fails. RabbitMQ should automatically unlock messages.
this._channel.BasicNack(args.DeliveryTag, multiple: false, requeue: true);
}
#pragma warning restore CA1031
};
}

dluc commented

no problem about making it configurable if it helps. If I remember correctly Azure Queues uses a count to decide when to discard a message, while RabbitMQ uses an expiration date, time to live. I would check the logs for this message "Message '{0}' received, expires at {1}" and see if the message actually expires.

If I correctly understand the meaning of the requeue parameter:

this._channel.BasicNack(args.DeliveryTag, multiple: false, requeue: true);

It enqueue the a new message again in the queue, with a new expiration time, endlessly.

By the way, there are some issues in RabbitMQPipeline:

this._log.LogDebug("Message '{0}' received, expires at {1}", args.BasicProperties.MessageId, args.BasicProperties.Expiration);

image
image

It seems that all the properties of BasicProperties are null.

@dluc I have made a PR to set the missing properties in RabbitMQ: #454.

After that, I think that also in RabbitMQPipeline we should handle a max retries number and poison queues like in AzureQueuePipeline. In case of RabbitMQ, it seems that the correct approach is based on Quorum Queues and x-delivery-limit: https://www.rabbitmq.com/blog/2020/04/20/rabbitmq-gets-an-ha-upgrade#quorum-queues-in-the-management-ui.

dluc commented

@dluc I have made a PR to set the missing properties in RabbitMQ: #454.

After that, I think that also in RabbitMQPipeline we should handle a max retries number and poison queues like in AzureQueuePipeline. In case of RabbitMQ, it seems that the correct approach is based on Quorum Queues and x-delivery-limit: https://www.rabbitmq.com/blog/2020/04/20/rabbitmq-gets-an-ha-upgrade#quorum-queues-in-the-management-ui.

There's a couple of approaches, IIRC all of them require multiple queues per queue. Worth reading:

So, do you prefer using such solutions instead of the quorum queue?

dluc commented

So, do you prefer using such solutions instead of the quorum queue?

quorum queues don't seem to offer a delayed delivery, do they?

When a message delivery fails we need to allow a configurable retry strategy, e.g.

  • retry 10 times, waiting 1 minute
  • retry for 24 hours, using exponential backoff

AFAIK with RabbitMQ it will require multiple queues.

I have understood, so at least we need something like this, right?

https://stackoverflow.com/a/73358042/1728189

Hi @dluc! Finally, I can return to this issue.

What do you think if we start implementing also in RabbitMQ the same behavior we have in Azure Queues?

try
{
if (message.DequeueCount <= this._config.MaxRetriesBeforePoisonQueue)
{
bool success = await processMessageAction.Invoke(message.MessageText).ConfigureAwait(false);
if (success)
{
this._log.LogTrace("Message '{0}' successfully processed, deleting message", message.MessageId);
await this.DeleteMessageAsync(message, cancellationToken: default).ConfigureAwait(false);
}
else
{
var backoffDelay = TimeSpan.FromSeconds(1 * message.DequeueCount);
this._log.LogWarning("Message '{0}' failed to process, putting message back in the queue with a delay of {1} msecs",
message.MessageId, backoffDelay.TotalMilliseconds);
await this.UnlockMessageAsync(message, backoffDelay, cancellationToken: default).ConfigureAwait(false);
}
}
else
{
this._log.LogError("Message '{0}' reached max attempts, moving to poison queue", message.MessageId);
await this.MoveMessageToPoisonQueueAsync(message, cancellationToken: default).ConfigureAwait(false);
}
}
#pragma warning disable CA1031 // Must catch all to handle queue properly
catch (Exception e)
{
// Exceptions caught by this block:
// - message processing failed with exception
// - failed to delete message from queue
// - failed to unlock message in the queue
// - failed to move message to poison queue
var backoffDelay = TimeSpan.FromSeconds(1 * message.DequeueCount);
this._log.LogWarning(e, "Message '{0}' processing failed with exception, putting message back in the queue with a delay of {1} msecs",
message.MessageId, backoffDelay.TotalMilliseconds);
// Note: if this fails, the exception is caught by this.DispatchMessages()
await this.UnlockMessageAsync(message, backoffDelay, cancellationToken: default).ConfigureAwait(false);
}

dluc commented

hey @marcominerva that would be nice, if possible. It might be a lot of work considering the extra queues to introduce - so, before starting, do you think we could break it down in multiple steps, to have separate PRs?

Yes. I can start, for example, creating the extra queues and then making a draft PR, so you can see the work in progress. Do you agree?

dluc commented

Yes. I can start, for example, creating the extra queues and then making a draft PR, so you can see the work in progress. Do you agree?

yes please go ahead - what would be the name of the queues? I'm assuming when missing, if deleted, these extra queues will be automatically recreated, without causing errors.

Actually, adding poison queues support like in Azure Queues has required a smaller amount of changes than I thought, so I have make a single PR with all the changes: #648.

Let me know if it is OK, or do you prefer to split it in any case.