mattbaird/elastigo

BulkIndex failing to capture errors

Closed this issue · 6 comments

In the BulkSend function in bulk.go when there are errors in the bulk response err is nil.

code:

func BulkSend(buf *bytes.Buffer) error {
    _, err := api.DoCommand("POST", "/_bulk", nil, buf)
    if err != nil {
        BulkErrorCt += 1
        return err
    }
    return nil

sample response that comes back with errors but since err is nil none of the errors make it onto the ErrorChannel

[{"took":4,"errors":true,"items":[{"index":{"_index":"myindex_v1","_type":"myfiles","_id":"aaaaac7683dddfee51e0725b0908e2b0","status":400,"error":....etc....

Looks like in DoCommand a bulk request will give a 200 status code and return the errors that came back from the bulk request so this code doesn't take that into consideration

httpStatusCode, body, err = req.Do(&response)

    if err != nil {
        return body, err
    }
    if httpStatusCode > 304 {

I have an update to the BulkSend function which gets errors put on the channel. Example code below, let me know if you want it as a 2.0 pull request. (I'd move the struct out of the function, just there for brevity of code example).

The idea is that the bulk response is 200 OK but contains a JSON object that indicates how many errors that were returned. You have to unmarshall the JSON blob and check to see if the errors == true. This correctly now puts an error message on the ErrorsChannel.

// This does the actual send of a buffer, which has already been formatted
// into bytes of ES formatted bulk data
func BulkSend(buf *bytes.Buffer) error {

    type responseStruct struct {
        Took   int64                    `json:"took"`
        Errors bool                     `json:"errors"`
        Items  []map[string]interface{} `json:"items"`
    }

    response := responseStruct{}

    body, err := api.DoCommand("POST", "/_bulk", nil, buf)

    if err != nil {
        BulkErrorCt += 1
        return err
    }
    // check for response errors, bulk insert will give 200 OK but then include errors in response
    jsonErr := json.Unmarshal(body, &response)
    if jsonErr == nil {
        if response.Errors {
            BulkErrorCt += uint64(len(response.Items))
            return fmt.Errorf("Bulk Insertion Error. Failed item count [%d]", len(response.Items))
        }
    }
    return nil
}

You might consider using

return fmt.Errorf("Bulk Insertion Error... [%d]", len(response.Items))

updated based on schleppy's comment

@mattbaird looks like you added the code example in commit:
dbd553d

want me to close this issue?

yes please.