go-playground/pool

Should the pool instance be reused?

sudo-suhas opened this issue · 2 comments

Hey @joeybloggs, thanks for this package. The abstractions make it dead simple to start and manage worker pools.

How do you recommend I use the pool instance? Should I create and reuse it(by making it package scope) if I am only ever going to create and use a batch inside a function? Here's the code:

package util

import (
	"runtime"

	jsoniter "github.com/json-iterator/go"
	"github.com/pkg/errors"
	log "github.com/sirupsen/logrus"
	"gopkg.in/go-playground/pool.v3"
)

// jsoniter.ConfigFastest marshals the float with 6 digits precision (lossy),
// which is significantly faster. It also does not escape HTML.
var json = jsoniter.ConfigFastest

// Unmarshal uses jsoniter for efficiently unmarshalling the byte stream into
// the struct pointer.
func Unmarshal(bs []byte, v interface{}) error {
	// See https://github.com/json-iterator/go/blob/master/example_test.go#L69-L88
	iter := json.BorrowIterator(bs)
	defer json.ReturnIterator(iter)

	iter.ReadVal(v)
	if iter.Error != nil {
		log.WithError(iter.Error).
			Error("Got error while trying to unmarshal value into given struct")
		return errors.Wrap(iter.Error, "util: unmarshal using jsoniter.ConfigFastest failed")
	}
	return nil
}

func unmarshalWorker(bs []byte, val interface{}) pool.WorkFunc {
	return func(wu pool.WorkUnit) (interface{}, error) {
		if wu.IsCancelled() {
			// return values not used
			return nil, nil
		}

		return nil, Unmarshal(bs, val)
	}
}

// BulkUnmarshal uses worker pool
func BulkUnmarshal(bytesSlice [][]byte, vals []interface{}) error {
	if len(bytesSlice) != len(vals) {
		return errors.New(
			"util: bulk unmarshal failed: length of bytes slice did not match targets",
		)
	}

	p := pool.NewLimited(uint(runtime.NumCPU() * 2))
	defer p.Close()

	batch := p.Batch()

	go func() {
		for idx, bs := range bytesSlice {
			batch.Queue(unmarshalWorker(bs, vals[idx]))
		}

		// DO NOT FORGET THIS OR GOROUTINES WILL DEADLOCK
		// if calling Cancel() it calles QueueComplete() internally
		batch.QueueComplete()
	}()

	for res := range batch.Results() {
		if err := res.Error(); err != nil {
			batch.Cancel()
			return errors.Wrap(err, "util: bulk unmarshal failed")
		}
	}

	return nil
}

Hey @sudo-suhas it depends

do you want to limit your entire application to the pool limit(package scoped)? or limited per function call?

that's the rule of thumb I would use.

Ok, That makes sense. Closing this. Thanks again.

However, I saw some performance issues when I was trying to use this lib for marshalling a slice in parallel. I'll probably create a new issue some time later with the code and benchmark.