yomo-source-emqx-starter
EMQ X Broker
This project demonstrates how to connect to EMQX MQTT Broker and processing data before store to a Serverless Database FaunaDB, the code use MQTT public cloud so you could run directly without installation. Code shows a mathematical SUM operation when every data arrived in Rx way.
About EMQX
EMQ X broker is a fully open source, highly scalable, highly available distributed MQTT messaging broker for IoT, M2M and Mobile applications that can handle tens of millions of concurrent clients.
Starting from 3.0 release, EMQ X broker fully supports MQTT V5.0 protocol specifications and backward compatible with MQTT V3.1 and V3.1.1, as well as other communication protocols such as MQTT-SN, CoAP, LwM2M, WebSocket and STOMP. The 3.0 release of the EMQ X broker can scaled to 10+ million concurrent MQTT connections on one cluster.
For more information, please visit EMQ X homepage
About YoMo
YoMo is an open-source Streaming Serverless Framework for building Low-latency Edge Computing applications. Built atop QUIC Transport Protocol and Functional Reactive Programming interface. makes real-time data processing reliable, secure, and easy.
1: Connect EMQX to YoMo
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
rb := msg.Payload()
fmt.Printf("Received message: %v from topic: %s \n", rb, msg.Topic())
// encode data by Y3 codec
p, _ := strconv.ParseInt(string(rb), 10, 64)
codec := y3.NewCodec(0x10)
buf, _ := codec.Marshal(p)
// send data via QUIC stream
_, err = stream.Write(buf)
if err != nil {
log.Printf("❌ Emit %s to yomo-zipper failure with err: %v", msg.Payload(), err)
} else {
log.Printf("✅ Sending message: %v to yomo-zipper", buf)
}
}
Run go run main.go
to start
2: Create YoMo-Zipper
see ./zipper/workflow.yaml
name: Service
host: localhost
port: 9999
flows:
- name: Noise_Serverless
host: localhost
port: 4242
Run cd zipper && yomo wf run
to start the yomo-zipper
3: Write your data process logic
We need to calculate the sum each time new data arrives, see ./flow/app.go
const observedKey = 0x10
// Decode data by Y3 as Int64 type from YoMo-Zipper
var decode = func(v []byte) (interface{}, error) {
val, err := y3.ToInt64(v)
if err != nil {
fmt.Printf("err: %v\n", err)
}
return val, nil
}
// Calculate sum every new data arrived, about Scan() Operator, can be read here: http://reactivex.io/documentation/operators/scan.html
var sum = func(_ context.Context, acc interface{}, elem interface{}) (interface{}, error) {
if acc == nil {
return elem, nil
}
return acc.(int64) + elem.(int64), nil
}
// Handler handle data in Rx way
func Handler(rxstream rx.RxStream) rx.RxStream {
stream := rxstream.
Subscribe(observedKey).
OnObserve(decode).
Scan(sum).
StdOut()
return stream
}
run cd ./flow && yomo run
to start