bsm/sarama-cluster

Receiving protobuf data triggers a kafka unknown error

hookover opened this issue · 2 comments

version:
kafka: 1.1.0
sarama-cluster: 2.1.15
sarama:1.22.1

topic: A, string data
eg:

producer
>a
>b 
>c

topic: B, protobuf byte data (cannal read binlog data send to kafka https://github.com/alibaba/canal)
eg:

  *��
mysql-bin.000001ǟ�. *UTF-80��ɷ-8B
                                 rupiah_loanJuser_loan_planP�Xb
	

kafka-console-consumer customer A、B: no any errors
topic A customer: no any errors
topic B customer errors:

//Every 3 seconds print this: 
Errors kafka: error while consuming rupiah_loan_user_loan/0: kafka server: Unexpected (unknown?) server error.

code

	config := cluster.NewConfig()
	config.Consumer.Return.Errors = true
	config.Consumer.Offsets.Initial = sarama.OffsetOldest

	customer, err := cluster.NewConsumer([]string{app.Cfg.App.GetString("KAFKA_BROKER_LIST")}, "dev.test", []string{"credit_data_plus_topic","rupiah_loan_user_loan_plan"}, config)
	if err != nil {
		fmt.Println("kafaka启动失败")
	}
	for {
		select {
		case noti := <-customer.Notifications():
			fmt.Printf("noti: %v\n", noti)
		case err := <-customer.Errors():
			fmt.Printf("errors:%v\n", err)
		case msg := <-customer.Messages():
			fmt.Println(msg.Topic, msg.Offset)
		default:
			time.Sleep(time.Second * 1)
			fmt.Println("间隔1s")
		}
	}

log:

rupiah_loan_user_loan_plan 208
rupiah_loan_user_loan_plan 210
rupiah_loan_user_loan_plan 212
rupiah_loan_user_loan_plan 214
rupiah_loan_user_loan_plan 216
rupiah_loan_user_loan_plan 218
errors:kafka: error while consuming rupiah_loan_user_loan_plan/0: kafka server: Unexpected (unknown?) server error.
rupiah_loan_user_loan_plan 220
rupiah_loan_user_loan_plan 222
rupiah_loan_user_loan_plan 224
errors:kafka: error while consuming rupiah_loan_user_loan_plan/0: kafka server: Unexpected (unknown?) server error.
rupiah_loan_user_loan_plan 226
errors:kafka: error while consuming rupiah_loan_user_loan_plan/0: kafka server: Unexpected (unknown?) server error.
errors:kafka: error while consuming rupiah_loan_user_loan_plan/0: kafka server: Unexpected (unknown?) server error.
errors:kafka: error while consuming rupiah_loan_user_loan_plan/0: kafka server: Unexpected (unknown?) server error.
errors:kafka: error while consuming rupiah_loan_user_loan_plan/0: kafka server: Unexpected (unknown?) server error.
errors:kafka: error while consuming rupiah_loan_user_loan_plan/0: kafka server: Unexpected (unknown?) server error.
errors:kafka: error while consuming rupiah_loan_user_loan_plan/0: kafka server: Unexpected (unknown?) server error.
errors:kafka: error while consuming rupiah_loan_user_loan_plan/0: kafka server: Unexpected (unknown?) server error.
..........

But when I restart the program, it will be consumed from beginning to end.
However, if config.Consumer.Offsets.Initial is configured as sarama.OffsetNewest, a piece of data is consumed and the error is always reported.
like this

#  go run main.go
rupiah_loan_user_loan_plan 250
errors:kafka: error while consuming rupiah_loan_user_loan_plan/0: kafka server: Unexpected (unknown?) server error.
errors:kafka: error while consuming rupiah_loan_user_loan_plan/0: kafka server: Unexpected (unknown?) server error.
^Csignal: interrupt

# go run main.go
rupiah_loan_user_loan_plan 260
errors:kafka: error while consuming rupiah_loan_user_loan_plan/0: kafka server: Unexpected (unknown?) server error.
errors:kafka: error while consuming rupiah_loan_user_loan_plan/0: kafka server: Unexpected (unknown?) server error.
errors:kafka: error while consuming rupiah_loan_user_loan_plan/0: kafka server: Unexpected (unknown?) server error.

Using other libraries to solve the problem

I also meet this problem