Simple helper for fast starting processing big files/tables/logs/etc. This code has been developed and maintained by Ven at May 2015.
You need creatr struct for this interface:
AggregatorTask interface {
Source(chan interface{})
GetBlank() AggregatorTask
Map(interface{}) error
Reduce(AggregatorTask) error
}
Source(chan interface{})
This is source of your data. Just put read it and put to chan
GetBlank() AggregatorTask
Rerurn new prepared struct. Goroutine will use it for local cache and after put it to reduce into current struct. Not pointer to current struct.
Map(interface{}) error
Process your data
Reduce(AggregatorTask) error
Merge local cache from goroutine to main struct
package main
import (
"github.com/2tvenom/aggregator"
"fmt"
)
type (
entity struct {
count uint64
}
)
//Source or your data. There you can downlaod file by http, read from disk or table. As you want. Just put to chan
func (e *entity) Source(source chan interface{}) {
var i uint64
for i=0; i<100; i++ {
source <- i
}
}
//Return blank struct (For local cache of goroutine)
func (e *entity) GetBlank() aggregator.AggregatorTask {
return &entity{}
}
//Process data
func (e *entity) Map(data interface{}) error {
e.count += data.(uint64)
return nil
}
//Merge data from local goroutine cache to main struct
func (e *entity) Reduce(localCache aggregator.AggregatorTask) error {
e.count += localCache.(*entity).count
return nil
}
func main() {
entityTask := &entity{}
a := aggregator.NewAggregator()
a.AddTask(entityTask)
a.Start()
fmt.Printf("%d\n", entityTask.count)
}
SetMaxGoRoutines(quantity int)
Count of goroutines for parallel data processing (Default = 1)
SetMaxQueueLen(quantity int)
Max length of source queue chan (Default = 100)
SetMaxReduceQueueLen(quantity int)
Max length of reduce queue (Default = 10)
SetMaxEntityForReduce(quantity int)
How many process entity before send local cache to reduce (Default = 100)
Method for receiving erorrs
CountReduceErrors()
CountPreProcessErrors()
CountMapErrors()
CountProcessed()