kafka: Extend unit tests
Closed this issue · 4 comments
Description
By leveraging the kfake
package, we can provide pretty good test coverage for the unit tests. We should extend the current code coverage and add a few more tests in these areas:
Producer
- Lossless graceful shutdown (No records should be lost when the producer is closed), both async and sync.
Consumer
- At Most Once Delivery (default).
- At Least Once Delivery.
- Lossless graceful shutdown (No records should be lost when the consumer is closed).
I have finished the Lossless graceful shutdown test and I have a few questions:
(No records should be lost when the consumer is closed)
did you mean when the producer is closed
? I assumed that was a typo and I am closing the producer in the test.
Could you clarify what we want to test with lossless graceful shutdown ?
This is the current test for the consumer graceful shutdown:
apm-queue/kafka/consumer_test.go
Line 372 in 5e50011
We produced some recorded and we're waiting for the consumer to process them. Once that's done we cancel the context:
apm-queue/kafka/consumer_test.go
Lines 409 to 415 in 5e50011
The processor is unblocking the select by reading from the channel and incrementing the counter:
apm-queue/kafka/consumer_test.go
Lines 389 to 396 in 5e50011
At that point the final assertion will make sure the counter match the produced recors:
apm-queue/kafka/consumer_test.go
Line 416 in 5e50011
But this will always succeed because the processor was called and incremented the counter. If the processor is not called the test will timeout early.
My test is similar, except it is using the producer to create records. Apologies if I'm missing something obvious, I feel like the logic of tests is "verify that by closing the consumer after the consumer processed events, we are not losing events". I can't see that logic/test fails, wouldn't it always succeed ? How does shutdown relates to that ? Wouldn't shutdown happens in the middle of processing ? Or even before it ?
@kruskall Yes, the description had a typo, I've corrected it. The intention was to add tests to the producer and ensure that graceful shutdown works as intended.
Regarding the current test. I meant to cancel the context before closing the processed
channel. I committed the wrong thing, but even making the change locally the tests passes:
diff --git a/kafka/consumer_test.go b/kafka/consumer_test.go
index ff9be63..2dd7d67 100644
--- a/kafka/consumer_test.go
+++ b/kafka/consumer_test.go
@@ -408,8 +408,8 @@ func TestGracefulSutdown(t *testing.T) {
go func() { consumer.Run(ctx) }()
select {
case process <- struct{}{}:
- close(process) // Allow records to be processed
cancel() // Stop the consumer.
+ close(process) // Allow records to be processed
case <-time.After(time.Second):
t.Fatal("timed out waiting for consumer to process event")
}
The intention on that test is to wait for the consumer to have read from Kafka, and ensure that after the consumer has been stopped (context cancelled), the records that have been fetched are processed.
Thank you for the clarification! 🙇