mailgun/kafka-pixy

gRPC GoLang quickstart guide

Closed this issue · 4 comments

Would be great to see gRPC quickstart guide for GoLang :)

Another 👍 from me. This is what it currently says in the quick-start-golang.md file:

If you need this, please create an issue and I will add the content in no time.

😄

@hermanschaaf yea, it does not look like "no time" at all, but i finally did it. Please check it out and let me know if it is good enough.

Great, thanks @horkhe! I had to make some tweaks due to variable naming / syntax errors in the sample code, but got it working. Here is the modified version of RunConsumeNAck that works for me:


// Runs consume-n-ack loop until context is done. Note that inner gRPC calls
// do not use provided context, that is intentional because we want for the
// current request to finish gracefully. Otherwise previously consumed message
// may not be properly acknowledged and will be consumed again.
func RunConsumeNAck(ctx context.Context, group, topic string, msgHandler func(msg []byte)) error {
	// Consume first message.
	var rs *pb.ConsRs
	var err error
	for {
		select {
		case <-ctx.Done():
			return nil
		default:
		}
		rs, err = client.ConsumeNAck(context.TODO(), &pb.ConsNAckRq{
			Topic: topic,
			Group: group,
			NoAck: true,
		})
		if err != nil {
			if status.Code(err) == codes.NotFound {
				continue
			}
			return errors.Wrap(err, "while consuming first")
		}
		break
	}
	msgHandler(rs.Message)
	// Run consume+ack loop.
	ackPartition := rs.Partition
	ackOffset := rs.Offset
	for {
		select {
		case <-ctx.Done():
			return nil
		default:
		}
		rs, err = client.ConsumeNAck(context.TODO(), &pb.ConsNAckRq{
			Topic:        topic,
			Group:        group,
			AckPartition: ackPartition,
			AckOffset:    ackOffset,
		})
		if err != nil {
			if status.Code(err) == codes.NotFound {
				continue
			}
			return errors.Wrap(err, "while consuming")
		}
		ackPartition = rs.Partition
		ackOffset = rs.Offset
		msgHandler(rs.Message)
	}
	// Ack the last consumed message.
	_, err = client.Ack(context.TODO(), &pb.AckRq{
		Topic:     topic,
		Group:     group,
		Partition: ackPartition,
		Offset:    ackOffset,
	})
	if err != nil {
		return errors.Wrapf(err, "while acking last")
	}
	return nil
}

@hermanschaaf thank you for corrections, I made your changes in the PR.