eapache/channels

Batching channel contains hidden type errors

rickb777 opened this issue · 2 comments

BatchingChannel has an error in the type of its output channel. This is hidden when using interface{}, but is apparent as soon as any other interface is substituted.

It should instead be

type BatchingChannel struct {
    input chan interface{}
    output chan []interface{}
    buffer        []interface{}
    size          BufferCap
}

There are corresponding changes needed in NewBatchingChannel() and Out().

The following illustrates this better:

package channels

type thing interface {
    X() string
}

// BatchingChannel implements the Channel interface, with the change that instead of producing individual elements
// on Out(), it batches together the entire internal buffer each time. Trying to construct an unbuffered batching channel
// will panic, that configuration is not supported (and provides no benefit over an unbuffered NativeChannel).
type BatchingChannel struct {
    input         chan thing
    output chan   []thing
    buffer        []thing
    size          BufferCap
}

func NewBatchingChannel(size BufferCap) *BatchingChannel {
    if size == None {
        panic("channels: BatchingChannel does not support unbuffered behaviour")
    }
    if size < 0 && size != Infinity {
        panic("channels: invalid negative size in NewBatchingChannel")
    }
    ch := &BatchingChannel{make(chan thing), make(chan []thing), nil, size}
    go ch.batchingBuffer()
    return ch
}

func (ch *BatchingChannel) In() chan<- thing {
    return ch.input
}

func (ch *BatchingChannel) Out() <-chan []thing {
    return ch.output
}

func (ch *BatchingChannel) Len() int {
    return len(ch.buffer)
}

func (ch *BatchingChannel) Cap() BufferCap {
    return ch.size
}

func (ch *BatchingChannel) Close() {
    close(ch.input)
}

func (ch *BatchingChannel) shutdown() {
    ch.output <- ch.buffer
    close(ch.output)
}

func (ch *BatchingChannel) batchingBuffer() {
    for {
        if len(ch.buffer) == 0 {
            elem, open := <-ch.input
            if open {
                ch.buffer = append(ch.buffer, elem)
            } else {
                ch.shutdown()
                return
            }
        } else if ch.size != Infinity && len(ch.buffer) >= int(ch.size) {
            ch.output <- ch.buffer
            ch.buffer = nil
        } else {
            select {
            case elem, open := <-ch.input:
                if open {
                    ch.buffer = append(ch.buffer, elem)
                } else {
                    ch.shutdown()
                    return
                }
            case ch.output <- ch.buffer:
                ch.buffer = nil
            }
        }
    }
}

I left the output channel as just interface{} so that it fulfills the Channel interface. As per the test code in batching_channel_test.go you can simply assert the output to []interface{} and then use the resulting slice.

Of course if you are taking a copy and manually substituting types, then it makes sense to just make the output channel a []thing, but then you're not fulfilling the Channel interface at that point anyways.

It was confusing, so I added a bit of extra documentation: https://godoc.org/github.com/eapache/channels#BatchingChannel.Out

Let me know if there's anything else that's not clear.

Evan