danielberkompas/elasticsearch-elixir

Repo.stream no longer supports preload

jhonathas opened this issue ยท 19 comments

In version 0.4.0 it was possible to use a load function as can be seen in the link below. But this option has been removed. I believe we will need to have the load function again. I'll explain below.
https://github.com/infinitered/elasticsearch-elixir/blob/58948d14a5806d76aa469702c39eef8643738ac4/guides/upgrading/0.4.x_to_0.5.x.md

In Ecto 3.0, support for preload along with Repo.stream has been removed as can be seen on these links:

https://elixirforum.com/t/repo-stream-with-preload-new-warning/17043

elixir-ecto/ecto@6655a9a#diff-122d0a4bbce6a65cc1523584a00193aaR138)

Without the option to preload next to the stream, there is only the option of preloading within Elasticsearch.Document, which is bad because it would have to preload every record, that is, we would lose the preload in batch.

I think that including again the load option, we could go back to doing things like:

  def load (schema, offset, limit)
    schema
    |> offset (^ offset)
    |> limit (^ limit)
    |> Repo.all ()
    |> Repo.preload ([: ad,: address,: medias])
  end

I do not know if I could explain. If you have any doubt, I can try to give you more examples.

Offset and limit are poor for performance. We are going to need to adopt a cursor-based solution for paging through documents.

Preload is a must for inner joins. How to upload data which needs data from related tables? Currently, it is very limited to flat tables.

To overcome Ecto 3 blocking preloads in the stream, I just replaced Repo.stream/1 with Repo.all/1 in Elasticsearch.Store. I can't see a better solution.

But in doing so, you will have n + 1 problem. Instead of having only 1 preload query, as it was in the stream, you will have preload queries for each row returned by .all.

Currently I use all/1, but I know it is not a good solution, especially when my base grows more.

No, I have no N+1 problem. For 3 inner joins I have 4 queries altogether.

With me it did not preload even me trying to use the join :(

Also curious about solutions, I'm stuck with a MSSQL db and Tds.Ecto doesn't support stream at all ๐Ÿคฃ

@ksherman This is something I want to address in version 2.0.0. I'm going to recommend that you use a stream based on cursors. You can build one pretty easily using the paginator library.

defmodule CursorStream do
  def new(repo, query, opts \\ []) do
    Stream.resource(
      fn ->
        %{
          query: query,
          entries: [],
          cursor: nil,
          repo: repo,
          opts: opts
        }
      end,
      &next/1,
      &cleanup/1
    )
  end

  defp next(%{entries: []} = state) do
    opts = Keyword.merge(state.opts, after: state.cursor)
    page = state.repo.paginate(state.query, opts)

    case page.entries do
      [h | entries] ->
        {[h], %{state | entries: entries, cursor: page.metadata.after}}

      [] ->
        {:halt, state}
    end
  end

  defp next(%{entries: [h | t]} = state) do
    {[h], %{state | entries: t}}
  end

  defp cleanup(_state) do
    # no op
  end
end

# Usage
CursorSteam.new(MyApp.Repo, query, opts_for_paginator)

Cursors avoid the limit/offset performance problems, as well as the transaction problems of Repo.stream.

@danielberkompas Okay, I tried this out, and I think its kinda just running the same page of products over and over? My CursorStream is what you pasted above. This is what I added to the store:

  @impl true
  def stream(schema) do
    CursorStream.new(
      Repo,
      from(s in schema, where: s."FoodItemID" > 9000 and s."FoodItemID" < 9050, preload: :brand),
      cursor_fields: [:id],
      limit: 50
    )
  end

The where on the FoodItemID was just to start with a small collection of results, this table has > 1.5M rows. There's only 9 products that match that range, the mix command just seems to index those 9 products over and over.

@ksherman Are you sure that your ID field is named :id? It looks like it might be capitalized, ID?

Hey! Sorry for the late reply, I've tried a few different options in the cursor_fields. Our id is actually :FoodItemID and using that doesn't seem to stop it from looping either.

I know offsets aren't desirable, for now this worked for me:

  def new(query, chunk_size \\ 500, offset \\ 0) do
    Stream.resource(
      fn -> 0 end,
      fn
        :stop ->
          {:halt, :stop}

        offset ->
          rows = EfAPI.Repo.all(from(query, limit: ^chunk_size, offset: ^offset))

          if Enum.count(rows) < chunk_size do
            {rows, :stop}
          else
            {rows, offset + chunk_size}
          end
      end,
      fn _ -> :ok end
    )
  end

To overcome Ecto 3 blocking preloads in the stream, I just replaced Repo.stream/1 with Repo.all/1 in Elasticsearch.Store. I can't see a better solution.

One option would be to perform a join query and programatically hydrate the structs, for example:

from(comment in Comment,
    join: post in assoc(comment, :post)
    select: {comment, post}
)
|> Repo.stream()
|> Stream.map(fn {comment, post} -> Map.put(comment, :post, post) end)

I'll test this approach. Today I'm doing preload inside the document, but it gets too slow ...
thanks

To overcome Ecto 3 blocking preloads in the stream, I just replaced Repo.stream/1 with Repo.all/1 in Elasticsearch.Store. I can't see a better solution.

One option would be to perform a join query and programatically hydrate the structs, for example:

from(comment in Comment,
    join: post in assoc(comment, :post)
    select: {comment, post}
)
|> Repo.stream()
|> Stream.map(fn {comment, post} -> Map.put(comment, :post, post) end)

Thanks, @tcrossland.
The problem with this approach is when we need to use has_many associations..

@diegonogueira

You could use Stream.chunk_every/2 and Stream.flat_map/2 to preload records in batches.

I'm doing something similar with larger CSVs that I need to build:

# ...
|> Repo.stream()
|> Stream.chunk_every(10)
|> Stream.flat_map(fn chunk ->
  Repo.preload(chunk, Feed.product_preloads())
end)
# ... Do something with your stream

This preloads records in groups of 10, and flatmap flattens the chunks back into a single stream. The performance overhead here seems to be negligible in my testing so far.

Thanks, @seanwash!
It works! :)

I created my stream_preload method on Repo file.

defmodule MyApp.Repo do
  use Ecto.Repo,
    otp_app: :my_app,
    adapter: Ecto.Adapters.Postgres

  def reload(%module{id: id}) do
    get(module, id)
  end

  def stream_preload(stream, size, preloads) do
    stream
    |> Stream.chunk_every(size)
    |> Stream.flat_map(fn chunk ->
      Repo.preload(chunk, preloads)
    end)
  end
end

And the store file:

defmodule MyApp.PostStore do
  @behaviour Elasticsearch.Store
  alias MyApp.Repo

  @impl true
  def stream(schema) do
    schema
    |> Repo.stream()
    |> Repo.stream_preload(500, :comments)
  end

  @impl true
  def transaction(fun) do
    {:ok, result} = Repo.transaction(fun, timeout: :infinity)
    result
  end
end

config.exs

...
bulk_page_size: 500,
bulk_wait_interval: 0
...

Thanks

Thanks, @seanwash!

It worked very well!