mattbaird/elastigo

Bulk indexing small batches can fail

Closed this issue · 5 comments

Thanks for all the work on bulk indexing. Last night I was running a process that is indexing documents using the global bulk indexor and found that for small batches that finish the process before BulkDelaySeconds has elapsed never get flushed to Elasticsearch. I had assumed that calling Flush might "block" for long enough to flush and retrieve the result, but it does not. The only way I have been able to get around this is by sleeping until (i think) the request hits Elasticsearch with the data.

Any suggestions with how to deal with this situation would be great. I have the following func running in a go routine that blocks until all the documents on the channel have been read and the channel is closed by the routine that is sending documents

func StartElasticsearchIndexer(indexName string, queue chan Document, wg *sync.WaitGroup) {
    done := make(chan bool)
    core.BulkDelaySeconds = 1
    core.BulkIndexorGlobalRun(1, done)

    for document := range queue {
        json, _ := Transform(document)
        date := time.Now() // not sure where to set this
        err := core.IndexBulk(indexName, string(document.Type), strconv.Itoa(document.Id), &date, string(json))

        // This should likely explode if there is an error so we can try again?
        if err != nil {
            panic(err.Error())
        }
    }
    // This channel is closed when the there are no more docs to process ...
    mx.Log.Info("Document channel closed.")
    mx.Log.Info(fmt.Sprintf("%s errors while indexing", strconv.FormatUint(core.BulkErrorCt, 10)))

    // The call to flush should be blocking (?)
    core.BulkIndexorGlobalFlush()
    time.Sleep(1500*time.Millisecond)
    // Tell the indexer to pack it up
    done <- true
    // not true!! we have to wait for request to finish!
    // This is a terrible solution
    time.Sleep(200*time.Millisecond)

    // Finally, we can stop waiting on main routine execution
    wg.Done()
}

I should add that BulkIndexorGlobalFlush was added so that I could get access to Flush on the global instance (which is not exported).

thank you for great writeup/finding this. I believe there was an error in the logic, i created a unit test that failed, then fixed it, also exposed some of the hidden Global indexor
araddon@b1ed1ec

Logical Error:
araddon@b1ed1ec#L0L178

Ill get a pull request in for this.

Thanks! It was very late last night when I found this. Thanks for fixing this so fast. Ill pull and give it a whirl.

Any update on this? I have a problem that may be related: the bulk indexer never finishes. Example:

var done chan bool
bulker := core.NewBulkIndexorErrors(1, 1)
bulker.Run(done)
for ... {
  ...
  bulker.Index("widgets", "widget", i, nil, widget)
}
log.Printf("flushing")
bulker.Flush()
log.Printf("waiting")
done <- true
log.Printf("done")

With any number of documents passed to bulker.Index(), this program hangs at "waiting". The done chan is never read.

Also, a suggestion: I suggest reducing the number of methods available for configuring the bulk indexer and removing the global bulk indexer and the convenience methods. There are currently too many ways to use the bulk indexer and none of them are more powerful or complex than the other. This adds unnecessary complexity to the API.

Thanks for writing this library!!

Hi Insasho, did you merge Aaron's changes and retest?

Could you write a test for this?

Agreed on the API, it could be simplified. Any suggestions?

On Thu, Jul 4, 2013 at 11:21 AM, insasho notifications@github.com wrote:

Any update on this? I have a problem that may be related: the bulk indexer
never finishes. Example:

var done chan bool
bulker := core.NewBulkIndexorErrors(1, 1)
bulker.Run(done)
for ... {
...
bulker.Index("widgets", "widget", i, nil, widget)
}
log.Printf("flushing")
bulker.Flush()
log.Printf("waiting")
done <- true
log.Printf("done")

With any number of documents passed to bulker.Index(), this program hangs
at "waiting". The done chan is never read.

Also, a suggestion: I suggest reducing the number of methods available for
configuring the bulk indexer and removing the global bulk indexer and the
convenience methods. There are currently too many ways to use the bulk
indexer and none of them are more powerful or complex than the other. This
adds unnecessary complexity to the API.

Thanks for writing this library!!


Reply to this email directly or view it on GitHubhttps://github.com//issues/20#issuecomment-20488927
.

Closed with #21