elastic/apm-queue

kafka: Extend unit tests

Closed this issue · 4 comments

Description

By leveraging the kfake package, we can provide pretty good test coverage for the unit tests. We should extend the current code coverage and add a few more tests in these areas:

Producer

  • Lossless graceful shutdown (No records should be lost when the producer is closed), both async and sync.

Consumer

  • At Most Once Delivery (default).
  • At Least Once Delivery.
  • Lossless graceful shutdown (No records should be lost when the consumer is closed).

I have finished the Lossless graceful shutdown test and I have a few questions:

(No records should be lost when the consumer is closed)

did you mean when the producer is closed ? I assumed that was a typo and I am closing the producer in the test.


Could you clarify what we want to test with lossless graceful shutdown ?

This is the current test for the consumer graceful shutdown:

func TestGracefulSutdown(t *testing.T) {

We produced some recorded and we're waiting for the consumer to process them. Once that's done we cancel the context:

select {
case process <- struct{}{}:
close(process) // Allow records to be processed
cancel() // Stop the consumer.
case <-time.After(time.Second):
t.Fatal("timed out waiting for consumer to process event")
}

The processor is unblocking the select by reading from the channel and incrementing the counter:

select {
case <-ctx.Done():
errored.Add(int32(len(*b)))
return ctx.Err()
case <-process:
processed.Add(int32(len(*b)))
}
return nil

At that point the final assertion will make sure the counter match the produced recors:

assert.Eventually(t, func() bool {

But this will always succeed because the processor was called and incremented the counter. If the processor is not called the test will timeout early.
My test is similar, except it is using the producer to create records. Apologies if I'm missing something obvious, I feel like the logic of tests is "verify that by closing the consumer after the consumer processed events, we are not losing events". I can't see that logic/test fails, wouldn't it always succeed ? How does shutdown relates to that ? Wouldn't shutdown happens in the middle of processing ? Or even before it ?

@kruskall Yes, the description had a typo, I've corrected it. The intention was to add tests to the producer and ensure that graceful shutdown works as intended.


Regarding the current test. I meant to cancel the context before closing the processed channel. I committed the wrong thing, but even making the change locally the tests passes:

diff --git a/kafka/consumer_test.go b/kafka/consumer_test.go
index ff9be63..2dd7d67 100644
--- a/kafka/consumer_test.go
+++ b/kafka/consumer_test.go
@@ -408,8 +408,8 @@ func TestGracefulSutdown(t *testing.T) {
 		go func() { consumer.Run(ctx) }()
 		select {
 		case process <- struct{}{}:
-			close(process) // Allow records to be processed
 			cancel()       // Stop the consumer.
+			close(process) // Allow records to be processed
 		case <-time.After(time.Second):
 			t.Fatal("timed out waiting for consumer to process event")
 		}

The intention on that test is to wait for the consumer to have read from Kafka, and ensure that after the consumer has been stopped (context cancelled), the records that have been fetched are processed.

Thank you for the clarification! 🙇

I think this can be closed. All the tests mentioned in the description should be present.

@marclop Am I missing something ?