/sink

Primary LanguageGoApache License 2.0Apache-2.0

sink

sink allows you to batch expensive operations using items from different flows.

GitHub Codecov GitHub Workflow Status

Installation

Install using go modules

  go get -u github.com/ormanli/sink

Usage/Examples

Configuration

cfg := sink.Config[int, int]{
    MaxItemsForBatching:   10, // Maximum number of items to batch inputs, mandatory, can't be less than 1.
    MaxTimeoutForBatching: 10 * time.Millisecond, // Maximum time to wait for inputs, mandatory, can't be less than 1 millisecond.
    AddPoolSize:           10, // Add operation goroutine pool size, mandatory, can't be less than 1.
    CallbackPoolSize:      10, // Callback operation goroutine pool size, mandatory, can't be less than 1.
    ExpensivePoolSize:     10, // Expensive operation goroutine pool size, mandatory, can't be less than 1.
    ExpensiveOperation: func(i []int) ([]int, error) {
        time.Sleep(time.Second)

        return i, nil
    }, // Actual function that is called with batched items, mandatory.
    Logger:                 customLogger, // Logger is optional, if not provided log package used.
}

Sink will either wait until MaxItemsForBatching of items to arrive or wait until MaxTimeoutForBatching to start processing batched items.

Sink

Processes given inputs by provided configuration. When it is no longer required, stop sink by calling Close method.

s, err := sink.NewSink[dummy, dummy](cfg)
defer s.Close()

_, err = s.Add(dummy{i: 10})

Sink With Context

Processes given inputs by provided configuration. It will run until the given context is canceled.

ctx, cncl := context.WithCancel(context.Background())
s, err := sink.NewSinkWithContext[dummy, dummy](ctx, cfg)
defer cncl()

_, err = s.Add(dummy{i: 10})

Running Tests

To run tests, run the following command

  make test

Authors