jet/kafunk

Pool buffers

Opened this issue · 0 comments

Prototype using .NET BufferManager here. The interface is:

type IBufferPool
val alloc : int -> byte[]
val free : byte[] -> unit

Pooling buffers for requests is trivial because the lifecycle is controlled explicitly by the Chan module - the region is well known.

Pooling buffers for responses is more challenging, because the response lifecycle is in the hands of the user. For example, take the Consumer.stream operation which yields control to the caller.

Some options:

  • Embed a decoder (ConsumerMessageSet -> 'a) into the consumer such that after decoding, the buffers could be freed.
  • Only use pooling in conjunction with Consumer.consume and a contract to free buffers corresponding to the message set after the user provided handler completes.
  • Have ConsumerMessageSet implement IDisposable, along with a finalizer, and urge users to dispose when done.
  • Provide an explicit "finalization queue" such that consumed message set buffers are freed automatically after the fetch cursor is ahead by N, thus assuming the message set would have been used. This can be used in conjunction with the prior option, or without it, foregoing the need to implement IDisposable.
  • Allow the user to configure IBufferPool and have the responsibility to free buffers for `ConsumerMessageSet.
  • Consider linear/affine type system approach as in here. Perhaps, as a linear variant of AsyncSeq which would eschew buffering, thereby giving us a controlled lifecycle for its elements.