sink allows you to batch expensive operations using items from different flows.
Install using go modules
go get -u github.com/ormanli/sink
cfg := sink.Config[int, int]{
MaxItemsForBatching: 10, // Maximum number of items to batch inputs, mandatory, can't be less than 1.
MaxTimeoutForBatching: 10 * time.Millisecond, // Maximum time to wait for inputs, mandatory, can't be less than 1 millisecond.
AddPoolSize: 10, // Add operation goroutine pool size, mandatory, can't be less than 1.
CallbackPoolSize: 10, // Callback operation goroutine pool size, mandatory, can't be less than 1.
ExpensivePoolSize: 10, // Expensive operation goroutine pool size, mandatory, can't be less than 1.
ExpensiveOperation: func(i []int) ([]int, error) {
time.Sleep(time.Second)
return i, nil
}, // Actual function that is called with batched items, mandatory.
Logger: customLogger, // Logger is optional, if not provided log package used.
}
Sink will either wait until MaxItemsForBatching of items to arrive or wait until MaxTimeoutForBatching to start processing batched items.
Processes given inputs by provided configuration. When it is no longer required, stop sink by calling Close
method.
s, err := sink.NewSink[dummy, dummy](cfg)
defer s.Close()
_, err = s.Add(dummy{i: 10})
Processes given inputs by provided configuration. It will run until the given context is canceled.
ctx, cncl := context.WithCancel(context.Background())
s, err := sink.NewSinkWithContext[dummy, dummy](ctx, cfg)
defer cncl()
_, err = s.Add(dummy{i: 10})
To run tests, run the following command
make test