danielberkompas/elasticsearch-elixir

Indexing fails on second Elasticsearch.Store load?

Closed this issue · 26 comments

So I have implemented the Store, Document Protocol and a mapping file and I'm nearly getting things working with your great library.

Unfortunately it seems to be calling the Elasticsearch.Store load function twice even though there are fewer than 5000 entries for the first index. I would have thought that it would never try to index again returned values < bulk_insert_size is the number of entries returned?

Returned from Elasticsearch.Store:

[
    %MyApp.MyThing{....etc...},
    %MyApp.MyThing{....etc...},
    %MyApp.MyThing{....etc...}
]

It then gives me argument error when my Repo returns an empty array:

SELECT .... FROM tasks AS i0 LIMIT $1 OFFSET $2 [5000, 5000]
** (ArgumentError) argument error
    :erlang.apply([], :load, [])
    (elasticsearch) lib/elasticsearch/storage/data_stream.ex:57: Elasticsearch.DataStream.load_page/4
    (elixir) lib/stream.ex:1361: Stream.do_resource/5
    (elixir) lib/stream.ex:1536: Enumerable.Stream.do_each/4
    (elixir) lib/enum.ex:1911: Enum.reduce/3
    (elasticsearch) lib/elasticsearch/indexing/bulk.ex:81: Elasticsearch.Index.Bulk.upload/4
    (elasticsearch) lib/elasticsearch/indexing/index.ex:33: Elasticsearch.Index.hot_swap/4
    (elasticsearch) lib/mix/elasticsearch.build.ex:57: Mix.Tasks.Elasticsearch.Build.build/3
    (elasticsearch) lib/mix/elasticsearch.build.ex:39: anonymous fn/3 in Mix.Tasks.Elasticsearch.Build.run/1
    (elixir) lib/enum.ex:1911: anonymous fn/3 in Enum.reduce/3
    (elixir) lib/enum.ex:3251: Enumerable.List.reduce/3
    (elixir) lib/enum.ex:1911: Enum.reduce/3
    (elasticsearch) lib/mix/elasticsearch.build.ex:37: Mix.Tasks.Elasticsearch.Build.run/1
    (mix) lib/mix/task.ex:314: Mix.Task.run_task/3
    (mix) lib/mix/cli.ex:80: Mix.CLI.run_task/2
    (elixir) lib/code.ex:677: Code.require_file/2

I am using your library inside an umbrella project so that might be something to do with it? I am a bit confused where to go from here! Ta!

@aphillipo thanks for the excellent bug report! I am investigating this now and will try and have a fix up by the end of the day.

@aphillipo as a followup, it would be helpful to see more details on your Elasticsearch.Store implementation. I'm having trouble reproducing the described behavior.

Sure!

defmodule Fixes.IssueStore do
  @behaviour Elasticsearch.Store
  import Ecto.Query, only: [from: 2]

  @impl Elasticsearch.Store
  def load(Fixes.Issue, offset, limit) do
    query = from(i in Fixes.Issue, limit: ^limit, offset: ^offset, preload: [:creator])
    Fixes.Repo.all(query)
  end
end

And my issue_impl.ex is:

defimpl Elasticsearch.Document, for: Fixes.Issue do
  def id(issue), do: issue.id
  def type(_issue), do: "issue"
  def parent(_issue), do: false

  def encode(issue) do
    %{
      issue_id: issue.issue_id,
      title: issue.title,
      description: issue.description,
      states: issue.states,
      creator: get_user_map(issue.creator)
    }
  end

  def get_user_map(user) do
    %{
      username: user.username,
      email: user.email,
      display_name: user.display_name,
      deleted_at: user.deleted_at
    }
  end
end

I think I might have spotted it! Load should be a parameter not a Fixes.Issue 🤦🏻‍♂️

Sorry to have wasted your time!

defmodule Fixes.IssueStore do
  @behaviour Elasticsearch.Store
  import Ecto.Query, only: [from: 2]

  @impl Elasticsearch.Store
  def load(queryable, offset, limit) do
    query = from(i in queryable, limit: ^limit, offset: ^offset, preload: [:creator])
    Fixes.Repo.all(query)
  end
end

Still doesn't work sadly :-(

@aphillipo I haven't been able to reproduce this error yet. It appears to be that somehow, somewhere, the store module is being set to an empty list, causing an error when Elixir tries to call store.load:

:erlang.apply([], :load, [])

I haven't been able to find anything that would do that yet. I have to move on for now, but maybe can you try out my branch and see if it helps you?

{:elasticsearch, github: "infinitered/elasticsearch", branch: "10-argument-error"}

Yes thanks for looking. Seems very strange, I'll take a look!

i'm not entirely sure that i'm hitting the same problem .. but at least the error message looks exactly what @aphillipo posted in the beginning.

I've played around with the examples from the README earlier on, which i why the error message is mentioning the post type .. while i was trying to extend the code using another event type - where i started to see the error

i'm in load_page/4 and also seeing that store is a empty list - which made me curious what the other parameter values would be. offset and limit do look okay, but source ...

first run, everything okay:

source: MyApp.Event
store: MyApp.ElasticsearchStore
offset: 0
limit: 5000

while on the second run, it breaks due to:

source: %Elasticsearch.Exception{
  col: nil,
  line: nil,
  message: "Rejecting mapping update to [events-1523862388] as the final mapping would have more than 1 type: [post, event]",
  query: nil,
  raw: %{
    "_id" => "V+4Jvn21KnFHpE80KOs+glpUWznkv1gI7ZOjKhzRvrM=",
    "_index" => "events-1523862388",
    "_type" => "event",
    "error" => %{
      "reason" => "Rejecting mapping update to [events-1523862388] as the final mapping would have more than 1 type: [post, event]",
      "type" => "illegal_argument_exception"
    },
    "status" => 400
  },
  status: 400,
  type: "illegal_argument_exception"
}
store: []
offset: 0
limit: 5000

Because of #14 i'm thinking that this project is mainly based on 5.x - because for version 6.x onward there is an article about Removal of mapping types which explains that

Indices created in Elasticsearch 6.0.0 or later may only contain a single mapping type. Indices created in 5.x with multiple mapping types will continue to function as before in Elasticsearch 6.x. Mapping types will be completely removed in Elasticsearch 7.0.0.

i don't see (yet) why there would ever be two types in one index in my situation, because i've defined two indexes .. each one does have a (separated) type, but according to the error message .. i'd say they get merged somehow.

HTH

ooookay .. kind of "heading down the wrong rabbit hole" - at least for me. @aphillipo how does your json schema look like?

i've realized that i (blindly) copy & pasted the sample (post) schema to my new event structure. that led to the error message i've posed above because the index looked like that, while not having any documents:

{
    "events-1523866031": {
        "mappings": {
            "post": {
                "properties": {
                    "author": {
                        "type": "text"
                    },
                    "title": {
                        "type": "text"
                    }
                }
            }
        }
    }
}

where you can see "post" on L4. having such a structure does conflict with a newly arriving document where type=event is provided.

{
  "mappings": {
    "_doc": {
      "properties": {
        "issue_id": {
          "type": "text",
          "fields": {
            "exact_issue_id": { "type": "keyword" }
          }
        },
        "title": { "type": "text" },
        "description": { "type": "text" },
        "type": { "type": "keyword" },
        "creator": {
          "properties": {
            "username": {
              "type": "text"
            },
            "display_name": {
              "type": "text"
            },
            "user_id": {
              "type": "keyword"
            }
          }
        }
      }
    }
  }
}

sorry, i should have been more precise .. but i'm guessing you're facing the same situation. you had

def type(_issue), do: "issue"

while your mapping shows _doc, that would be the same conflict i had. to get more debugging output, adding the following function to your Store module should output a similar message i got:

def load(%Elasticsearch.Exception{} = exception, _, _, _) do
  IO.inspect(exception, label: "Elasticsearch.Exception")
end

(given all i know about my error right now).

otherwise i'd suggest you debug your local dependency (adding some IO.inspects in Elasticsearch.DataStream.load_page/4 did the trick for me) and tell Elixir it should use the local one for mix by adding path to your dependency in mix.exs:

{:elasticsearch, "~> 0.1.1", path: "deps/elasticsearch"}

@steffkes that worked thanks so much!

@danielberkompas so what seems to be happening "on the second run" is basically just a failure while executing the first run - which just appears visible as part of the second one.

i'm not deep enough into this library yet to decide where to place additional error handling? because i'd say we need to drop out of the stream handling in this case and just bark. there is no way that we could fix this otherwise during runtime.

my first idea was that we could maybe avoid the redundant definition of the type .. but i can't see how that would work. so we're only able to provide better error messages in case that it actually happens, or do i miss something?

Better errors would be great if we've both bumped into this - that's why I haven't closed the issue yet.

So, if I'm understanding this right, this can happen whenever DataStream encounters an exception. I'll look into reproducing this via test.

Regarding the differences between Elasticsearch 5 and 6, are you saying that Elasticsearch no longer supports having multiple document types in the same index?

Regarding the differences between Elasticsearch 5 and 6, are you saying that Elasticsearch no longer supports having multiple document types in the same index?

This is how Removal of mapping types sounds to me, yes. They are describing the drawbacks of this approach as well as a few alternatives for new versions and the future.

So, if I'm understanding this right, this can happen whenever DataStream encounters an exception. I'll look into reproducing this via test.

Yes, exactly this. based on the idea to place the handling inside Store.load/4 I've refactored it - but i'm not sure it's the best place to have it, as well as the best way to handle it? this patch is based on the v0.1.1 tag:

diff --git a/lib/elasticsearch/storage/data_stream.ex b/lib/elasticsearch/storage/data_stream.ex
index 1ad24fa..3ce9a3a 100644
--- a/lib/elasticsearch/storage/data_stream.ex
+++ b/lib/elasticsearch/storage/data_stream.ex
@@ -39,6 +39,10 @@ defmodule Elasticsearch.DataStream do
     {[], 0, Config.all()[:bulk_page_size]}
   end

+  defp next(_, %Elasticsearch.Exception{} = exception, _) do
+    raise exception
+  end
+
   # If no items, load another page of items
   defp next({[], offset, limit}, source, store) do
     load_page(source, store, offset, limit)

at least, this stops the stream cleanly and provides a stack trace. probably not the most valuable one .. but better than nothing i'd say

I'm going to patch the issue with DataLoader to raise any Elasticsearch exception it encounters like @steffkes suggests. Then, I'll cut the 0.2.0 release, and then focus on upgrading the library for Elasticsearch 6+. Some things will need to change now that we won't be having multiple types per index.

I'm confused again. I can't find any way that the store in DataStream would raise an exception, or that this exception would somehow get converted into the store position in the args.

It must be happening somewhere else in the execution chain, especially since DataStream has nothing to do with creating Elasticsearch indexes. It simply loads data from your data store.

Could either of you provide an example application? Or more detailed reproduction steps?

i don't think it's the store raising this one - to me it seems the origin is the HTTP-API, at least that would make sense, given that the same behaviour can be seen when you're using curl to send records towards elasticsearch's bulk endpoint.

the patch is just an idea/suggestion how to make the error visible - as i've said before i'm not sure if it's the best place or even a correct place at all. this was the first part of the chain where i could spot/handle it.

i'll see that i put an example together where it's obvious and probably even find out where a better place to hook in would be.

Before all of this is happening can we not just check the Document Impl for type and the mapping have a consistent key? That's what solved my issue.

Well yes, that would already help :) do you have an idea how/where to check this? because given the current structure, a type can be part of multiple indexes and during runtime we don't have an idea which index we're writing to (if i'm not mistaken) .. at least no somewhere where i'd probably integrate such an assertion

and .. on the other hand, the way it's suggested .. it would solve probably all error cases, no matter what. from "not able to connect to this host" to something like "dude, you've made some configuration mistake" - everything would be caught this way

Could either of you provide an example application? Or more detailed reproduction steps?

kept it as simple as possible @danielberkompas .. configuration (which btw yielded another issue which i'll file afterwards), (static) store, entity & mapping: https://github.com/steffkes/elasticsearch-elixir-10

where the relevant things to note are the different identifiers between elasticsearch's posts mapping and the post entity - one time we're using post, the other time not-post

Thanks for doing this @steffkes!

you're welcome @aphillipo, this was a nice trip :)