go-service-framework
This Go package provides primitives for building a Go service, specifically the manager, worker pool, and poller. The package's main purposes include:
Features
- Manager: Manages grpc, http, and background services for the service
- Worker Pool: An all-purpose worker pool that handles parallelizing of tasks. Worker pools can be chained together and these chains can be used to handle complex data flows.
- Poller: A struct that polls a blockchain and feeds the data through an ETL pipeline using chained worker pools.
Installation
To install the package, you can use the go get command:
go get github.com/<your-username>/go-service-framework
Usage
Here is a generic Go service utilizing the framework to run a poller:
package main
import (
"github.com/coherentdevs/go-service-framework/manager"
"github.com/coherentdevs/go-service-framework/poller"
"github.com/coherentdevs/go-service-framework/pool"
"log"
"net/http"
"net/http/pprof"
)
func main() {
mgr := manager.New(manager.WithoutGracefulShutdown())
// load environment configuration, logging and other preliminaries
cfg := initializeConfig(mgr.Logger())
nodeClient := initializeNodeClient(cfg, mgr.Logger())
dataCache := initializeCache(cfg, mgr.Logger())
dataConnector := initializeDataConnector(mgr.Context(), cfg, mgr.Logger())
tt := pool.NewThrottler(cfg.FetcherPoolThrottleBandwidth, cfg.FetcherPoolThrottleDuration)
wp1 := pool.NewWorkerPool(
"fetcher",
pool.WithOutputChannel(),
pool.WithThrottler(tt),
pool.WithLogger(mgr.Logger()),
pool.WithBandwidth(cfg.FetcherPoolBandwidth),
)
wp2 := pool.NewWorkerPool(
"accumulator",
pool.WithOutputChannel(),
pool.WithLogger(mgr.Logger()),
pool.WithBandwidth(cfg.AccumulatorPoolBandwidth),
)
wp3 := pool.NewWorkerPool(
"writer",
pool.WithLogger(mgr.Logger()),
pool.WithBandwidth(cfg.WriterPoolBandwidth),
)
driver := initializeDriver(cfg.Poller.Blockchain, nodeClient, dataConnector, mgr.Logger())
p := poller.New(
&cfg.Poller,
driver,
poller.WithCache(dataCache),
poller.WithFetchPool(wp1),
poller.WithAccumulatePool(wp2),
poller.WithWritePool(wp3),
poller.WithLogger(mgr.Logger()),
poller.WithMetrics(mgr.Metrics()),
)
srv := initializeServer(p, mgr.Logger())
apiSrv := http.Server{
Addr: cfg.HttpServePort,
Handler: srv.Router(),
}
// Register background services
mgr.RegisterBackgroundSvc("fetcher pool", wp1.Start, wp1.Stop)
mgr.RegisterBackgroundSvc("accumulator pool", wp2.Start, wp2.Stop)
mgr.RegisterBackgroundSvc("writer pool", wp3.Start, wp3.Stop)
mgr.RegisterBackgroundSvc("poller", p.Start, p.Stop)
mgr.RegisterBackgroundSvc("throttler", tt.Start, tt.Stop)
// Register HTTP servers
mgr.RegisterHttpServer("api", &apiSrv)
// Wait for interrupt signal
mgr.WaitForInterrupt()
}
In this code:
- The
manager.New()
function is used to create a new instance of the manager object, which controls the life cycle of all services and servers. - The
poller.New()
function is used to create a new poller object, which fetches, accumulates, and writes data. - The
mgr.RegisterBackgroundSvc()
function is used to register background services with the manager. These services will start when the manager starts and stop when the manager stops. - The
mgr.RegisterHttpServer()
function is used to register HTTP servers with the manager. These servers will start when the manager starts and stop when the manager stops. - The
mgr.WaitForInterrupt()
function makes the main function wait for an interrupt signal before stopping all services and servers.
Documentation
Below are details about key components of the package.
Manager
Manages grpc, http, and background services for the service. It helps in coordinating different types of services running in a Go application.
Worker Pool
The worker pool is an all-purpose worker pool that handles parallelizing of tasks. Worker pools can be chained together, and these chains can be used to handle complex data flows. Here is the main code for the Worker Pool:
Refer to the code in the worker_pool.go file in the pool directory.
It includes methods to manage the worker pool lifecycle such as Start, Stop, FlushAndRestart and methods to handle jobs and groups like PushJob, PushGroup, SetInputFeed, SetGroupInputFeed.
Poller
A module that polls a blockchain and feeds the data through an ETL pipeline using chained worker pools. It is used for ETLing blockchain data, utilizing worker pools to optimize efficiency and speed.
Refer to the code in the poller.go
file in the poller directory.
It has modes which determine what the poller does on each iteration of its main routine's loop; these are determined by the distance of the cursor from chaintip and the success/failure state of the previous iteration.
Drivers for specific chains can be found in https://github.com/coherentdevs/evm-etl