riverqueue/river

Job cancellation doesn't work in database/sql driver

Opened this issue · 10 comments

Hi!

I am using the example SortWorker job with small adjustments to test longer-running jobs with cancellation but it does not seem to work.

const SortJobKind = "sort"

type SortArgs struct {
  Strings []string `json:"strings"`
}

func (SortArgs) Kind() string {
  return SortJobKind
}

type SortWorker struct {
  river.WorkerDefaults[SortArgs]
}

func (w *SortWorker) Work(ctx context.Context, job *river.Job[SortArgs]) error {
  log.Printf("STARTING: %d", job.ID)
  select {
  case <-ctx.Done():
    log.Printf("CANCELLED: %d", job.ID)
    return ctx.Err()
  case <-time.After(20 * time.Second):
    log.Printf("WILL DO: %d", job.ID)
  }

  items := job.Args.Strings
  sort.Strings(items)
  log.Printf("RESULT: %+v\n", items)
  return nil
}

When the job appears in the User Interface (riverui), I click Cancel but the job keeps running. In the logs I eventually get from STARTING to WILL DO but never observe CANCELLED.

The only thing that indicates some attempt at cancellation is the metadata field on the job which states:

{
  "cancel_attempted_at": "2024-10-02T....."
}

Am I doing something wrong or is there a potential bug?

I am running v0.12.1 and using riverdatabasesql.New, though the underlying connection is pgx, wrapped with opencensus tracing (ocsql).

My guess is that this is due to the database/sql driver not currently supporting LISTEN functionality, so there's no way for the worker to find out that the job has been cancelled.

I believe we will need to move forward with something like #352 for this to be viable since there is no way using plain database/sql to access the underlying connection and do a listen loop on it.

To confirm, are you using pgx under the hood somehow and are you using pgbouncer?

Yes, it's pgx but it is not directly created. Instead, it is wrapped with ocsql (opencensus), which exposes its own wrapper driver name in order to do tracing. This driver is instantiated through the standard sql.Open.

No, pgbouncer is not used.

My understanding from the docs was that when LISTEN is not available, River uses polling instead with its latency and performance implications to deliver the same functionality. Does this mean that River should not be used with anything other than a direct pgx connection and it is not functional without LISTEN?

Without opencensus wrapping it also not working.

This means, using sql.Open("pgx", "....") followed by riverdatabasesql.New.

Right the issue is that River's dbsql driver doesn't support listen at all at the moment, so it can't receive notifications. #352 was my idea for how we could solve that. I think while @brandur and I were discussing it we were mostly focused on whether it was needed as an optimization for job fetching, but without really considering job cancellation.

We built a poll only mode that covers some of the other features which otherwise use listen/notify, but there is no such fallback for cancellation. The only way a running job actually gets cancelled is if the worker it's running on gets the cancel signal over listen/notify, which essentially means it just doesn't work at all in the dbsql driver as of today.

In theory the producer could poll the table for any active jobs being worked, but that's a ton of extra query load for a rare event. I really think some kind of pubsub listener setup is the only good way for that feature to work and we should probably figure out how to make that happen in the dbsql driver.

I understand. Thanks for getting back to me so quickly. I am still in the research phase. I really liked River, since it fits extremely nicely to our needs and it answers questions that other frameworks don't bother asking (ensuring atomic/consistent job scheduling) but unfortunately I might need to consider other options in this case, since switching to a raw pgx connection does not seem possible with the current code base.

Just to make sure I understand the use case, are you saying in your setup it would be difficult to make a pgx pool that's solely for the purpose of the 1-conn-per-client listener process (as illustrated in #352)? Or that it would be difficult for you to switch to using a raw pgx pool for all of the rest of your codebase?

With the approach in #352 the rest of your codebase would still be able to use whatever abstraction you have today, so long as you can also provide the River driver with a plain pgx pool that it can check out a conn from solely for the purpose of listening for pubsub events.

Would love to hear more about it if there's more to the story so I can make sure I'm considering the full set of use cases. Otherwise we could probably ship something like that PR pretty quickly IMO.

Our workers and clients would run within the same Go app.

Am I understanding correctly that the sole purpose of the db connection that is passed to the RiverClient is for listening for jobs?

Does this mean that within a client logic, if I were to use the plain sql connection (extracted from GORM) to schedule a job, notification and atomic subscription will work correctly?

Also, does this mean that it should not be a problem to use GORM within the Worker's Work logic and use something like JobCompleteTx again without issues?

If the pgx connection is used only for listening, leader election, and the like and is completely isolated from any of our own business/transaction logic, then it should be possible to create a separate connection for RiverClient. Is there anything that I should be aware of as a caviat in such a setup?

@mokiat well, my comments were probably pretty confusing in light of how #352 has rotted since it was open (including it being based on an earlier merged PR but showing a huge diff). I just updated it now, so if you check it out again, the following will probably make much more sense.

Am I understanding correctly that the sole purpose of the db connection that is passed to the RiverClient is for listening for jobs?

Nope! There's essentially 3 categories of usage for database connections:

  1. At job insertion time, the provided tx is used (if you use a *Tx insert method). Otherwise the main database pool is used.
  2. A River Client that's working jobs will use the main database pool for fetching jobs, marking them as completed, and for other internal maintenance operations.
  3. A River Client will also check out a dedicated connection from the database pool so that it can be dedicated to using as a Postgres LISTEN listener. There's one of these per working Client when using the pgx driver today as the database/sql interface does not support the low level operations required for LISTEN.

The proposal in #352 is to allow an alternate way for obtaining connections for use case (3) above, and is only applicable to drivers that don't otherwise support LISTEN. Essentially it just provides an alternate constructor to the standard New called NewWithListener that takes both a standard pool, as well as a riverdriver.Listener type. The latter in that proposal can be allocated with a riverpgxv5.NewListener constructor. It's a bit odd as you end up making use of both the pgxv5 driver and the database/sql driver, but if you're using pgxv5 under the hood anyway then it's generally not that hard IMO. All you need is to construct an isolated pgxpool that will be used solely for checking out listener connections, and then provide that to your driver on init.

Also, does this mean that it should not be a problem to use GORM within the Worker's Work logic and use something like JobCompleteTx again without issues?

If the pgx connection is used only for listening, leader election, and the like and is completely isolated from any of our own business/transaction logic, then it should be possible to create a separate connection for RiverClient. Is there anything that I should be aware of as a caviat in such a setup?

With the riverdatabasesql driver you should be able to use the same dbsql-based ORM anywhere in your codebase, including in your workers. And if we were to move forward on #352, there shouldn't be any loss of functionality from this approach—just the regular performance degradation of using dbsql vs direct pgx.

I hope that all makes sense! Can you let me know if this answers your questions and if you think this would address your concerns?

I think I have a better understanding now.

My idea and ask was slightly different, however. Let's ignore #352 for a second.

What would happen if instead I were to use riverpgxv5.New as is available right now to instantiate a RiverClient but continue using *sql.DB for all our internal business logic (i.e. I would have two separate connections to the database)? Would this create a problem for operations like InsertTx and JobCompleteTx, where a transaction created from the default *sql.DB connection would be passed?

In other worlds, the river.Client would be instantiated with a pgx connection pool but all other places of the program would use a separate, standard *sql.DB pool.

I am assuming this would work but just want to make sure.

Ok, after a bit more playing around, I realize why #352 is required. It all has to do with the InsertTx method which is very specific as to the Tx type and in the case a of pgx River client would expect a pgx transaction.

I switched to the commit from the PR to give it a try. Scheduling a job from a transaction and cancellation from the UI work as expected. However, cancellation from source code does not work. Neither JobCancelTx nor JobCancel seem to work. The job executes to completion and is marked as completed.

Also, side note for the docu, the GORM ConnPool could be of type PreparedStmtTX and not *sql.Tx, in which case ConnPool.(*gorm.PreparedStmtTX).Tx.(*sql.Tx) is required.