spring-projects/spring-integration-aws

Allow configuration of TaskExecutor in KclMessageDrivenChannelAdapter

xargsgrep opened this issue · 8 comments

Expected Behavior

Allow overriding/customizing the implementation of TaskExecutor in KclMessageDrivenChannelAdapter.

Current Behavior

Currently it uses a SimpleAsyncTaskExecutor and there does not seem to be anyway to configure it to use a different TaskExecutor implementation.

Context

SimpleAsyncTaskExecutor does not use a thread pool and therefore keeps spawning new threads. This results in an extremely high number of threads created over the lifetime of the JVM and causes a native memory leak when using Java Flight Recorder with JDK 17.

I'm not sure in the question. There is indeed a respective setter on the KclMessageDrivenChannelAdapter:

	public void setExecutor(TaskExecutor executor) {
		Assert.notNull(executor, "'executor' must not be null.");
		this.executor = executor;
	}

On the other hand only one thread is used from that SimpleAsyncTaskExecutor in the KclMessageDrivenChannelAdapter anyway:

	protected void doStart() {
		super.doStart();

		if (ListenerMode.batch.equals(this.listenerMode) && CheckpointMode.record.equals(this.checkpointMode)) {
			this.checkpointMode = CheckpointMode.batch;
			logger.warn("The 'checkpointMode' is overridden from [CheckpointMode.record] to [CheckpointMode.batch] "
					+ "because it does not make sense in case of [ListenerMode.batch].");
		}

		LifecycleConfig lifecycleConfig = this.config.lifecycleConfig().taskBackoffTimeMillis(this.consumerBackoff);
		RetrievalConfig retrievalConfig =
				this.config.retrievalConfig()
						.glueSchemaRegistryDeserializer(this.glueSchemaRegistryDeserializer);

		this.scheduler =
				new Scheduler(
						this.config.checkpointConfig(),
						this.config.coordinatorConfig(),
						this.config.leaseManagementConfig(),
						lifecycleConfig,
						this.config.metricsConfig(),
						this.config.processorConfig(),
						retrievalConfig);

		this.executor.execute(this.scheduler);
	}

So, it is started once and use only that one thread.

What do I miss?

Sorry, I'm a bit new to spring boot. I did notice the setter but I couldn't find a way to invoke it before KclMessageDrivenChannelAdapter is started. How would I go about doing that?

As for the numbers of threads - what I have observed is that something is constantly creating new threads. Concurrently, there seem to be about the same number of threads as the number of shards of the kinesis stream. The below is from a thread dump in a long running application in which the kinesis binder is configured to use the KPL/KCL library.

"SimpleAsyncTaskExecutor-172710" #173020 prio=5 os_prio=0 cpu=0.54ms elapsed=0.99s tid=0x00007f3846a7aa90 nid=0x4c9f waiting on condition  [0x00007f3877dfc000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
	at java.lang.Thread.sleep(java.base@17.0.6/Native Method)
	at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.handleNoRecords(ProcessTask.java:282)
	at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.call(ProcessTask.java:166)
	at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49)
	at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:24)
	at java.util.concurrent.FutureTask.run(java.base@17.0.6/FutureTask.java:264)
	at java.lang.Thread.run(java.base@17.0.6/Thread.java:833)

We can see above that some instance of SimpleAsyncTaskExecutor has created more than 172k threads over its lifetime. Since KclMessageDrivenChannelAdapter only uses one thread, do you have any idea what could be creating all these threads?

the same number of threads as the number of shards of the kinesis stream

Well, that's possible, but it is already out of KclMessageDrivenChannelAdapter scope. Since all those threads must not be Spring any more.
This one accepts a Kinesis stream and delegates everything to the KCL.
I can imagine a growing number of SimpleAsyncTaskExecutor threads only if you that many binder destinations.
But that is not possible with KCL since it does not accept specific shard.

It is probably time for you to explain more what you do with your Spring Cloud Stream application and how it is configured.

Either way Spring Cloud Stream provides a hook to customize this kind of situation via ConsumerEndpointCustomizer:

		@Bean 
		ConsumerEndpointCustomizer<KclMessageDrivenChannelAdapter> kclEndpointCustomizer(TaskExecutor executor) {
			return (endpoint, destinationName, group) -> endpoint.setExecutor(executor);
		} 

Thanks for the info! As for our application - we have a spring boot 3.0.5 app with a single kinesis binder (consumer only) and two kafka binder destinations. Spring cloud version is 2022.0.2. Relevant configuration is below.

spring:
  cloud:
    stream:
      bindings:
        dispatch-in-0:
          content-type: application/x-protobuf;charset=UTF-8
          binder: kinesis
          destination: <destination>
          group: <group>
          consumer:
            header-mode: none
            auto-startup: true

        dispatch-out-0:
          content-type: avro/binary+twilio
          destination: <destination>
          binder: kafka
          producer:
            # we are using a custom key extractor bean as a workaround for a spring.cloud.stream bug which makes all
            # bindings to use the out-0 partition-key-expression.
            #           partition-key-expression: payload.correlationSid
            partition-key-extractor-name: kafkaPartitionKeyExtractor
            partition-count: <partitionCount>

        dispatch-out-1:
          content-type: avro/binary+twilio
          destination: <destination>
          binder: kafka
          producer:
            partition-key-extractor-name: kafkaPartitionKeyExtractor
            partition-count: <partitionCount>

      kinesis:
        binder:
          auto-create-stream: false
          kpl-kcl-enabled: true

Hopefully that helps.

OK. So, try to customize that executor as you have asked originally.

However I still don't understand how that KCL logic may pull so many threads from a SimpleAsyncTaskExecutor...

I tried out the ConsumerEndpointCustomizer and it worked. I set the executor to a ThreadPoolTaskExecutor with a fixed number of threads and I no longer see an unbounded number of threads being created. Thanks for the help!

Good. So, let's treat this as a fix for the issue!