/stream

An experiment around building streamable pipelines in go

Primary LanguageGoMIT LicenseMIT

stream

An experiment around building composable, streamable pipelines in go

the idea it self is not about maximum performance but rather an simplier way to abstract channels, workers, context cancellation and dynamic data mostly for ETL jobs. It relies heavily on interface{} to pass data around and has some heavy reflection usage on github.com/stdiopt/stream/strmutil.

it's be possible to build procs around serializing CSVs, selecting DB, producing Parquet, crawling url's, consuming API's, etc...

ProcFunc

a ProcFunc is the function signature to chain transforms in a stream ProcFuncs should block until it doesn't have more messages to send, exiting early closes the internal channel and will stop further processors

  • Each procFunc runs in a go routine
  • ProcFuncs should block until they don't have any more data to send
  • Consume or Send will be cancelled if context is done
type ProcFunc = func(p stream.Proc) error

the Proc interface:

type Proc interface {
	Context() context.Context
	Consume(func(interface{}) error) error
	Send(interface{}) error
}

Consume is a blocking method that consumes messages from the previous processor and calls the func passed as argument for each message

Send will send a value to the next processor

an usual ProcFunc looks like:

func(p stream.Proc) error {
	// Initialize things, open files, db connections, whatever fits the
	// processor

	// Since consume blocks we can call it in the end to hold the function
	// until we don't have more to consume if the underlying context is
	// cancelled due to a previous error or timeout the Consume will cease and
	// return
	return p.Consume(func(v interface{}) error {
		// do something with consumed value
		return p.Send(transformed)
	})
}

Usage

package main

import (
	"fmt"

	"github.com/stdiopt/stream"
)

func main() {
	l := stream.Line(
		produce,
		consume,
	)
	if err := stream.Run(l); err != nil {
		fmt.Println("err:", err)
	}
}
func produce(p stream.Proc) error {
	for i := 0; i < 10; i++ {
		if err := p.Send(i); err != nil {
			return err
		}
	}
	return nil
}
func consume(p stream.Proc) error {
	return p.Consume(func(v interface{}) error {
		fmt.Println("Consuming:", v)
		return nil
	})
}

Consuming API Example here

func main() {
	err := stream.Run(
		strmutil.Value("https://randomuser.me/api/?results=100"), // just sends the string
		strmutil.HTTPGet(nil),
		strmutil.JSONParse(nil),
		strmutil.Field("results"),
		strmutil.Unslice(),
		strmutil.Field("picture.thumbnail"),
		// Download profile pictures thumbnails concurrently
		stream.Workers(32, HTTPDownload(nil)),
		strmutil.JSONDump(os.Stdout),
	)
	if err != nil {
		log.Println("err:", err)
	}
}

examples