mattbaird/elastigo

Fundamental flaw in bulk indexer?

Opened this issue · 8 comments

We've been trying to track down some issues where there are holes in our imported data, yet no errors coming through. I think I've identified a race condition where the last documents may never be flushed to Elasticsearch.

In startHttpSender, we wait on both the b.sendBuf and b.httpDoneChan inside a select loop. If we receive on httpDoneChan we return, and the goroutine finishes. During the Stop process, we first push the last docs, if there are any, onto sendBuf and then send true on httpDoneChan.

Go promises to process a single channel in a fifo manner, however from our tests this does not hold true for a select on multiple channels. I believe that in some cases, the httpDoneChan will beat the sendBuf call, and the goroutine will close, Stop will return, and sendBuf will be left with data in it, never to be received on. This is more likely to happen, I imagine, if the sendBuf part of the select statement is still processing at the moment that the Stop method is called, since neither channel will be ready to receive on immediately, and then the race occurs.

Here's some sample code that shows how the ordering of input to the channels does not matter when selecting on them, and also showing that this can't be solved by closing the channels either :(. The playground link doesn't show the close issue, but testing on a live machine will show the count output varies.

http://play.golang.org/p/a0RbfN8uQ8

package main

import (
    "fmt"
    "time"
)

func main() {
    ch1 := make(chan string, 10)
    ch2 := make(chan string, 10)
    go func() {
        count := 0
    LOOP:
        for {
            select {
            case a, more := <-ch1:
                if !more {
                    break LOOP
                }
                count++
                fmt.Println(a)
            case a, more := <-ch2:
                if !more {
                    break LOOP
                }
                count++
                fmt.Println(a)
            }
        }
        fmt.Println(count)
    }()
    for i := 0; i < 10; i++ {
        ch1 <- "a"
        ch2 <- "b"
    }
    close(ch1)
    close(ch2)
    time.Sleep(time.Second)
}

this is quite possible. This code needs some love. If you have some proposals, I'm open to helping.

My personal opinion is that it's trying to be a little too "asynchronous". We've moved to a simpler system that has a synchronous interface for any single "batch" of entities, (which is what we were trying to use the BulkIndexer for), whereas the BulkIndexer feels like it's designed to be more of a long running process.

We communicate via a docChan and an errChan. A goroutine runs and pulls from the docChan until it's closed, and sends the docs it receives in batches to elasticsearch as it reaches the threshold - sending any errors (and nil) back on the errChan, and using a sync.WaitGroup to ensure all requests are processed, then closing the errChan. In the main thread the entities are looped through, sent on the docChan, then the docChan is closed, and the errChan is looped over until it's closed. This has worked seamlessly for us, and was a very simple interface.

Obviously this was domain specific to our code, since we knew how to parse our entities into bulk indexer compatible documents, but I wouldn't imagine this would be too hard to replicate as BatchIndexer style code. That could be implemented in parallel to the BulkIndexer.

The bulkIndexer has a lot of issues, a simpler bulkIndexer would be appreciated. The current one does not allow the client to deal with errors. It's also very likely to drop data on shutdown of an app since you don't know if something was indexed or not.

Yes, I've been trying to track down errors in the bulk indexer and it's really difficult. Also, there seems to be a memory leak in NewBulkIndexerErrors. If an update or index operation fails and it retries then it seems to leak memory.

the background go routine stuff really doesn't belong in the library in my opinion. The library should give you a simple bulk api that the caller can wrap in a goroutine if they want to make it async.

@vrecan @kristen1980 how about setting a custom Sender function attribute on the BulkIndexer, is that not a good way to deal with errors? what kind of errors are you talking about that can't be caught this way?

@snikch were you using an old version of elastigo? httpDoneChan has been removed since nov 2014 (9fd168f ) and startHttpSender looks quite different than you describe. does this race issue still exist?

Hrm, it's possible. This was over a year ago so I've really got no memory of the whole situation. Sorry!