Allow configuration of TaskExecutor in KclMessageDrivenChannelAdapter
xargsgrep opened this issue · 8 comments
Expected Behavior
Allow overriding/customizing the implementation of TaskExecutor
in KclMessageDrivenChannelAdapter.
Current Behavior
Currently it uses a SimpleAsyncTaskExecutor
and there does not seem to be anyway to configure it to use a different TaskExecutor
implementation.
Context
SimpleAsyncTaskExecutor
does not use a thread pool and therefore keeps spawning new threads. This results in an extremely high number of threads created over the lifetime of the JVM and causes a native memory leak when using Java Flight Recorder with JDK 17.
I'm not sure in the question. There is indeed a respective setter on the KclMessageDrivenChannelAdapter
:
public void setExecutor(TaskExecutor executor) {
Assert.notNull(executor, "'executor' must not be null.");
this.executor = executor;
}
On the other hand only one thread is used from that SimpleAsyncTaskExecutor
in the KclMessageDrivenChannelAdapter
anyway:
protected void doStart() {
super.doStart();
if (ListenerMode.batch.equals(this.listenerMode) && CheckpointMode.record.equals(this.checkpointMode)) {
this.checkpointMode = CheckpointMode.batch;
logger.warn("The 'checkpointMode' is overridden from [CheckpointMode.record] to [CheckpointMode.batch] "
+ "because it does not make sense in case of [ListenerMode.batch].");
}
LifecycleConfig lifecycleConfig = this.config.lifecycleConfig().taskBackoffTimeMillis(this.consumerBackoff);
RetrievalConfig retrievalConfig =
this.config.retrievalConfig()
.glueSchemaRegistryDeserializer(this.glueSchemaRegistryDeserializer);
this.scheduler =
new Scheduler(
this.config.checkpointConfig(),
this.config.coordinatorConfig(),
this.config.leaseManagementConfig(),
lifecycleConfig,
this.config.metricsConfig(),
this.config.processorConfig(),
retrievalConfig);
this.executor.execute(this.scheduler);
}
So, it is started once and use only that one thread.
What do I miss?
Sorry, I'm a bit new to spring boot. I did notice the setter but I couldn't find a way to invoke it before KclMessageDrivenChannelAdapter
is started. How would I go about doing that?
As for the numbers of threads - what I have observed is that something is constantly creating new threads. Concurrently, there seem to be about the same number of threads as the number of shards of the kinesis stream. The below is from a thread dump in a long running application in which the kinesis binder is configured to use the KPL/KCL library.
"SimpleAsyncTaskExecutor-172710" #173020 prio=5 os_prio=0 cpu=0.54ms elapsed=0.99s tid=0x00007f3846a7aa90 nid=0x4c9f waiting on condition [0x00007f3877dfc000]
java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(java.base@17.0.6/Native Method)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.handleNoRecords(ProcessTask.java:282)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.call(ProcessTask.java:166)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:24)
at java.util.concurrent.FutureTask.run(java.base@17.0.6/FutureTask.java:264)
at java.lang.Thread.run(java.base@17.0.6/Thread.java:833)
We can see above that some instance of SimpleAsyncTaskExecutor
has created more than 172k threads over its lifetime. Since KclMessageDrivenChannelAdapter
only uses one thread, do you have any idea what could be creating all these threads?
the same number of threads as the number of shards of the kinesis stream
Well, that's possible, but it is already out of KclMessageDrivenChannelAdapter
scope. Since all those threads must not be Spring any more.
This one accepts a Kinesis stream and delegates everything to the KCL.
I can imagine a growing number of SimpleAsyncTaskExecutor
threads only if you that many binder destinations.
But that is not possible with KCL since it does not accept specific shard.
It is probably time for you to explain more what you do with your Spring Cloud Stream application and how it is configured.
Either way Spring Cloud Stream provides a hook to customize this kind of situation via ConsumerEndpointCustomizer
:
@Bean
ConsumerEndpointCustomizer<KclMessageDrivenChannelAdapter> kclEndpointCustomizer(TaskExecutor executor) {
return (endpoint, destinationName, group) -> endpoint.setExecutor(executor);
}
Thanks for the info! As for our application - we have a spring boot 3.0.5 app with a single kinesis binder (consumer only) and two kafka binder destinations. Spring cloud version is 2022.0.2. Relevant configuration is below.
spring:
cloud:
stream:
bindings:
dispatch-in-0:
content-type: application/x-protobuf;charset=UTF-8
binder: kinesis
destination: <destination>
group: <group>
consumer:
header-mode: none
auto-startup: true
dispatch-out-0:
content-type: avro/binary+twilio
destination: <destination>
binder: kafka
producer:
# we are using a custom key extractor bean as a workaround for a spring.cloud.stream bug which makes all
# bindings to use the out-0 partition-key-expression.
# partition-key-expression: payload.correlationSid
partition-key-extractor-name: kafkaPartitionKeyExtractor
partition-count: <partitionCount>
dispatch-out-1:
content-type: avro/binary+twilio
destination: <destination>
binder: kafka
producer:
partition-key-extractor-name: kafkaPartitionKeyExtractor
partition-count: <partitionCount>
kinesis:
binder:
auto-create-stream: false
kpl-kcl-enabled: true
Hopefully that helps.
OK. So, try to customize that executor as you have asked originally.
However I still don't understand how that KCL logic may pull so many threads from a SimpleAsyncTaskExecutor
...
I tried out the ConsumerEndpointCustomizer
and it worked. I set the executor to a ThreadPoolTaskExecutor
with a fixed number of threads and I no longer see an unbounded number of threads being created. Thanks for the help!
Good. So, let's treat this as a fix for the issue!