geofmureithi/apalis

0.6.0-rc7 crashing

kdesjard opened this issue · 10 comments

After making a number of changes to get 0.6.0-rc7 to compile, I've set the storage backend as always (which includes my request type):

.backend(storage.clone())

But now I get:

internal error: entered unreachable code: Worker missing required context: Missing the an entry for `apalis_sql::postgres::PostgresStorage<MyType>`. Did you forget to add `.data(<apalis_sql::postgres::PostgresStorage<MyType>>)

Why do I need to use .data now??

There is always something that passes through a refactor.
I will write a quick fix

Ok thanks. I do like the new Request struct layout as I was doing this myself in my own struct. Overall, it's much simpler now.

After consideration, this will possibly be resolved externally. Here are some thoughts:

  1. Adding back an implicit Data Layer for the backend would impose a clone for all backends which is not the case for Redis.
  2. We need to standardize dependency injection, we cant inject the backend sparingly.
  3. Not every job needs access the storage, I think an extra .data( line would not hurt your case.

Let me know your thoughts.

Ok, if I add the storage via .data, then I get:

Worker missing required context: Missing the an entry for apalis_core::task::task_id::TaskId. Did you forget to add `.data(<apalis_core::task::task_id::TaskId>)

I am updating documentation on this.
Since the introduction of Parts, the following are injected directly:

  • TaskId
  • Attempt
  • Namespace
  • Ctx dependent on the Backend

These are no longer available via Data(T), just inject the directly as T into your job function.

Let me know if that helps. You can also look at the fn-args example in the v0.6 tree.

Creating the worker pool:

let storage: PostgresStorage<MyReq> = PostgresStorage::new(pool.clone());

let worker = WorkerBuilder::new("MyWorker")
    .chain(|srv| srv.layer(TraceLayer::new()))
    .backend(storage.clone())
    .build_fn(execute);

The build_fn like the fn-args example:

pub async fn execute(
    mut proc: MyReq,
    worker_id: Data<WorkerId>,
    _worker_ctx: Context<TokioExecutor>,
    _sqlite: Data<PostgresStorage<MyReq>>,
    task_id: Data<TaskId>,
    _ctx: Data<SqlContext>,

then I get the crash:

thread 'tokio-runtime-worker' panicked at ..cargo/git/checkouts/apalis-2e5337d3a5750988/d62281f/packages/apalis-core/src/worker/mod.rs:468:45:                                                                                                                   
internal error: entered unreachable code: Worker missing required context: Missing the an entry for `apalis_sql::postgres::PostgresStorage<MyReq>`. Did you forget to add `.data(<apalis_sql::postgres::PostgresStorage<MyReq>>) 

You need to do the following:

let storage: PostgresStorage<MyReq> = PostgresStorage::new(pool.clone());

let worker = WorkerBuilder::new("MyWorker")
    .chain(|srv| srv.layer(TraceLayer::new()))
    .data(storage.clone())
    .backend(storage.clone())
    .build_fn(execute);

And for your job fn:

pub async fn execute(
    mut proc: MyReq,
    worker_id: Data<WorkerId>,
    _worker_ctx: Context<TokioExecutor>,
    _sqlite: Data<PostgresStorage<MyReq>>, //We now include this via data
    task_id: TaskId, // TaskId Injected directly
    _ctx: SqlContext // Ctx is injected directly
)

With that setup I get many trait errors like:

72 | .build_fn(execute);
| ^^^^^^^^ the trait Service<apalis::prelude::Request<MyReq, SqlContext>> is not implemented for ServiceFn<fn(MyReq, Data<WorkerId>, Context<TokioExecutor>, Data<PostgresStorage<MyReq>>, TaskId, SqlContext) -> impl Future<Output = Result<OverallResult, Error>> {execute}, MyReq, SqlContext, _>, which is required by apalis::prelude::WorkerBuilder<MyReq, SqlContext, PostgresStorage<MyReq>, Stack<Data<PostgresStorage<MyReq>>, Stack<TraceLayer, Identity>>, _>: ap alis::prelude::WorkerFactory<_, _, apalis::prelude::ServiceFn<_, _, _, _>>

71 | .backend(storage.clone())
| ^^^^^^^ the trait Service<apalis::prelude::Request<MyReq, SqlContext>> is not implemented for ServiceFn<fn(MyReq, Data<WorkerId>, Context<TokioExecutor>, Data<PostgresStorage<MyReq>>, TaskId, SqlContext) -> impl Future<Output = Result<OverallResult, Error>> {execute}, MyReq, SqlContext, _>

Well seems I fkd up somewhere.
Looking at the docs, seems I am wrong about some of those. Looks like I missed that during the refactor. I will write a patch and clarify.