absinthe-graphql/dataloader

Error when used in transactions

arnodirlam opened this issue ยท 16 comments

Hi there,

we're using Dataloader without Absinthe and came across this error:

** (DBConnection.ConnectionError) could not checkout the connection owned by #PID<0.3039.0>. When using the sandbox, connections are shared, so this may imply another process is using a connection. Reason: connection not available and request was dropped from queue after 1562ms.

when running the following code in a test:

Ecto.Multi.new()
|> Ecto.Multi.run(:whatever, fn _, _ ->
  Dataloader.new()
  |> Dataloader.add_source(:repo, Dataloader.Ecto.new(MyApp.Repo))
  |> Dataloader.load(:repo, :roles, user)
  |> Dataloader.run()
end)
|> Ev2.Repo.transaction()

Outside of an Ecto.Multi, it works:

Dataloader.new()
|> Dataloader.add_source(:repo, Dataloader.Ecto.new(MyApp.Repo))
|> Dataloader.load(:repo, :roles, user)
|> Dataloader.run()

Is this something that can and should be fixed in Dataloader?

Thanks a lot for this great project! ๐Ÿ™Œ

What version of Dataloader / Ecto are you using?

Tried with

  • Dataloader 1.0.8 and master
  • Ecto 3.6.2 and master

Interesting. Dataloader.Ecto explicitly sets the caller value to the pid that calls load in order to avoid this issue. A test case which reproduces this would be very helpful if you have time open a PR with one.

Turns out it just expects an :ok tuple, which we did not return. So this works:

Ecto.Multi.new()
|> Ecto.Multi.run(:whatever, fn _, _ ->
  loader =
    Dataloader.new()
    |> Dataloader.add_source(:repo, Dataloader.Ecto.new(MyApp.Repo))
    |> Dataloader.load(:repo, :roles, user)
    |> Dataloader.run()

  {:ok, loader}
end)
|> Ev2.Repo.transaction()

Thanks a lot for your time and help! ๐Ÿ™ Closing this issue..

We ran into issues again, now with a correct return value for Multi.run/3. Please see my updated PR branch.

[error] Task #PID<0.938.0> started from #PID<0.936.0> terminating
** (DBConnection.ConnectionError) could not checkout the connection owned by #PID<0.931.0>. When using the sandbox, connections are shared, so this may imply another process is using a connection. Reason: connection not available and request was dropped from queue after 999ms. You can configure how long requests wait in the queue using :queue_target and :queue_interval. See DBConnection.start_link/2 for more information
    (ecto_sql 3.4.4) lib/ecto/adapters/sql.ex:593: Ecto.Adapters.SQL.raise_sql_call_error/1
    (ecto_sql 3.4.4) lib/ecto/adapters/sql.ex:526: Ecto.Adapters.SQL.execute/5
    (ecto 3.4.5) lib/ecto/repo/queryable.ex:192: Ecto.Repo.Queryable.execute/4
    (ecto 3.4.5) lib/ecto/repo/queryable.ex:17: Ecto.Repo.Queryable.all/3
    (dataloader 1.0.8) lib/dataloader/ecto.ex:328: Dataloader.Ecto.run_batch/6
    (dataloader 1.0.8) lib/dataloader/ecto.ex:656: Dataloader.Source.Dataloader.Ecto.run_batch/2
    (dataloader 1.0.8) lib/dataloader/ecto.ex:614: anonymous fn/2 in Dataloader.Source.Dataloader.Ecto.run_batches/1
    (elixir 1.12.1) lib/task/supervised.ex:90: Task.Supervised.invoke_mfa/2
    (elixir 1.12.1) lib/task/supervised.ex:35: Task.Supervised.reply/5
    (stdlib 3.15.1) proc_lib.erl:226: :proc_lib.init_p_do_apply/3
Function: &:erlang.apply/2
    Args: [#Function<14.77029692/1 in Dataloader.Source.Dataloader.Ecto.run_batches/1>, [{{:queryable, #PID<0.931.0>, Dataloader.User, :one, :id, %{}}, #MapSet<[{490, 490}]>}]]

Grateful for any hints or pointers! ๐Ÿ™

@benwilson512 not sure if you've seen I re-opened this. I tried to fix it, but didn't get very far in my attempts.

Can you confirm that it's an issue in dataloader, or am I overlooking something again? ๐Ÿ™‚

@arnodirlam are you able to create an example I can run?

@benwilson512 Yes, the tests in the linked PR should fail: #130

I'm seeing something similar when using a transaction. As far as I can tell, the issue is:

  • Test process calls a function that starts a Repo.transaction
  • Inside transaction a Dataloader is created and run
  • Dataloader.Ecto.run_batches spawns async tasks to run the batches
  • The run_batch function (now in a separate process) calls a Repo function
  • The Repo function attempts to checkout a connection
  • The only connection is being held by Repo.transaction in the first step, which is in a different process
  • The run_batch function times out and fails

@arnodirlam I created #134 to solve my issue with Repo.transaction. I suspect this may solve your problem too.

Thanks a lot for confirming the issue and for coming up with a PR, @giddie ๐Ÿ™

I've done more research, and found out the following:

Why does dataloader use async processes?

Partly for error handling, partly for performance optimization.

The hierarchy of processes looks as follows:

  1. A process calls Dataloader
  2. A process is spawned for loading all sources (using Task.async + Task.await) [Dataloader.run/1]
  3. A process is spawned for each source (using Task.async_stream) [Dataloader.run_tasks/3]
  4. For each source, a process is spawned for loading all batches (using Task.async + Task.await) [Dataloader.Ecto.run/1]
  5. For each source, a process is spawned for each batch (using Task.async_stream) [Dataloader.Ecto.run_batches/1]

Spawning 2. and 4. is for catching errors raised while loading sources and batches, respectively. They trap exits (docs).
Spawning 3. and 5. is for loading sources and batches in parallel. They are also used to enforce timeouts on loading each batch.

Why doesn't that work with transactions?

A transaction is bound to a DB connection that's held by the process calling Dataloader. DB connections, however, cannot be shared between processes.

I believe it is not about the sandbox. Transactions are per process, you cannot share a transaction across multiple process. If you have work you want to process in parallel, you can do all of the normalization on the side and leave the final step of inserting the data to the main process.

-- Josรฉ Valim on Ecto mailing list (Mar 13, 2017)

How does Ecto itself handle this?

Ecto's preloader checks whether the process it's called from already has a checked out connection. If yes, it does not preload in parallel (using Task.async_stream) but synchronously:

    if match?([_,_|_], preloaders) and not checked_out?(repo_name) and
         Keyword.get(opts, :in_parallel, true) do
      preloaders
      |> Task.async_stream(&(&1.(opts)), timeout: :infinity)
      |> Enum.map(fn {:ok, assoc} -> assoc end)
    else
      Enum.map(preloaders, &(&1.(opts)))
    end

-- shortened version of the code in Ecto.Repo.Preloader introduced by Josรฉ Valim in 78ba871

How to fix the issue?

By restructuring Dataloader to not spawn processes if it's run from within a transaction. Instead of using Task.async for catching errors, we could wrap calls to the data-loading in try/rescue blocks. Would that cover all error cases?

In particular, I suggest the following changes (will update according to feedback):

  • Replace uses of Task.async by counterparts using try/rescue (levels 2. and 4. in process hierarchy)
  • Categorize sources into those that need to be run synchronously (i.e. Ecto sources with a checked out connection) and those that can be run asynchronously. Run the async ones using Task.async, then run the synchronous ones sequentially, then Task.await all async ones.
  • Load batches synchronously if the source needs to be run synchronously
  • For Ecto sources loading batches synchronously, handle timeouts using the :timeout option on Ecto.Repo functions (docs) as timeouts aren't enforced anymore by processes spawned using Task.async_stream. The timeout for loading each batch equals the total timeout (for all batches) minus the total time used so far for loading previous batches.
  • Optimization: Don't spawn processes to load only a single source or batch (see Ecto.Repo.Preloader code above)
  • Optional: Add source option :in_parallel to force running in parallel (similar to @giddie's PR #314, but named like in Ecto) - up to you, @giddie

I know it's quite a bit of refactoring, but I think it's worth it!

Looking forward to feedback and discussion! @giddie @benwilson512 ๐Ÿ˜„

Also @seddy who worked on the async parts a lot ๐Ÿ‘Œ

seddy commented

Don't think you meant me @arnodirlam, though I'm more than happy to take credit for someone else's work obviously ๐Ÿ˜น

EDIT: Unless you're referring to this thing I did ages ago and had completely forgotten about ๐Ÿ˜… #41

So this wasn't you? @seddy ce9ac3f ๐Ÿค”

EDIT: Yea, we're talking ages ago ๐Ÿ˜„

Great work @arnodirlam. Looks like you went a few steps further than me by delving into Ecto to find a better solution and writing it all up :) Your recommendations sound good to me.

Handled now in 1.0.11. To use, make sure that when you call Dataloader.new/1 you do:

loader = Dataloader.new(async?: !MyApp.Repo.in_transaction?())

This will be handled automatically in 2.0