[Bug] Endless pipeline loop when trying to import corrupted document
marcominerva opened this issue · 15 comments
Context / Scenario
In service mode (using a queue), when trying to import a corrupted document (i.e., invalid PDF file), decoder will throw an exception, but then the message will be put again in the queue, generating an endless loop:
warn: Microsoft.KernelMemory.Pipeline.Queue.DevTools.SimpleQueues[0]
Message '20240416.100334.6972903.f0f10e85d780488d927f6c03891d20d5' processing failed with exception,
putting message back in the queue.
Message content: {"index":"default","document_id":"98d5b5a2d38e4cf49dbda5b50c4502c2202404161003346000416","execution_id":"e6f9118e170745168f1d0bc1a9cd39b5","steps":["extract","partition","gen_embeddings","save_records"]}
What happened?
If decoding fails with an exception, I expect that the document will be marked as failed to process
and not process again. I see also that in the DataPipelineStatus class, there is a Failed
property, but it is always false
:
kernel-memory/service/Abstractions/Pipeline/DataPipeline.cs
Lines 432 to 448 in babedc0
Importance
I cannot use Kernel Memory
Platform, Language, Versions
Kernel Memory v0.36.240415.2
Relevant log output
dbug: Microsoft.KernelMemory.Handlers.TextExtractionHandler[0]
Extracting text, pipeline 'default/577dbc0439a74a0b8a4996be204b45e3202404161021070293739'
dbug: Microsoft.KernelMemory.Handlers.TextExtractionHandler[0]
Extracting text from file 'Iceberg.pdf' mime type 'application/pdf' using extractor 'Microsoft.KernelMemory.DataFormats.Pdf.PdfDecoder'
dbug: Microsoft.KernelMemory.DataFormats.Pdf.PdfDecoder[0]
Extracting text from PDF file
warn: Microsoft.KernelMemory.Pipeline.Queue.DevTools.SimpleQueues[0]
Message '20240416.102107.1238584.0a0332828e5f4538ba32bc09b4998acc' processing failed with exception, putting message back in the queue. Message content: {"index":"default","document_id":"577dbc0439a74a0b8a4996be204b45e3202404161021070293739","execution_id":"86d031771dc445d1bd085f110075266e","steps":["extract","partition","gen_embeddings","save_records"]}
UglyToad.PdfPig.Core.PdfDocumentFormatException: Could not find the version header comment at the start of the document.
at UglyToad.PdfPig.Parser.FileStructure.FileHeaderParser.Parse(ISeekableTokenScanner scanner, IInputBytes inputBytes, Boolean isLenientParsing, ILog log)
at UglyToad.PdfPig.Parser.PdfDocumentFactory.OpenDocument(IInputBytes inputBytes, ISeekableTokenScanner scanner, InternalParsingOptions parsingOptions)
at UglyToad.PdfPig.Parser.PdfDocumentFactory.Open(IInputBytes inputBytes, ParsingOptions options)
at UglyToad.PdfPig.Parser.PdfDocumentFactory.Open(Stream stream, ParsingOptions options)
at UglyToad.PdfPig.PdfDocument.Open(Stream stream, ParsingOptions options)
at Microsoft.KernelMemory.DataFormats.Pdf.PdfDecoder.DecodeAsync(Stream data, CancellationToken cancellationToken)
at Microsoft.KernelMemory.DataFormats.Pdf.PdfDecoder.DecodeAsync(BinaryData data, CancellationToken cancellationToken)
at Microsoft.KernelMemory.Handlers.TextExtractionHandler.ExtractTextAsync(FileDetails uploadedFile, BinaryData fileContent, CancellationToken cancellationToken)
at Microsoft.KernelMemory.Handlers.TextExtractionHandler.InvokeAsync(DataPipeline pipeline, CancellationToken cancellationToken)
at Microsoft.KernelMemory.Pipeline.DistributedPipelineOrchestrator.RunPipelineStepAsync(DataPipeline pipeline, IPipelineStepHandler handler, CancellationToken cancellationToken)
at Microsoft.KernelMemory.Pipeline.DistributedPipelineOrchestrator.<>c__DisplayClass5_0.<<AddHandlerAsync>b__0>d.MoveNext()
--- End of stack trace from previous location ---
at Microsoft.KernelMemory.Pipeline.Queue.DevTools.SimpleQueues.<>c__DisplayClass19_0.<<OnDequeue>b__0>d.MoveNext()
info: Microsoft.KernelMemory.Pipeline.Queue.DevTools.SimpleQueues[0]
Message received
dbug: Microsoft.KernelMemory.Handlers.TextExtractionHandler[0]
Extracting text, pipeline 'default/577dbc0439a74a0b8a4996be204b45e3202404161021070293739'
dbug: Microsoft.KernelMemory.Handlers.TextExtractionHandler[0]
Extracting text from file 'Iceberg.pdf' mime type 'application/pdf' using extractor 'Microsoft.KernelMemory.DataFormats.Pdf.PdfDecoder'
dbug: Microsoft.KernelMemory.DataFormats.Pdf.PdfDecoder[0]
Extracting text from PDF file
warn: Microsoft.KernelMemory.Pipeline.Queue.DevTools.SimpleQueues[0]
Message '20240416.102107.1238584.0a0332828e5f4538ba32bc09b4998acc' processing failed with exception, putting message back in the queue. Message content: {"index":"default","document_id":"577dbc0439a74a0b8a4996be204b45e3202404161021070293739","execution_id":"86d031771dc445d1bd085f110075266e","steps":["extract","partition","gen_embeddings","save_records"]}
Does this happen only with SimpleQueues? By design SimpleQueues doesn't support poison queues, so that at dev time one can debug without having to worry about the number of retries.
With AzureQueue and RabbitMQ the number of retries should be capped, so that eventually the service stops retrying.
I have tried with Azure Queues, I have verified that the message is moved to a poison queue after a certain number of retries. However, the number of retries is hard-coded:
What do you think about adding this parameter in AzureQueuesConfig?
For what concerns RabbitMQ, it seems that the message is always put in the queue again:
kernel-memory/extensions/RabbitMQ/RabbitMQPipeline.cs
Lines 105 to 143 in f266b25
no problem about making it configurable if it helps. If I remember correctly Azure Queues uses a count to decide when to discard a message, while RabbitMQ uses an expiration date, time to live. I would check the logs for this message "Message '{0}' received, expires at {1}"
and see if the message actually expires.
If I correctly understand the meaning of the requeue
parameter:
It enqueue the a new message again in the queue, with a new expiration time, endlessly.
By the way, there are some issues in RabbitMQPipeline
:
It seems that all the properties of BasicProperties
are null
.
@dluc I have made a PR to set the missing properties in RabbitMQ: #454.
After that, I think that also in RabbitMQPipeline
we should handle a max retries number and poison queues like in AzureQueuePipeline
. In case of RabbitMQ, it seems that the correct approach is based on Quorum Queues and x-delivery-limit
: https://www.rabbitmq.com/blog/2020/04/20/rabbitmq-gets-an-ha-upgrade#quorum-queues-in-the-management-ui.
@dluc I have made a PR to set the missing properties in RabbitMQ: #454.
After that, I think that also in
RabbitMQPipeline
we should handle a max retries number and poison queues like inAzureQueuePipeline
. In case of RabbitMQ, it seems that the correct approach is based on Quorum Queues andx-delivery-limit
: https://www.rabbitmq.com/blog/2020/04/20/rabbitmq-gets-an-ha-upgrade#quorum-queues-in-the-management-ui.
There's a couple of approaches, IIRC all of them require multiple queues per queue. Worth reading:
So, do you prefer using such solutions instead of the quorum queue?
So, do you prefer using such solutions instead of the quorum queue?
quorum queues don't seem to offer a delayed delivery, do they?
When a message delivery fails we need to allow a configurable retry strategy, e.g.
- retry 10 times, waiting 1 minute
- retry for 24 hours, using exponential backoff
AFAIK with RabbitMQ it will require multiple queues.
I have understood, so at least we need something like this, right?
Hi @dluc! Finally, I can return to this issue.
What do you think if we start implementing also in RabbitMQ the same behavior we have in Azure Queues?
kernel-memory/extensions/AzureQueues/AzureQueuesPipeline.cs
Lines 192 to 231 in 8c0ad8c
hey @marcominerva that would be nice, if possible. It might be a lot of work considering the extra queues to introduce - so, before starting, do you think we could break it down in multiple steps, to have separate PRs?
Yes. I can start, for example, creating the extra queues and then making a draft PR, so you can see the work in progress. Do you agree?
Yes. I can start, for example, creating the extra queues and then making a draft PR, so you can see the work in progress. Do you agree?
yes please go ahead - what would be the name of the queues? I'm assuming when missing, if deleted, these extra queues will be automatically recreated, without causing errors.
Actually, adding poison queues support like in Azure Queues has required a smaller amount of changes than I thought, so I have make a single PR with all the changes: #648.
Let me know if it is OK, or do you prefer to split it in any case.