streamnative/pulsar-spark

[BUG] Incorrect subscription name used by PulsarProvider when reader subscription name is provided already

grssam opened this issue · 6 comments

Describe the bug
Consider a scenario:

predefinedSubscription option is not provided.
pulsar.reader.subscriptionName option is provided
Optionally - subscriptionPrefix may or maynot be provided

In this case, as per logic, pulsar-spark will go ahead and subscribe to the topic using subscription name derived from $driverGroupIdPrefix-$topicPartition (here) which in turn is made up by $subscriptionPrefix-$topicPartition as that's what is passed to PulsarMetadataReader object here and other places.

But the issue is that since the option pulsar.reader.subscriptionName is set, the actual reader created here will use a partition-agnostic common subscription name.

To Reproduce
mentioned above

Expected behavior
Ideally, if a low level (reader level) override is used, the PulsarProvider and PulsarMetadataReader should use that vallue instead of deriving it from subscriptionPrefix and topicPartition combination.

nlu90 commented

@grssam These reader configs are for the Readers used when consuming data during the micro-batch execution. Please let me know if those readers don't take the config.

Readers created by PulsarProvider or PulsarMetaReader(or PulsarHelper) is not for consuming data but for some metadata checking or bookkeeping. They may be replaced with consumers in the following and don't need to follow the reader configs.

@grssam These reader configs are for the Readers used when consuming data during the micro-batch execution. Please let me know if those readers don't take the config.

Agreed. This works as expected, but..

Readers created by PulsarProvider or PulsarMetaReader(or PulsarHelper) is not for consuming data but for some metadata checking or bookkeeping. They may be replaced with consumers in the following and don't need to follow the reader configs.

The main objective is to be able to configure these subscription names as well. It can come from a new property as well. This is really important for systems using pulsar with their own Authorization Provider logic where there is a check to confirm that only deterministically valid subscription names are being used to read from any given topic.

nlu90 commented

This is really important for systems using pulsar with their own Authorization Provider logic where there is a check to confirm that only deterministically valid subscription names are being used to read from any given topic.

Good point.
So the "hidden" feature request here is to make all subscriptions used by the connector deterministic.
I can do a follow-up check to see how we can make this happen after the pulsar admin dependency removal.

In a way, yes, this is a feature request. I would also like to understand the rationale behind going the $subscriptionPrefix-$topicPartition approach and not simply using a $subscriptionName there to begin with. I might be missing some context.

nlu90 commented

We have a config called predefinedSubscription to let user provide a defined subscription name.

The $subscriptionPrefix-$topicPartition is some legacy code developed by the team at the very beginning of the connector. I also don't know the motivation here.

nlu90 commented

PulsarProvide and PulsarHelper(previously called PulsarMetadataReader) will only use consumer for offset management with the above-linked PR changes.

The Reader is dedicated to actual data consumption.