olivere/elastic

fatal error: concurrent map read and map write

ppg-coder opened this issue · 1 comments

I use github.com/olivere/elastic/v7 v7.0.22 and go 1.15

I use bulk processor to batch index and update doc, my setup code like this

func (i *SpuEs) BulkProcessor(ctx context.Context) (*elastic.BulkProcessor, error) {
	return i.EsClient.BulkProcessor().
		Name(i.EsIndexName).
		Workers(5).
		BulkActions(4096).
		BulkSize(2 << 20).
		FlushInterval(2 * time.Second).
		After(func(executionId int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) {
			if err != nil {
				failedNum := 0
				if response != nil {
					failedNum = len(response.Failed())
				}
				xlog.WithEvent(i.EsIndexName).Errorf("BulkProcessor failed, id:%v, req_num:%v, failed_num:%v, err:%v", executionId, len(requests), failedNum, err)
				return
			}
			if response == nil || !response.Errors {
				return
			}
			for _, item := range response.Items {
				for key, val := range item {
					if val.Error == nil {
						continue
					}
					xlog.WithEvent(i.EsIndexName).Errorf("BulkProcessor failed, key:%v, res:%v, err:%v", key, val.Result, val.Error.Reason)
				}
			}
		}).
		Do(ctx)
}

atfer my service start, the fatal error occurred with probability

fatal error: concurrent map read and map write

goroutine 455 [running]:
runtime.throw(0x1d408db, 0x21)
	/usr/local/go/src/runtime/panic.go:1116 +0x72 fp=0xc0034599c8 sp=0xc003459998 pc=0x437cb2
runtime.mapaccess2(0x1a1ac80, 0xc005cb8ff0, 0xc00aea08c0, 0xc00aea08c0, 0xc0055fe160)
	/usr/local/go/src/runtime/map.go:469 +0x25b fp=0xc003459a08 sp=0xc0034599c8 pc=0x40f91b
reflect.mapaccess(0x1a1ac80, 0xc005cb8ff0, 0xc00aea08c0, 0x1d2b0ae)
	/usr/local/go/src/runtime/map.go:1309 +0x3f fp=0xc003459a40 sp=0xc003459a08 pc=0x469d5f
reflect.Value.MapIndex(0x1a1ac80, 0xc005cb8ff0, 0x15, 0x196f860, 0xc00aea08c0, 0x98, 0x19e70e0, 0xc00aea09c0, 0x94)
	/usr/local/go/src/reflect/value.go:1189 +0x16e fp=0xc003459ab8 sp=0xc003459a40 pc=0x4a3d0e
encoding/json.mapEncoder.encode(0x1d88598, 0xc006186100, 0x1a1ac80, 0xc005cb8ff0, 0x15, 0x1a10100)
	/usr/local/go/src/encoding/json/encode.go:801 +0x30d fp=0xc003459c30 sp=0xc003459ab8 pc=0x75bdad
encoding/json.mapEncoder.encode-fm(0xc006186100, 0x1a1ac80, 0xc005cb8ff0, 0x15, 0x2bd0100)
	/usr/local/go/src/encoding/json/encode.go:777 +0x65 fp=0xc003459c70 sp=0xc003459c30 pc=0x768405
encoding/json.(*encodeState).reflectValue(0xc006186100, 0x1a1ac80, 0xc005cb8ff0, 0x15, 0xc003450100)
	/usr/local/go/src/encoding/json/encode.go:358 +0x82 fp=0xc003459ca8 sp=0xc003459c70 pc=0x758f42
encoding/json.(*encodeState).marshal(0xc006186100, 0x1a1ac80, 0xc005cb8ff0, 0x100, 0x0, 0x0)
	/usr/local/go/src/encoding/json/encode.go:330 +0xf4 fp=0xc003459d08 sp=0xc003459ca8 pc=0x758b34
encoding/json.Marshal(0x1a1ac80, 0xc005cb8ff0, 0x45, 0xc000f29950, 0x45, 0x0, 0x0)
	/usr/local/go/src/encoding/json/encode.go:161 +0x52 fp=0xc003459d80 sp=0xc003459d08 pc=0x757fb2
github.com/olivere/elastic/v7.(*BulkIndexRequest).Source(0xc006bbaee0, 0xc003459ee0, 0x2, 0x0, 0x1, 0x1)
	/go/pkg/mod/github.com/olivere/elastic/v7@v7.0.22/bulk_index_request.go:240 +0x39a fp=0xc003459e68 sp=0xc003459d80 pc=0x147efba
github.com/olivere/elastic/v7.(*bulkWorker).work(0xc001994b80, 0x1f72580, 0xc0003c1a80)
	/go/pkg/mod/github.com/olivere/elastic/v7@v7.0.22/bulk_processor.go:484 +0x46b fp=0xc003459fc8 sp=0xc003459e68 pc=0x148260b
runtime.goexit()
	/usr/local/go/src/runtime/asm_amd64.s:1374 +0x1 fp=0xc003459fd0 sp=0xc003459fc8 pc=0x4707e1
created by github.com/olivere/elastic/v7.(*BulkProcessor).Start
	/go/pkg/mod/github.com/olivere/elastic/v7@v7.0.22/bulk_processor.go:336 +0x1ce