qnikst/co-log-concurrent

Allow for batch message processing

martyall opened this issue · 7 comments

The library only allows for the background thread to process messages one at a time (see the definition here)

For my use case I'm hoping to use the background logger to log to both Elasticsearch and Postgres, and it would be extremely useful for performance reasons to be able to use the batch write operations. What do you think about putting in a hook to be able to create a background logger that can perform a log action over messages :: [msg] where messages is the value of the "flush queue" at any given time?

Terribly sorry for missing this comment, for some reason GitHub notifications do not work for me.

I see two options for you:

  1. keep the API as is and use it in order to provide batching
  2. extend an API

Keeping as-is

Use log action LogAction msg to populate a buffer (say IO ref or some other buffer) and flush to send to the external service (pseudo code):

buffer <- newIORef DList.empty
forkBackgroundLogger c
   (\msg -> modifyIORef' buffer (msg:))
   (do msgs <- atomicModifyIORef buffer (\x -> ((), x))
          flushToExternalService msgs)

Extend an API

  1. introduce method that will be aware of batches
forkBackgroundLoggerBatch :: Capacity -> LogAction IO [msg] -> IO () -> IO (BackgroundWorker msg)
  1. writes forkBackgroundLogger c l a as forkBackgroupLoggerBatch c (traverse_ (mask_ . unLogAction logAction))

The former seems simpler, but the later is more general especially if you have an other use-case for flush.

What would you think of those solutions?

when i implemented it for our use case I just vendored and modified the forkBackgroundLogger function like this:

forkBatchBackgroundLogger :: Capacity -> Log.LogAction IO [msg] -> IO () -> IO (BackgroundWorker msg)
forkBatchBackgroundLogger (Capacity cap lim) logAction flush = do
  queue <- newTBQueueIO cap
  isAlive <- newTVarIO True
  tid <- forkFinally
    (forever $ do
      msgs <- atomically $ fetch $ readTBQueue queue
      unless (null msgs) $
        mask_ . Log.unLogAction logAction $ msgs
      flush)
    (\_ ->
       (do msgs <- atomically $ many $ readTBQueue queue
           unless (null msgs) $
             mask_ . Log.unLogAction logAction $ msgs
           flush)
         `finally` atomically (writeTVar isAlive False))
  pure $ BackgroundWorker tid (writeTBQueue queue) isAlive
  where
    fetch
      | Just n <- lim = someN n
      | otherwise = some
    someN :: Natural -> STM a -> STM [a]
    someN 0 _ = pure []
    someN n f = (:) <$> f <*> go n where
      go 0 = pure []
      go k = ((:) <$> f <*> go (k-1)) <|> pure []

At this point I'm not remembering enough about how things work to say if you can do this without an API change, what do you think?

I think the approach with IORef will cover exactly your use case. However I'm not sure about the performance implications of that solution, your approach should consume less memory and be more effective, but I wonder about actual numbers.

But what about the bounded queue? Does the back pressure property still hold with your implementation ?

Yes so, what happens. You have original forkBackgroundLogger

forkBackgroundLogger :: Capacity -> LogAction IO msg -> IO () -> IO (BackgroundWorker msg)
forkBackgroundLogger (Capacity cap lim) logAction flush = do
  queue <- newTBQueueIO cap
  isAlive <- newTVarIO True
  tid <- forkFinally
    (forever $ do
      msgs <- atomically $ fetch $ readTBQueue queue
      for_ msgs $ mask_ . unLogAction logAction
      flush)
    (\_ ->
       (do msgs <- atomically $ many $ readTBQueue queue
           for_ msgs $ mask_ . unLogAction logAction
           flush)
         `finally` atomically (writeTVar isAlive False))
  pure $ BackgroundWorker tid (writeTBQueue queue) isAlive

it remains exactly the same it has back pressure and executes an action one per item and flush once per bulk. So you want is to have a logger that delays an action until the flush. You can do it in a several ways, the simples and the most general is to write to a mutable variable (the simplest one is IORef) and the flush action reads that variable and executes a bulk action.
In order to make appending fast you can use difference list (DList msg), so you have:

postpone :: IORef [msg] -> msg -> IO ()
postpone ref msg = modifyIORef' ref (<> msg)

and

flush :: IORef [msg] -> ([msg] -> IO ()) -> IO ()
flush ref real_action = atomicModifyIORef ref (\x -> (x, empty)) >>= real_action . DList.toList

This way you write all the messages at once, each block no bigger than the lim, and no more than cap messages in the queue.

However I'm not sure that gathering messages in a difference list inside the IORef is the fastest thing in the world. So depending on your concrete use case you can use better strategies. For example gather list in a Aeson.builder, or raw buffer still a sort of a mutable variable is needed.

Hope that helps..

@martyall hello! Sorry for bothering, but I just want to make sure if the issue is resolved for you, or you still have a questions or think that the library still needs an improvement to support your use case?

Hey! Sorry for not responding, this seems like it would work fine!