spring-cloud/spring-cloud-stream-samples

set default clientId throws InstanceAlreadyExistsException

PrabaharanK opened this issue · 9 comments

Hi Team

I'm using Greenwich.RC2 spring cloud version for my project. I try to set client.id specification for the kafka binder and gets below exception.

How to set the client.id property?, I tried to set the property on bindings level but no impact, I shared my application.yml as well.


javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=client1-0
	at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) ~[na:1.8.0_161]
	at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) ~[na:1.8.0_161]
	at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) ~[na:1.8.0_161]
	at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) ~[na:1.8.0_161]
	at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) ~[na:1.8.0_161]
	at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) ~[na:1.8.0_161]
	at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62) ~[kafka-clients-2.0.1.jar:na]
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:791) [kafka-clients-2.0.1.jar:na]
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:615) [kafka-clients-2.0.1.jar:na]
	at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:164) [spring-kafka-2.2.6.RELEASE.jar:2.2.6.RELEASE] 



spring:
  cloud:
    stream:
      binders:
        kafka1:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: localhost
                      autoCreateTopics: true
                      configuration:
                        default:
                          key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
                          value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
                        auto.offset.reset: latest
                        client.id: client1
      bindings:
        INPUT1:
          destination: INPUT_TOPIC
          group: INPUT_GROUP1
          binder: kafka1
        INPUT2:
          destination: INPUT_TOPIC2
          group: INPUT_GROUP1
          binder: kafka1
        OUTPUT1:
          destination: OUTPUT_TOPIC
          group: INPUT_GROUP1
          binder: kafka1


Why are you using an old release candidate? The current Greenwich version is SR6 https://spring.io/projects/spring-cloud#learn

You can't set the client id that way.

You can add a ListenerContainerCustomizer @Bean and

@Bean
public ListenerContainerCustomizer<AbstractKafkaListenerContainerFactory<?, ?>> customizer() {
    return (container, destinationName, group) -> container.getContainerProperties().setClientIdPrefix(...);

Hi Gary, Thank you, I updated the version to Greenwich SR6.

I tried the below code and no-luck, still the clientid field is empty. I verified the bean method is invoked during the startup.

   @Bean
    public ListenerContainerCustomizer<AbstractKafkaListenerContainerFactory<?, ?, ?>> customizer() {
        return (container, destinationName, group) -> container.getContainerProperties().setClientId("client1");
    }

In my console, below are the properties I can see.

2020-07-27 22:39:03.730  INFO 10612 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
	auto.commit.interval.ms = 100
	auto.offset.reset = latest
	bootstrap.servers = [localhost:9092]
	check.crcs = true
	client.id = 
	connections.max.idle.ms = 540000
	default.api.timeout.ms = 60000
	enable.auto.commit = false
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = INPUT_GROUP1
	heartbeat.interval.ms = 3000
	interceptor.classes = []
	internal.leave.group.on.close = true
	isolation.level = read_uncommitted
	key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 300000
	max.poll.records = 500
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	session.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = https
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

Sorry, I made a typo in the generic type; also you must set a unique prefix for each binding:

@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer() {
	return (container, destinationName, group) -> container.getContainerProperties().setClientId(destinationName + "." + group);
}

By the way, it is not good practice to put multiple input bindings in the same group - a rebalance on one consumer will force an unnecessary rebalance on the other(s).

Hi Gary, still clientId is empty.

`@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer() {
	return (container, destinationName, group) -> container.getContainerProperties().setClientId(destinationName + "." + group);
}`

then I tried below code as well and I can see application started successfully and no trace of exception in console.

    @Bean
    public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer() {
        return (container, destinationName, group) -> {
            if(true) throw new NullPointerException("Test");
            container.getContainerProperties().setClientId(destinationName + "." + group);
        };
    }

I'm using

        <dependency>
            <!-- Import dependency management from Spring Boot -->
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.1.5.Release</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>

I'm having the same issue here, I'm trying to setup a client-id in order to be able to have clear logs when we startup the app and one cluster isn't available.

Currently the client-id of the adminclient beans are the generic ones, prefixes by adminclient and suffixed with -n with n being a number.
Such a log (adminclient-2):

2021-05-10 18:27:16.461  INFO 41669 --- [| adminclient-2] o.a.k.c.a.i.AdminMetadataManager         : [AdminClient clientId=adminclient-2] Metadata update failed

org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, deadlineMs=1620685636461) timed out at 9223372036854775807 after 1 attempt(s)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting to send the call.

I would like to change the client id so it's clear which cluster is down. This log can be reproduced with only one cluster (you can use nc -l -k 9092 to simulate a cluster accepting connections but timing out).

I can provide a minimal app if required (I'll work on it tomorrow).
I've tried the solution above without success. I also tried to set it up through several yaml properties without success.

We can set the value using the spring.kafka.admin.client-id property, but then it gets set on every admin for both clusters, so back to square one for me.

Note that if the cluster is responding, the consumer client ids are setup from the group value.

Here is a project that can be used to show this specific case: https://github.com/mborgraeve/kafka-binder-client-id-sample

Sorry, I made a typo in the generic type; also you must set a unique prefix for each binding:

@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer() {
	return (container, destinationName, group) -> container.getContainerProperties().setClientId(destinationName + "." + group);
}

By the way, it is not good practice to put multiple input bindings in the same group - a rebalance on one consumer will force an unnecessary rebalance on the other(s).

Sorry, I made a typo in the generic type; also you must set a unique prefix for each binding:

@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer() {
	return (container, destinationName, group) -> container.getContainerProperties().setClientId(destinationName + "." + group);
}

By the way, it is not good practice to put multiple input bindings in the same group - a rebalance on one consumer will force an unnecessary rebalance on the other(s).

I did not understand what you are saying here. What's the harm in giving the same groupId to different topics? Does it really cause rebalancing? Did I misunderstand? If so, can you explain why? When rebalancing, only the groupId is taken care of?@garyrussell

Your Answer : (By the way, it is not good practice to put multiple input bindings in the same group - a rebalance on one consumer will force an unnecessary rebalance on the other(s).)

Thank you @garyrussell . What you said worked. I was really able to customize client Ids. However, I have a small problem. I have 20 topics. I follow the logs. Different client Ids are assigned to the consumers of my 20 topics as I want. However, only 5 consumers have a problem afterwards. It prints the clientId log to them again and override the clientId that I assigned. New different clientIds are assigned for those 5 consumers. I don't understand why he behaved like this. I wonder if it takes my new clientId while doing rebalance. Why does this code work for some consumers and not others? Why is it suddenly assigned a different clientId? I'm doing something wrong but I don't know what.

@PrabaharanK did u solve?