Allow for batch message processing
martyall opened this issue · 7 comments
The library only allows for the background thread to process messages one at a time (see the definition here)
For my use case I'm hoping to use the background logger to log to both Elasticsearch and Postgres, and it would be extremely useful for performance reasons to be able to use the batch write operations. What do you think about putting in a hook to be able to create a background logger that can perform a log action over messages :: [msg]
where messages
is the value of the "flush queue" at any given time?
Terribly sorry for missing this comment, for some reason GitHub notifications do not work for me.
I see two options for you:
- keep the API as is and use it in order to provide batching
- extend an API
Keeping as-is
Use log action LogAction msg
to populate a buffer (say IO ref or some other buffer) and flush to send to the external service (pseudo code):
buffer <- newIORef DList.empty
forkBackgroundLogger c
(\msg -> modifyIORef' buffer (msg:))
(do msgs <- atomicModifyIORef buffer (\x -> ((), x))
flushToExternalService msgs)
Extend an API
- introduce method that will be aware of batches
forkBackgroundLoggerBatch :: Capacity -> LogAction IO [msg] -> IO () -> IO (BackgroundWorker msg)
- writes
forkBackgroundLogger c l a
asforkBackgroupLoggerBatch c (traverse_ (mask_ . unLogAction logAction))
The former seems simpler, but the later is more general especially if you have an other use-case for flush.
What would you think of those solutions?
when i implemented it for our use case I just vendored and modified the forkBackgroundLogger
function like this:
forkBatchBackgroundLogger :: Capacity -> Log.LogAction IO [msg] -> IO () -> IO (BackgroundWorker msg)
forkBatchBackgroundLogger (Capacity cap lim) logAction flush = do
queue <- newTBQueueIO cap
isAlive <- newTVarIO True
tid <- forkFinally
(forever $ do
msgs <- atomically $ fetch $ readTBQueue queue
unless (null msgs) $
mask_ . Log.unLogAction logAction $ msgs
flush)
(\_ ->
(do msgs <- atomically $ many $ readTBQueue queue
unless (null msgs) $
mask_ . Log.unLogAction logAction $ msgs
flush)
`finally` atomically (writeTVar isAlive False))
pure $ BackgroundWorker tid (writeTBQueue queue) isAlive
where
fetch
| Just n <- lim = someN n
| otherwise = some
someN :: Natural -> STM a -> STM [a]
someN 0 _ = pure []
someN n f = (:) <$> f <*> go n where
go 0 = pure []
go k = ((:) <$> f <*> go (k-1)) <|> pure []
At this point I'm not remembering enough about how things work to say if you can do this without an API change, what do you think?
I think the approach with IORef
will cover exactly your use case. However I'm not sure about the performance implications of that solution, your approach should consume less memory and be more effective, but I wonder about actual numbers.
But what about the bounded queue? Does the back pressure property still hold with your implementation ?
Yes so, what happens. You have original forkBackgroundLogger
forkBackgroundLogger :: Capacity -> LogAction IO msg -> IO () -> IO (BackgroundWorker msg)
forkBackgroundLogger (Capacity cap lim) logAction flush = do
queue <- newTBQueueIO cap
isAlive <- newTVarIO True
tid <- forkFinally
(forever $ do
msgs <- atomically $ fetch $ readTBQueue queue
for_ msgs $ mask_ . unLogAction logAction
flush)
(\_ ->
(do msgs <- atomically $ many $ readTBQueue queue
for_ msgs $ mask_ . unLogAction logAction
flush)
`finally` atomically (writeTVar isAlive False))
pure $ BackgroundWorker tid (writeTBQueue queue) isAlive
it remains exactly the same it has back pressure and executes an action one per item and flush once per bulk. So you want is to have a logger that delays an action until the flush. You can do it in a several ways, the simples and the most general is to write to a mutable variable (the simplest one is IORef
) and the flush action reads that variable and executes a bulk action.
In order to make appending fast you can use difference list (DList msg
), so you have:
postpone :: IORef [msg] -> msg -> IO ()
postpone ref msg = modifyIORef' ref (<> msg)
and
flush :: IORef [msg] -> ([msg] -> IO ()) -> IO ()
flush ref real_action = atomicModifyIORef ref (\x -> (x, empty)) >>= real_action . DList.toList
This way you write all the messages at once, each block no bigger than the lim
, and no more than cap
messages in the queue.
However I'm not sure that gathering messages in a difference list inside the IORef is the fastest thing in the world. So depending on your concrete use case you can use better strategies. For example gather list in a Aeson.builder, or raw buffer still a sort of a mutable variable is needed.
Hope that helps..
@martyall hello! Sorry for bothering, but I just want to make sure if the issue is resolved for you, or you still have a questions or think that the library still needs an improvement to support your use case?
Hey! Sorry for not responding, this seems like it would work fine!