onyx-platform/onyx

Resume point AssertionError: Assert failed: (= slot-migration :direct) for {:mode :initialize}

sundbry opened this issue · 0 comments

I'm getting assert failures when I try to submit a job with an initial resume point. I can't add {:slot-imgration :direct} to the resume point map or it will fail validation?

{:message Onyx lifecycle exception, :lifecycle #:lifecycle{:task :all, :calls :app.data.core/standard-lifecycle-calls, :doc Standard lifecycles including error handling}, :phase :lifecycle/recover-input}
java.lang.AssertionError: Assert failed: (= slot-migration :direct)
        at onyx.peer.resume_point$recover_input.invokeStatic(resume_point.clj:118)
        at onyx.peer.resume_point$recover_input.invoke(resume_point.clj:114)
        at onyx.peer.task_lifecycle$recover_input.invokeStatic(task_lifecycle.clj:447)
        at onyx.peer.task_lifecycle$recover_input.invoke(task_lifecycle.clj:442)
        at onyx.peer.task_lifecycle.TaskStateMachine.exec(task_lifecycle.clj:1075)
        at onyx.peer.task_lifecycle$run_task_lifecycle_BANG_.invokeStatic(task_lifecycle.clj:554)
        at onyx.peer.task_lifecycle$run_task_lifecycle_BANG_.invoke(task_lifecycle.clj:544)
        at onyx.peer.task_lifecycle$start_task_lifecycle_BANG_$fn__46871.invoke(task_lifecycle.clj:1162)
        at clojure.core.async$thread_call$fn__11025.invoke(async.clj:441)
        at clojure.lang.AFn.run(AFn.java:22)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

The job spec looks like this:

{:workflow [[:telemetry-events :telemetry-handler] ...]
 :catalog 
   [{:onyx/name :telemetry-events
     :onyx/plugin :onyx.plugin.kafka/read-messages
     :onyx/type :input
     :onyx/medium :kafka
     :kafka/topic "telemetry"
     :kafka/receive-buffer-bytes 65536
     :kafka/zookeeper "my.zookeeper.host:2181"
     :kafka/offset-reset :earliest
     :kafka/deserializer-fn :app.core/deserialize-json
     :kafka/wrap-with-metadata? false
     :onyx/batch-timeout 50
     :onyx/n-peers 2
     :onyx/batch-size 100
     :onyx/doc "Reads messages from a Kafka topic"}
    ...]
 :lifecycles ...
 :flow-conditions ...
 :task-scheduler :onyx.task-scheduler/balanced
 :resume-point {:telemetry-events {:input {:mode :initialize}}}}

I'm not sure what I did wrong? I think a resume point example in the onyx-examples would also be very helpful.