Some useful tools implemented by channel to increase development efficiency, e.g. event bus, stream, promise, actor, parallel runner, aggregator, etc..
- 如何把Golang的channel用的如nodejs的stream一样丝滑
- 如何用Golang的channel实现消息的批量处理
- 如何把Golang的Channel玩出async和await的feel
- 下次想在Golang中写个并发处理,就用这个模板,准没错!
- 玩转Golang的channel,二百行代码实现PubSub模式
A simple util to run tasks in parallel
worker := func(ctx context.Context, input interface{}) (interface{}, error) {
num := input.(int)
return num+1, nil
inputs := []interface{}{1,2,3,4,5}
outputs, err := channelx.RunInParallel(context.Background(), inputs, worker, 4)
more examples, please check parallel_runner_test.go
A PubSub pattern util
logger := channelx.NewConsoleLogger()
eventBus := channelx.NewEventBus(logger, 4,4,2, time.Second, 5 * time.Second)
handler := NewExampleHandler(logger)
eventBus.Subscribe(ExampleEventID, handler)
more details, please check event_bus_test.go#TestEventBus_Example
A golang style async/await, even I call it Promise, while the api is not 100% aligns with Javascript Promise.
promise := NewPromise(func() (res interface{}, err error) {
// do work asynchronously here
}).Then(func(input interface{}) (interface{}, error) {
// here is the succss handler, which aslo runs asynchronously
}, func(err error) interface{} {
// here is the error handler, which aslo runs asynchronously
// await: wait until it completes.
res, _ := promise.Done()
more examples, please check promise_test.go
The actor pattern is also called as Active Object, it seems like Promise, but the difference is Actor can be reused, and it is FIFO.
actor := NewActor(SetActorBuffer(0))
defer actor.Close()
// do some work asynchroniously.
call := actor.Do(func() (interface{}, error) {
time.Sleep(0 * time.Second)
return 0, nil
// can to some other synchroniouse work here
// ......
// wait for the call completes.
res, err := call.Done()
more examples, please check actor_test.go
Steam works like Node.Js stream, it can be piped and data flows through the pipe one by one.
var multipleChan = make(chan int, 4)
var minusChan = make(chan int, 4)
var harvestChan = make(chan int, 4)
defer close(multipleChan)
defer close(minusChan)
defer close(harvestChan)
go func() {
for i:=1;i<=100;i++{
multipleChan <- i
for i:=0; i<4; i++{
go func() {
for data := range multipleChan {
minusChan <- data * 2
time.Sleep(10* time.Millisecond)
go func() {
for data := range minusChan {
harvestChan <- data - 1
time.Sleep(10* time.Millisecond)
var sum = 0
var index = 0
for data := range harvestChan{
sum += data
if index == 100{
var sum = 0
NewChannelStream(func(seedChan chan<- Item, quitChannel chan struct{}) {
for i:=1; i<=100;i++{
seedChan <- Item{Data:i}
close(seedChan) //don't forget to close it
}).Pipe(func(Item Item) Item {
return Item{Data: Item.Data.(int) * 2}
}).Pipe(func(Item Item) Item {
return Item{Data: Item.Data.(int) - 1}
}).Harvest(func(Item Item) {
sum += Item.Data.(int)
more examples, please check stream_test.go
Aggregator is used for the scenario that receives request one by one while handle them in a batch would increase efficiency.
// YourKnownType, YourBatchHandler, yourRequest are faked type or object
batchProcess := func(items []interface{}) error {
var arr YourKnownType
for _, item := range items{
ykt := item.(YourKnownType)
arr = append(arr, ykt)
aggregator := NewAggregator(batchProcess)
more examples, please check aggregator_test.go