apache/pulsar-client-go

[Bug] DLQ producer name conflicts when same name consumer sends messages to DLQ

geniusjoe opened this issue · 0 comments

Expected behavior

Two consumers with same consumer name should both create dlq successfully.

Actual behavior

One consumer throw exception below:
ERRO[0069] Failed to create DLQ producer dlq-topic="persistent://<tenant>/<namespace>/<topic>-<subscription>-DLQ" error="server error: ProducerBusy: org.apache.pulsar.broker.service.BrokerServiceException$NamingException: Producer with name '-<subscription>-<consumerName>-DLQ' is already connected to topic"

Steps to reproduce

Run two consumers with same configuration below, then one of the consumers will throw "Producer already connected to topic" exception:

func main() {
	client, err := pulsar.NewClient(pulsar.ClientOptions{
		URL:            "<pulsar service>:8080",
		Authentication: pulsar.NewAuthenticationToken("<token>"),
	})
	if err != nil {
		log.Fatal(err)
	}
	defer client.Close()

	consumer, err := client.Subscribe(pulsar.ConsumerOptions{
		Topic:            "<tenant>/<namespace>/<topic>",
		SubscriptionName: "<subscription>",
		Type:             pulsar.Shared,
		RetryEnable:      true,
		Name:             "<consumerName>",
		DLQ: &pulsar.DLQPolicy{
			MaxDeliveries: 3,
		},
	})
	if err != nil {
		log.Fatal(err)
	}
	defer consumer.Close()

	for {
		msg, err := consumer.Receive(context.Background())
		if err != nil {
			log.Fatal(err)
		}
		fmt.Printf("Received message msgId: %#v -- content: '%s'\n", msg.ID(), string(msg.Payload()))
		consumer.ReconsumeLater(msg, 2*time.Second)
		time.Sleep(1 * time.Second)
	}
}

I think this issue is related to 21589, 21888. Pulsar allows users to create multiple consumers with same name in one subscription, but it forbids to create same name producers.
Before v0.12 there is no default dlq producer names, so that same name consumers can create empty name dlq producers successfully. But when it comes to v0.12, two same name consumers will create two same name dlq producers and errors occurred. I think maybe we need to add a random suffix in dlq producer name like: opt.Name = fmt.Sprintf("%s-%s-%s-%s-DLQ", r.topicName, r.subscriptionName, r.consumerName, generateRandomName())

System configuration

Pulsar version: Release v0.12.1