Receiving protobuf data triggers a kafka unknown error
hookover opened this issue · 2 comments
hookover commented
version:
kafka: 1.1.0
sarama-cluster: 2.1.15
sarama:1.22.1
topic: A, string data
eg:
producer
>a
>b
>c
topic: B, protobuf byte data (cannal read binlog data send to kafka https://github.com/alibaba/canal)
eg:
*��
mysql-bin.000001ǟ�. *UTF-80��ɷ-8B
rupiah_loanJuser_loan_planP�Xb
kafka-console-consumer customer A、B: no any errors
topic A customer: no any errors
topic B customer errors:
//Every 3 seconds print this:
Errors kafka: error while consuming rupiah_loan_user_loan/0: kafka server: Unexpected (unknown?) server error.
code
config := cluster.NewConfig()
config.Consumer.Return.Errors = true
config.Consumer.Offsets.Initial = sarama.OffsetOldest
customer, err := cluster.NewConsumer([]string{app.Cfg.App.GetString("KAFKA_BROKER_LIST")}, "dev.test", []string{"credit_data_plus_topic","rupiah_loan_user_loan_plan"}, config)
if err != nil {
fmt.Println("kafaka启动失败")
}
for {
select {
case noti := <-customer.Notifications():
fmt.Printf("noti: %v\n", noti)
case err := <-customer.Errors():
fmt.Printf("errors:%v\n", err)
case msg := <-customer.Messages():
fmt.Println(msg.Topic, msg.Offset)
default:
time.Sleep(time.Second * 1)
fmt.Println("间隔1s")
}
}
log:
rupiah_loan_user_loan_plan 208
rupiah_loan_user_loan_plan 210
rupiah_loan_user_loan_plan 212
rupiah_loan_user_loan_plan 214
rupiah_loan_user_loan_plan 216
rupiah_loan_user_loan_plan 218
errors:kafka: error while consuming rupiah_loan_user_loan_plan/0: kafka server: Unexpected (unknown?) server error.
rupiah_loan_user_loan_plan 220
rupiah_loan_user_loan_plan 222
rupiah_loan_user_loan_plan 224
errors:kafka: error while consuming rupiah_loan_user_loan_plan/0: kafka server: Unexpected (unknown?) server error.
rupiah_loan_user_loan_plan 226
errors:kafka: error while consuming rupiah_loan_user_loan_plan/0: kafka server: Unexpected (unknown?) server error.
errors:kafka: error while consuming rupiah_loan_user_loan_plan/0: kafka server: Unexpected (unknown?) server error.
errors:kafka: error while consuming rupiah_loan_user_loan_plan/0: kafka server: Unexpected (unknown?) server error.
errors:kafka: error while consuming rupiah_loan_user_loan_plan/0: kafka server: Unexpected (unknown?) server error.
errors:kafka: error while consuming rupiah_loan_user_loan_plan/0: kafka server: Unexpected (unknown?) server error.
errors:kafka: error while consuming rupiah_loan_user_loan_plan/0: kafka server: Unexpected (unknown?) server error.
errors:kafka: error while consuming rupiah_loan_user_loan_plan/0: kafka server: Unexpected (unknown?) server error.
..........
But when I restart the program, it will be consumed from beginning to end.
However, if config.Consumer.Offsets.Initial is configured as sarama.OffsetNewest, a piece of data is consumed and the error is always reported.
like this
# go run main.go
rupiah_loan_user_loan_plan 250
errors:kafka: error while consuming rupiah_loan_user_loan_plan/0: kafka server: Unexpected (unknown?) server error.
errors:kafka: error while consuming rupiah_loan_user_loan_plan/0: kafka server: Unexpected (unknown?) server error.
^Csignal: interrupt
# go run main.go
rupiah_loan_user_loan_plan 260
errors:kafka: error while consuming rupiah_loan_user_loan_plan/0: kafka server: Unexpected (unknown?) server error.
errors:kafka: error while consuming rupiah_loan_user_loan_plan/0: kafka server: Unexpected (unknown?) server error.
errors:kafka: error while consuming rupiah_loan_user_loan_plan/0: kafka server: Unexpected (unknown?) server error.
hookover commented
Using other libraries to solve the problem
wbrucelee commented
I also meet this problem