/kinesis-producer

An aggregated records producer for Amazon Kinesis

Primary LanguageGoMIT No AttributionMIT-0

Amazon kinesis producer License GoDoc

A KPL-like batch producer for Amazon Kinesis built on top of the official Go AWS SDK and using the same aggregation format that KPL use.

Useful links

Example

package main

import (
	"time"

	"github.com/sirupsen/logrus"
	"github.com/achunariov/kinesis-producer"
	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/kinesis"
)

func main() {
	client := kinesis.New(session.New(aws.NewConfig()))
	pr := producer.New(&producer.Config{
		StreamName:   "test",
		BacklogCount: 2000,
		Client:       client
	})

	pr.Start()

	failures := pr.NotifyFailures()

	// Handle failures
	go func() {
		for r := range failures {
			log.Error(r)
		}
	}()

	go func() {
		for i := 0; i < 5000; i++ {
			err := pr.Put([]byte("foo"), "bar")
			if err != nil {
				log.WithError(err).Fatal("error producing")
			}
		}
	}()

	time.Sleep(3 * time.Second)
	pr.Stop()
}

Shard Mapping

The Producer supports aggregation based on a shard map. UserRecords get mapped to a shard using the md5 hash of the Partition Key or a provided Explicit Hash Key. Records mapped to the same shard are aggregated together.

By default, shard mapping is disabled. To use the shard mapping feature, you need to set Config.GetShards. This function will be called on producer initialization to populate the shard map. You can optionally provide a refresh interval Config.ShardRefreshInterval to update the map. Note that Puts to the Producer are blocked while it is updating the shard map so that it can reaggregate requests based on the new map. It is only blocking during the reaggregation phase.

This package provides a GetShards function GetKinesisShardsFunc that uses an AWS client to call the ListShards API to get the shard list.

Note At the time of writing, using the shard map feature adds significant overhead. Depending on the configuration and your record set, this can be more than 2x slower. Providing an explicit hash key for user records can help reduce this by quite a bit. Take a look at the benchmarks in producer_test.go for examples.

Example

package main

import (
	"time"

	"github.com/achunariov/kinesis-producer"
	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/kinesis"
	"github.com/google/uuid"
)

func main() {
	client := kinesis.New(session.New(aws.NewConfig()))
	pr := producer.New(&producer.Config{
		StreamName:           "test",
		BacklogCount:         2000,
		Client:               client,
		GetShards:            producer.GetKinesisShardsFunc(client, "test"),
		ShardRefreshInterval: 5 * time.Second,
	})

	pr.Start()

	failures := pr.NotifyFailures()

	// Handle failures
	go func() {
		for r := range failures {
			log.Error(r)
		}
	}()

	go func() {
		for i := 0; i < 1000; i++ {
			pk := uuid.New().String()
			for j := 0; j < 5; j++ {
				err := pr.Put([]byte("foo"), pk)
				if err != nil {
					log.WithError(err).Fatal("error producing")
				}
			}
		}
	}()

	time.Sleep(3 * time.Second)
	pr.Stop()
}

UserRecord interface

You can optionally define a custom struct that implements the UserRecord interface and put using Producer.PutUserRecord. The producer will hold onto the reference in case of any failures. Do not attempt to modify or use the reference after passing it to the producer until you receive it back in a failure record, otherwise thread issues may occur.

Example

package main

import (
	"encoding/json"
	"math/big"
	"time"

	"github.com/achunariov/kinesis-producer"
	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/kinesis"
	"github.com/google/uuid"
)

type myExampleUserRecord struct {
	Id   string `json:"id"`
	Key  string `json:"key"`
	Val  string `json:"val"`
	data []byte `json:"-"`
}

func (r *myExampleUserRecord) PartitionKey() string      { return r.id }
func (r *myExampleUserRecord) ExplicitHashKey() *big.Int { return nil }
func (r *myExampleUserRecord) Data() []byte              { return r.data }
func (r *myExampleUserRecord) Size() int                 { return len(r.data) }

func newMyExampleUserRecord(key, val string) (*myExampleUserRecord, error) {
	r := &myExampleUserRecord{
		Id:  uuid.New().String(),
		Key: key,
		Val: val,
	}
	data, err := json.Marshal(r)
	if err != nil {
		return nil, err
	}
	r.data = data
	return r, nil
}

func main() {
	client := kinesis.New(session.New(aws.NewConfig()))
	pr := producer.New(&producer.Config{
		StreamName:           "test",
		BacklogCount:         2000,
		Client:               client,
		GetShards:            producer.GetKinesisShardsFunc(client, "test"),
		ShardRefreshInterval: 5 * time.Second,
	})

	pr.Start()

	failures := pr.NotifyFailures()

	// Handle failures
	go func() {
		for r := range failures {
			log.Error(r)
		}
	}()

	go func() {
		for i := 0; i < 5000; i++ {
			record, err := newMyExampleUserRecord("foo", "bar")
			if err != nil {
				log.WithError(err).Fatal("error creating user record")
			}
			err = pr.PutUserRecord(record)
			if err != nil {
				log.WithError(err).Fatal("error producing")
			}
		}
	}()

	time.Sleep(3 * time.Second)
	pr.Stop()
}

Specifying logger implementation

producer.Config takes an optional logging.Logger implementation.

Using a custom logger

customLogger := &CustomLogger{}

&producer.Config{
  StreamName:   "test",
  BacklogCount: 2000,
  Client:       client,
  Logger:       customLogger,
}

Using logrus

import (
	"github.com/sirupsen/logrus"
	producer "github.com/achunariov/kinesis-producer"
	"github.com/achunariov/kinesis-producer/loggers"
)

log := logrus.New()

&producer.Config{
  StreamName:   "test",
  BacklogCount: 2000,
  Client:       client,
  Logger:       loggers.Logrus(log),
}

kinesis-producer ships with three logger implementations.

  • producer.Standard used the standard library logger
  • loggers.Logrus uses logrus logger
  • loggers.Zap uses zap logger

License

MIT