PulsarKafkaProducer/Consumer do not set required configuration properties.
afedulov opened this issue · 0 comments
afedulov commented
Describe the bug
new PulsarKafkaProducer<>(
props, new StringSerializer(), new StringSerializer<>());
fails with
org.apache.kafka.common.config.ConfigException: Missing required configuration "key.serializer" which has no default value.
Similarly, PulsarKafkaConsumer fails with missing key.deserializer/value.serializer
.
To Reproduce
import java.util.Properties;
import org.apache.kafka.clients.producer.PulsarKafkaProducer;
import org.apache.kafka.common.serialization.StringSerializer;
public class ReproducePulsarBug {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "pulsar://localhost:49377");
// Only works if below two lines are commented in:
// props.put("key.serializer", StringSerializer.class.getName());
// props.put("value.serializer", StringSerializer.class.getName());
new PulsarKafkaProducer<>(
props, new StringSerializer(), new StringSerializer());
}
}
Additional context
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-kafka-original</artifactId>
<version>2.8.0</version>
</dependency>