conf := *common.Config{
BackOffTime: 2,
MaximumRetry: 3,
Version: "2.5.1",
Group: "my-group",
Host: []string{"localhost:9092"},
Debug: true,
AutoCommit: false,
}
broker, err = NewBroker(common.KafkaBrokerType, conf)
if err != nil {
panic(err)
}
handler := func(ctx context.Context, msg []byte) {
// Do something
}
broker.RegisterHandler("my-topic", handler)
go broker.Start(func(ctx context.Context, err error) {
// Do something after broker is cleaned-up
})
examples handler
func (c Customer) newHandler(topic string) (handler kafka1.Handler, err error) {
handler = func(ctx context.Context, msg []byte) {
switch topic {
case "test":
// Do something
err = c.customer.MsgReceiver(ctx, msg)
c.kafka.Session().Commit()
default:
fmt.Println(string(msg), "with default")
}
}
return handler, err
}
msg := []byte("my message")
err = broker.SendTopicMessage("my-topic", msg)