valyala/fasthttp

Reponse body io.Reader

qJkee opened this issue · 40 comments

qJkee commented

Hey guys. How to set custom response body writer?
I've tried to use SetBodyStream, but i don't see any body readers from response which i need to pass in io.Reader.
P.S I'm trying to implement throttler.

qJkee commented

Problem is, that I can't get/reach Responce.bodyStream

Here is a simple example that throttles the response to 2 bytes per second:

package main

import (
	"bufio"
	"io"
	"net/http"
	"time"

	"github.com/valyala/fasthttp"
)

func rateLimited(ctx *fasthttp.RequestCtx) {
	body := []byte("this is a test")

	ctx.SetBodyStreamWriter(func(w *bufio.Writer) {
		i := 0
		for i < len(body) {
			end := i + 2
			if end >= len(body) {
				end = len(body) - 1
			}

			w.Write(body[i:end])
			w.Flush()

			i += 2

			time.Sleep(time.Second)
		}
	})
}

func main() {
	go func() {
		if err := fasthttp.ListenAndServe(":8080", rateLimited); err != nil {
			panic(err)
		}
	}()

	res, err := http.Get("http://localhost:8080")
	if err != nil {
		panic(err)
	}

	for {
		var b [2]byte

		if _, err := res.Body.Read(b[:]); err == io.EOF {
			break
		} else if err != nil {
			panic(err)
		}

		println(string(b[:]))
	}
}
qJkee commented

It's correct.
But i need to read the body from fasthttp.Response(i've done request to another website), not from static one

qJkee commented

@erikdubbelboer
It will return me []byte, not io.Reader

@qJkee look at my example! it's not using any io.Reader.

dgrr commented

I think @qJkee wants to make client request and read the response from io.Reader. And this is not possible using fasthttp.

@qJkee if that is what you want I'm afraid @dgrr is correct that it's not possible. Fasthttp always reads the whole response body before returning.

qJkee commented

Why you can't provide some getter/setter for that?
Because it would be good to be able to replace io.Reader with some custom one. For example, in case of throttling for downloading huge files, it's a perfect solution

@qJkee because this requires huge internal changes. The way things work internally in fasthttp wasn't designed for this at all.

To demonstrate this I actually wrote a version that would add support for this with backwards compatibility and keeping it possible to work with 0 heap allocations.

You can find my changes here: erikdubbelboer@6951527

It's quite ugly and complicated code. It introduces 2 more sync.Pool uses with each request. And it increases the coupling between the different components which I don't like that much either.

In the current state the API could also use some improvements. Right now you have to set Response.ReturnBodyReader to true in which case after the request Response.BodyReader contains an io.ReadCloser which allows you to read the body.

Just like net/http it requires you to read the whole body and call Response.BodyReader.Close() for the connection to be reused for future request. On top of that right now it also requires calling ReleaseBodyReader(Response.BodyReader) but I think this can be merged into the Response.BodyReader.Close call.

A full example can be found in the benchmarks that I added to make sure I introduces no new heap allocations.

When I have time in the future I might improve this and at some point merge it into the master branch.

Hi @erikdubbelboer I tried your code. The "Do" call would not block, but I could not read anything out from the resp.BodyReader until the "Do" Call timed out.
My code is like this:

	client := &fasthttp.Client{
		DialDualStack: true,
		// ReadTimeout:                   time.Second,
		DisableHeaderNamesNormalizing: true,
	}
	var req fasthttp.Request
	req.Header.SetMethod("POST")
	// set some header
	req.SetRequestURI(url)
	req.SetBody(data)

	resp := fasthttp.AcquireResponse()
	resp.ReturnBodyReader = true
	err = c.client.Do(&req, resp)
	// check err && return resp

Another goroutine reading resp.BodyReader would block until timeout(if the client's ReadTimeout is set).

Did I miss something?

I would have to see more code. I just wrote this simple test program which works fine:

package main
  
import (
  "fmt"
  "io/ioutil"
  "time"

  "github.com/erikdubbelboer/fasthttp"
)

func readBody(resp *fasthttp.Response) {
  body, err := ioutil.ReadAll(resp.BodyReader)
  if err != nil {
    panic(err)
  }
  fmt.Printf("read %d bytes of body\n", len(body))
}

func main() {
  client := &fasthttp.Client{
    DialDualStack:                 true,
    DisableHeaderNamesNormalizing: true,
  }

  req := fasthttp.AcquireRequest()
  req.Header.SetMethod("GET")
  req.SetRequestURI("https://www.google.com/")

  resp := fasthttp.AcquireResponse()
  resp.ReturnBodyReader = true

  err := client.Do(req, resp)
  if err != nil {
    panic(err)
  }

  go readBody(resp)

  fmt.Println("after go readBody")

  // give readBody 10 seconds to read the body before we exit.
  time.Sleep(time.Second * 10)
}

@erikdubbelboer HI. My case is that the client/server will hold the connection for a very long time(like forever). The server will send messages with uncertain interval in this connection. And ReadAll will block. As I can see, the BodyReader will only be filled after the connection is closed. Am I right?

The server's code is like this

package main

import (
	"fmt"
	"log"
	"net/http"
	"time"
)

func main() {
	http.HandleFunc("/test", func(w http.ResponseWriter, r *http.Request) {
		flusher, ok := w.(http.Flusher)
		if !ok {
			panic("expected http.ResponseWriter to be an http.Flusher")
		}
		for i := 1; ; i++ {
			fmt.Fprintf(w, "Chunk #%d\n", i)
			flusher.Flush() // Trigger "chunked" encoding and send a chunk...
			time.Sleep(500 * time.Millisecond)
		}
	})

	log.Print("Listening on localhost:8080")
	log.Fatal(http.ListenAndServe(":8080", nil))
}

I think I know what is happening. You are trying to read a buffer that is bigger than the current chunk and it hangs trying to read those extra bytes (which only arrive at the next chunk).

I have changed my implementation a bit so Read() now returns as soon as a chunk is finished. This way the following example works fine for me:

package main

import (
	"fmt"
	"log"
	"net/http"
	"time"

	"github.com/erikdubbelboer/fasthttp"
)

func readBody(resp *fasthttp.Response) {
	// 40 bytes is bigger than the chunk of 9 we're about to receive.
	b := make([]byte, 40)

	for {
		if n, err := resp.BodyReader.Read(b); err != nil {
			break
		} else {
			log.Printf("body: %q", b[:n])
		}
	}
}

func doRequest() {
	client := &fasthttp.Client{
		DialDualStack:                 true,
		DisableHeaderNamesNormalizing: true,
	}

	req := fasthttp.AcquireRequest()
	req.Header.SetMethod("GET")
	req.SetRequestURI("http://localhost:8080/test")

	resp := fasthttp.AcquireResponse()
	resp.ReturnBodyReader = true

	err := client.Do(req, resp)
	if err != nil {
		panic(err)
	}

	go readBody(resp)

	log.Println("after go readBody")
}

func main() {
	http.HandleFunc("/test", func(w http.ResponseWriter, r *http.Request) {
		flusher, ok := w.(http.Flusher)
		if !ok {
			panic("expected http.ResponseWriter to be an http.Flusher")
		}
		for i := 1; ; i++ {
			fmt.Fprintf(w, "Chunk #%d\n", i)
			flusher.Flush() // Trigger "chunked" encoding and send a chunk...
			time.Sleep(500 * time.Millisecond)
		}
	})

	go func() {
		log.Print("Listening on localhost:8080")
		log.Fatal(http.ListenAndServe(":8080", nil))
	}()

	// Give the server 100ms to start.
	time.Sleep(time.Millisecond * 100)

	doRequest()

	// Exit the program after 10 seconds.
	time.Sleep(time.Second * 10)
}

OK. It works. And here comes up another question. How could I set the proper buffer size... ? I just want to use some code like this:

reader := bufio.NewReader(data) // I have to use bufio.NewReaderSize(data, 100) instead
for {
            fmt.Printf("decoding %#v\n", reader)
            lengthStr, err := reader.ReadString('\n')
            fmt.Printf("find a ln\n")
            if err != nil {
                    return err
            }
            //  do something.
}

Background is that this "bufio.NewReader" use http.Response before, and I want to change it to fasthttp.Response.
Thanks!

You don't really need to set an initial buffer size. Let the reader figure that out itself. Your code works fine. Try replacing the function in my example with this:

func readBody(resp *fasthttp.Response) {
  reader := bufio.NewReader(resp.BodyReader)

  for {
    s, err := reader.ReadString('\n')
    if err != nil {
      break
    } else {
      log.Printf("body: %q", s)
    }
  }
}

Not specifying size would go back to the first problem I mentioned above... I can never read out the '\n' from resp.BodyReader.

The problem you had was fixed when I introduced the commit that makes Read return when a chunk is finished.

Sorry. Missed this.
It now works fine. Thanks!

Fasthttp always reads the whole response body before returning

@erikdubbelboer is that still true, or is there a way to use read the response body with io.Reader?

@Arnold1 I'm afraid that this is still true, the code I wrote above to add support for this isn't fit to be merged and I'm not sure if I could ever get it good enough. Reading the whole body is just how fasthttp was designed I'm afraid.

@erikdubbelboer whats the best way to achieve high thoughput + low latency with fasthttp? my server will receive a high number of concurrent requests and needs to perform a small calculation for each request (~5-15ms).

i have code like that:

router := fasthttprouter.New()
router.POST("/calcResp", calcResp)
log.Fatal(fasthttp.ListenAndServe(":8080"))

func calcResp(ctx *fasthttp.RequestCtx) error {
   var request = Request{}

   // check content length

   body = ctx.PostBody()
   err := json.Unmarshal(body, &request)
   // do error handling

   modelData := readModelData(someURL)
  
   timeOutChan := time.After(30 * time.Millisecond)

   var allResp []*response
   for _, model := range modelData {
      select {
		default:
                // if calc takes longer than 30ms send back http.StatusRequestTimeout 
		case <-timeOutChan:
                    ctx.SetStatusCode(fasthttp.StatusRequestTimeout)
		    respBody, _ := json.Marshal(errMessage)
                    ctx.Write(respBody)
       }
    
       // do calculation ... 5-7ms, but can be called multiple times, bc i range over modelData...
       response := calcResponse(jsonData)
       allResp = append(allResp, response)
   }

   // json marshal
   ctx.Write(allResponseJSON)
}

also where can i set fasthttp.Server Concurrency?

To set server options use:

	s := &fasthttp.Server{
		Handler:               router.Handler,
		ReadTimeout:           time.Hour,
		WriteTimeout:          time.Hour,
		ReadBufferSize:        4096 * 6,
		WriteBufferSize:       4096 * 6,
		NoDefaultServerHeader: true,
	}
	log.Fatal(s.ListenAndServe(":8080"))

You probably don't have to change the concurrency as the default is 262144 concurrent connections which should always be enough.

If you really want to have the best performance I would suggest not using fasthttprouter but doing your own simple routing. Unless of course your routing is complex in which case fasthttprouter is the better option.

For JSON I would suggest using https://github.com/mailru/easyjson for best performance.

Your timeout code doesn't make sense this way, it needs to be something like this:

	done := make(chan struct{})

	// Preallocate the correct slice capacity already for optimal performance.
	allResp := make([]*response, 0, len(modelData))
	
	// Do the processing in a goroutine so we can abandon it if it takes too long.
	go func() {
		defer close(done)

		// In theory you could also do each calcResponse in parallel goroutines
		// to speed it up even more if its slow.
		for _, model := range modelData {
			// do calculation ... 5-7ms, but can be called multiple times, bc i range over modelData...
			response := calcResponse(jsonData)
			allResp = append(allResp, response)
		}
	}()

	// Either wait until the processing is done, or the timeout.
	select {
	case done:
	case <-time.After(30 * time.Millisecond):
		ctx.SetStatusCode(fasthttp.StatusRequestTimeout)
		respBody, _ := json.Marshal(errMessage)
		ctx.Write(respBody)
		return // If we timeout we should return here and not send back the normal response.
	}
	
	ctx.Write(allResponseJSON)

hi can you do the following without fasthttprouter?

router := fasthttprouter.New()
router.POST("/getResp", logWrapper(HttpErrWrapper(calcResp)))

Just check the paths in your one handler:

var (
	strSlashGetResp = []byte("/getResp")
)

func handler(ctx *fasthttp.RequestCtx) error {
	if bytes.Equal(ctx.Path(), strSlashGetResp) {
		// ...
	}
}

@erikdubbelboer how can i run the for _, model := range modelData { in parallel?

Something like:

	done := make(chan struct{}, len(modelData))

	// Preallocate the correct slice capacity already for optimal performance.
	allResp := make([]*response, 0, len(modelData))
	var allRespMu sync.Mutex

	// In theory you could also do each calcResponse in parallel goroutines
	// to speed it up even more if its slow.
	for _, model := range modelData {
		go func(model) {
			defer func() {
				done<-struct{}{}
			}()

			// do calculation ... 5-7ms, but can be called multiple times, bc i range over modelData...
			response := calcResponse(jsonData)

			allRespMu.Lock()
			allResp = append(allResp, response)
			allRespMu.Unlock()
		}(model)
	}

	// Either wait until the processing is done, or the timeout.
	for _ := range modelData {
		select {
		case <-done:
		case <-time.After(30 * time.Millisecond):
			ctx.SetStatusCode(fasthttp.StatusRequestTimeout)
			respBody, _ := json.Marshal(errMessage)
			ctx.Write(respBody)
			return // If we timeout we should return here and not send back the normal response.
		}
	}
	
	allRespMu.Lock()
	ctx.Write(allResp)

Or you could even use the channel to pass down the results instead of using a Mutex. That would be better but I don't have the time to write that now.

@erikdubbelboer
is there any issue with i do the following?

// ctx from fasthttp ... ctx *fasthttp.RequestCtx
cancelFunc, cancel := context.WithTimeout(ctx, time.Duration(30)*time.Millisecond)
defer cancel()
resultsCh := make(chan *Response, len(modelData))
results := make([]*Response, len(modelData))

for _, model := range modelData {
	go func(model *ModelData) {
		// do calculation
		response := calcResponse(jsonData)

		resultsCh <- &Response{
			Results:           response,
			MetaData:       model.ModelID,
			// and other things...
		}

	}(model)
}

i := 0
for {
	select {
	case <-cancelFunc.Done():
		status = fasthttp.StatusRequestTimeout // 408

                // i might want to return partial data...
		/*
                ctx.Write(Results{Results: results[0:i]})
                */
		return nil
	case r := <-resultsCh:
		results[i] = r
		i++
		if i == len(modelData) {
			ctx.Write(Results{Results: results})
			return nil
		}
	}
}

Arnold1 Yes that's perfect.

for some reason the the code works slower...hm...

@erikdubbelboer
what is the diff between 1 and 2?

1.)

func calcResponse(ctx *fasthttp.RequestCtx) {
   cancelFunc, cancel := context.WithTimeout(ctx, time.Duration(30)*time.Millisecond)
   //...
}
func calcResponse(ctx *fasthttp.RequestCtx) {
   cancelFunc, cancel := context.WithTimeout(context.Background(), time.Duration(30)*time.Millisecond)
   //...
}

In case of 1 your context will also be cancelled if you shut down the server using server.Shutdown().

@erikdubbelboer is there any obvious reason why mutex version works faster than the context version i posted before?

In theory the channel version should be slightly faster as channels can be slightly faster for the scheduler. But in practice they are probably the same speed.

@erikdubbelboer

type ErrorHandlerFunc func(ctx *fasthttp.RequestCtx) error

func startApp() {
    router := fasthttprouter.New()
    router.POST("/Response", LogWrapper(HTTPErrorWrapper(ResponseRate)))
}

func LogWrapper(f fasthttp.RequestHandler) fasthttp.RequestHandler {
}

func HTTPErrorWrapper(errorHandlerFunc ErrorHandlerFunc) fasthttp.RequestHandler {
}

func ResponseRate(ctx *fasthttp.RequestCtx) error {
//....
for _ := range modelData {
        // Either wait until the processing is done, or the timeout.
	select {
	case done:
	case <-time.After(30 * time.Millisecond):
		ctx.SetStatusCode(fasthttp.StatusRequestTimeout)
		respBody, _ := json.Marshal(errMessage)
		ctx.Write(respBody)
		return // If we timeout we should return here and not send back the normal response.
	}
}

is there a way i can benchmark the code? i want to figure out why the version with context is slower...

@erikdubbelboer
case <-time.After(30 * time.Millisecond): ... i think thats not what i want... is it 30 ms per item? i want the total max time to be 30ms....

Use: https://golang.org/pkg/testing/#hdr-Benchmarks

In that case just assign the time.After(30 * time.Millisecond) to a variable outside the loop so it is only started once.

@erikdubbelboer cool, do you have a sample where you benchmark fasthttp code? should i send a request to /Response endpoint and benchmark that?

Fenny commented

I think his question has been answered with examples and links to specific resources, can we close this? @erikdubbelboer