Recovering from snapshot gives the incorrect index
danielhopkins opened this issue · 9 comments
On a clean topic everything (journaling and snapshotting) work correctly, but on recovery from a snapshot the plugin appears to get ahead of itself.
I'm getting the error:
kafka.common.OffsetOutOfRangeException: Request for offset 10724 but we only have log segments in the range 0 to 10723.
I'm running on an older version of kafka, 0.8.0 so I'm not sure if that's an issue.
Any ideas?
Kafka 0.8.0 should be fine. Have you by chance experimented with killing and restarting nodes in a Kafka cluster or do you see this error with a single node installation of Kafka?
I'm running with a single node installation or kafka & zookeeper.
I'm seeing this is a couple of different way, but it appears to be some kind of race. When I first spin up my application, I've seen my server request lastSequenceNr = "-1". But can't reproduce it consistently.
With this config:
akka.persistence.journal.plugin = "kafka-journal"
akka.persistence.snapshot-store.plugin = "kafka-snapshot-store"
kafka-snapshot-store.zookeeper.connect = "127.0.0.1:2181/vo_kafka"
kafka-journal.zookeeper.connect = "127.0.0.1:2181/vo_kafka"
- If I start the server
- Persist a bunch of messages, enough to create snapshots.
- Restart the server
- Send another message to the actor
I start getting index is out of range.
I've seen that when testing failover in a cluster. When you re-start a killed replica that has more messages in a partition than the current master has, Kafka drops these additional messages, which may result in snapshots that have no corresponding event sequence up to their sequence numbers. And this causes the OffsetOutOfRangeException
.
In your case, the reason is also a snapshot that has no corresponding event sequence up to the snapshot's sequence number but I'm not sure yet how this is happening. Are you using persist()
or persistAsync()
in your persistent actor? Are you restarting the server while the application is writing or while it is idle? Can you share the code you're running with some instructions how to reproduce?
Anyway, I'm currently writing an automated repair procedure that deals with such situations. From a high-level perspective, it simply ignores all snapshots that have a higher sequence number than the last stored entry in the journal.
Sure, here's the actor with only a couple things removed. The processing is done from a view, but I commented that out and still got the errors.
class DeliveryEndpointProducer extends PersistentActor with ActorLogging {
override def persistenceId = s"a-persistent-id-endpoint"
val receiveRecover: Receive = {
case s: SnapshotOffer =>
case r: RecoveryCompleted =>
case m => log.info(s"Replay ${lastSequenceNr}")
}
override def postStop = {
saveSnapshot(1)
super.postStop
}
val receiveCommand: Receive = {
case _: SaveSnapshotSuccess => log.info("Snapshot complete")
case _: SaveSnapshotFailure => log.error("Snapshot failed")
case m => persist(m) { msg =>
ack
snapshotMaybe
}
}
val autoSnapshot = 1000
def snapshotMaybe = if(lastSequenceNr % autoSnapshot == 0) saveSnapshot(1)
def ack = sender ! Success(())
}
I removed the saveSnapshot call from the postStop and haven't seen the error since. Could that be related?
I pulled in your latest commits, that also might have fixed the issue!
I removed the saveSnapshot call from the postStop and haven't seen the error since. Could that be related?
Ah yes, that could be the reason if you save a snapshot in postStop()
and stop the actor with system.stop(...)
or context.stop(...)
. I don't expect this to happen when you send the actor a PoisonPill
.
I pulled in your latest commits, that also might have fixed the issue!
No, the fix isn't committed yet but should be ready soon I hope.
The plugin does now automatically recover from these error conditions. Thanks again for feedback.