/yomo-sink-tdengine-example

TDEngine 🙌 YoMo

Primary LanguageGoMIT LicenseMIT

yomo-integrate-with-TDengine

Integrate TDengine to YoMo

TDengine 🙌 YoMo. Demonstrates how to integrate TDengine to YoMo and store data to TDengine after stream processing.

About TDengine

TDengine is an open-sourced big data platform under GNU AGPL v3.0, designed and optimized for the Internet of Things (IoT), Connected Cars, Industrial IoT, and IT Infrastructure and Application Monitoring. Besides the 10x faster time-series database, it provides caching, stream computing, message queuing and other functionalities to reduce the complexity and cost of development and operation.

For more information, please visit TDengine homepage

About YoMo

YoMo is an open-source Streaming Serverless Framework for building Low-latency Edge Computing applications. Built atop QUIC Transport Protocol and Functional Reactive Programming interface. makes real-time data processing reliable, secure, and easy.

1: Installing TDengine

$ sudo apt-get install -y gcc cmake build-essential git
$ git clone --depth 1 https://github.com/taosdata/TDengine.git
$ cd TDengine
$ git submodule update --init --recursive
$ mkdir debug && cd $_
$ cmake ..
$ cmake --build .
$ make install

TDengine officai installation page

2: Create database and table

$ taos
taos> create database yomo;
Query OK, 0 row(s) affected (0.004701s)

taos> use yomo;
Database changed.

taos> create table in not exists noise (ts timestamp, v float);
Query OK, 0 row(s) affected (0.011501s)

taos> insert into noise values ('2021-01-01 00:00:00', 41.1);
Query OK, 1 row(s) affected (0.002012s)

taos> select * from noise;
           ts            |           v          |
=================================================
 2021-01-01 00:00:00.000 |             41.10000 |
Query OK, 1 row(s) in set (0.004414s)

taos>

3: Integrate TDengine with YoMo

Start YoMo-Zipper

Configure YoMo-Zipper:

name: YoMoZipper 
host: localhost
port: 9000
sinks:
  - name: TDEngine
    host: localhost
    port: 9333

Start this zipper will listen on 9000 port, send data streams directly to 9333:

cd ./zipper && yomo wf run

Store data to TDengine

var url = "root:taosdata@/tcp(localhost:6030)/yomo"

func main() {
	log.Print("Starting YoMo Sink server: -> TDEngine")
	srv := quic.NewServer(&srvHandler{})
	err := srv.ListenAndServe(context.Background(), "0.0.0.0:9333")
	if err != nil {
		log.Printf("YoMo Sink server start failed: %s\n", err.Error())
	}
	select {}
}

type srvHandler struct {}

func (s *srvHandler) Listen() error {
	return nil
}

func (s *srvHandler) Read(qs quic.Stream) error {
	ch := y3.FromStream(qs).
		Subscribe(0x10).
		OnObserve(decode)

	go func() {
		for item := range ch {
			err := store(item)
			if err != nil {
				log.Printf("write to TDEngine error : %s", err.Error())
			} else {
				log.Printf("saved: %v", item)
			}
		}
	}()

	return nil
}

func decode(v []byte) (interface{}, error) {
	data, err := y3.ToFloat32(v)
	if err != nil {
		log.Printf("err: %s\n", err.Error())
	}
	return data, err
}

func store(v interface{}) error {
	db, err := sql.Open("taosSql", url)
	if err != nil {
		fmt.Printf("Open database error: %s\n", err)
		os.Exit(1)
	}
	defer db.Close()

	// Ensure 'noise' table exists
	sql := "CREATE TABLE IF NOT EXISTS noise (ts TIMESTAMP, v FLOAT)"
	_, err = db.Exec(sql)
	if err != nil {
		fmt.Printf("db.Exec error: %s\n", err)
	}

	// Insert data
	var val float32 = v.(float32)
	sql = "INSERT INTO noise VALUES (NOW, " + fmt.Sprintf("%f", val) + ")"
	_, err = db.Exec(sql)
	if err != nil {
		fmt.Printf("Insert error: %s\n", err)
	}

	return err
}

Start this YoMo-Sink, will save data to TDengine wherever data arrives

go run main.go

Emulate a data source for test

cd source && go run main.go

This will start a YoMo-Source, demonstrates a random float every 100ms to YoMo-Zipper

4. Verify TDengine

taos> use yomo;
Database changed.

taos> select * from noise;
           ts            |          v           |
=================================================
 2021-02-01 02:11:54.581 |              5.83000 |
 2021-02-01 02:14:19.372 |              5.83000 |
 2021-02-01 04:35:12.875 |             44.58845 |
 2021-02-01 04:35:12.963 |            157.36317 |
 2021-02-01 04:35:13.062 |             16.95439 |
 2021-02-01 04:35:13.163 |            180.45207 |
 2021-02-01 04:35:13.263 |             96.63864 |
 2021-02-01 04:35:13.364 |            134.08540 |
 2021-02-01 04:35:13.464 |             59.86330 |
 2021-02-01 04:35:13.565 |            197.74881 |
 2021-02-01 04:35:13.666 |            171.70944 |
 2021-02-01 04:35:13.765 |             36.40285 |