Reactive Extensions for the Go Language
ReactiveX, or Rx for short, is an API for programming with Observable streams. This is the official ReactiveX API for the Go language.
ReactiveX is a new, alternative way of asynchronous programming to callbacks, promises, and deferred. It is about processing streams of events or items, with events being any occurrences or changes within the system. A stream of events is called an Observable.
An operator is a function that defines an Observable, how and when it should emit data. The list of operators covered is available here.
The RxGo implementation is based on the concept of pipelines. A pipeline is a series of stages connected by channels, where each stage is a group of goroutines running the same function.
Let's see a concrete example with each box being an operator:
- We create a static Observable based on a fixed list of items using the
Just
operator. - We define a transformation function (convert a circle into a square) using the
Map
operator. - We filter each yellow square using the
Filter
operator.
In this example, the final items are sent in a channel, available to a consumer. There are many ways to consume or to produce data using RxGo. Publishing the results in a channel is only one of them.
Each operator is a transformation stage. By default, everything is sequential. Yet, we can leverage modern CPU architectures by defining multiple instances of the same operator. Each operator instance being a goroutine connected to a common channel.
The philosophy of RxGo is to implement the ReactiveX concepts and leverage the main Go primitives (channels, goroutines, etc.) so that the integration between the two worlds is as smooth as possible.
go get -u github.com/reactivex/rxgo/v3
Let's create our first Observable and consume an item:
observable := rxgo.Interval(time.Second)
observable.SubscribeSync(func(v uint) {
log.Println("Value ->", v)
}, func(err error) {
log.Println("Error ->", err)
}, func() {
log.Println("Complete!")
})
Package documentation: https://pkg.go.dev/github.com/reactivex/rxgo/v3
All contributions are very welcome! Be sure you check out the contributing guidelines first. Newcomers can take a look at ongoing issues and check for the help needed
label.
Also, if you publish a post about RxGo, please let us know. We would be glad to include it in the External Resources section.
Thanks to all the people who already contributed to RxGo!
- Announcing RxGo v2
- Why YoMo (an open-source streaming serverless framework for building low-latency edge computing applications) uses RxGo
- Go Cookbook from Packt - Reactive programming with RxGo (based on v1)
- Writing PizzaScript Lexer with RxGo
- Writing PizzaScript Parser with RxGo
- Reactive programming in Go
- Programación reactiva en Go (Spanish)
- Go 每日一库之 RxGo (Chinese)
- RxGo 入门 · 语雀 (Chinese)
A big thanks to JetBrains for supporting the project.