clj-commons/claypoole

Fully lazy version of pmap

mjwillson opened this issue · 10 comments

This is a really neat library which does almost exactly what I need!

One thing kinda confused me / gave me reservations though, and that's this "eager streaming" thing. I may have gotten the wrong end of the stick about this, but I'm not really clear on why (other than convenience for a subset of use cases) this is mandatory, rather than taking the more idiomatic clojure approach of making it lazy by default and letting people stick a dorun / doseq / doall at the end if they want to force the work to happen. (Incidentally I notice your examples in the README do this, which would seem to make the eagerness stuff redundant).

As you mention, eagerly consuming the input means that a lot of memory can be consumed by unrealised futures if pmapping over something like (range 1000000).

It looks like you're not holding onto the head of the input within pmap itself, nor evaluating all of the input before work starts (which was my intitial concern).

But it seems the problem extends to any case where the consumer can't keep up with rate at which pmap generates output. In these cases you could end up bringing more and more results into memory until you get an OOM, since the consumer holds onto the head up to the point they've managed to consume up to, and the producer (pmap) is eagerly producing results faster than the consumer is advancing.

In my case I'm doing something like (->> list-of-urls (pmap download) ... (pmap do-something-expensive)). I can download URLs faster than I can process them. But I sure as hell don't want to eagerly bring them into memory as fast as I can download them, since I'll OOM. I don't want to buffer any more data than is required by the rate-limiting step in the pipeline, i.e. one piece of input for every thread in the thread pool which is doing the bottleneck work.

It seems to me that forcing eagerness at every step along the pipeline makes this hard to do, although it's quite likely I'm missing something. I can't see how to do this (allow the bottleneck step to rate-limit how much data is pulled through preceding steps) without laziness.

Do you have any pointers on what to do in cases like this? Or is there any chance we could add lazy pmap as an option (better yet make it lazy by default)?

Cheers!

Just edited this -- I realised you're not holding onto the head of the input nor the output (right?).

But it seems the problem still remains if pmap is able to produce output faster than the output can be consumed. pmap will eagerly rush ahead of the consumer buffering more and more results in memory.

Whatever does end up decided, it'd be good to clarify some of this stuff a bit more in the README. This eager streaming thing feels like quite a subtle topic and not necessarily the way people are used to things working in clojure, so would be good to clarify the rationale and any trade-offs.

Yeah, Claypoole doesn't hold onto the head of the input or the output. And to prevent the runaway task creation, it only realizes a few more futures than are currently being processed in the threadpool (in claypoole/buffer-blocking-seq), but the machinery for that is kind of complex. So, at least some of your worries are already taken care of, though of course your main fear is correct: two pmaps going at different speeds could build up a growing buffer between them.

I wanted Claypoole to be eager because it's very hard to get some of the desired behavior with lazy sequences. One of the bad things about built-in pmap is that if one item hangs, e.g.

(pmap #(Thread/sleep %) (cons 100000 (range 10)))

then the pmap stops doing anything in parallel--it finishes ncpus+2 parallel tasks and then doesn't start any more work until the slow task is done. That bothers me because I want parallelism in order to get things done quickly! I can't see a good way to do that while using laziness (other than upmap, which could be undesirable for other reasons, e.g. trying to line up input and output).

I also wanted to be able to get tasks started while the main thread kept working. I was worried about minimizing latencies, and every delay seemed threatening. It's possible that I over-optimized for that case.

I certainly hadn't thought much about your use case; I was using parallelism to work on a set of things that would easily fit in memory. I've fixed a few issues that people encountered when using large or infinite sequences, but not your issue.

Is there a good way to obtain the laziness you want without sometimes starving the threadpool of work accidentally? Forcing people to use upmap when they want to avoid such starvation could work; it would probably have more intuitive resource usage. But it'd require the user to do more complex things (e.g. upmaps that return both the input and the output) in some cases. And there would still be some hidden buffers behind the scenes, since we'd probably want to realize at least a few more tasks than threads in the threadpool (because we want to avoid ever having the threadpool wait for tasks to be realized).

Interesting, yeah that explains it, thanks!

I'll have a think about this. It does seem hard to avoid the problem you mention with pmap if you want laziness, because you don't want to get too far ahead of the consumer, but the consumer has to see the result of the stalling slow operation before it can "give permission" for more results to be computed.

On first glance it seems like two things are possible, at least:

  • Lazy, non-stalling version of upmap
  • Lazy version of pmap which has a configurable maximum output buffer size, allowing you to trade off the risk of stalling against the size of the output buffer. Setting it to infinity would behave more like your pmap. Setting it to zero (or perhaps ncpus+2?) would I think give something closer to the standard pmap behaviour. I'd just like something inbetween :)

I think these would address my use case. In fact the lazy upmap might be sufficient for a lot of what I want to do if it was applied to transformed input data using some tricks like you mention. But I'm not sure this covers all my use cases.

Let me if you're open to either of these anyway, if so I'll see if I can put a pull request together, or at least share a standalone implementation of the above things.

I think having lazy methods available would be wonderful. I'm at a conference right now, and even otherwise I'm not sure how fast I'd get around to this task, but if you can provide a pull request, I'd find that wonderful!

I think if I were building it, I'd make a separate com.climate.claypoole.lazy namespace and put lazy versions of the functions there. Your proposed functions sound pretty good. Figuring out how to configure the pmap is a little tricky (I find clean API design takes a lot of effort), but maybe you'll find it easier than I would.

Cool, OK. Yeah a separate namespace is probably the easiest way to go. How tricky this is might depend how precisely you'd want it to match the semantics of your upmap, e.g. when it comes to the cancel on error stuff.

I have a quick implementation of a lazy non-stalling upmap here though, which doesn't exactly match yours but is getting there:

(deftype CapturedThrowable [^Throwable throwable])

(defn upmap
  ([f coll]
     (upmap f coll (+ 2 (.. Runtime getRuntime availableProcessors))))
  ([f coll max-concurrent]
     ;; We maintain a queue for results of size equal to the maximum
     ;; number of concurrent tasks we want to aim for.
     ;;
     ;; "Taken slots" are spaces in this result queue which are either
     ;; filled with a completed-but-not-yet-consumed result, or
     ;; reserved for a result whose computation is in progress. We can
     ;; track the number of taken slots without needing to track
     ;; completion of tasks, since we know how many tasks we initiated
     ;; and how many results have been consumed.
     ;;
     ;; If we initiate new tasks to fill free (not taken) slots up to
     ;; the maximum concurrency level, the total number of tasks in
     ;; progress will never exceed this maximum level. (It may be less
     ;; than the maximum though, if completed results are left
     ;; unconsumed on the result queue, as may be the case if the
     ;; consumer can't keep up with the producer.)
     (let [result-q (LinkedBlockingQueue. (int max-concurrent))]
       (letfn [(make-result-seq [input-seq num-taken-slots]
                 (lazy-seq
                  ;; First, fill as many free slots as we can by
                  ;; consuming input and firing off fresh tasks on it:
                  (let [num-free-slots (- max-concurrent num-taken-slots)
                        [inputs-to-start-processing remaining-input] (split-at num-free-slots input-seq)]
                    (doseq [input inputs-to-start-processing]
                      (future (.put result-q (try
                                               (f input)
                                               (catch Throwable e
                                                 (CapturedThrowable. e))))))

                    (let [new-num-taken-slots (+ num-taken-slots (count inputs-to-start-processing))]
                      ;; If after trying to fill any free slots, none are
                      ;; taken, we're done:
                      (when-not (zero? new-num-taken-slots)
                        ;; Otherwise a slot is taken, so a result is
                        ;; either queued or pending, so safe to block
                        ;; until a result is available from the queue:
                        (cons (let [result (.take result-q)]
                                (if (instance? CapturedThrowable result)
                                  (throw (ExecutionException. (.throwable ^CapturedThrowable result)))
                                  result))
                              (make-result-seq remaining-input
                                               ;; Now we pulled a result off the
                                               ;; queue, an extra slot is free:
                                               (dec new-num-taken-slots))))))))]
         (make-result-seq coll 0)))))

It runs at most max-concurrency tasks at once, and it won't stall unless it's blocked on all of the concurrent tasks. Whenever a new cons cell is consumed from the lazy-seq it returns, it will launch new tasks to fill up any "free slots" (see comments above) up to the maximum concurrency level, so it's lazy and will only read ahead its input at most max-concurrency items ahead of the consumer.

It doesn't use a custom threadpool just yet but that shouldn't be too hard to add, just another argument for the threadpool, and switch it use your implementation of future. In that case I guess I'd default the max-concurrency to the size of the pool.

For the API, putting max-concurrent as the last argument doesn't work very well, since map takes a variable number of sequences to map over, and that sequence has to be last. I'd say using the pool size as max-concurrent and dropping the parameter might be good.

The overall algorithm sounds reasonable to me.

@mjwillson I've added a branch with some lazy code, https://github.com/TheClimateCorporation/claypoole/tree/lazy

I'm still having some problems with automatically shutting down pools, and I don't have tests so anything could be wrong with it, but it's a start.

@mjwillson I've added a pull request #22 . Please feel free to review it and see if it does what you need.

I chose a slightly different implementation than you did; I went with something closer to the core pmap implementation.