The Stream Processing API for Go
- Window
- LengthWindow
- LengthBatchWindow
- TimeWindow
- TimeBatchWindow
- Selector
- EqualsType, NotEqualsType
- Equals, NotEquals
- LargerThan, LessThan
- Function
- Max, Min, Median
- Count, Sum, Average
- Cast
- As
- SelectAll, Select
- GroupBy
- Having
- View
- OrderBy, Limit
- First, Last
- Tool
- Builder
- Lexer
- Parser
go get github.com/itsubaki/goceptype LogEvent struct {
Time time.Time
Level int
Message string
}
// select count(*) from LogEvent.time(10sec) where Level > 2
w := NewTimeWindow(10*time.Second)
defer w.Close()
w.SetSelector(EqualsType{LogEvent{}})
w.SetSelector(LargerThanInt{"Level", 2})
w.SetFunction(Count{As: "count"})
go func() {
for {
events := <-w.Output()
if Newest(events).Int("count") > 10 {
// notification
}
}
}()
w.Input() <- LogEvent{time.Now(), 1, "this is text log."}type MyEvent struct {
Name string
Value int
}
// select Name as n, Value as v
// from MyEvent.time(10msec)
// where Value > 97
// orderby Value DESC
// limit 10 offset 5
w := NewTimeWindow(10 * time.Millisecond)
defer w.Close()
w.SetSelector(EqualsType{MyEvent{}})
w.SetSelector(LargerThanInt{"Value", 97})
w.SetFunction(SelectString{"Name", "n"})
w.SetFunction(SelectInt{"Value", "v"})
w.SetView(OrderByInt{"Value", true})
w.SetView(Limit{10, 5})
go func() {
for {
fmt.Println(<-w.Output())
}
}()
for i := 0; i < 100; i++ {
w.Input() <-MyEvent{"name", i}
}// select avg(Value), sum(Value) from MyEvent.length(10)
w := NewLengthWindow(10)
defer w.Close()
w.SetSelector(EqualsType{MyEvent{}})
w.SetFunction(AverageInt{"Value", "avg(Value)"})
w.SetFunction(SumInt{"Value", "sum(Value)"})// type RuntimeEvent struct {
// Name string
// Value int
// }
b := NewStructBuilder()
b.SetField("Name", reflect.TypeOf(""))
b.SetField("Value", reflect.TypeOf(0))
s := b.Build()
// i.Value()
// -> RuntimeEvent{Name: "foobar", Value: 123}
// i.Pointer()
// -> &RuntimeEvent{Name: "foobar", Value: 123}
i := s.NewInstance()
i.SetString("Name", "foobar")
i.SetInt("Value", 123)
w.Input() <-i.Value()p := NewParser()
p.Register("MapEvent", MapEvent{})
query := "select * from MapEvent.length(10)"
statement, err := p.Parse(query)
if err != nil {
log.Println("failed.")
return
}
window := statement.New()
window.Input() <-MapEvent{map}
fmt.Println(<-window.Output())