A KPL-like batch producer for Amazon Kinesis built on top of the official Go AWS SDK and using the same aggregation format that KPL use.
package main
import (
"time"
"github.com/sirupsen/logrus"
"github.com/achunariov/kinesis-producer"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
)
func main() {
client := kinesis.New(session.New(aws.NewConfig()))
pr := producer.New(&producer.Config{
StreamName: "test",
BacklogCount: 2000,
Client: client
})
pr.Start()
failures := pr.NotifyFailures()
// Handle failures
go func() {
for r := range failures {
log.Error(r)
}
}()
go func() {
for i := 0; i < 5000; i++ {
err := pr.Put([]byte("foo"), "bar")
if err != nil {
log.WithError(err).Fatal("error producing")
}
}
}()
time.Sleep(3 * time.Second)
pr.Stop()
}
The Producer
supports aggregation based on a shard map. UserRecords get mapped to a shard using the md5 hash of the Partition Key or a provided Explicit Hash Key. Records mapped to the same shard are aggregated together.
By default, shard mapping is disabled. To use the shard mapping feature, you need to set Config.GetShards
. This function will be called on producer initialization to populate the shard map. You can optionally provide a refresh interval Config.ShardRefreshInterval
to update the map. Note that Puts to the Producer are blocked while it is updating the shard map so that it can reaggregate requests based on the new map. It is only blocking during the reaggregation phase.
This package provides a GetShards function GetKinesisShardsFunc
that uses an AWS client to call the ListShards
API to get the shard list.
Note At the time of writing, using the shard map feature adds significant overhead. Depending on the configuration and your record set, this can be more than 2x slower. Providing an explicit hash key for user records can help reduce this by quite a bit. Take a look at the benchmarks in producer_test.go
for examples.
package main
import (
"time"
"github.com/achunariov/kinesis-producer"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/google/uuid"
)
func main() {
client := kinesis.New(session.New(aws.NewConfig()))
pr := producer.New(&producer.Config{
StreamName: "test",
BacklogCount: 2000,
Client: client,
GetShards: producer.GetKinesisShardsFunc(client, "test"),
ShardRefreshInterval: 5 * time.Second,
})
pr.Start()
failures := pr.NotifyFailures()
// Handle failures
go func() {
for r := range failures {
log.Error(r)
}
}()
go func() {
for i := 0; i < 1000; i++ {
pk := uuid.New().String()
for j := 0; j < 5; j++ {
err := pr.Put([]byte("foo"), pk)
if err != nil {
log.WithError(err).Fatal("error producing")
}
}
}
}()
time.Sleep(3 * time.Second)
pr.Stop()
}
You can optionally define a custom struct that implements the UserRecord
interface and put using Producer.PutUserRecord
. The producer will hold onto the reference in case of any failures. Do not attempt to modify or use the reference after passing it to the producer until you receive it back in a failure record, otherwise thread issues may occur.
package main
import (
"encoding/json"
"math/big"
"time"
"github.com/achunariov/kinesis-producer"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/google/uuid"
)
type myExampleUserRecord struct {
Id string `json:"id"`
Key string `json:"key"`
Val string `json:"val"`
data []byte `json:"-"`
}
func (r *myExampleUserRecord) PartitionKey() string { return r.id }
func (r *myExampleUserRecord) ExplicitHashKey() *big.Int { return nil }
func (r *myExampleUserRecord) Data() []byte { return r.data }
func (r *myExampleUserRecord) Size() int { return len(r.data) }
func newMyExampleUserRecord(key, val string) (*myExampleUserRecord, error) {
r := &myExampleUserRecord{
Id: uuid.New().String(),
Key: key,
Val: val,
}
data, err := json.Marshal(r)
if err != nil {
return nil, err
}
r.data = data
return r, nil
}
func main() {
client := kinesis.New(session.New(aws.NewConfig()))
pr := producer.New(&producer.Config{
StreamName: "test",
BacklogCount: 2000,
Client: client,
GetShards: producer.GetKinesisShardsFunc(client, "test"),
ShardRefreshInterval: 5 * time.Second,
})
pr.Start()
failures := pr.NotifyFailures()
// Handle failures
go func() {
for r := range failures {
log.Error(r)
}
}()
go func() {
for i := 0; i < 5000; i++ {
record, err := newMyExampleUserRecord("foo", "bar")
if err != nil {
log.WithError(err).Fatal("error creating user record")
}
err = pr.PutUserRecord(record)
if err != nil {
log.WithError(err).Fatal("error producing")
}
}
}()
time.Sleep(3 * time.Second)
pr.Stop()
}
producer.Config
takes an optional logging.Logger
implementation.
customLogger := &CustomLogger{}
&producer.Config{
StreamName: "test",
BacklogCount: 2000,
Client: client,
Logger: customLogger,
}
import (
"github.com/sirupsen/logrus"
producer "github.com/achunariov/kinesis-producer"
"github.com/achunariov/kinesis-producer/loggers"
)
log := logrus.New()
&producer.Config{
StreamName: "test",
BacklogCount: 2000,
Client: client,
Logger: loggers.Logrus(log),
}
kinesis-producer ships with three logger implementations.
producer.Standard
used the standard library loggerloggers.Logrus
uses logrus loggerloggers.Zap
uses zap logger
MIT