apache/pulsar-client-go

Got " Failed to create consumer at reconnect" when stop pulsar broker first and restart it later

zhangbiao2009 opened this issue · 3 comments

Expected behavior

Expect no error happens.

Actual behavior

image

Steps to reproduce

  1. run the go example program
  2. stop pulsar broker
  3. observe the output of go example, start pulsar broker after you see this error message: error="dial tcp [::1]:6650: connect: connection refused" remote_addr="pulsar://localhost:6650"
  4. you can see the Failed to create consumer at reconnect now.

program example to reproduce:

package main

import (
	"context"
	"fmt"
	"log"

	"net/http"
	_ "net/http/pprof"

	"github.com/apache/pulsar-client-go/pulsar"
)

func main() {

	go func() {
		log.Println(http.ListenAndServe("localhost:6060", nil))
	}()

	// Create a Pulsar client
	client, err := pulsar.NewClient(pulsar.ClientOptions{
		URL: "pulsar://localhost:6650",
	})
	if err != nil {
		log.Fatalf("Could not create Pulsar client: %v", err)
	}
	defer client.Close()

	consumer, err := client.Subscribe(pulsar.ConsumerOptions{
		Topic:            "my-topic",
		SubscriptionName: "my-subscription",
		Type:             pulsar.KeyShared,
	})
	if err != nil {
		fmt.Printf("Could not create consumer: %v\n", err)
		return
	}
	go func() {
		for {
			msg, err := consumer.Receive(context.Background())
			if err != nil {
				log.Printf("Error receiving message: %v", err)
				break
			}

			// Process the message
			fmt.Printf("Received message msgId: %v -- content: '%s'\n",
				msg.ID(), string(msg.Payload()))

			// Acknowledge the message
			consumer.Ack(msg)
		}
	}()

	// Keep the main function alive to allow the goroutine to run
	select {}
}

System configuration

Macbook Pro 2021
go version: 1.23.0
pulsar go sdk version: 0.13.1

pulsar broker version: latest docker image. I run it with this command:
docker run -it \ -p 6650:6650 \ -p 8080:8080 \ apachepulsar/pulsar:latest \ bin/pulsar standalone

my-topic does not exist.

my-topic does not exist.

@nodece Thanks for the reply.

I think I got what you mean, the reason why it fails to reconnect because the topic doesn't exist after I re-ran the docker pulsar command, because the topic was stored locally in the docker container, they were lost after I restarted the docker command. This error wouldn't happen in a real environment because topics are stored in BookKeeper. Is that correct?

I also tried specifying a volume for it to store topics persistently:
docker run -it \ -p 6650:6650 \ -p 8080:8080 \ -v pulsar_data:/pulsar/data \ apachepulsar/pulsar:latest \ bin/pulsar standalone
It can reconnect successfully when I repeat my test.

However, I still think this is an issue, I checked the running goroutines when the Topic Not Found error happened, the consumer.Receive() still blocks in this case. I think if we cannot reconnect successfully, consumer.Receive() should return an error, otherwise the caller can do nothing about it but getting stuck there. Do you think so?

We need to introduce the Golang context, which can help us break the receiving process.