gerritjvv/kafka-fast

Thread leak when retrieving Kafka topic offsets

Closed this issue · 4 comments

I'm using the following code piece to retrieve Kafka topic offsets:

(let [metadata-producer (metadata-request-producer (first (zk/get-brokers zk-client)) 9092 {})
        metadata (get-metadata [metadata-producer] {})
        topics (keys metadata)]
    (apply merge
           (for [[topic offset-info] (apply (partial merge-with concat)
                                            (map second
                                                 (get-broker-offsets {:offset-producers (ref {})}
                                                                     metadata topics
                                                                     {:use-earliest false})))]
             {topic
              (->> offset-info
                   (map #(select-keys % [:offset :partition]))
                   (sort-by :partition)
                   (map :offset)
                   (into []))}))))

After running this code for the first time a lot of "async-thread-macro-XXX" threads are created (for our case about 5K of threads). Each subsequent execution doubles this number.

I still have to test it but I think the issue is with placing {:offset-producers (ref {})} inside the for loop

The offset-producers ref is meant to be a global or contextual cache where connections to the broker are cached and only created once, it is in this connection creation where several threads and resources are created.

Try the following at the top of the let statement

(let [ broker {:host (first (zk/get-brokers zk-client)) :port 9092}

        metadata-producer (metadata-request-producer (:host broker) (:port broker) {})
        offset-producers-ref (ref {broker metadata-producer}) ;;cache the metadata-producer

And where you call get-broker-offsets do

(get-broker-offsets {:offset-producers offset-producers-ref} ```

please try the above solution and ping me if it worked or didn't work :)

We already moved to using Kafka Java API, but I will try it later.

Ok I hope it works well for you.
Closing the ticket for the moment given that its not a bug. Please reopen if the fix does not work for you or you run into any more issues.