Got " Failed to create consumer at reconnect" when stop pulsar broker first and restart it later
zhangbiao2009 opened this issue · 3 comments
Expected behavior
Expect no error happens.
Actual behavior
Steps to reproduce
- run the go example program
- stop pulsar broker
- observe the output of go example, start pulsar broker after you see this error message:
error="dial tcp [::1]:6650: connect: connection refused" remote_addr="pulsar://localhost:6650"
- you can see the
Failed to create consumer at reconnect
now.
program example to reproduce:
package main
import (
"context"
"fmt"
"log"
"net/http"
_ "net/http/pprof"
"github.com/apache/pulsar-client-go/pulsar"
)
func main() {
go func() {
log.Println(http.ListenAndServe("localhost:6060", nil))
}()
// Create a Pulsar client
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
log.Fatalf("Could not create Pulsar client: %v", err)
}
defer client.Close()
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "my-topic",
SubscriptionName: "my-subscription",
Type: pulsar.KeyShared,
})
if err != nil {
fmt.Printf("Could not create consumer: %v\n", err)
return
}
go func() {
for {
msg, err := consumer.Receive(context.Background())
if err != nil {
log.Printf("Error receiving message: %v", err)
break
}
// Process the message
fmt.Printf("Received message msgId: %v -- content: '%s'\n",
msg.ID(), string(msg.Payload()))
// Acknowledge the message
consumer.Ack(msg)
}
}()
// Keep the main function alive to allow the goroutine to run
select {}
}
System configuration
Macbook Pro 2021
go version: 1.23.0
pulsar go sdk version: 0.13.1
pulsar broker version: latest docker image. I run it with this command:
docker run -it \ -p 6650:6650 \ -p 8080:8080 \ apachepulsar/pulsar:latest \ bin/pulsar standalone
my-topic does not exist.
my-topic does not exist.
@nodece Thanks for the reply.
I think I got what you mean, the reason why it fails to reconnect because the topic doesn't exist after I re-ran the docker pulsar command, because the topic was stored locally in the docker container, they were lost after I restarted the docker command. This error wouldn't happen in a real environment because topics are stored in BookKeeper. Is that correct?
I also tried specifying a volume for it to store topics persistently:
docker run -it \ -p 6650:6650 \ -p 8080:8080 \ -v pulsar_data:/pulsar/data \ apachepulsar/pulsar:latest \ bin/pulsar standalone
It can reconnect successfully when I repeat my test.
However, I still think this is an issue, I checked the running goroutines when the Topic Not Found
error happened, the consumer.Receive()
still blocks in this case. I think if we cannot reconnect successfully, consumer.Receive()
should return an error, otherwise the caller can do nothing about it but getting stuck there. Do you think so?
We need to introduce the Golang context
, which can help us break the receiving process.