
kafka(consumer): Consider processing multiple events

Closed this issue · 2 comments


kafka.partitionConsumer.consume, ranges over a channel of [].kgo.Record, then ranges over each of the kgo.Record, translates each of the kgo.Record into an apmqueue.Record and calls processor.Process(processCtx, record) for each of the records.


Lines 457 to 465 in c9b9cb1

for records := range pc.records {
// Store the last processed record. Default to -1 for cases where
// only the first record is received.
last := -1
for i, msg := range records {
meta := make(map[string]string)
for _, h := range msg.Headers {
meta[h.Key] = string(h.Value)

Any processor that has a mutex or any synchronisation primitives would benefit from receiving a single call with a list of apmqueue.Record, rather than individual calls.


  • Decreased contention on sync primitives for processors that require it.
  • More predictable behaviour based on the apmqueue.Processor API.


  • Increased memory usage (Could be mitigated by reusing slices using a sync.Pool)

Based on recent benchmarks it doesn't look like this is necessarily the bottleneck, so deprioritizing this for 2-3 iterations.

axw commented

This also conflicts with #277, so we'd need to decide which one to implement.