krasserm/akka-persistence-kafka

Use the new async Producer API

krasserm opened this issue Β· 51 comments

Hello,

I use kafka in synchronous replication mode (ack=-1), and with the old synchronous producer, the performances are not so good.
Do you think the version 0.4 will be available soon ?

I won't upgrade to the new producer for reasons explained in RBMHTechnology/eventuate#59. Briefly, the new producer doesn't support atomic writes of user-defined batches to a partition.

Following your comment on #28 I decided to take a look and try to implement this, but I have a question.
Is atomicity really necessary here? From what I understand of the docs, it appears that the ordering is preserved, so even if something goes wrong a consumer should see a valid journal (even if not the most up-to-date).

it appears that the ordering is preserved

@talp-traiana can you please point me to that location in the docs?

Well, they don't say it directly, but the [API documentation](http://kafka.apache.org/082/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send%28org.apache.kafka.clients.producer.ProducerRecord, org.apache.kafka.clients.producer.Callback%29) says this about the completion callbacks:

Callbacks for records being sent to the same partition are guaranteed to execute in order. That is, in the following example callback1 is guaranteed to execute before callback2:
producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1), callback1);
producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key2, value2), callback2);

Note that callbacks will generally execute in the I/O thread of the producer and so should be reasonably fast or they will delay the sending of messages from other threads

And to me that implies that the writes are performed in order (it could simply be wishful thinking on my part).

This is fine if there are no failures, but imagine the following situation: 4 events (e1, e2, e3 and e4) need to be written atomically. Internally, the producer adds e1 and e2 to a batch1, and e3 and e2 to batch2. Writing of batch1 fails but writing of batch2 succeeds. You end up with an incomplete write i.e. atomicity is broken.

More generally, under failures, you may end up with missing events in a sequence without being able to recover from. That's the reason why the new producer API is a no-go for event-sourcing. Not sure if this is fixed in the meantime, worth taking a closer look.

Yeah, I see your point... Oh, well, guess I'll keep digging.

Would it make sense to migrate to the Async producer, and just not support persistAll?

Sad to see this stale, did some digging. KafkaProducer as in 0.9 (http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html) only supports send which returns a Future. We could "block" on this, emulating the old sync API. However this is far from ideal. Ideally you would atomically/batch write all messages. Kafka has a JIRA issue on this; https://issues.apache.org/jira/browse/KAFKA-1524, but however it doesn't seem to have any activity on it (last activity was in 2014).

@Fristi thanks for your investigations. What we'd need is the ability to make atomic batch writes to a single topic partition, it doesn't need to be transactional across partitions or even topics. I'm pretty sure the new producer internally does that already, just didn't find the time to take a closer look. We would just need that to be part of the public API.

The Java database driver seems to buffer the writes in the background. The protocol itself (https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProduceRequest) supports sending a MessageSet to a topic + partition which is what we want.

@Fristi @krasserm it seems that Java driver only supports send() method for producing single record, but in fact, the driver batches all records internally and sends them into Kafka in batches. The Kafka protocol supports batch requests. The solution would be to use Kafka Protocol and implement the producer, which will be used in AsyncWriteJournal to send batches of PersistentRepr messages to Kafka. What do you think? @krasserm do you see any other problems?

it seems that Java driver only supports send() method for producing single record, but in fact, the driver batches all records internally and sends them into Kafka in batches

yeah, this is exactly what I described here :-) See also RBMHTechnology/eventuate#59

The Kafka protocol supports batch requests. The solution would be to use Kafka Protocol and implement the producer, which will be used in AsyncWriteJournal to send batches of PersistentRepr messages to Kafka. What do you think? @krasserm do you see any other problems?

No, except for the effort to implement the producer. We also need to be able to deal with failover scenarios etc. Maybe extending/patching the exiting Java producer to offer a batch-write API is less effort.

@krasserm great, I am interested to support this backend so it is good time to start discussion how to implement this and meet requirements of Akka Persistence Query ;) I will go deeply into Kafka Java driver and check if we can reuse some parts to implement producer or just extends Java driver.

Current implementation of akka-persistence-kafka creates topic per persistenceId always using partition 0, so below I have collected all ideas how to support predefined queries:

  • EventsByPersistenceIdQuery and CurrentEventsByPersistenceIdQuery - we will reply events from topic
  • EventsByTag and CurrentEventsByTag - with EventAdapters user can define tag (tags), all tagged events will be also persisted to tag`s topic and we will reply them from this topic.
  • AllPersistenceIdsQuery and CurrentPersistenceIdsQuery - KafkaConsumer provides listTopics() method, but it is not good idea to use it. What about producing single record for each PersistentRepr with sequenceNr == 0 to topic with name eg. "persistenceIds" and support this query by replying events from this topic?
ktoso commented

It should be noted that a general Akka Streams Kafka API has been developed over here for quite a while very actively: https://github.com/akka/reactive-kafka AFAIR using the "new" APIs, so it might be worth considering if re-use and adapt for the Persistence Query needs might be more feasible than a full reimplementation :)

@ktoso I totally agree that using reactive-kafka as backend for Persistence Query is good solution. Thanks to that the only need is to implement the producer :)

It would be great to have akka-persistence working on the latest kafka version. I've pointed Kafka on Twitter (https://twitter.com/mark_dj/status/737334591795695616) to this topic in the hopes they can help us out on this matter :) (maybe they've got already an issue on it (couldn't find it though) or are working it).

ktoso commented

Uhm, it's a by design API breakage as far as I know, so I don't think Kafka maintainers will do something about it. Instead this project should use the new APIs, which is what Reactive Kafka does.

Thanks to that the only need is to implement the producer :)

Maybe I'm tired, but Reactive Kafka provides both sink and source (i.e. Consumer / Producer), so those can be reused to implement the semantics that Persistence Query would need :)

Your right, there is ProducerStage in Reactive Kafka, but it cannot be used to implement AsyncWriteJournal because it does not support persisting atomic batches of events, it passes all elements from stream to Kafka`s producer, which is able to consume single record, whereas batching of those records is done internally by Java client.

Uhm, it's a by design API breakage as far as I know

I am not sure if I understand you πŸ˜„ What API will break? To make my point clear, the Kafka protocol produce requests describe this (https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProduceRequest):

v0, v1 (supported in 0.9.0 or later) and v2 (supported in 0.10.0 or later)
ProduceRequest => RequiredAcks Timeout [TopicName [Partition MessageSetSize MessageSet]]
  RequiredAcks => int16
  Timeout => int32
  Partition => int32
  MessageSetSize => int32

Which allows to write a MessageSet (multiple messages in one go) instead of using Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback callback) which requires you to send each message one by one.

The Haskell Kafka client does this https://hackage.haskell.org/package/milena-0.5.0.1/docs/Network-Kafka-Producer.html

I am interested to support this backend ... I will go deeply into Kafka Java driver and check if we can reuse some parts to implement producer or just extends Java driver.

Really looking forward to discuss your findings @maciekciolek

I totally agree that using reactive-kafka as backend for Persistence Query is good solution. Thanks to that the only need is to implement the producer :)

+1,

It would be great to have akka-persistence working on the latest kafka version.

This should actually work but requires using the old producer.

Uhm, it's a by design API breakage as far as I know, so I don't think Kafka maintainers will do something about it. Instead this project should use the new APIs ...

+1

Reactive Kafka provides both sink and source (i.e. Consumer / Producer), so those can be reused to implement the semantics that Persistence Query would need

The problem with the new Kafka producer is that atomic batch writes are required by Akka Persistence but the new producer (and hence Reactive Kafka) doesn't support it; producer clients don't have control over batch boundaries. Creating and submitting batches to a topic partition however is done internally by the new producer. Exposing it through an additional public API should be possible with reasonable effort I think.

I went deeply into the problem and I have some findings, which must be discussed.

At the beginning let's clarify Kafka`s design:

  1. Within each topic we can have configured number of partition and the client is responsible of producing records to selected partition.

  2. We can have cluster of Kafka brokers and those brokers will select the leader for each partition.

  3. Writes and reads must be done to leader of the partition, otherwise client will reply with NotLeaderForPartition error.

  4. The leadership is dynamic, so:

    ... you can't just configure each client with some static mapping file. Instead all Kafka brokers can answer a metadata request that describes the current state of the cluster: what topics there are, which partitions those topics have, which broker is the leader for those partitions, and the host and port information for these brokers.

  5. Metadata Request defined in protocol allows to fetch metadata for all topics in cluster or just for provided list of topics.

  6. Produce Request defined in protocol allows to produce single batch of records across topics and partitions, but the client need to ensure that this batch contains data for partitions of which the target broker is current leader, otherwise we can except NotLeaderForPartition error.

Finally, I was able to reuse some parts of Java driver and make batch requests - so I can confirm that we are able to implement producer which can made atomic writes of batches.

Problem 1:
Our goal is to ensure that each Persistent Actor creates single topic (per persistenceId) in Kafka. So in order to write some records, we need to maintain topic`s metadata on client side (as described in point 4) for each topic (in our case persistenceId).

Java client does metadata update every 5 minutes (by default) or when any partition leadership occurs (NotLeaderForPartition error).

Will it be okey if we cache leader per each persistenceId (topic), which can result in huge memory usage? Maybe we should provided some LRU cache?

Problem 2:
In order to support AllPersistenceIdsQuery my idea was to make atomic write of batch, eg.:

[ 
(persistenceId1Topic, persistentRepr0, partition0), 
(persistenceId1Topic, persistentRepr1, partition0),
(persistenceId1Topic, persistentRepr2, partition0) 
... 
(persistenceId1Topic, persistentReprN, partition0), 
(allPersistenceIdsTopic, persistenceId1, partition0)  - if we notice that sequenceNr is equal to 0
]

This write ensures consistency between topic persistenceId1Topic and allPersistenceIdsTopic and also ensures that AllPersistenceIdsQuery will always returns the truth. But as I described in point 6, it is not possible to do, because the leader of persistenceId1Topic cannot be same as the leader of persistenceIdsTopic.

Do you have any ideas how to solve this problem?

Finally, I was able to reuse some parts of Java driver and make batch requests - so I can confirm that we are able to implement producer which can made atomic writes of batches.

Awesome news, thanks a lot for digging into it. Did you need to make some changes to the driver or was it possible with the existing API? Would you mind to share some example code?

Will it be okey if we cache leader per each persistenceId (topic), which can result in huge memory usage? Maybe we should provided some LRU cache?

I'm not sure if memory usage will get too high as the number of topics is limited (few 100 - few 1000). Anyway, using an LRU cache sounds absolutely reasonable to me. For finding the leader of a topic, you can reuse the leaderFor methods of MetadataConsumer.

AllPersistenceIdsQuery ... Do you have any ideas how to solve this problem?

Kafka can give you a list of all topics. Using a naming convention for topics should solve the problem or am I missing something?

Awesome news, thanks a lot for digging into it. Did you need to make some changes to the driver or was it possible with the existing API? Would you mind to share some example code?

I reused the API, but still it requires from us to implement the new producer, the only parts we can reuse it the async network client and protocol representation. To be honest, I am thinking about writing it from the beginning using Actor with TCP support. Example code

I'm not sure if memory usage will get too high as the number of topics is limited (few 100 - few 1000). Anyway, using an LRU cache sounds absolutely reasonable to me. For finding the leader of a topic, you can reuse the leaderFor methods of MetadataConsumer.

Reusing MetadataConsumer is good idea, but I need to ask - why did you say that the number of topics is limited? IMO the number of topics equals the number of persistenceIds, which in huge systems can be even few 1000000 :) Am I missing something?

Kafka can give you a list of all topics. Using a naming convention for topics should solve the problem or am I missing something?

You are right, we can get the list of topics with API, but it returns just the list. It is easy to turn up the list into the stream - great, but in order to implement eg. the allPersistentIds query, we need to make cyclic queries and remembering the topics which we returned already in order to push newly created topics to stream of allPersistenceIds. That's why I am thinking about creating a separate topic for all perisitenceIds.

Great work @maciekciolek!

To be honest, I am thinking about writing it from the beginning using Actor with TCP support.

I have scodec code for the ProduceRequest/ProduceResponse in case you are interested :)

Edit: Ah I see you are using the Kafka Java API to achieve the same thing. Great stuff

@maciekciolek thanks a lot for sharing the code, will take a closer look soon.

IMO the number of topics equals the number of persistenceIds, which in huge systems can be even few 1000000 :) Am I missing something?

The number of topics/partitions in Kafka is limited, you won't be able to have more than 100K topics or so (see also https://www.quora.com/How-many-topics-can-be-created-in-Apache-Kafka). I've even experienced much stricter limits in the past.

From this perspective, akka-persistence-kafka has limited applicability but an Eventuate integration would definitely make sense as persistent actors can share event logs there, in the same way as producers and consumers can share a topic in a Kafka application. See also http://rbmhtechnology.github.io/eventuate/faq.html.

... That's why I am thinking about creating a separate topic for all perisitenceIds.

I see. Still, I wouldn't add to this topic on the write side. Instead, a background job could periodically (or, when signaled) load the list of existing topics from Kafka, compute a diff and add new persistenceIds to the allPersistenceIds topic. That's a cleaner separation of the write side from the query side and maps much better to the write/read journal separation in Akka.

The number of topics/partitions in Kafka is limited, you won't be able to have more than 100K topics or so (see also https://www.quora.com/How-many-topics-can-be-created-in-Apache-Kafka). I've even experienced much stricter limits in the past.

You are right, this is limitation of Kafka :( So, akka-persistence-kafka is good backend if your application will not exceed the number of persistenceIds above the limitations of Kafka topics.

... an Eventuate integration would definitely make sense as persistent actors can share event logs there, in the same way as producers and consumers can share a topic in a Kafka application. See also http://rbmhtechnology.github.io/eventuate/faq.html.

As far as I understand this Kafka integration should implement EventLog from Eventuate to support this?

I see. Still, I wouldn't add to this topic on the write side. Instead, a background job could periodically (or, when signaled) load the list of existing topics from Kafka, compute a diff and add new persistenceIds to the allPersistenceIds topic. That's a cleaner separation of the write side from the query side and maps much better to the write/read journal separation in Akka.

Okey, this can be done because we have some "upper" limit to possible topics.

Do you have any ideas how can we support EventsByTag query? My first idea was to make atomic batch of events to persitenceIdTopic and eventTopic, but it will only works if the leader of those topics and partitions is the same.

I think the new producer can still be used as long as the consumer used by the journal is "batch-aware." The producer could add metadata to each write (batch id, batch size) so that later the consumer could ignore incomplete batches during replay. This would probably make the offset arithmetic more complicated though.

Update: It turns out that this conversation is pretty active, so I should have scrolled up and read the updates before posting my thoughts.

As far as I understand this Kafka integration should implement EventLog from Eventuate to support this?

Yes

Do you have any ideas how can we support EventsByTag query?

With another background job, processing from the persistenceId topics and generating tag topics. The main problem with this approach is that there might be many persistenceId topics to conume from for processing.

Alternatively, a write journal could write all its events to a single topic partition (or a few topic partitions, e.g. by hashing persistenceIds, for scaling writes) and then use this topic (these topics) to derive everything else:

  • all persistenceIds topic
  • persistenceId topics (used to recover individual persistent actors)
  • tag topics

This approach would also keep events written by a single journal instance in write-order (per partition) which can be very helpful when processing events for creating views (e.g. persistenceId topics or tag topics). Keeping only a write-order per persistenceId during initial writes (as done in most, if not all, existing akka-persistence journals, including this one) can make creating views rather difficult.

A similar approach is taken in Eventuate, for example, where an event log (written by many event-sourced actors) is indexed to allow efficient replay per actor id.

The idea of creating single topic (with partitions) for all events is great, but we should be aware about problems:

  1. Recovery persistenceId from this topic could take such a long time in huge systems because of skipping other events. Maybe creating enough amount of partitions could be the solution, but still it can cause some problems.
  2. Creating views base on this topic eg. per persistenceId and recovery actor based on it is not a good solution, because those views will be eventually consistent up to main topic, which in some cases can cause dropping of events.

Too be honest, it seems that Kafka is good backend only with deployment of one broker and topics with single partition πŸ‘

The problems you see are easy to solve: you just need to store a processing progress of the main topic. Whenever you make an eventually consistent read form a derived topic, you complete the read by processing the (usually short) end of the main topic, starting from the progress marker. This pattern is implemented, for example, in Eventuate’s Cassandra event log and we have tested read-your-write consistency (among many other things) even under chaotic conditions and are using it successfully in production. Furthermore, this is a common pattern in log-centric architectures.

Following this pattern would also allow us to have a Kafka-based write journal combined with a read journal that uses another storage backend (Cassandra, HBase ... or whatever). IMO it doesn’t make much sense to use Kafka for a feature-rich read journal, except for maintaining allPersistenceIds and persistenceId topics.

Hope that helps :)

Thanks a lot for those ideas :) So, I would like to summarize current ideas:

Batch Producer
It is needed to implement the producer, which will support batch writes of events. It can be done by reusing parts of existsing producer or with Akka Actor + TCP.

Topic per journal
The write journal will write all events to partitioned topic, wheres the partitioning will be done by hashing persistenceIds. This will ensure that we will keep events in write-order per partition.

Recovery per persistenceId
This can be done in two ways:

  • reading events from main topic partition and skipping not acceptable events - the recover will take longer time than creating eg. view per persistenceId
  • creating view per persistenceId - but with this approach, we will facing max number of topic in Kafka limits and requires to implement switch logic from reading this view into main topic

Hmm, maybe it can be configureable - user will choose which strategy is suitable for him regarding Kafka limitations. Recovery from main topic could also be connected with storing snapshots, which will shorten the time of recovery.

Read per persistenceId
Same solutions as described above

Read allPersistneceIds
The topic (view) should be created which will be feeded by background job. The job will read all events from main topics and push new persistenceIds if sequenceNr of event is equal to 0.

Read byEventType
The topic (view) should be created per eventType which will be feeded by background job. The job will read all events from main topic partitions and copying the tagged events. With this approach we can ensure, that eventsByType view will contain events totally ordered by partition (almost persistenceId), but it will not ensure that we have total order of events between partitions.

The background job mentioned above should start on each node, which will request any of query stream, eg. allPersistenceIdQuery or allEventsByTagQuery. It also can be configurable to not start this job, if user do not want to. Those jobs will use default Java consumer and will belong to same groupId which ensures that we will not read same event on more time.

Am I correct? :)

@maciekciolek sounds good!

We have discussed similar solution in the Akka Persistence Cassandra Query implementation. I started implementing it, but never finished as we progressed with a simpler solution (although potentially missing some performance and correctness aspects) utilising materialised views. I think I gave the read side quite some thoughts including some of the difficulties you mentioned such as consistent replay or handling of failure scenarios.

Do you want to split the work? I would be interested mostly in implementing the background job that precomputes the allPersistenceIds/eventsByTag "views", because I have spent quite some time on the concept already, but happy to help elsewhere too.

I would suggest creating a separate branch with update to Kafka 0.9 and then split the work and proceed in parallel.

@zapletal-martin thanks a lot for offering help here, highly appreciated. +1 on proceeding as you proposed. @maciekciolek thoughts?

@zapletal-martin thanks a lot for your proposal πŸ‘ It`s a great idea to split the work, then I will implement the producer and AsyncWriteJournal for this new producer. If you finish with background jobs, than we can implement read journal with reactive-kafka. Would it be great to have some chat in order to talk about all details?

@maciekciolek apologies for the delay. Happy to connect and discuss the details. What communication channel would be most convenient?

@zapletal-martin no problem, I am also a little lack of time. I think that the fastest way to start it the gitter - I was unable to find you, can you join or provide other communication channel?

I have now joined Gitter, using my git name.

@maciekciolek @zapletal-martin I just created https://gitter.im/krasserm/akka-persistence-kafka for this repository. Would love to follow/participate your discussions.

I hate to interrupt this great discussion but I can't quite understand the problems that @maciekciolek enumerates and the pattern that @krasserm proposes to solve them.

Recovery persistenceId from this topic could take such a long time in huge systems because of skipping other events. Maybe creating enough amount of partitions could be the solution, but still it can cause some problems.

This isn't solved with Snapshots?

Creating views base on this topic eg. per persistenceId and recovery actor based on it is not a good solution, because those views will be eventually consistent up to main topic, which in some cases can cause dropping of events.

Views are not always eventually consistent?
In what cases the events could be dropped?

you just need to store a processing progress of the main topic. Whenever you make an eventually consistent read form a derived topic, you complete the read by processing the (usually short) end of the main topic, starting from the progress marker.

I'm lost here.

I know that you're working on the akka-persistence-kafka plugin so I understand if you can't spent time to respond this.
I will be following your progress.

@gabrielgiussi here are the answers to your questions:

This isn't solved with Snapshots?

You are right, it can be solved with Snapshots. I was thinking about the situation if eg. you have 1M persistent actors and everyone contains only 10 events. If user does not use snapshot, with naive topic per journal this will result in bad performance. In other words journal will reply all events to get only those 10 events...

Views are not always eventually consistent?
In what cases the events could be dropped?

Here I was talking about @krasserm proposition to create the topic per persistenceId as the view of main topic in order to provider faster actor recovery. Of course this view (topic) will be eventually consistent to main topic, so you will need some logic to read the latests events from main topic.

I'm lost here

This should help to understand:

Recovery per persistenceId
This can be done in two ways:

  • reading events from main topic partition and skipping not acceptable events - the recover will take longer time than creating eg. view per persistenceId
  • creating view per persistenceId - but with this approach, we will facing max number of topic in Kafka limits and requires to implement switch logic from reading this view into main topic
    Hmm, maybe it can be configureable - user will choose which strategy is suitable for him regarding Kafka limitations. Recovery from main topic could also be connected with storing snapshots, which will shorten the time of recovery.

As regards the progress, I am working on producer implementation from scratch with scodec and new v1 Kafka protocol :)

@maciekciolek Thanks, it's much clearer now. Keep up the good work!

I created a working PoC of the background job that precomputes the "views", using Kafka's group rebalance to avoid duplicates in the resulting views after a failure after publishing, but not committing. It is a very general solution and will require lots of refactoring and tests, but proves the concept, https://github.com/zapletal-martin/distributed-causal-stream-processing. Let me know if there is any progress and when this can be added).

Just found the new Kafka proposal for idempotent producers and transactional messaging. The idempotent producer is exactly what we need for using Kafka as event sourcing storage backend.

That would largely simplify the task at hand. It however still seems to be in proposal stage, is any information about the timelines when it might be included in Kafka available? Prior to that, it might still be possible to proceed with the above solution and keep extensibility and modularity in mind for future replacement.

@zapletal-martin sorry for late reply. I couldn't find a timeline for idempotent producer. Seems this will take a while until ready. So πŸ‘ to proceed as discussed before.

giena commented

Hi everyone,

It seems that the transactional producer is available since kafka 0.11.0. Is it still exactly what we need? Someone works on it?

I was thinking the same thing, and I am planning on looking into it

giena commented

Hi,
I worked on it on our own fork at https://github.com/worldline-messaging/akka-persistence-kafka/

  • Use of the new KafkaProducer with transactions
  • No more use of the zookeeper API
  • KafkaJournalWriters have been discarded, because the new API is asynchronous.
    Could you please test it and give me some feedbacks?
giena commented

#A little status about the Akk persistence plugin for kafka with transactions:

  • Usage of Kafka 1.0.0
  • I had a lot of difficulties passing the optional TCK tests, especially the performance tests. Indeed, the systematic use of transactions is problematic when the batch size is small. In the case of the TCK, they use batch writing of 1 event (using persist rather than persistAsync). And there, for each event, we start a transaction, we write a message and we commit. And there, it does not go well because I am unable to store 10000 messages in less than 10 seconds. This use case is only for testing, it should not happen in a real log or we just persist with persistAsync. So no gravities in my opinion. Especially since these tests are part of the TCK's optional tests.
    I stop my efforts for the moment, we have a functional version that passes the mandatory tests. The perfs are degraded but we got rid of deprecated APIs, the code is really simplified.

Could you please test it and give me some feedbacks?

#37