awakesecurity/gRPC-haskell

Server crashes when using async with ServerStreaming

lzszt opened this issue · 2 comments

lzszt commented

I'm trying to implement a server, which spawns an asynchronous action.
I then call send function from within this async block.
This results in the server crashing with various errors:

E0511 07:02:51.828270196   10996 server.cc:201]              assertion failed: queue.Pop() == nullptr
Aborted (core dumped)
double free or corruption (out)
Aborted (core dumped)
double free or corruption (!prev)
Segmentation fault (core dumped)
corrupted size vs. prev_size while consolidating
Aborted (core dumped)

The server implementation looks like this:

startSubscriptionHandler ::
  ServerRequest
    'ServerStreaming
    Api.StartStreamRequest
    Api.StreamNotification ->
  IO
    (ServerResponse 'ServerStreaming Api.StreamNotification)
startSubscriptionHandler (ServerWriterRequest _metadata Api.StartStreamRequest send) = do
  reponseCode <- do
    async $
      forM_ [1 :: Word32 .. 1000000] $ \i -> do
        res <- send $ Api.StreamNotification i
        case res of
          Left err -> fail $ "error: " <> show err
          Right () -> pure ()
    pure StatusOk
  pure $ ServerWriterResponse mempty reponseCode mempty

main :: IO ()
main = do
  Api.dataApiServer
    ( Api.DataApi
        startSubscriptionHandler
    )
    defaultServiceOptions

Am I doing something wrong? Or is the use async in this context not supported?

gbaz commented

This looks wrong to me. My recollection is that sending the response finishes the interaction -- so you can only send the response after you finish sending the streaming results.

lzszt commented

Thank you, this seems to have fixed to problem.