set default clientId throws InstanceAlreadyExistsException
PrabaharanK opened this issue · 9 comments
Hi Team
I'm using Greenwich.RC2 spring cloud version for my project. I try to set client.id specification for the kafka binder and gets below exception.
How to set the client.id property?, I tried to set the property on bindings level but no impact, I shared my application.yml as well.
javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=client1-0
at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) ~[na:1.8.0_161]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) ~[na:1.8.0_161]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) ~[na:1.8.0_161]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) ~[na:1.8.0_161]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) ~[na:1.8.0_161]
at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) ~[na:1.8.0_161]
at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62) ~[kafka-clients-2.0.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:791) [kafka-clients-2.0.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:615) [kafka-clients-2.0.1.jar:na]
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:164) [spring-kafka-2.2.6.RELEASE.jar:2.2.6.RELEASE]
spring:
cloud:
stream:
binders:
kafka1:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost
autoCreateTopics: true
configuration:
default:
key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
auto.offset.reset: latest
client.id: client1
bindings:
INPUT1:
destination: INPUT_TOPIC
group: INPUT_GROUP1
binder: kafka1
INPUT2:
destination: INPUT_TOPIC2
group: INPUT_GROUP1
binder: kafka1
OUTPUT1:
destination: OUTPUT_TOPIC
group: INPUT_GROUP1
binder: kafka1
Why are you using an old release candidate? The current Greenwich version is SR6 https://spring.io/projects/spring-cloud#learn
You can't set the client id that way.
You can add a ListenerContainerCustomizer
@Bean
and
@Bean
public ListenerContainerCustomizer<AbstractKafkaListenerContainerFactory<?, ?>> customizer() {
return (container, destinationName, group) -> container.getContainerProperties().setClientIdPrefix(...);
Hi Gary, Thank you, I updated the version to Greenwich SR6.
I tried the below code and no-luck, still the clientid field is empty. I verified the bean method is invoked during the startup.
@Bean
public ListenerContainerCustomizer<AbstractKafkaListenerContainerFactory<?, ?, ?>> customizer() {
return (container, destinationName, group) -> container.getContainerProperties().setClientId("client1");
}
In my console, below are the properties I can see.
2020-07-27 22:39:03.730 INFO 10612 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
auto.commit.interval.ms = 100
auto.offset.reset = latest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = INPUT_GROUP1
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
Sorry, I made a typo in the generic type; also you must set a unique prefix for each binding:
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer() {
return (container, destinationName, group) -> container.getContainerProperties().setClientId(destinationName + "." + group);
}
By the way, it is not good practice to put multiple input bindings in the same group - a rebalance on one consumer will force an unnecessary rebalance on the other(s).
Hi Gary, still clientId is empty.
`@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer() {
return (container, destinationName, group) -> container.getContainerProperties().setClientId(destinationName + "." + group);
}`
then I tried below code as well and I can see application started successfully and no trace of exception in console.
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer() {
return (container, destinationName, group) -> {
if(true) throw new NullPointerException("Test");
container.getContainerProperties().setClientId(destinationName + "." + group);
};
}
I'm using
<dependency>
<!-- Import dependency management from Spring Boot -->
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.5.Release</version>
<type>pom</type>
<scope>import</scope>
</dependency>
I'm having the same issue here, I'm trying to setup a client-id in order to be able to have clear logs when we startup the app and one cluster isn't available.
Currently the client-id of the adminclient beans are the generic ones, prefixes by adminclient
and suffixed with -n
with n being a number.
Such a log (adminclient-2
):
2021-05-10 18:27:16.461 INFO 41669 --- [| adminclient-2] o.a.k.c.a.i.AdminMetadataManager : [AdminClient clientId=adminclient-2] Metadata update failed
org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, deadlineMs=1620685636461) timed out at 9223372036854775807 after 1 attempt(s)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting to send the call.
I would like to change the client id so it's clear which cluster is down. This log can be reproduced with only one cluster (you can use nc -l -k 9092
to simulate a cluster accepting connections but timing out).
I can provide a minimal app if required (I'll work on it tomorrow).
I've tried the solution above without success. I also tried to set it up through several yaml properties without success.
We can set the value using the spring.kafka.admin.client-id
property, but then it gets set on every admin for both clusters, so back to square one for me.
Note that if the cluster is responding, the consumer client ids are setup from the group
value.
Here is a project that can be used to show this specific case: https://github.com/mborgraeve/kafka-binder-client-id-sample
Sorry, I made a typo in the generic type; also you must set a unique prefix for each binding:
@Bean public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer() { return (container, destinationName, group) -> container.getContainerProperties().setClientId(destinationName + "." + group); }By the way, it is not good practice to put multiple input bindings in the same group - a rebalance on one consumer will force an unnecessary rebalance on the other(s).
Sorry, I made a typo in the generic type; also you must set a unique prefix for each binding:
@Bean public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer() { return (container, destinationName, group) -> container.getContainerProperties().setClientId(destinationName + "." + group); }By the way, it is not good practice to put multiple input bindings in the same group - a rebalance on one consumer will force an unnecessary rebalance on the other(s).
I did not understand what you are saying here. What's the harm in giving the same groupId to different topics? Does it really cause rebalancing? Did I misunderstand? If so, can you explain why? When rebalancing, only the groupId is taken care of?@garyrussell
Your Answer : (By the way, it is not good practice to put multiple input bindings in the same group - a rebalance on one consumer will force an unnecessary rebalance on the other(s).)
Thank you @garyrussell . What you said worked. I was really able to customize client Ids. However, I have a small problem. I have 20 topics. I follow the logs. Different client Ids are assigned to the consumers of my 20 topics as I want. However, only 5 consumers have a problem afterwards. It prints the clientId log to them again and override the clientId that I assigned. New different clientIds are assigned for those 5 consumers. I don't understand why he behaved like this. I wonder if it takes my new clientId while doing rebalance. Why does this code work for some consumers and not others? Why is it suddenly assigned a different clientId? I'm doing something wrong but I don't know what.
@PrabaharanK did u solve?