Headers change breaking old schedule data
bobbyrauchenberg opened this issue · 8 comments
We have recently added support for headers to the KMS. This means the schema has evolved to add headers: Map[String, Array[Byte]]
to the Schedule
and ScheduleEvent
objects (see below)
Schedule messages written to the KMS use the avro binary format which doesn't use the schema registry. Schedule messages that were written before this change (all of them that currently exist on the schedules topic) can now no longer be decoded
case class Schedule(time: OffsetDateTime,
topic: String,
key: Array[Byte],
value: Option[Array[Byte]],
headers: Map[String, Array[Byte]])
case class ScheduleEvent(delay: FiniteDuration,
inputTopic: String,
outputTopic: String,
key: Array[Byte],
value: Option[Array[Byte]],
headers: Map[String, Array[Byte]])
We could fix this by:
a) "fix" the data - but this might be quite involved given we'd need to regenerate all schedule data - it would involved a purge of split and normalized topics and a full refresh from GZ
b) Implement headers differently - maybe putting the headers for the scheduled message on the kafka message we write in KMS rather than in the payload we write to kafka (the body of the message). This would fix the issue without extra decoding steps or datafix, but is arguably a less elegant and encapsulated way of representing the headers associated with the message the scheduler writes
c) Declare a schema for the original pre-header version. This SchemaV1
can be used to derive the pre-header schema
We can then decode against the new schema, and if this fails (for messages created pre-headers), we can decode against the base version (i've written a test for this and it would work)
after talking to @paoloambrosio-skyuk , we're going to go with c) Declare a schema for the original pre-header version
PR for this: #78
@bobbyrauchenberg can this be closed now? or are you guys still verifying? don't think the PR was correctly linked (Closes #bla)
KMS 0.21.0 constantly restarting in both dev gcp and dcos with the following error after loading topics:
Might be worth adding a docker compose test to the pr build?
{
"@timestamp": "2019-08-12T14:43:39.936Z",
"@version": "1",
"message": "Reader stream has died",
"logger_name": "com.sky.kms.actors.SchedulingActor",
"thread_name": "kafka-message-scheduler-akka.actor.default-dispatcher-31",
"level": "ERROR",
"level_value": 40000,
"stack_trace": "java.util.concurrent.TimeoutException: No elements passed in the last 5 minutes.
at akka.stream.impl.Timers$Idle$$anon$3.onTimer(Timers.scala:110)
at akka.stream.stage.TimerGraphStageLogic.onInternalTimer(GraphStage.scala:1544)
at akka.stream.stage.TimerGraphStageLogic.$anonfun$getTimerAsyncCallback$1(GraphStage.scala:1533)
at akka.stream.stage.TimerGraphStageLogic.$anonfun$getTimerAsyncCallback$1$adapted(GraphStage.scala:1533)
at akka.stream.impl.fusing.GraphInterpreter.runAsyncInput(GraphInterpreter.scala:452)
at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:481)
at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:581)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:749)
at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:764)
at akka.actor.Actor.aroundReceive(Actor.scala:539)
at akka.actor.Actor.aroundReceive$(Actor.scala:537)
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:671)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:612)
at akka.actor.ActorCell.invoke_aroundBody0(ActorCell.scala:581)
at akka.actor.ActorCell$AjcClosure1.run(ActorCell.scala:1)
at org.aspectj.runtime.reflect.JoinPointImpl.proceed(JoinPointImpl.java:149)
at akka.kamon.instrumentation.ActorMonitors$$anon$1.$anonfun$processMessage$1(ActorMonitor.scala:134)
at kamon.Kamon$.withContext(Kamon.scala:120)
at akka.kamon.instrumentation.ActorMonitors$$anon$1.processMessage(ActorMonitor.scala:134)
at akka.kamon.instrumentation.ActorCellInstrumentation.aroundBehaviourInvoke(ActorInstrumentation.scala:45)
at akka.actor.ActorCell.invoke(ActorCell.scala:574)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268)
at akka.dispatch.Mailbox.run(Mailbox.scala:229)
at kamon.executors.Executors$InstrumentedExecutorService$$anon$7.run(Executors.scala:270)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:49)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)",
"sourceThread": "kafka-message-scheduler-akka.actor.default-dispatcher-25",
"akkaSource": "akka://kafka-message-scheduler/user/scheduling-actor",
"sourceActorSystem": "kafka-message-scheduler",
"akkaTimestamp": "14:43:39.918UTC"
}
that error is an idle timeout, so it means the kms is getting no data. are transformer / adapter running in those environments and creating data?
i know we scaled a lot of things down / switched stuff off
That idle timeout only applies to topic loading. So therea data to load but it isn't receiving it or is getting stuck when trying to process it
could it be worth reverting to an earlier kms - the pre-headers version (0.19?) and seeing if that works still