Resume point AssertionError: Assert failed: (= slot-migration :direct) for {:mode :initialize}
sundbry opened this issue · 0 comments
sundbry commented
I'm getting assert failures when I try to submit a job with an initial resume point. I can't add {:slot-imgration :direct} to the resume point map or it will fail validation?
{:message Onyx lifecycle exception, :lifecycle #:lifecycle{:task :all, :calls :app.data.core/standard-lifecycle-calls, :doc Standard lifecycles including error handling}, :phase :lifecycle/recover-input}
java.lang.AssertionError: Assert failed: (= slot-migration :direct)
at onyx.peer.resume_point$recover_input.invokeStatic(resume_point.clj:118)
at onyx.peer.resume_point$recover_input.invoke(resume_point.clj:114)
at onyx.peer.task_lifecycle$recover_input.invokeStatic(task_lifecycle.clj:447)
at onyx.peer.task_lifecycle$recover_input.invoke(task_lifecycle.clj:442)
at onyx.peer.task_lifecycle.TaskStateMachine.exec(task_lifecycle.clj:1075)
at onyx.peer.task_lifecycle$run_task_lifecycle_BANG_.invokeStatic(task_lifecycle.clj:554)
at onyx.peer.task_lifecycle$run_task_lifecycle_BANG_.invoke(task_lifecycle.clj:544)
at onyx.peer.task_lifecycle$start_task_lifecycle_BANG_$fn__46871.invoke(task_lifecycle.clj:1162)
at clojure.core.async$thread_call$fn__11025.invoke(async.clj:441)
at clojure.lang.AFn.run(AFn.java:22)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
The job spec looks like this:
{:workflow [[:telemetry-events :telemetry-handler] ...]
:catalog
[{:onyx/name :telemetry-events
:onyx/plugin :onyx.plugin.kafka/read-messages
:onyx/type :input
:onyx/medium :kafka
:kafka/topic "telemetry"
:kafka/receive-buffer-bytes 65536
:kafka/zookeeper "my.zookeeper.host:2181"
:kafka/offset-reset :earliest
:kafka/deserializer-fn :app.core/deserialize-json
:kafka/wrap-with-metadata? false
:onyx/batch-timeout 50
:onyx/n-peers 2
:onyx/batch-size 100
:onyx/doc "Reads messages from a Kafka topic"}
...]
:lifecycles ...
:flow-conditions ...
:task-scheduler :onyx.task-scheduler/balanced
:resume-point {:telemetry-events {:input {:mode :initialize}}}}
I'm not sure what I did wrong? I think a resume point example in the onyx-examples would also be very helpful.