Using EmbeddedKafkaCluster with KafkaConsumer
Closed this issue · 1 comments
I have tried to use EmbeddedKafkaCluster with KafkaProducer and KafkaConsumer. It works with KafkaProducer, but KafkaConsumer. My code is below.
public class InputConsumerTest {
private final static String TOPIC = "topic";
private final static String KEY = "bla";
private final static String KEY_2 = "bla bla";
private final static String INPUT = "blubb";
private final static String INPUT_2 = "blubb blubb";
private final static String ENCRYPTED = "gnampf";
private final static String ENCRYPTED_2 = "gnampf gnampf";
private static final String ENCRYPTION_KEY = "p4yzvXSakknClU";
@ClassRule
public static final EmbeddedKafkaCluster KAFKA_CLUSTER = EmbeddedKafkaCluster.provisionWith(
EmbeddedKafkaClusterConfig.useDefaults());
public void setupKafka() throws InterruptedException {
delay(5);
KAFKA_CLUSTER.start();
KAFKA_CLUSTER.createTopic(TopicConfig.forTopic(TOPIC).useDefaults());
delay(5);
}
@Test
public void test1() throws InterruptedException {
setupKafka();
Properties props = new Properties();
props.put("bootstrap.servers", KAFKA_CLUSTER.getBrokerList());
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 10; i < 100; i++)
producer.send(new ProducerRecord<String, String>(TOPIC, Integer.toString(i*2), Integer.toString(i*2)));
ObserveKeyValues<String, String> observeRequest = ObserveKeyValues.on(TOPIC, 1).useDefaults();
List<String> observedValues = KAFKA_CLUSTER.observeValues(observeRequest);
//observedValues could have been read.
producer.close();
props.put("bootstrap.servers", KAFKA_CLUSTER.getBrokerList());
props.put("group.id", "test5");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
//props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(TOPIC));
ConsumerRecords<String, String> records = consumer.poll(100);
//records is empty. It sould not be.
}
I fail to see why this is a problem with kafka-junit.
Your consumer does not override the ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
configuration property. This is set to
latest
by default, meaning that the consumer will start polling from the tail of the topic. So, as you already asserted correctly using kafka-junit, there are indeed records in the topic, but your consumer does not see them because it starts reading from the latest offset.
Set ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
to earliest
and try again.
The Confluent Community Slack is probably a better place to ask for help on Kafka-related questions.