Stream merge all question
Closed this issue · 2 comments
haf commented
- |> List.traverseJobM (fun (s,i) -> infra.subscribeToStream s i)
- |> Job.map (Stream.ofSeq >> Stream.mergeAll)
+ |> List.traverseJobM (fun (s, i) ->
+ job {
+ let! s = infra.subscribeToStream s i
+ do! logger.infoWithBPJ (eventX "got stream")
+ do! Job.start (Stream.iterFun (fun e -> printfn "%A" e) s)
+ return ()
+ })
The removed stuff never yields events. The added stuff prints them. Doesn't mergeAll
merge all streams? What am I missing?
haf commented
Alt.choose [
inputs ^=> function
| Stream.Cons (event, next) ->
logger.verboseWithBPJ (eventX "Writer received event {eventId}" >> setField "eventId" event.id)
>>=. persistEvent pw event
>>= persistWriter
>>= running next
| Stream.Nil ->
let msg = "The writer got Stream.Nil – but the stream should be infinite."
logger.error (eventX msg)
failwith msg
shutdown ^=> shuttingDown
]
:> Job<_>
This is how we iterate it.
haf commented
Of course, traverseJobM
is to blame. At least I had isolated the two lines of code that mattered ;)