clj-commons/manifold

manifold.stream/on-closed never called upstream

stijnopheide opened this issue · 5 comments

Manifold's map, filter, ... operators don't call the on-closed handler of the source immediately. Only when a value is put into it.

e.g.

(let [s1 (s/stream)
      _ (s/on-closed s1 #(println "closed s1"))
      out (s/map inc s1)]
  (s/close! out))
=> nil

If you put a value in s1 it does call on-closed

(let [s1 (s/stream)
      _ (s/on-closed s1 #(println "closed s1"))
      out (s/map inc s1)]
  (s/close! out)
  (s/put! s1 1))
closed s1
=> << true >>

I'm not sure if this is an issue, but I have 2 questions here:

  1. Why is on-closed not called immediately on the upstream source?
  2. Why is put! returning << true >>?

I came across this behaviour when investigating an issue I ran into with concat:

(let [s1 (s/stream)
      s2 (s/stream)
      _ (s/on-closed s1 #(println "closed s1"))
      _ (s/on-closed s2 #(println "closed s2"))
      out (s/concat (s/->source [s1 s2]))]
  (s/close! out)
  [(s/put! s1 1) (s/put! s2 2) (s/take! out)])
=> [<< true >> << true >> << nil >>]

Here, on-closed on the 2 sources is never called. Is this intended behaviour?

The use case is the following: watch a directory for new files by concat'ing a stream of currently existing files and a stream attached to a hawk watch. If the resulting stream gets closed, the hawk watch should be stopped (by registering it as a callback on on-closed for the hawk stream), but is never called.

Or maybe this should be better phrased as a question: How can I ensure everything upstream is being closed?

This is the result of an bug in concat which closes the stream-of-streams, but not the individual streams themselves. I'm pushing a fix.

To be clear, there is also the issue where on-closed isn't triggered immediately, but only once a put! or take! fails. This is a design decision, as it makes the lifecycle simpler to reason about in the implementation, but I think it is a consistent source of confusion for people using the library. I'll need to give this a bit more thought, but for now I plan to leave things as they are.

I was planning on using the on-closed event on a stream to bubble back upwards to close a RethinkDB changefeed query (a query that sends down changes to a table). However I think the current behaviour would mean that the changefeed would only be closed after RethinkDB tried to send another change down the line. It's not the end of the world, as you only need one TCP connection to run many queries over, so you're just leaving the changefeed active on the server, but it is a little unclean.

Thanks for the concat fix @ztellman. On the issue when on-closed is triggered, I'm in the same situation as @danielcompton: just some extra resources being used, so not a big deal. However, what might be more troubling is that the put! immediately after a close! still returns successfully. Only then puts will start to return false.

(let [s1 (s/stream)
      _ (s/on-closed s1 #(println "closed s1"))
      out (s/map inc s1)]
  (s/close! out)
  [(s/put! s1 1) (s/put! s1 2)])
closed s1
=> [<< true >> << false >>]