citrusframework/citrus

Issue receiving kafka messages via the same endpoint for multiple tests (quarkus)

dhmuech opened this issue · 2 comments

Citrus Version

4.1.1

Question

How can I utilize the same endpoint in multiple tests for both sending and receiving?

What I've tried so far

I'm trying to use Citrus in a Quarkus application for integration testing with Kafka. My goal is to test a processor class across multiple tests. E.g. In the first test, I send data to topic A, the processor performs some actions, and a message with other data is created on topic B. In the second test, I send different data to topic A, the processor performs its actions, and the message should be sent to topic B.
The issue I'm facing is that individual tests work fine, but when I try to run all the tests together, the second test does not receive the messages. However, when I check Kafka using kafka-console-consumer.bat --bootstrap-server 127.0.0.1:9092 --topic data --from-beginning, the message is send/received.

I've checked the Citrus documentation, and it mentions that the consumer groups setting may cause a "timeout," but I don't understand why. If both tests use the same consumer group, I would expect to receive the message twice if the default offset-reset is set to earliest.

Perhaps I've misunderstood the Citrus framework, or there might be a configuration error on my part.

Here a example class to reproduce the issue I'm facing.

If I execute all tests, I will not receive the message 'data-2' in the receive_2_ShouldSuccess() test.

Console out:
image

import io.quarkus.test.junit.QuarkusTest;
import org.citrusframework.TestCaseRunner;
import org.citrusframework.annotations.CitrusResource;
import org.citrusframework.kafka.endpoint.KafkaEndpoint;
import org.citrusframework.quarkus.CitrusSupport;
import org.junit.jupiter.api.Test;

import static org.citrusframework.actions.ReceiveMessageAction.Builder.receive;
import static org.citrusframework.actions.SendMessageAction.Builder.send;
import static org.citrusframework.kafka.endpoint.builder.KafkaEndpoints.kafka;

@QuarkusTest
@CitrusSupport
class ExampleTestTests {

    private final KafkaEndpoint dataEndpoint = kafka().asynchronous().topic("data").timeout(100000L).build();

    @CitrusResource
    private TestCaseRunner t;

    @Test
    void receive_1_ShouldSuccess() {
        //Arrange & Act
        t.when(send().endpoint(dataEndpoint)
                .message()
                .body("data-1"));

        //Assert
        t.then(receive().endpoint(dataEndpoint)
                .message()
                .body("data-1"));
    }

    @Test
    void receive_2_ShouldSuccess() {
        //Arrange & Act
        t.when(send().endpoint(dataEndpoint)
                .message()
                .body("data-2"));

        //Assert
        t.then(receive().endpoint(dataEndpoint)
                .message()
                .body("data-2"));
    }
}

Additional information

<quarkus.platform.version>3.6.0</quarkus.platform.version>

Please have a look into the sample-quarkus that shows how to setup a KafkaEndpoint in a CitrusEndpointConfig class (https://github.com/citrusframework/citrus-samples/blob/main/demo/sample-quarkus/src/test/java/org/apache/camel/demo/CitrusEndpointConfig.java) that is reused in many tests.

https://github.com/citrusframework/citrus-samples/tree/main/demo/sample-quarkus

I think the root cause of this is that Quarkus instantiates a new test instance for each test so you get a fresh Kafka endpoint instance when this is a private final member in your test class.

Please let me know if that helps

Thanks, works like a charm :)
I followed the example and added a CitrusEndpointConfig with the dataEndpoint. Additionally, I included the annotation @CitrusConfiguration(classes = { CitrusEndpointConfig.class }) and @CitrusEndpoint.