🦋 work in progress
Data processing as streams with a bit of Go's reflection.
// Map cities to their weather using the wttr.in API.
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"github.com/gawen/go-flow"
)
type WttrinInfo struct {
city string
CurrentCondition []struct {
TemperatureC string `json:"temp_C"`
} `json:"current_condition"`
}
func main() {
// declare a flow
f := flow.F(context.Background())
// build a stream (`chan string`) with some cities.
cities := f.Pipe(1, 0, func(cities chan string) {
cities <- "Paris"
cities <- "San Francisco"
cities <- "New York"
// ...
})
// get weather info for each city. Request wttr.in with 8 parallel lanes.
wttrninInfos := cities.Pipe(8, 0, func(ctx context.Context, cities chan string, infos chan WttrinInfo) error {
for city := range cities {
url := fmt.Sprintf("https://wttr.in/%s?format=j1", city)
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return err
}
res, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer res.Body.Close()
buf, err := ioutil.ReadAll(res.Body)
if err != nil {
return err
}
var info WttrinInfo
if err := json.Unmarshal(buf, &info); err != nil {
return err
}
info.city = city
infos <- info
}
return nil
})
// consume weather infos and print them.
wttrninInfos.Consume(1, func(infos chan WttrinInfo) {
for info := range infos {
fmt.Printf("%+v\n", info)
}
})
// wait for the flow to finish and panic if an error happened.
if err := f.Wait(); err != nil {
panic(err.Error())
}
}
The core method is
// Pipe `inputs` to `outputs` with `kernel`. Run parallely `laneCount` times.
func (*Flow) PipeN(
laneCount int, // instance count of running `kernel`. >=1.
outChannelBuf int, // outputs's channel buffer size. >=0.
outsCount int, // count of outputs. >=0.
inputs ...interface{}, // list of inputs. can be channels, array/slice, or *flow.Stream
kernel interface{}, // Kernel
) (outputs []*Stream)
A kernel is a method where the first arguments are the inputs given to PipeN
,
and the rest the outputs. It can have an optional context.Context
as first
argument, being the flow's context.Context
.
f.PipeN(
8, // run 8 kernels simulatenously
0, // outputs's channel won't have any buffer
1, // only one output expected
[]int{1,2,3}, // first input: an []int. will be translated to a chan int.
make(chan string), // second input: an chan string.
func( // define kernel
inp1 chan int, // first input
inp2 chan string, // second input
out chan float, // define one output, being a chan float
) {
// consume inp1 and inp2 and output something in out.
},
)
The inputs types are checked at runtime. If they do not match, go-flow
will panic.
Sugar methods are provided to ease building flows.
Flow.Consume
callsFlow.PipeN
with no expected output;Flow.Pipe
does with one expected output;Flow.Pipe2
does with 2 expected outputs;- etc...
A Stream
also provides the same methods, to chain calls and ease flow writing.
(see above example).
A kernel can return an optional error, which will kill the whole flow.
f := flow.F(context.Background())
f.RangeInt(0, 10, 1).Pipe(1, 0, func(in, out chan int) error {
for i := range in {
// dummy error
if i == 5 {
return errors.New("i don't like 5s")
}
out <- i
}
return nil
}).Consume(1, func(in chan int) {
for i := range in {
println(i)
}
})
if err := f.Wait(); err != nil {
panic(err.Error())
}
// will print:
// 0
// 1
// 2
// 3
// 4
// panic: i don't like 5s