nats-io/nats-streaming-server

Support for message partitioning

Closed this issue ยท 12 comments

At our company, we're big fans of event sourcing and message streaming between our services. We use Kafka a lot, but also sometimes Google's Pub/Sub when it makes sense.

I've been reading (and listening) up on NATS (Streaming) since I read the CNCF announcement and am very intrigued by the project, the direction, the knowledge+experience of the contributors, and the (operational) simplicity that it currently has over a project like Kafka.

However, the one thing that seems to be missing on NATS right now is a good answer to message partitioning within a queue group. From what I read, messages are randomly assigned to consumers within a queue, and there's not way to deterministically dictate which messages end up at which consumer in a queue.

I understand that the order at which producers produce messages is the same order that consumers consume the message, but as soon as you switch to a group of consumers in a queue, and these consumers also then produce a derivative message onto a separate topic/subject, you lost this ordering guarantee.

I read #373 (request for log compaction), and noticed this response:

NATS Streaming does not have a concept of message key, and would need to be able to handle sequence gaps in a message log, which it does not at the moment.

Message keys would also be a requirement I believe to support this use-case, so even though it's somewhat related to that request, I figured I'd create a separate issue for this one, and ask you if this is something that is either 1) currently supported, but I just haven't found it yet, 2) planned for the future, or 3) will never be supported and I'm trying to fit a square peg in a round hole?

@JeanMertz There is no plan currently to support message partitioning with queue groups. As it has been discussed in other GH issues or Slack, you can somehow achieve this by having specific channels. Instead of providing a key in your message, simply use a dedicate channel. Dumb example, but if you were to send to foo with key 1, 2, or 3, then why not have channels foo.1, etc.. and have a single consumer on those sub channels.

Thanks for the clear answer @kozlovic. I can see how what you propose can work technically, but I dread the management surrounding such a set-up.

Say we want to partition analytics events based on session uuids. There seem to be several steps involved in getting this to work:

  • We'd have to use hundreds of thousands of subjects, each namespaces to a uuid for a single session.
  • Furthermore, these subjects would be highly ephemeral, as a session can be as short as 10 seconds
  • We can't know all the session uuids in advance, so we'd have to track that somehow, and spin up new consumers to listen to each new subject being created
  • We'd then also have to have logic to terminate that consumer once no more events from that session are expected
  • Furthermore, we have monitoring in place (currently on Kafka) to check if any consumer group lag is very high, which might signal a problem in the consumers, which would also be able to deal with this
  • Finally, not all consumers are interested in this segmentation of messages and just want to see all the messages at once, which means using another subject (granted, it's only one more, so that's maybe less of a problem)

We could of course tackle this at the ingress of the data, by calculating the subject of a message based on the session uuid, and have it only distribute to 10 different subjects, that would reduce the complexity somewhat, but if we anticipate a traffic spike, we'd still need to do a lot of manual labor to increase the spread over more subjects, spin up more consumers, change our monitoring to now also include those added subjects, etc.

Don't get me wrong, as I said before, I really like where NATS (Streaming) is going, and I can see it be a big part of our infrastructure in the future, but this is one area where Kafka makes a lot more sense right now, and I wanted to basically explore if that just means NATS isn't for our use-case period, or it's not yet for our use-case, and it'll get there eventually (or, I'm approaching this wrongly, and your solution does solve our problem nicely, I just haven't thought it through enough yet).

@JeanMertz I understand your concerns and I am always saying that people should use the tool that fit best their needs. I won't force on you NATS Streaming if this is not the right product for you.

The way queue group works in NATS Streaming is indeed very different than Kafka, and as I said, there is no plan with NATS Streaming as it is today to change that. There are talks about another approach of streaming with a different project, but there are no concrete plans at the moment. Moreover, I don't think that the queue behavior and partitioning is one of the area that would actually change. So the answer is that I don't see that behavior changing for the foreseeable future. Sorry about that...

I won't force on you NATS Streaming if this is not the right product for you.

I understand that, I'm also aware that the project will do fine without this feature, it's an awesome product, and as you said, not all tools fit every job, so that's to be expected ๐Ÿ‘

I had hoped to find a replacement for Kafka in NATS, as while Kafka is an awesome and reliable tool, it's not the most simple to manage, and it's not as "cloud native" as NATS is, ie. it expects the nodes to be more durable than NATS does, which isn't a great fit for Kubernetes (although not impossible to get working either). The (Java) ecosystem around it also isn't as user-focused/friendly as one might hope, unfortunately. Besides that, it's a pretty big code-base with hundreds of use-cases, which makes it not as easy to get into while working on or with it, compared to a more lean tool like NATS (ironically, I get that me asking this feature, in a scenario where this would have been implemented would have made NATS also a bit more complicated, add 10 more years, and you potentially end up where Kafka is today ๐Ÿ˜„)

Either way, good luck with the project, and I'm sure to peek in here from time to time, and see if we have any use-cases that fit this tool ๐Ÿ‘

jemc commented

@JeanMertz - I'm just a curious bystander who came in with the same question as you, but I wanted to speak briefly about the way you were talking about emulating partitions.

Say we want to partition analytics events based on session uuids. There seem to be several steps involved in getting this to work:

  • We'd have to use hundreds of thousands of subjects, each namespaces to a uuid for a single session.
    Furthermore, these subjects would be highly ephemeral, as a session can be as short as 10 seconds
  • We can't know all the session uuids in advance, so we'd have to track that somehow, and spin up new consumers to listen to each new subject being created

Note that this isn't really how partitions work in Kafka. To emulate Kafka partitions, you'd need to implement a consistent hashing scheme in your producer and consumer clients.

That is, you wouldn't use the session uuid itself as a suffix for your topic/channel name - you'd use modular arithmetic on the hash of the session uuid to map each uuid to a pre-known finite number
of partitions. For example, if you wanted to have eight partitions, the math would be hash modulo 8, your topics might look like this (with the numbers faked because I'm too lazy to do the real math on it):

session uuid hash hash % 8 topic/channel name
"1e2852fd-00f0-4d0f-a45e-c649e26d16b9" 1633428562 7 "sessions.7"
"b0cb8fc0-a61b-11e8-98d0-529269fb1459" 7594634739 1 "sessions.1"
"b68ded0e-a61b-11e8-98d0-529269fb1459" 5000799124 3 "sessions.3"
"bb08b850-a61b-11e8-98d0-529269fb1459" 9787173343 0 "sessions.0"
"bf0464e0-a61b-11e8-98d0-529269fb1459" 3421657995 2 "sessions.2"

(Theoretically, when using UUIDs which are uniformly random, you could skip the hashing step as an optimization - the only reason it's there is to get a uniformly random distribution of numbers to do the modulo arithmetic on)

Just like with Kafka, you'd want to plan out the number of partitions ahead of time, because rebalancing them would be a pain (you'd have to copy all of the historical data you care about into the new "partitions").

@kozlovic seems like @jemc has come up with a reasonable solution, is this something that the nats streaming team have seen before/agree with ?

What's missing from @jemc's suggestion is how to spread the consumption of partitions across multiple nodes. The reason you want to partition in the first place is because the throughput of the stream is too high for a single node to handle. So, if you had 20 partitions, you would need 20 consumers, but perhaps for your regular load you don't need that many nodes, so you have 2 nodes that are consuming them, so you need some way to decide which partitions each node will handle. Then at some point, you might have a spike in load (perhaps this is an ecommerce system and it's black friday), and so you need to scale up the number of consuming nodes to 10 to handle the throughput (and ideally, this would be automatic). How does this happen? How do partitions get rebalanced, handed off from one node to another? That's the hard part of partitioning, it's not in the partitioning itself, it's in the allocation of partitions to consumers. Kafka does this for you, it uses ZooKeeper to coordinate which nodes are consuming which partitions.

So the suggestion to just use specific channels for each partition is not a solution to this, it only solves the trivial part of the problem, not the real problem.

jemc commented

Yeah, to be clear, I wasn't putting it forth as a proposal - just trying to correct an inaccuracy in how partitioning was being described.

I work for a telecoms company, using NATS as our only communication layer between services and we love NATS. We push 1000's of messages per second through the cluster and its works very reliable.

To achieve partitioning, we implemented a hash on senders to send to "topic.<hash 0..N-1>" and decided up front that we have say N=10 partitions. Then we start 10 consumers to consume it. That is pretty much as some of you described above, and it works perfectly as long as we can keep all N consumers running, and each of those need to know which nr they are, which we do with docker swarm with one consumer per physical node.

The obvious issue came in when some of the nodes broke and then those partitions were no longer served and service for those customers is down until we manually moved the consumer elsewhere.

The solution can be to automate in docker swarm the relocation of the consumer[i] to another node then it will just recover.

We however want to have more fine-grained horizontal scaling options, e.g. to use 20 partitions over 5 nodes and be allowed to run 4 consumers on each of 5 nodes during peak, but just one consumer serving all 20 partitions when traffic drops.

Our solution is to have a service to coordinate this, and change the consumer to subscribe to dynamically change the list of partitions it subscribes to... I suppose like zookeeper is doing for kafka. So we plan to implement that as a generic service, have all consumers register with it and then tell the consumers to server one or more partitions (i.e. subscriber to []topic.N and unsubscribe when told). When only one consumer register, it will be told to subscribe to all. As more consumers join, the existing consumer will be told to let go of a subscription and the new consumers told to take over. If a consumer dies, its subscription is given to the others... that kind of thing.

That is our approach... still researching for available candidates doing this with NATS, else will attempt to develop it.

@jan-semmelink-takealot aren't you reinventing kafka/redpanda rebalance logic? Wouldn't it be simpler to use redpanda in combination w/ nats? Where all publishing goes to redpanda first, and then to nats? From what I can tell, latency is quite low in redpanda.

@jan-semmelink-takealot have you got any coordination services yet?

Im facing the same issue and will probably start with a more static setup.
We currently have 100 partitions and distribute them over N consumers. That allows us to change N without changing the hash output for the partition selection of single messages.

Changing partitions number creates at least some issues:

  • Increasing partition number changes hash result's and needs additional workers.
  • Decreasing partition number might leave topics unhandled

It's als solvable with a coordinator, but also quite some work to implement.

Please note this issue is for 'nats streaming' which is now legacy and not supported anymore and replaced by JetStream, so the discussion should probably be moved to the nats-server repository instead.