Hopac/Hopac

Stream merge all question

Closed this issue · 2 comments

haf commented
-          |> List.traverseJobM (fun (s,i) -> infra.subscribeToStream s i)
-          |> Job.map (Stream.ofSeq >> Stream.mergeAll)
+          |> List.traverseJobM (fun (s, i) ->
+            job {
+              let! s = infra.subscribeToStream s i
+              do! logger.infoWithBPJ (eventX "got stream")
+              do! Job.start (Stream.iterFun (fun e -> printfn "%A" e) s)
+              return ()
+            })

The removed stuff never yields events. The added stuff prints them. Doesn't mergeAll merge all streams? What am I missing?

haf commented

      Alt.choose [
        inputs ^=> function
          | Stream.Cons (event, next) ->
            logger.verboseWithBPJ (eventX "Writer received event {eventId}" >> setField "eventId" event.id)
            >>=. persistEvent pw event
            >>= persistWriter
            >>= running next
          | Stream.Nil ->
            let msg = "The writer got Stream.Nil – but the stream should be infinite."
            logger.error (eventX msg)
            failwith msg

        shutdown ^=> shuttingDown
      ]
      :> Job<_>

This is how we iterate it.

haf commented

Of course, traverseJobM is to blame. At least I had isolated the two lines of code that mattered ;)