smallrye/smallrye-reactive-messaging

How to use companion.consumeXYZ() twice?

Closed this issue · 2 comments

Here a simple test

@QuarkusTest
@QuarkusTestResource(KafkaCompanionResource.class)
class KafkaCompanionIT {

    @InjectKafkaCompanion
    KafkaCompanion companion;

    @AfterEach
    void clearAllRecords() {
        companion.topics().clear();
    }

    @RepeatedTest(10)
    void bar() {
        String data = RandomStringUtils.randomAlphanumeric(100);
        ProducerRecord<String, String> record = new ProducerRecord<>("bar", data);
        companion.produceStrings().fromRecords(record).awaitCompletion();

        ConsumerTask<String, String> records = companion.consumeStrings().fromTopics("bar").awaitNextRecord();
        assertThat(records.count(), is(1L));
        assertThat(records.getLastRecord().value(), is(data));
    }
}

The test succeeds ony for repetition 1.
All other repetitions fail, e.g.

java.lang.AssertionError: 
Expected: is <1L>
     but: was <2L>

or

java.lang.AssertionError: 
Expected: is "pNorBO2fTDAS88nyny7HVyfmvyNEDmlwjAQTIMAsKCiEI3V9PD7VIIEGj1mCh2CIqMFzE6aV2eclc7jkDYBclnDAwBVZkntO9NCj"
     but: was "k9rFJpanJ7GWHQ8jQ6CaRrEJyFLNGrEAdm17qjH3M6094IZwUec6vyLT6plckzf0EAGD5FqxgHLZKIOcSJvtYNU0uwSB4nLwv6x8"

Any ideas? Thanks!

Here an other test - same problems

@QuarkusTest
@QuarkusTestResource(KafkaCompanionResource.class)
class KafkaCompanionOtherIT {

    @InjectKafkaCompanion
    KafkaCompanion companion;

    @AfterEach
    void clearAllRecords() {
        companion.topics().clear();
    }

    @Test
    void test1() {
        String data = RandomStringUtils.randomAlphanumeric(100);
        ProducerRecord<String, String> record = new ProducerRecord<>("bar", data);
        companion.produceStrings().fromRecords(record).awaitCompletion();

        ConsumerTask<String, String> records = companion.consumeStrings().fromTopics("bar").awaitNextRecord();
        assertThat(records.count(), is(1L));
        assertThat(records.getLastRecord().value(), is(data));
    }

    @Test
    void test2() {
        String data = RandomStringUtils.randomAlphanumeric(100);
        ProducerRecord<String, String> record = new ProducerRecord<>("bar", data);
        companion.produceStrings().fromRecords(record).awaitCompletion();

        ConsumerTask<String, String> records = companion.consumeStrings().fromTopics("bar").awaitNextRecord();
        assertThat(records.count(), is(1L));
        assertThat(records.getLastRecord().value(), is(data));
    }

    @Test
    void test3() {
        String data = RandomStringUtils.randomAlphanumeric(100);
        ProducerRecord<String, String> record = new ProducerRecord<>("bar", data);
        companion.produceStrings().fromRecords(record).awaitCompletion();

        ConsumerTask<String, String> records = companion.consumeStrings().fromTopics("bar").awaitNextRecord();
        assertThat(records.count(), is(1L));
        assertThat(records.getLastRecord().value(), is(data));
    }
}

Works only for test1

I don't remember very well but I think you need to pass the names of the topics you want to "clear", like companion.topics().clear("bar");
Also, clearing the topic doesn't clear the offsets.