apache/pulsar-adapters

DeadLetterPolicy in Apache Pulsar Storm Adapter Not implemented

Opened this issue · 0 comments

Describe the bug
v2.11.0
There is no way to negatively acknowledge the consumer and the registry method for DeadLetterPolicy is broken

ConsumerConfigurationData<byte[]> subscriptionConfig = new ConsumerConfigurationData<>(); subscriptionConfig.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest); subscriptionConfig.setSubscriptionType(SubscriptionType.Shared); subscriptionConfig.setDeadLetterPolicy(DeadLetterPolicy.builder() .deadLetterTopic(viestiSourceConfig.getDeadLetterTopic()).build());
PulsarSpoutV2 pulsarSpout = new PulsarSpoutV2( spoutConfiguration, ((ClientBuilderImpl) createBuilder(viestiSourceConfig)) .getClientConfigurationData() .clone(), subscriptionConfig);

This above code doesnt stick while creating PulsarSpout.

`
static class SpoutConsumer implements PulsarSpoutConsumer {
private Consumer<byte[]> consumer;

    public SpoutConsumer(Consumer<byte[]> consumer) {
        this.consumer = consumer;
    }

    public Message<byte[]> receive(int timeout, TimeUnit unit) throws PulsarClientException {
        return this.consumer.receive(timeout, unit);
    }

    public void acknowledgeAsync(Message<?> msg) {
        this.consumer.acknowledgeAsync(msg);
    }

    public void close() throws PulsarClientException {
        this.consumer.close();
    }

    public void unsubscribe() throws PulsarClientException {
        this.consumer.unsubscribe();
    }
}

`

Also there is no Mechanism to negativelyAcknowledge a message.

Expected behavior

While Setting DeadletterPolicy It should not drop it while serialising. Negative Acks support should be there

Desktop (please complete the following information):
MacOs Ventura 13.4.1
java version "1.8.0_333"
Java(TM) SE Runtime Environment (build 1.8.0_333-b02)
Java HotSpot(TM) 64-Bit Server VM (build 25.333-b02, mixed mode)
Apache Storm 2.2.1

Trying to consume from Pulsar Topic in Apache Storm