onyx-platform/onyx

Feature request: Do not try to recover output checkpoints for plugins that don't use it

sundbry opened this issue · 3 comments

Many output plugins do not use checkpoint state. It would be nice if the system did not even bother to read/write these checkpoint files if they are just going to be empty anyways. This should give us a couple benefits:

  1. Anti-fragility in resuming jobs (especially when the output/structure of the job changes but the input stays the same)
  2. Reduced load and costs of S3

I imagine to keep the interface flexible, we could add to the plugin protocol to check at runtime for various features of the plugin, such as if output checkpointing is supported.

Thoughts?

(Typical stack trace when I resume a job and output checkpointing fails b/c I changed the job definition around)

ERROR 2019-05-23 09:23:29,085 service.data.job.core: {:message Onyx lifecycle exception,  :phase :lifecycle/recover-output}
com.amazonaws.services.s3.model.AmazonS3Exception: The specified key does not exist. (Service: Amazon S3; Status Code: 404; Error Code: NoSuchKey; Request ID: A07B7892DD6BE81F; S3 Extended Request ID: AOGX9hab+QAnQIuIk9C1gVVVSOPTZRUeBNkzRIbUE/vxnk7wlDS/OWqqquH/M9GNnWNUr4DWyF8=), S3 Extended Request ID: AOGX9hab+QAnQIuIk9C1gVVVSOPTZRUeBNkzRIbUE/vxnk7wlDS/OWqqquH/M9GNnWNUr4DWyF8=
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1639)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
        at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
        at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
        at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
        at com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1409)
        at onyx.storage.s3$read_checkpointed_bytes.invokeStatic(s3.clj:102)
        at onyx.storage.s3$read_checkpointed_bytes.invoke(s3.clj:100)
        at onyx.storage.s3$eval49560$fn__49562$fn__49564.invoke(s3.clj:241)
        at onyx.storage.s3$eval49560$fn__49562.invoke(s3.clj:239)
        at clojure.lang.MultiFn.invoke(MultiFn.java:284)
        at onyx.peer.resume_point$read_checkpoint.invokeStatic(resume_point.clj:56)
        at onyx.peer.resume_point$read_checkpoint.invoke(resume_point.clj:51)
        at onyx.peer.resume_point$recover_output.invokeStatic(resume_point.clj:112)
        at onyx.peer.resume_point$recover_output.invoke(resume_point.clj:106)
        at onyx.peer.task_lifecycle$recover_output.invokeStatic(task_lifecycle.clj:486)
        at onyx.peer.task_lifecycle$recover_output.invoke(task_lifecycle.clj:479)
        at onyx.peer.task_lifecycle.TaskStateMachine.exec(task_lifecycle.clj:1070)
        at onyx.peer.task_lifecycle$run_task_lifecycle_BANG_.invokeStatic(task_lifecycle.clj:550)
        at onyx.peer.task_lifecycle$run_task_lifecycle_BANG_.invoke(task_lifecycle.clj:540)
        at onyx.peer.task_lifecycle$start_task_lifecycle_BANG_$fn__43880.invoke(task_lifecycle.clj:1155)
        at clojure.core.async$thread_call$fn__11217.invoke(async.clj:442)
        at clojure.lang.AFn.run(AFn.java:22)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

We actually see this issue in our long running jobs even when no changes to the job have been made. The stack trace is almost identical other than you're using S3 and we're using ZK.

I'm pretty sure ours is due to a task rebooting but haven't figured out why it happens, or why it appears to consistently be the same output task.

@jgerman it sounds like your issue is unrelated to missing lookups when loading checkpoints.

Check the back-pressure on your tasks. It is the most frustrating and hard to identify cause of random peer timeouts. Despite the brief section in the user guide, onyx 0.14 does not handle backpressure all that well. When the preceeding task is pushing data out faster than the downstream tasks can take, it can stop processing heartbeat messages.

If. you have a lot of back-pressure, consider writing to an intermediate kafka topic to decouple the imbalance between producer/consumer speed. Tuning the idle-sleeps in the config as the user guide suggests is actually not that effective in my experience.

http://www.onyxplatform.org/docs/user-guide/0.14.x/#backpressure

Thanks! Any suggestions for measuring back pressure? I've only indirectly looked at it via consumer lag on our kafka topics.

I've definitely seen issues with checkpoints not being written which I assumed was due to heartbeats (barriers?) not being processed.