harlow/kinesis-consumer

support for multiple consumers across shards

bryanpaluch opened this issue · 8 comments

Do you have any plans that would allow scaling this out across multiple processes which access multiple (but different) shards? I believe the checkpoint package would need an interface for locking access to a shard.

hi @bryanpaluch there is the option to use ScanShard to scan a single shard.
https://github.com/harlow/kinesis-consumer/blob/master/consumer.go#L156-L159

In this case each process would need specify the shardID before kicking off the scan:

c, err := consumer.New(*stream)
if err != nil {
  log.Fatalf("consumer error: %v", err)
 }

shardID := "SOME_AWS_SHARD_ID"

err = c.ScanShard(context.TODO(), shardID, func(r *consumer.Record) bool {
  fmt.Println(string(r.Data))
  return true // continue scanning
})

The checkpointing should work as expected (as the Scan just kicks off a ScanShard for each shard in a goroutine).

I created a placeholder for the idea of a consumer group, but I haven't found a real need for it yet so not sure what the timeframe on that would be: #36

+1 it would be nice for consumers to lock a shard that they are reading from without having to explicitly assign shards to consumers.

I believe the checkpoint package would need an interface for locking access to a shard.

Trying to unravel the desired functionality here a little... Seems like there are two ways to consume the stream:

  1. Consumer scans all shards
  2. Consumer scans some arbitrary number of shards

For the latter case there would need to be some type of co-ordination between the consumers so they don't try to pick up the same shards. This would be akin to a "consumer group" which would need to store some state, and be able to auto-balance shards across consumers (I believe the the AWS KCL libraries support this functionality).

+1 for consumer group support. Auto-balancing shards across consumers would be awesome. Snippet from the KCL docs on scaling:

The following example illustrates how the KCL helps you handle scaling and resharding:

For example, if your application is running on one EC2 instance, and is processing one Kinesis data stream that has four shards. This one instance has one KCL worker and four record processors (one record processor for every shard). These four record processors run in parallel within the same process.

Next, if you scale the application to use another instance, you have two instances processing one stream that has four shards. When the KCL worker starts up on the second instance, it load-balances with the first instance, so that each instance now processes two shards.

If you then decide to split the four shards into five shards. The KCL again coordinates the processing across instances: one instance processes three shards, and the other processes two shards. A similar coordination occurs when you merge shards.

@harlow and @vincent6767 - I would like to help with this feature. Do you guys already have an interface in mind or any other design considerations?

Twitch's kinsumer library does this, it might give some ideas.

yeoji commented

Are there any updates on this? Saw that this is being worked on in the README. Any ideas when it would be available?

Hi @yeoji I'm afraid we lost traction on it. I'll remove that notice from the README for the time being