English | 简体中文
go-stash is a high performance, free and open source server-side data processing pipeline that ingests data from Kafka, processes it, and then sends it to Clickhouse.
cd stash && go build stash.go
- With binary
./stash -f etc/config.yaml
The config.yaml example is as follows:
Clusters:
- Input:
Kafka:
Name: go-stash
Log:
Mode: file
Brokers:
- "172.16.48.41:9092"
- "172.16.48.42:9092"
- "172.16.48.43:9092"
Topic: ngapplog
Group: stash
Conns: 3
Consumers: 10
Processors: 60
MinBytes: 1048576
MaxBytes: 10485760
Offset: first
Filters:
- Action: drop
Conditions:
- Key: status
Value: 503
Type: contains
- Key: type
Value: "app"
Type: match
Op: and
- Action: remove_field
Fields:
- message
- source
- beat
- fields
- input_type
- offset
- "@version"
- _score
- _type
- clientip
- http_host
- request_time
Output:
Clickhouse:
Addr:
- "127.0.0.1:9000"
Auth:
Database: default
Username: default
Password:
Table: example
Columns:
- Col1
- Col2
- Col3
Conns: 3
Consumers: 10
Processors: 60
MinBytes: 1048576
MaxBytes: 10485760
Offset: first
- The number of links to kafka, the number of links is based on the number of cores of the CPU, usually <= the number of cores of the CPU.
- The number of open threads per connection, the calculation rule is Conns * Consumers, not recommended to exceed the total number of slices, for example, if the topic slice is 30, Conns * Consumers <= 30
- The number of threads to process data, depending on the number of CPU cores, can be increased appropriately, the recommended configuration: Conns * Consumers * 2 or Conns * Consumers * 3, for example: 60 or 90
- The default size of the data block from kafka is 1M~10M. If the network and IO are better, you can adjust it higher.
- Optional last and false, the default is last, which means read data from kafka from the beginning
- Action: drop
Conditions:
- Key: k8s_container_name
Value: "-rpc"
Type: contains
- Key: level
Value: info
Type: match
Op: and
- Action: remove_field
Fields:
- message
- _source
- _type
- _score
- _id
- "@version"
- topic
- index
- beat
- docker_container
- offset
- prospector
- source
- stream
- Action: transfer
Field: message
Target: data
- Delete flag: The data that meets this condition will be removed when processing and will not be entered into es
- According to the delete condition, specify the value of the key field and Value, the Type field can be contains (contains) or match (match)
- Splice condition Op: and, can also write or
Remove_field_id: the field to be removed, just list it below
Transfer field identifier: for example, the message field can be redefined as a data field
- Optional parameters
- The size of the bulk submitted each time, default is 15M
- The default is 15s, which is used to process the remaining consumption and data within 15s after the program closes and exits gracefully
- Float32, Float64
- Int8, Int16, Int32, Int64
- UInt8, UInt16, UInt32, UInt64
- IPv4, IPv6
- Bool, Boolean
- Date, Date32, DateTime
- UUID
- String