Repo.stream seems unsupported
mpcjanssen opened this issue · 6 comments
It seems that Repo.stream will not actually stream the query results. Instead the full resultset is returned and only then transformed into a stream.
For example the following code will give a timeout error:
def query do
import Ecto.Query
query = from r in Table, select: r
Repo.transaction fn ->
Repo.stream(query)
|> Stream.map(&IO.inspect/1)
|> Stream.run()
end
endRunning the same code against a Postgresql database will immediately start printing results as expected.
Interesting. I've not used stream before. I will look into this.
@mpcjanssen What version of ecto and sqlite_ecto2 are you using? Is it possible you have no data in your table? I am able to stream using very similar code to what you posted.
@ConnorRigby I am sure I have data in the table (2 million records). I will get a timeout when executing the query. So I think the sqlite_ecto adapter tries to get the full result first and then converts it to a stream. Before the full result (2 million records) is retrieved, the request times out.
When using Postgresql a database cursor is used instead so the results are returned one by one (see https://hexdocs.pm/postgrex/Postgrex.html#stream/4).
I also have done some investigation and it seems that the sqlitex binding doesn't actually support cursors or row by row results.
More details:
:sqlite_ecto2, "2.2.4
:sqlitex, "1.4.2"
Actual error when doing the query:
Interactive Elixir (1.7.2) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> AppModule.query
12:46:09.590 [error] Sqlite.DbConnection.Protocol (#PID<0.277.0>) disconnected: ** (DBConnection.ConnectionError) client #PID<0.278.0> timed out because it checked out the connection for longer than 15000ms
** (MatchError) no match of right hand side value: {:error, %Sqlite.DbConnection.Error{connection_id: nil, message: "Disconnected", sqlite: nil}}
(sqlite_ecto2) lib/sqlite_db_connection/protocol.ex:206: Sqlite.DbConnection.Protocol.handle_transaction/3
(db_connection) lib/db_connection.ex:1343: DBConnection.handle/5
(db_connection) lib/db_connection.ex:1329: DBConnection.conclude_meter/6
(db_connection) lib/db_connection.ex:1207: DBConnection.run_begin/3
(db_connection) lib/db_connection.ex:798: DBConnection.transaction/3Okay i understand now. I'll have to read how Postgres implements streaming. Sqlite does support cursors if i remember correctly, so we should be able to implement something. For what it's worth you can set pool_timeout: :infinity, but as you've noticed, you are not really streaming at that point, you are loading every record.
Also for what it's worth here is the responsible source:
https://github.com/Sqlite-Ecto/sqlite_ecto2/blob/master/lib/sqlite_db_connection/stream.ex#L7-L23
You called it. We are just trying to execute the entire query.
And for reference here is the postgres implementation.