This project is just a training project for the moment. The goal is for me to get more familiar with Go concurrency primitives and philosophy.
All I did is take this article: https://go.dev/blog/pipelines and turn it into reusable code, separating the concurrency logic from the user logic, and moving function to interfaces.
A pipeline has to start with a Producer and end with a Consumer. In between, it is possible to add Subjects to perform more processing. A Subject is simply a type that is both Consumer and Producer, so it can consume values from an upstream layer, and produce a new value for a downstream layer.
For the moment there is no notion of connecting nodes, but only of linear pipeline.
The Pipeline type aggregates the Producer, Consumer and Subjects, and ignites the pipeline flow.
Additional types are provided for the user's convenience:
- The
DelegateProduceris a producer where values are generated by a custom factory function provided by the user. - The
ItemsProduceris a producer where values are given upfront by the user as immediate values when constructing the instance.
- The
DelegateSubjectis a consumer and producer, where received values are transformed by a custom transform function provided by the user. - The
MergeSubjectis a fan-out consumer / fan-in producer, where the input is distributed to all subjects provided at construction, and subjects output is gathered into a single output for the downstream layer.
- The
DelegateConsumeris a consumer where values are processed by a custom processing function provided by the user. - The
DrainConsumeris a consumer that will simply consume (and discard) all incoming values.
Hereafter is the sample from the article reproduced, with a source at the top of the pipeline that produces numbers, then a layer that squares the inbound values, and finally a sink that prints the squared values.
package main
import (
"fmt"
"pipeline"
)
func main() {
p := pipeline.NewPipeline[int]()
source := pipeline.NewItemsProducer(p, []int{4, 9})
subject := pipeline.NewDelegateSubject(p, func(input int) int {
return input * input
})
sink := pipeline.NewDelegateConsumer(func(input int) bool {
fmt.Println(input)
return true
})
p.RegisterSource(source)
p.AddSubject(subject)
p.RegisterSink(sink)
p.Run()
}- Create a
Pipelineof the desired type. - Create a
Producer, here theItemsProduceris used for simplicity. - Create a
Subjectthat will square the inbound values. - Create a
Consumerthat will print the inbound squared values. - Register the
Produceras source, add theSubjectto the pipeline, and register theConsumeras sink. - Run the pipeline. The
Runfunction blocks until all the input values are consumed.
The pipeline can be interrupted in two ways.
The sink consumer returns false to indicate it is done, this interrupt consumption, but it does not signals the upstream layers that they should stop producing values. Therefore when interrupting the flow that way, you should cancel the pipeline (by calling Cancel() on the Pipeline instance) after the Run() function returns.
You can also interrupt the pipeline by simply calling the Cancel() function on the Pipeline instance at any time. That is the reason why most of the provided types take an instance of Pipeline at construction, they need to listen to the "done" channel to be notified of pipeline interruption.
It should be possible to pass a context.Context to the Run() function that will automatically be shared amongst all elements of the pipeline, or another way but something based on context.Context.
More tests should be performed, and thus guidelines written, to help the user ensure he/she will not write code that can leak goroutines.