Brainstorm/ Discussion: Kafka as "persistent" source
Closed this issue · 11 comments
Kafka is a persistent commit log. Because of the persistent nature of Kafka we can use kafka for event sourcing. Here is a nice article about the subject https://dzone.com/articles/event-sourcing-with-kafka. This article describes RocksDb for state.
I think Kafka and Akka could have great synergy. There is just one catch, which is already mentioned in the article as well. Because Kafka uses files on disc to store the event stream it is considered not wise to use topic per entity, unless you have little entities in the system, but you would also need some administration of what entities exist (is isn't that hard, as we can get info using metadata).
Examples from other github sources actually have 1 topic per actor, java/scala example: https://github.com/krasserm/akka-persistence-kafka there is an EventTopicMapper
but I can't really see if it can be used for many entities into 1 topic, or if it meant to do that altogether.
I think it is not smart to make kafka a source for a persistent actor at this moment as you will need as much consumers as number of actors which are restoring state. Kafka topics are not meant to be read from the beginning over and over again from a performance point.
To not get all to abstract here, a simple Event Sourced system could be a Users topic. We could define UserCreated, UserDisplaynameChanged, UserLoggedIn, UserLoggedOut. I could see we have 1 UsersTopic. Let's make the Key a Guid (we all love Guid don't we ;))
What we would need:
- mapping topic to topic coordinator, in our case UserCoordinator?
- partitions mapping, what set of user are part of a particular topic
- trigger when an actor has caught up in the topic it is in, presuming multiple partitions some actors would be synched sooner then others.
- I think the system should stash commands as long as the actor is still being build
I don't think we need to worry a lot about other actors producing in the the topics out subsystem is subscribing, this is up the the developer and has the same issues when someone has persistent actors with overlapping persistenceids, system behaviour could be real strange, but as long as consumers keep subcribed to the topic, it will be a lot more robust then current persistent actors, as they stop the subscription (most implementations anyway) ...
Kafka can perform at great speeds, I think creating snapshots could be done, but snapshots for multiple actors in a topic could be quite hard to do, so scope of this discussion should be events/ journal only at this time.
I have already build actors based on topics, but I have created this "issue" for a discussion for an generic solution, we could provide others in this repo, or anywhere else.
My thoughts how to do this with the simple users topic. I created this discussion because I would like to see what others think before missing important Akka features/ Kafka features etc.
- implementation should be based on a BaseClass (KafkaTopicEntityCoordinator), it would need a way to create child actors Funct<byte[], Props> maybe?, transform key into IEquatable, I guess Func<T, IComparable> could do the trick.
- abstract method Synced we can Become
- message Key can be used to create a child actorname, simply to create string from byte[] (ansi encoding)
- The BaseClass should have a mapping which Dictionary<topic, HAshSet> to tell all partition dependant actors the topic reached EOF, all actors we get trigger to Become(Synced).
- When all topics reach the end the coordinator can cal Become(Synced)
- When Synced all actors can tell the Producer associated with the Coordinator baseclass new events to persist.
Based on my first thoughts on this I guess we need the PartititionEOF from Kafka as an event as well in the SourceStage.
I am going to implement something next week, as I would to streamline some of my actors using the Kafka connector. I am using AMQP to talk to non-trusted remote ActorSystems (running on Phones/ Dekstops/ etc.) and I am using Kafka as my EventSource (retention -1).
@AndreSteenbergen There are a lot of doubts when using Kafka as eventsourcing layer - I'm not convinced to use this approach myself, especially in combination with actors where stream-per-actor semantic is pretty important (as it will be used quite often during state replays).
However I see Kafka as a bridge for actor cluster communication and great source for events, that multiple services subscribe to at different points in time to produce their own persistent micro-states (in their own storages). I think it's a major marketing point of Kafka right now, and indeed it fits it well.
I think, that maybe you could start from a vision/motivation - what problem do you aim to fix, what alternatives do we have right now and how (and when) yours will be better?
I am using Kafka as an event source for about 1.5 years now. I works quite well actually, I did have some strange problems here and there. But that was me not reading the docs correctly. The only thing I needed to do was to let go of the stream-per-actor mindset.
As an event source kafka ticks every box for the needs I see. It is a performant commit-only log for immutable objects.
I don't think state replays should be used, when kafka is used as an event source. Start from the beginning read all events, create state based on the events, or cache state (snapshots if you you would like). I use cache for data that's read not often and is large in memory. Fast data I use rarely exceeds 500mb in data, I understand this way of event source is only really possible when a lot of the active data can be kept in memory, but that's also the case when an in-memory data store is used like redis-cache as a persistence store.
My motivation is kafka is already used as an event source. It's on of the use cases on their website. There are also quite a lot of sources about how to do it. Basically what I am trying to fix is the gap between Kafka as Event Source and Akka not knowing when to start processing commands, when using the nuget package as StreamSource.
I think the biggest concern is when are actors "allowed" to process commands (assuming the command needs current state).
I am not trying to pour Kafka into a PersistentActor. I don't see that working, because of the stream per actor. I am actually trying to go for a coordinator with multiple actors. So 1 Kafka source many Actors; 1 coordinator to start other actors, perhaps even other coordinators.
If akka.streams for kafka is (will be) ready, then you can use either PartitionHub, GroupBy, or construct more dedicated GraphStage (i.e. GraphStage equivalent of sharding coordinator) to manage routing messages from kafka to actors.
I use GroupBy partition, works well.
Do you have a source where I can read up on that, PartitionHub, GroupBy and how to create a GraphStage? http://getakka.net/articles/streams/stream-dynamic.html ? It looks partitionHub is not what I have in mind, from the documentation, it looks like I would need to know consuming actors up front.
I am looking te create something like a coordinator e.g. UserCoordinator, with Children, UserActor, one for each unique User in the UsersTopic.
@Horusiath How would I make a Sharding Coordinator, without knowing the number of actors up front? In a dedicated Graphstage? BTW, I just created a Pull Request to allow partition eofs to be handled elsewhere as well.
@AndreSteenbergen if you're talking about akka.cluster.sharding - there are no hard laws, but the common rule of thumb when using hash-based message extractors, is to configure maximum number of shards (not actors) to be 10x the maximum expected size of your cluster - so if you expect that in the future your cluster can grow to 100 nodes, set number of shards to 1000.
You can read more about cluster sharding here:
Akka.Streams.Kafka have been moved out of Alpakka into its own Github repo.
Closing this issue. Please re-open them in https://github.com/akkadotnet/Akka.Streams.Kafka if you're still having this problem.