DeadLetterPolicy in Apache Pulsar Storm Adapter Not implemented
Opened this issue · 0 comments
Describe the bug
v2.11.0
There is no way to negatively acknowledge the consumer and the registry method for DeadLetterPolicy is broken
ConsumerConfigurationData<byte[]> subscriptionConfig = new ConsumerConfigurationData<>(); subscriptionConfig.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest); subscriptionConfig.setSubscriptionType(SubscriptionType.Shared); subscriptionConfig.setDeadLetterPolicy(DeadLetterPolicy.builder() .deadLetterTopic(viestiSourceConfig.getDeadLetterTopic()).build());
PulsarSpoutV2 pulsarSpout = new PulsarSpoutV2( spoutConfiguration, ((ClientBuilderImpl) createBuilder(viestiSourceConfig)) .getClientConfigurationData() .clone(), subscriptionConfig);
This above code doesnt stick while creating PulsarSpout.
`
static class SpoutConsumer implements PulsarSpoutConsumer {
private Consumer<byte[]> consumer;
public SpoutConsumer(Consumer<byte[]> consumer) {
this.consumer = consumer;
}
public Message<byte[]> receive(int timeout, TimeUnit unit) throws PulsarClientException {
return this.consumer.receive(timeout, unit);
}
public void acknowledgeAsync(Message<?> msg) {
this.consumer.acknowledgeAsync(msg);
}
public void close() throws PulsarClientException {
this.consumer.close();
}
public void unsubscribe() throws PulsarClientException {
this.consumer.unsubscribe();
}
}
`
Also there is no Mechanism to negativelyAcknowledge a message.
Expected behavior
While Setting DeadletterPolicy It should not drop it while serialising. Negative Acks support should be there
Desktop (please complete the following information):
MacOs Ventura 13.4.1
java version "1.8.0_333"
Java(TM) SE Runtime Environment (build 1.8.0_333-b02)
Java HotSpot(TM) 64-Bit Server VM (build 25.333-b02, mixed mode)
Apache Storm 2.2.1
Trying to consume from Pulsar Topic in Apache Storm