akka/akka-persistence-jdbc

Slick queue size not configurable

Closed this issue · 0 comments

No config for slick queue size in the default config, nor there is a way to do it in code

Versions used

Akka-persistence-jdbc: 5.0.1
Slick :3.3.3

Akka version:
2.6.15

Expected Behavior

Should be able to configure the queue size

Actual Behavior

under heavy load the executor reject tasks

Relevant logs

java.util.concurrent.RejectedExecutionException: Task slick.basic.BasicBackend$DatabaseDef$$anon$4@635e1ea4 rejected from slick.util.AsyncExecutor$$anon$1$$anon$2@41e532bf[Running, pool size = 20, active threads = 3, queued tasks = 10006, completed tasks = 1962633] at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2057) at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:827) at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1357) at slick.util.AsyncExecutor$$anon$1$$anon$4.execute(AsyncExecutor.scala:161) at slick.basic.BasicBackend$DatabaseDef.scheduleSynchronousStreaming(BasicBackend.scala:302) at slick.basic.BasicBackend$DatabaseDef.scheduleSynchronousStreaming$(BasicBackend.scala:300) at slick.jdbc.JdbcBackend$DatabaseDef.scheduleSynchronousStreaming(JdbcBackend.scala:37) at slick.basic.BasicBackend$DatabaseDef.streamSynchronousDatabaseAction(BasicBackend.scala:295) at slick.basic.BasicBackend$DatabaseDef.streamSynchronousDatabaseAction$(BasicBackend.scala:293) at slick.jdbc.JdbcBackend$DatabaseDef.streamSynchronousDatabaseAction(JdbcBackend.scala:37) at slick.basic.BasicBackend$DatabaseDef.slick$basic$BasicBackend$DatabaseDef$$runInContextInline(BasicBackend.scala:240) at slick.basic.BasicBackend$DatabaseDef.runInContextSafe(BasicBackend.scala:148) at slick.basic.BasicBackend$DatabaseDef.runInContext(BasicBackend.scala:142) at slick.basic.BasicBackend$DatabaseDef.runInContext$(BasicBackend.scala:141) at slick.jdbc.JdbcBackend$DatabaseDef.runInContext(JdbcBackend.scala:37) at slick.basic.BasicBackend$DatabaseDef$$anon$1.subscribe(BasicBackend.scala:118) at akka.stream.impl.fusing.ActorGraphInterpreter$BatchingActorInputBoundary.preStart(ActorGraphInterpreter.scala:148) at akka.stream.impl.fusing.GraphInterpreter.init(GraphInterpreter.scala:306) at akka.stream.impl.fusing.GraphInterpreterShell.init(ActorGraphInterpreter.scala:619) at akka.stream.impl.fusing.ActorGraphInterpreter.tryInit(ActorGraphInterpreter.scala:727) at akka.stream.impl.fusing.ActorGraphInterpreter.preStart(ActorGraphInterpreter.scala:776) at akka.actor.Actor.aroundPreStart(Actor.scala:548) at akka.actor.Actor.aroundPreStart$(Actor.scala:548) at akka.stream.impl.fusing.ActorGraphInterpreter.aroundPreStart(ActorGraphInterpreter.scala:716) at akka.actor.ActorCell.create(ActorCell.scala:644) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:514) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:536) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:295) at akka.dispatch.Mailbox.run(Mailbox.scala:230) at akka.dispatch.Mailbox.exec(Mailbox.scala:243) at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1016) at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1665) at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1598) at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)

Reproducible Test Case

Can be produced with a Gatling test