elastic/apm-queue

kafka(consumer): Consider processing multiple events

Closed this issue · 2 comments

Description

kafka.partitionConsumer.consume, ranges over a channel of [].kgo.Record, then ranges over each of the kgo.Record, translates each of the kgo.Record into an apmqueue.Record and calls processor.Process(processCtx, record) for each of the records.

apm-queue/kafka/consumer.go

Lines 457 to 465 in c9b9cb1

for records := range pc.records {
// Store the last processed record. Default to -1 for cases where
// only the first record is received.
last := -1
for i, msg := range records {
meta := make(map[string]string)
for _, h := range msg.Headers {
meta[h.Key] = string(h.Value)
}

Any processor that has a mutex or any synchronisation primitives would benefit from receiving a single call with a list of apmqueue.Record, rather than individual calls.

Advantages

  • Decreased contention on sync primitives for processors that require it.
  • More predictable behaviour based on the apmqueue.Processor API.

Disadvantages

  • Increased memory usage (Could be mitigated by reusing slices using a sync.Pool)

Based on recent benchmarks it doesn't look like this is necessarily the bottleneck, so deprioritizing this for 2-3 iterations.

axw commented

This also conflicts with #277, so we'd need to decide which one to implement.