mpenet/jet

any way to hook the client GET up to a server response (like a pipe in node.js)?

Closed this issue · 25 comments

I have to server up some data and have got everything working with core.async and jet (great job!!)

I think I am missing something though in the docs (or my head!)... can I hook up the client reader to the server writer? This would prevent the need for me to read the entire object before serving.

I am sure this must be possible but I just can't see a simple way to achieve that objective.

I would be grateful for any help / tips.

Thanks

Ray

I am not sure I understand what you mean, can you give me an example?

Hi Max,

Maybe it already works as I expect (having thought about it a little more after posting the question!)...

Here is a small piece of code that I am playing with ...

(defn respond-within-sla [expected-result-milliseconds respond-ok respond-later route-transformer & args]
  (let [data-channel (timeout expected-result-milliseconds)
        response-channel (chan)]
    (go (if-let [data (<!! data-channel)]
          (time (let [response (<!! (respond-ok data))
                      response-headers (:headers response)]
                  (>! response-channel {:body    (<!! (:body response))
                                        :headers response-headers
                                        :status  200})))
          ; else, nothing back from the URL transformer
          (>! response-channel (respond-later))))
    (go
      (>! data-channel (apply route-transformer args)))
    response-channel))

This is the wiring code

(defn generate-response [s3-data url]
  (let [sla (or (env :sla-milliseconds) 100)]
    (respond-within-sla sla s3-data-fetcher sla-fail-message s3-transform-full-path s3-data url)))

(defn async-responder [request]
  (generate-response s3-cache-record-from-db (request/request-url request)))

(defn -main [& [port]]
  (let [port (Integer. (or port (env :port) 5000))]
    (jet/run-jetty {:port port :join? false :ring-handler async-responder})))

The code does what I want it to do (it looks up the location of a resource on S3 and then serves it) but I want to confirm that it really behaves the way I would like under the covers.

My goal is to stream out the data to the client whilst reading the body and I think it actually does that. If that is the case, please confirm. Otherwise, my original question stands ;-)

Thanks

Ray

I think that's what you get yes, streaming is a big word since you just send the data in one shot (you have a single chunk here).
also don't use double bang core.async functions in go blocks.

Oops, yes thanks for noticing that - I should claim it's a typo but really I just copied and pasted some code around and missed it!! Oops I mean, missed it!

Using some modified code I run into a problem on big files:

"s3-transform-dropping-brand-country from http://localhost:5000/ACME/RM/18mb-file-0 to https://s3-eu-west-1.amazonaws.com/ray-s3-test/18mb-file-0"
"Elapsed time: 1877.598 msecs"
2014-10-14 22:17:23.606:INFO:oejc.ResponseNotifier:HttpClient@2134583487-711: Exception while notifying listener org.eclipse.jetty.client.HttpRequest$11@2c90e5e4
java.lang.AssertionError: Assert failed: No more than 1024 pending puts are allowed on a single channel. Consider using a windowed buffer.|(< (.size puts) impl/MAX-QUEUE-SIZE)
        at clojure.core.async.impl.channels.ManyToManyChannel.put_BANG_(channels.clj:150)

Here is the modified code:

    (go (if-let [data (<! data-channel)]
          (time (let [response (<! (respond-ok data))
                      response-headers (:headers response)]
                  (if (= 200 (:status response))
                    (>! response-channel {:body    (<! (:body response))
                                          :headers response-headers
                                          :status  200})
                    (>! response-channel {:body   (str "Problem getting data from S3. Status code: " (:status response))
                                          :status (:status response)}))))
          ; else, nothing back from the URL transformer
          (>! response-channel (respond-later))))

I also have problems with files > 100k and so I am now a little confused by your single chunk comment.

Any further comments or ideas if I have still done something wrong?

Thanks

Ray

FYI here is how wget looks against my server

$ wget http://localhost:5000/ACME/RM/1k-file-0
--2014-10-14 22:53:11--  http://localhost:5000/ACME/RM/1k-file-0
Resolving localhost... 127.0.0.1, ::1
Connecting to localhost|127.0.0.1|:5000... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1078 (1.1K) [text/plain]
Saving to: ‘1k-file-0’

100%[=======================================================================================================================================================>] 1,078       --.-K/s   in 0s      

2014-10-14 22:53:16 (68.5 MB/s) - ‘1k-file-0’ saved [1078/1078]

$ wget http://localhost:5000/ACME/RM/001_tcm-13-537.jpg
--2014-10-14 22:53:23--  http://localhost:5000/ACME/RM/001_tcm-13-537.jpg
Resolving localhost... 127.0.0.1, ::1
Connecting to localhost|127.0.0.1|:5000... connected.
HTTP request sent, awaiting response... 200 OK
Length: 22231 (22K) [image/jpeg]
Saving to: ‘001_tcm-13-537.jpg.1’

63% [===============================================================================================>                                                        ] 14,178      --.-K/s  eta 8s      
^C

I have to Ctrl-C as it stops responding here

I think I get it now... In my code as it stands the must be fully read before it writes back the response channel. And somehow this is stalling, although I admit that I don't know why.

Do have any suggestions on how I can re-structure the code to avoid this problem and really stream the body?

Acutally no, I think you just hit the response buffer size set by default by jetty!

try setting it to a higher value, see an example here https://github.com/pyr/cyanite/blob/b593fac79009b901a4169958850926db0d6645af/src/io/cyanite/es_client.clj#L16-L17

it's now set to 4M by default as of [cc.qbits/jet "0.5.0-beta1"]. Could you confirm & close if that's working for you?

I get this after upgrade:
Exception in thread "async-dispatch-7" java.lang.ClassCastException: qbits.jet.client.http$client cannot be cast to org.eclipse.jetty.client.HttpClient

In the code above the 'respond-ok' is mapped to this:

(defn s3-data-fetcher [url]
  (http-client/get http-client/client url))

can you show me the code, seems like you are passing the function as HttpClient value somewhere...

oh poop - paren error ... pls ignore

I have assumed that with the upgrade, I don't need to change any code (to set buffer sizes). The behaviour with the new code is still the same:

i-Ray-MacBook-Pro-2:functional-url-mapper ray$ curl http://localhost:5000/ACME/RM/1k-file-0 > /dev/null
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  1078  100  1078    0     0    972      0  0:00:01  0:00:01 --:--:--   972
i-Ray-MacBook-Pro-2:functional-url-mapper ray$ curl http://localhost:5000/ACME/RM/100k-file-0 > /dev/null
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  9  160k    9 16384    0     0    979      0  0:02:47  0:00:16  0:02:31     0
^C

Oh and FYI I have now posted all of the code on github
https://github.com/raymcdermott/functional-url-mapper

could you make a minimal test case? also since it's a server issue, maybe you are running into buffer size issue there too (see https://github.com/mpenet/jet/blob/master/src/clj/qbits/jet/server.clj#L82), meaning you might need to increase the defaults there: run-jetty {:output-buffer-size ...})

I tried setting the output-buffer-size but it did not have any impact.

Here is a (fairly) minimal case:
https://github.com/raymcdermott/functional-url-mapper/blob/master/src/functional_url_mapper/test_case.clj

You need to clone the repo and run that as a main.

I hope this is sufficiently minimal but let me know if you would prefer me to use another approach.

Ray

running the test case is by standing up the server
$ lein run -m functional-url-mapper.test_case

In other terminal window run a wget/curl on http://localhost:5000/anything-you-want.txt

In the test case we always emit a maximum of 16384 bytes

This means we are either not reading more than that amount or not outputting more than that amount. Given that the default output buffer is 32768 bytes it implies a reading problem.

If I fiddle with the request / response buffer in the client (as you did in the new code) I can not affect the maximum emission although setting them to lower numbers can bring it down.

It seems like the stream is being read twice (once for headers and once for the body perhaps) and after that, nothing is being read.

Just an idea / theory for you to consider.

ok the test case makes the issue more evident, this is a chunked transfer from s3, so the body channel is not a single value.

So just removing the <! on the body channel fixes the issue, since jet will do streaming of the channel transparently:

https://github.com/raymcdermott/functional-url-mapper/blob/master/src/functional_url_mapper/test_case.clj#L12 should read (>! response-channel {:body (:body response) ...

with this

$ curl  http://localhost:5000/anything-you-want.txt > /dev/null
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  160k  100  160k    0     0   205k      0 --:--:-- --:--:-- --:--:--  205k

Good progress! That works for the modestly size files and reads more elegantly too. Will it also work on non-chunked servers or is that something that I should check from the header and have a branch on?

Anyway, that aside, we're still not quite there yet ... the larger file (which is also public) still gives off the exception:

"s3-transform-dropping-brand-country from http://localhost:5000/ACME/RM/18mb-file-0 to https://s3-eu-west-1.amazonaws.com/ray-s3-test/18mb-file-0"
2014-10-16 23:10:24.198:INFO:oejc.ResponseNotifier:HttpClient@1613880613-148: Exception while notifying listener org.eclipse.jetty.client.HttpRequest$11@77fa0ef0
java.lang.AssertionError: Assert failed: No more than 1024 pending puts are allowed on a single channel. Consider using a windowed buffer.|(< (.size puts) impl/MAX-QUEUE-SIZE)

Oh, and that file is incorrectly named as it is actually 31.1mb in size. Don't let it bother you.

The default response buffer in the client is set to 4M so 31M will cause troubles.
Everything works smoothly with this code for me:

(ns functional-url-mapper.test_case
  (:import (java.net URL))
  (:require [clojure.core.async :as async :refer [<! >! <!! chan timeout go]]
            [qbits.jet.server :as jet]
            [qbits.jet.client.http :as http-client]))

(defn async-responder [url]
  (let [response-channel (chan)]
    (go (let [response (<! (http-client/get (http-client/client
                                             ;; set a large max response buffer size
                                             {:response-buffer-size (* 100 1024 1024)})
                                            url))
              response-headers (:headers response)]
          (if (= 200 (:status response))
            (do
              (>! response-channel {:body (:body response)
                                   :headers response-headers
                                   :status  200}))
            (>! response-channel {:body   (str "Problem getting data from S3. Status code: " (:status response))
                                  :status (:status response)}))))
    response-channel))

(defn responder [request]
  ; ignore requst - hard code for the purpose of a minimal test case... this file is publicly available
  (async-responder "https://s3-eu-west-1.amazonaws.com/ray-s3-test/18mb-file-0"))

(defn -main []
  (jet/run-jetty {:port 5000 :ring-handler responder}))

Are you sure that this is the right way to fix this problem? What happens when serving videos?

I ll have a second look at this, but even using http://download.eclipse.org/jetty/stable-9/apidocs/org/eclipse/jetty/client/util/BufferingResponseListener.html a max value has to be supplied apparently, but this might at least make the response buffer size more manageable.