creachadair/jrpc2

Canceled request contextes are not propagated to handlers

Closed this issue · 8 comments

pbedat commented

Hello again,

in one of my projects I had the following problem:

One of the jrpc2 handlers interacted with browser automation, that go stuck from time to time.
I didn't impose timeouts to the automation code, because I wanted to let the client handle the timeouts.
Although the client times out the jrpc2 call, the context in the handler, doesn't seem to fire on the ctx.Done() channel.

Here is some test code, that demonstrates my expectations. Both tests fail:

0385d04

Am I doing something wrong here?

Why is the concurrency baked into the Server in the first place? Couldn't this be a implementation detail of the channel?

In the first of those two tests, the server handler and the test code both wait for a value on the done channel. This is a deadlock since nothing ever writes a value there or closes it. Did you intend for the server to close(done) instead?

Re: concurrency. I'm not sure I understand your suggestion. Are you saying every implementation of the channel interface should have to be aware of the packet structure and route requests and responses? That would essentially duplicate what the server does across all the implementations of that interface.

pbedat commented

Okay I think it's all about expectations. Let me try to break this down with another example.
In any client/server architecture, timeouts are a fundamental concept that have to be handled.
So I expect the server to handle cancellation of requests gracefully.

Here are two examples:

func httpServer() {
	mux := http.NewServeMux()

	wg := sync.WaitGroup{}
	wg.Add(1)

	mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
		w.WriteHeader(200)
		<-r.Context().Done()
		wg.Done()
		log.Print("done")
	})

	srv := httptest.NewServer(mux)

	ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*100)
	req, err := http.NewRequestWithContext(ctx, "GET", srv.URL, nil)
	if err != nil {
		panic(err)
	}

	go srv.Client().Do(req)

	wg.Wait()
}

func jrpcServer() {
	wg := sync.WaitGroup{}
	wg.Add(1)

	srv := server.NewLocal(handler.Map{
		"test": func(ctx context.Context, r *jrpc2.Request) (any, error) {
			<-ctx.Done()
			wg.Done()
			log.Print("done")
			return nil, nil
		},
	}, nil)

	ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*100)
	go srv.Client.Call(ctx, "test", nil)

	wg.Wait()
}

The first example (http), runs to completion and prints "done".
The second example (jrpc2), never completes. The goroutine would leak and occupy a worker thread forever.

If this is intended, the docs should give more guidance on the topic, because the library deviates from the usual go idioms in my opinion.

RE concurrency:
Why are you using worker threads in the first place? In Go, where goroutines are cheap, going with a "goroutine per request" execution model could be more scalable, when the work involves a lot of IO. What was your motivation for worker threads?
My example with channels was just a way to ask, if it would be possible to make the execotion model configurable. Right now "worker threads" are hard wired.

In any client/server architecture, timeouts are a fundamental concept that have to be handled. So I expect the server to handle cancellation of requests gracefully.

There are two separate topics here: (1) Responding correctly when the context times out, and (2) propagating a client-side timeout to the server.

(1) is already handled by both the client and the server.

For (2) there is no standard solution, you have to decide how to implement it. There are basically two approaches you could take:

  • Include a timeout in the request parameters, and handle it explicitly in the handler on the server side.
  • Add a separate method to the service to cancel a request, and call it (separately) when the client context terminates.

The latter is what LSP does, and it's why the OnCancel hook is provided in the client options. LSP has a $/cancelRequest method that receives the ID of a request to cancel. In this library, the Server's CancelRequest method lets you cancel a request by ID.

In your example here, the server handler will not return until the server is stopped, but the client still handles the cancellation. I slightly reworked your test to illustrate this:

func TestConc(t *testing.T) {
	done := make(chan bool)

	loc := server.NewLocal(handler.Map{
		"test": handler.New(func(ctx context.Context) error {
			defer close(done)
			<-ctx.Done()
			return nil
		}),
	}, &server.LocalOptions{
		Server: &jrpc2.ServerOptions{Concurrency: 1},
	})

	ctx, cancel := context.WithCancel(context.Background())
	time.AfterFunc(50*time.Millisecond, cancel)
	rsp, err := loc.Client.Call(ctx, "test", nil)
	t.Logf("rsp=%v, err=%v", rsp, err)

	select {
	case <-done:
		t.Log("OK")
	case <-time.After(200 * time.Millisecond):
		t.Error("timeout")
	}
}

As you noted, the server handler in this case does not get cancelled, but the client correctly does. Here is the above example again slightly reworked, to add a cancel handler. Note that you will need to allow server concurrency of at least 2, since the cancel handler needs to execute in parallel.

func TestConc(t *testing.T) {
	done := make(chan bool)

	loc := server.NewLocal(handler.Map{
		"test": handler.New(func(ctx context.Context) error {
			defer close(done)
			<-ctx.Done()
			return nil
		}),
		"cancel": handler.New(func(ctx context.Context, id [1]string) error {
			jrpc2.ServerFromContext(ctx).CancelRequest(id[0])
			return nil
		}),
	}, &server.LocalOptions{
		Server: &jrpc2.ServerOptions{Concurrency: 2},
		Client: &jrpc2.ClientOptions{
			OnCancel: func(cli *jrpc2.Client, rsp *jrpc2.Response) {
				cli.Notify(context.Background(), "cancel", []string{rsp.ID()})
			},
		},
	})

	ctx, cancel := context.WithCancel(context.Background())
	time.AfterFunc(50*time.Millisecond, cancel)
	rsp, err := loc.Client.Call(ctx, "test", nil)
	t.Logf("rsp=%v, err=%v", rsp, err)

	select {
	case <-done:
		t.Log("OK")
	case <-time.After(time.Second):
		t.Error("timeout")
	}
}

RE concurrency: Why are you using worker threads in the first place? In Go, where goroutines are cheap, going with a "goroutine per request" execution model could be more scalable, when the work involves a lot of IO. What was your motivation for worker threads? My example with channels was just a way to ask, if it would be possible to make the execotion model configurable. Right now "worker threads" are hard wired.

Sorry, I don't think I understand what you're asking. This library does not construct any threads, each request is indeed handled by a goroutine as you suggest. If you look at the dispatchLocked method of the server, you'll see that each handler is invoked in a goroutine.

The server does maintain two additional goroutines while the server is running, one to multiplex the channel (the "reader") and one to dispatch requests to handlers (so that the server can respect the user's configured limit on concurrency). But again, those are just normal goroutines.

(Edit to fix a wording error)

pbedat commented

Oooff I totally ignored, that cancellation in JSON RPC is non standard and only part of the LSP spec :(
This is such a footgun... You could mention it in the OnCancel hook documentation

It would be really nice to add this nonstandard behaviour to the options. Similar to AllowServerPush. Then, cancellation would be just an idiomaticy call to the cancel function of context.WithCancel.

Re concurrency:

So the Concurrency is just a limit? Okay I expected this to control the number of worker processes, because it defaults to runtime.Numcpu(). Why not allow something like jrpc2.ConcurrencyUnlimited here?

You could mention it in the OnCancel hook documentation

Sure, we can document this more. #105

I don't think the OnCancel hook is the right place for it, since by the time you've found that you already know you need it. I added a section to the package doc, though, and would welcome your comments.

It would be really nice to add this nonstandard behaviour to the options.

I used to have it baked in, but then it only works for implementations that use this package, or emulate its custom solution. I think it's better to leave the choice of how to do it up to the user, though I do agree it makes sense to document it better.

So the Concurrency is just a limit? Okay I expected this to control the number of worker processes, because it defaults to runtime.Numcpu(). Why not allow something like jrpc2.ConcurrencyUnlimited here?

Yes, it is an upper bound on the number of handlers that will be permitted to execute in parallel by the server. Requests in excess of that will be held until earlier handlers complete.

It doesn't really make sense to allow unbounded concurrency; eventually you saturate the CPU on the server and it has trouble even servicing the queue. That said, if you really want to have "effectively unbounded" concurrency you can set the option to some large value like 1000000. I don't recommend it, but the library won't prevent it.

pbedat commented

Thanks. I think #105 will help a lot and yeah that's absolutely the right place - much better than with OnCancel.

I think it's better to leave the choice of how to do it up to the user

Hmm reading:

The AllowPush server option allows handlers to "push" requests back to the client. This is a non-standard extension of JSON-RPC used by some applications such as the Language Server Protocol (LSP).

I thought you would be open for adding another option ;)
Never mind, I think I'm trying to write some WithCancel decorators

It doesn't really make sense to allow unbounded concurrency

Absolutely not, but Numcpu() is very conservative, when your handlers are just doing some IO. Either way, the performance of the application is impacted - a higher limit just scales much better with e.g. web application work loads.
I think I'll give it a very high limit, because rate limiting the frontend, will save my IPC channels from too much load.

The AllowPush server option allows handlers to "push" requests back to the client. This is a non-standard extension of JSON-RPC used by some applications such as the Language Server Protocol (LSP).

I thought you would be open for adding another option ;)

There is an option, that's what OnCancel is. 🙂 It does need to be more than just true or false, though, unlike push. It's not only a question of whether to send a message on cancellation, the implementation also needs to decide how to implement it (for example: is it a call or a notification? What name? What argument syntax? Should timeouts be parameters and cancellations be calls, etc.)

None of these are huge decisions, but fixing a specific choice pins the implementation to a specific approach. So the option does require more choices than push did.

Never mind, I think I'm trying to write some WithCancel decorators

If you only need a default timeout, you could do that with a NewContext hook, e.g.,

func() context.Context {
   ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
   return ctx
}

or similar.

Absolutely not, but Numcpu() is very conservative, when your handlers are just doing some IO.

It's fine to set a higher value, that's why the knob is there. Although, it's worth noting that if most request handlers are short-lived, there isn't likely to be a lot of contention anyway. On the other hand, if the handlers are mostly I/O bound, you'll also need to make you don't run your process out of file descriptors. NumCPU is only meant to be a safe default.

[A] higher limit just scales much better with e.g. web application work loads.

I have to admit, I've never seen anyone try to use JSON-RPC for web applications. Since browsers don't speak JSON-RPC natively, I'd expect a web app to have to package its API calls in HTTP anyway, and if you have HTTP there's no real need for JSON-RPC. HTTP already has its own connection framing, metadata, request/response tracking, and so on—all the things that JSON-RPC defines for use in contexts that don't have those features (e.g., sockets and pipes).

It should work, but in a web API with performance concerns, I'd consider just using HTTP directly instead of JSON-RPC.

I think this is addressed now, but if you have other related concerns feel free to reopen it.