krasserm/akka-persistence-kafka

Write messages

giena opened this issue · 2 comments

The asyncWriteMessages is like this:
def asyncWriteMessages(messages: Seq[PersistentRepr]): Future[Unit] = {
val sends = messages.groupBy(.persistenceId).map {
case (pid, msgs) => writerFor(pid).ask(messages)(writeTimeout)
}
Future.sequence(sends).map(
=> ())
}

I think it should be like this:
def asyncWriteMessages(messages: Seq[PersistentRepr]): Future[Unit] = {
val sends = messages.groupBy(.persistenceId).map {
case (pid, msgs) => writerFor(pid).ask(msgs)(writeTimeout)
}
Future.sequence(sends).map(
=> ())
}

Do i make a mistake?

Ouch, that's really a bug, good catch. It just didn't surface because a message batch sent by akka-persistence only contains messages with the same persistenceId. Do you want to create a pull request? Maybe along with a test that demonstrates that the bug is fixed?

I've created a pull request: #25
But no test.