/gleamold

support map reduce in PureGo, Lua, UnixPipes

Primary LanguageGoApache License 2.0Apache-2.0

Gleamold

Build Status GoDoc Wiki Go Report Card codecov

Gleamold is a high performance and efficient distributed execution system, and also simple, generic, flexible and easy to customize.

Gleamold is built in Go, and the user defined computation can be written in Go, Lua, Unix pipe tools, or any streaming programs.

It is convenient to write logic in Lua, but Lua is optional. Go is also supported with a little bit extra effort.

High Performance

  • Pure Go mappers and reducers have high performance and concurrency.
  • Optional LuaJIT also has high performance comparable to C, Java, Go. It streamingly processes data, without context switch between Go and Lua.
  • Data flows through memory, optionally to disk.
  • Multiple map reduce steps are merged together for better performance.

Memory Efficient

  • Gleamold does not have the common GC problem that plagued other languages. Each executor is run in a separated OS process. The memory is managed by the OS. One machine can host many more executors.
  • Gleamold master and agent servers are memory efficient, consuming about 10 MB memory.
  • Gleamold tries to automatically adjust the required memory size based on data size hints, avoiding the try-and-error manual memory tuning effort.

Flexible

  • The Gleamold flow can run standalone or distributed.
  • Adjustable in memory mode or OnDisk mode.

Easy to Customize

  • The Go code is much simpler to read than Scala, Java, C++.
  • Optional LuaJIT FFI library can easily invoke any C functions, for even more performance or use any existing C libraries.
  • (future) Write SQL with UDF written in Lua.

One Flow, Multiple ways to execute

Gleamold code defines the flow, specifying each dataset(vertex) and computation step(edge), and build up a directed acyclic graph(DAG). There are multiple ways to execute the DAG.

The default way is to run locally. This works in most cases.

Here we mostly talk about the distributed mode.

Distributed Mode

The distributed mode has several names to explain: Master, Agent, Executor, Driver.

Gleamold Driver

  • Driver is the program users write, it defines the flow, and talks to Master, Agents, and Executors.

Gleamold Master

  • The Master is one single server that collects resource information from Agents.
  • It stores transient resource information and can be restarted.
  • When the Driver program starts, it asks the Master for available Exeutors on Agents.

Gleamold Agent

  • Agents runs on any machine that can run computations.
  • Agents periodically send resource usage updates to Master.
  • When the Driver program has executors assigned, it talks to the Agents to start Executors.
  • Agents also manage datasets generated by each Executors.

Gleamold Executor

  • Executors are started by Agents. It will read inputs from external or previous datasets, process them, and output to a new dataset.

Dataset

  • The datasets are managed by Agents. By default, the data run only through memory and network, not touching slow disk.
  • Optionally the data can be persist to disk.

By leaving it in memory, the flow can have back pressure, and can support stream computation naturally.

Documentation

Standalone Example

Word Count

Word Count by Pure Go

Basically, you need to register the Go functions first. It will return a mapper or reducer function id, which we can pass it to the flow.

package main

import (
	"strings"

	"github.com/chrislusf/gleamold/flow"
	"github.com/chrislusf/gleamold/gio"
)

var (
	MapperTokenizer = gio.RegisterMapper(tokenize)
	MapperAddOne    = gio.RegisterMapper(addOne)
	ReducerSum      = gio.RegisterReducer(sum)
)

func main(){

	gio.Init() // required for pure go map reduce, place it right after main() starts

	flow.New().TextFile("/etc/passwd").
		Mapper(MapperTokenizer). // invoke the registered "tokenize" mapper function.
		Mapper(MapperAddOne).    // invoke the registered "addOne" mapper function.
		ReducerBy(ReducerSum).   // invoke the registered "sum" reducer function.
		Sort(flow.OrderBy(2, true)).
		Printlnf("%s %d").Run()
}

func tokenize(row []interface{}) error {
	line := string(row[0].([]byte))
	for _, s := range strings.FieldsFunc(line, func(r rune) bool {
		return !('A' <= r && r <= 'Z' || 'a' <= r && r <= 'z' || '0' <= r && r <= '9')
	}) {
		gio.Emit(s)
	}
	return nil
}

func addOne(row []interface{}) error {
	word := string(row[0].([]byte))
	gio.Emit(word, 1)
	return nil
}

func sum(x, y interface{}) (interface{}, error) {
	return x.(uint64) + y.(uint64), nil
}

A more blown up example is here. https://github.com/chrislusf/gleamold/blob/master/examples/word_count_in_go/word_count_in_go.go

Word Count by LuaJIT

LuaJIT can greatly simplify the code. The full source code, not snippet, for word count:

package main

import (
	"github.com/chrislusf/gleamold/flow"
)

func main() {

	flow.New().TextFile("/etc/passwd").FlatMap(`
		function(line)
			return line:gmatch("%w+")
		end
	`).Map(`
		function(word)
			return word, 1
		end
	`).ReduceBy(`
		function(x, y)
			return x + y
		end
	`).Printlnf("%s,%d").Run()
}

Word Count by Unix Pipe Tools

Another way to do the similar:

package main

import (
	"github.com/chrislusf/gleamold/flow"
)

func main() {

	flow.New().TextFile("/etc/passwd").FlatMap(`
		function(line)
			return line:gmatch("%w+")
		end
	`).Pipe("sort").Pipe("uniq -c").Printlnf("%s").Run()
}

Join two CSV files.

Assume there are file "a.csv" has fields "a1, a2, a3, a4, a5" and file "b.csv" has fields "b1, b2, b3". We want to join the rows where a1 = b2. And the output format should be "a1, a4, b3".

package main

import (
	. "github.com/chrislusf/gleamold/flow"
	"github.com/chrislusf/gleamold/gio"
	"github.com/chrislusf/gleamold/plugins/csv"
)

func main() {

	gio.Init()

	f := New()
	a := f.Read(csv.New("a.csv")).Select(Field(1,4)) // a1, a4
	b := f.Read(csv.New("b.csv")).Select(Field(2,3)) // b2, b3

	a.Join(b).Printlnf("%s,%s,%s").Run()  // a1, a4, b3

}

Parallel Execution

Unix Pipes are easy for sequential pipes, but limited to fan out, and even more limited to fan in.

With Gleamold, fan-in and fan-out parallel pipes become very easy.

This example get a list of file names, partitioned into 3 groups, and then process them in parallel.

// word_count.go
package main

import (
	"log"
	"path/filepath"

	"github.com/chrislusf/gleamold/flow"
)

func main() {

	fileNames, err := filepath.Glob("/Users/chris/Downloads/txt/en/ep-08-*.txt")
	if err != nil {
		log.Fatal(err)
	}

	flow.New().Strings(fileNames).Partition(3).PipeAsArgs("cat $1").FlatMap(`
      function(line)
        return line:gmatch("%w+")
      end
    `).Map(`
      function(word)
        return word, 1
      end
    `).ReduceBy(`
      function(x, y)
        return x + y
      end
    `).Printlnf("%s\t%d").Run()

}

Distributed Computing

Setup Gleamold Cluster Locally

Start a gleamold master and several gleamold agents

// start "gleamold master" on a server
> go get github.com/chrislusf/gleamold/distributed/gleamold
> gleamold master --address=":45326"

// start up "gleamold agent" on some different servers or ports
// if a different server, remember to install Luajit and copy the MessagePack.lua file also.
> gleamold agent --dir=2 --port 45327 --host=127.0.0.1
> gleamold agent --dir=3 --port 45328 --host=127.0.0.1

Setup Gleamold Cluster on Kubernetes

Start a gleamold master and several gleamold agents

kubectl apply -f k8s/

Change Execution Mode.

After the flow is defined, the Run() function can be executed in different ways: local mode, distributed mode, or planner mode.

  f := flow.New()
  ...
  // local mode
  f.Run()

  // distributed mode
  import "github.com/chrislusf/gleamold/distributed"
  f.Run(distributed.Option())
  f.Run(distributed.Option().SetMaster("master_ip:45326"))

  // distributed planner mode to print out logic plan
  import "github.com/chrislusf/gleamold/distributed"
  f.Run(distributed.Planner())

Write Mapper Reducer in Go

LuaJIT is easy, but sometimes we really need to write in Go. It is a bit more complicated, but not much. Gleamold allows us to write a simple Go code with mapper or reducer logic, and automatically send it over to Gleamold agents to execute. See https://github.com/chrislusf/gleamold/wiki/Write-Mapper-Reducer-in-Go

Important Features

Status

Gleamold is just beginning. Here are a few todo items. Welcome any help!

  • Add new plugin to read external data.
  • Add schema support for each dataset.
  • Support using SQL as a flow step, similar to LINQ.
  • Add windowing functions similar to Apache Beam/Flink.
  • Add dataset metadata for better caching of often re-calculated data.

Especially Need Help Now:

  • Go implementation to read Parquet files.

Please start to use it and give feedback. Help is needed. Anything is welcome. Small things count: fix documentation, adding a logo, adding docker image, blog about it, share it, etc.

License

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.