sourcegraph/conc

context cancel propagation does not work if all tasks are completed, but you haven't called Wait().

serbrech opened this issue · 6 comments

repro:

ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
defer cancel()
p := pool.New().WithMaxGoroutines(20)
for ctx.Err() == nil {
	p.WithErrors().WithContext(ctx).Go(func(ctx context.Context) error { return nil })
	fmt.Println("pushed task")
}

ctrl+C is not exiting the program

sidenote:
I wrote this code thinking that, the pool would release a worker when a task is done.
I misunderstood the usecase of conc. I thought I could use it as a simple concurrency limiter/backpressure, and continuously push task to it as fast as it can handle them.

the fact that I have to call Wait makes it more of a batch prossessing helper than a concurrency helper.

I looked at Stream, and was surprised that it also required to call Wait().

Hi @serbrech! The problem with your example is you're reconfiguring it in every call of the for loop. Calling any of the With* methods after calling Go() is unsupported. It will work as expected if you call WithErrors() and WithContext() outside the for loop.

	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
	defer cancel()
	p := pool.New().WithMaxGoroutines(20).WithContext(ctx)
	for ctx.Err() == nil {
		p.Go(func(ctx context.Context) error { return nil })
		fmt.Println("pushed task")
	}
	p.Wait()

However, Wait() should always be called. If Wait() is not called, there is no guarantee that all the running goroutines have finished, and any panics from child goroutines will simply be ignored. conc is a library for scoped concurrency, so the entire library is designed around the guarantee that, after calling Wait(), there are no leftover goroutines.

That makes sense, and it now does behaves as I expected, thanks!

I do think that setting up callbacks for errors/panics would be very useful, so that they can be surfaced/handled right away, and separately from the task itself.
Right now the only way to retrieve the errors from the tasks seems to be to call Wait()?

is that something you've considered?

What did you have in mind here? In the case that you want to handle errors early with a callback, couldn't you just add that to the closure you're giving to conc?

Something like:

errCallback := func(err error) {
  println("live error", err)
}
p := pool.New().WithErrors()
for i := 0; i < 100; i++ {
  p.Go(func() (err error) { 
    defer func() { if err != nil { errCallback(err) } }()
    return thingThatMightError()
  })
}
println("completed error", p.Wait())

Thanks!
This looks fine, I agree :). May I suggest adding something like this in an examples file or folder?
I think the simplicity of conc is great, but it can get in the way of "how do I do x?"

Thanks for the recommendation! An examples folder sounds great, I think that could be quite useful 🙂

I just created #80 to track the addition of an examples folder. Closing this in favor of that issue. Thanks for the suggestion!