External service for kafka not working
juanli2 opened this issue ยท 76 comments
Hi,
I tried to use a NodePort type to expose the kafka service out.
Attached my service yml file.
And here's the service descrition:
$ kubectl describe svc kafka -n kafka
Name: kafka
Namespace: kafka
Labels:
Selector: app=kafka
Type: NodePort
IP: 100.73.225.207
Port: 9092/TCP
NodePort: 32093/TCP
Endpoints: 10.44.0.10:9092,10.44.0.11:9092,10.44.0.12:9092
Session Affinity: None
But when I tried using port 32039 external to connect to the kafka service it seems not working.
$ ./bin/kafka-console-producer.sh --broker-list ${master-ip}:32093 --topic test2
abc
[2016-11-18 15:26:58,157] ERROR Error when sending message to topic test2 with key: null, value: 3 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for test2-0 due to 1512 ms has passed since batch creation plus linger time
I'm pretty sure that the connection to the master-ip is working, and the port 32039 is listening in the cluster.
It works for the zookeeper, but I'm not sure why the kafka not working
The example kafka service 30service.yml
doesn't use NodePort, so it will only be accessible within the cluster. Port will be 9092 but not exposed on nodes.
If you modified the service, what kubernetes environment do you use? Does it require a LoadBalancer
type service? This repository has only focused on kafka access within the cluster.
Have you tried any of the pods in https://github.com/Yolean/kubernetes-kafka/tree/master/test ?
The access within the pod is OK. Just I want to test expose the port out. It's in my local lab. Not on cloud. So I use NodePort Type. It works well for zookeeper NodePort. But it just failed when accessed outside.
What's your suggestion if I want to expose the 9092 port out.
[2016-11-21 15:09:38,373] WARN [console-consumer-72367_JUANLI2-M-G38Q-1479712160584-889927b7-leader-finder-thread], Failed to find leader for Set(test1-0) (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
kafka.common.KafkaException: fetching topic metadata for topics [Set(test1)] from broker [ArrayBuffer(BrokerEndPoint(1,kafka-1.broker.kafka.svc.cluster.local,9092), BrokerEndPoint(2,kafka-2.broker.kafka.svc.cluster.local,9092), BrokerEndPoint(0,kafka-0.broker.kafka.svc.cluster.local,9092))] failed
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:65)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
Caused by: java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
... 3 more
It seems I need a LoadBlancer to those brokers
A kafka client that supports broker redirection will work in this scenario, just map the three internal broker endpoints to the one used in your connection string.
Having the same issue. Were you able to figure out how to expose the kafka service @juanli2 using NodePort?
I'm not sure the kafka protocol allows such a thing. See an ongoing investigation in #21. I think consumers must be able to address each broker separately, though it can get the list of brokers from one or more "bootstrap" names. Kubernetes services are inherently round-robin.
I was able to get around this using broker redirection in no-kafka. What happens is that upon connection to a broker, it sends a list of known brokers which then replace whatever endpoint you used to connect with. These are the kubernetes cluster endpoints, however, which will not be the same as what the consumer is connecting with. The broker redirection feature lets you map the external endpoint with the internal one.
Ahh right on, let me try that, and will report back!
@donstaheli Have you tried that setup with a topic that has fewer replicas than you have brokers?
No, I haven't. What is the use case demanding that can't be met with partitioning?
It's more of a gotcha than a use case. In #21 we suspect that when consumers can't resolve individual brokers (internal or external services alike) they will not know how to locate a topic unless it lives on the broker they happen to hit.
I don't think a client lib can solve that, because k8s services are round-robin. There's a discussion about that in kubernetes/kubernetes#36415.
Sorry for the late update, but I looked at the broker redirection no-kafka and this is not what I'm looking for. So, my question is similar to @juanil2. Let me explain. I used the repo to create the zookeeper and Kafka statefulsets. I was successful creating everything as advertised. I am able to access Kafka by either hitting the broker-0.broker.... and so forth or by using the Kafka.default.svc.... I then went ahead and modified the 30service.yml with type: NodePort and attempt to hit this service externally to the k8 cluster, and I get the same error @juanil2 reported. If I understand nodeport services correctly, this should work? Btw, my k8 cluster is in not in a cloud environment. Any ideas ?
I think the no-kafka docs on remapping confirm my position that Kafka isn't meant to be used through a round-robin load balancer. It might work within a cluster though, where you can - depending on config - resolve the addresses to individual brokers that you get from your initial connection through the service.
You could try one service + NodePort per kafka broker, together with remapping.
Guys,
have you figured out how to connect to kafka using AWS classic ELB?
I receive
kafkacat -b %DNS-NAME%:9092 -L
% ERROR: Failed to acquire metadata: Local: Timed out
I have TCP mapping on nginx ingress of port 9092 to service kafka/kafka:9092
data:
"2181": kafka/zookeeper:2181
"9092": kafka/kafka:9092
kind: ConfigMap
Also zookeeper is not accessible from ELB.
Internally it works fine:
./bin/kafka-topics.sh --zookeeper zookeeper:2181 --topic test2 --create --partitions 1 --replication-factor 3
./bin/kafka-console-producer.sh --broker-list kafka-0.broker:9092,kafka-1.broker:9092,kafka-2.broker:9092 --topic test2
./bin/kafka-console-consumer.sh --zookeeper zookeeper:2181 --topic test2 --from-beginning
thanks for any help
Has anyone ever got this working with an external service?
I don't think so. With a single broker it might be straightforward. With >1 you'd probably have to expose each broker externally and set advertised.listeners individually.
So I managed to get messages coming in, using a setup like this:
apiVersion: v1
kind: Service
metadata:
name: kafka-service
namespace: kafka
spec:
ports:
- port: 9092
name: kafka-port
targetPort: 9092
protocol: TCP
selector:
app: kafka
type: LoadBalancer
And changing 50kafka.yml (note I am using your 0.11 branch of the project)
./bin/kafka-server-start.sh
config/server.properties
--override log.retention.hours=-1
--override log.dirs=/var/lib/kafka/data/topics
--override broker.id=${HOSTNAME##*-}
--override advertised.listeners=PLAINTEXT://foo-bar.us-west-2.elb.amazonaws.com:9092
--override default.replication.factor=3
--override num.partitions=3
I can connect externally and produce messages, but for some reason the Kafka cluster under replicates topics and I get a lot of this in the Kafka logs:
(kafka.server.ReplicaFetcherThread)
[2017-07-20 11:47:56,940] INFO [ReplicaFetcherThread-0-0]: Retrying leaderEpoch request for partition san1-2 as the leader reported an error: UNKNOWN_TOPIC_OR_PARTITION
Do you have any ideas why this might be happening?
Ben
I guess brokers won't be able to resolve each other at the .elb.amazonaws.com
name. Maybe you could add the local name as well to advertised listerners? Using ${HOSTNAME##*-}
to make it specific per broker pod. I think producers will be ok even if they only find one of the listeners.
BOSH!
That was it, but instead of using advertising listeners I used listeners (listeners is for internal brokers) so it now look like this:
./bin/kafka-server-start.sh
config/server.properties
--override log.retention.hours=-1
--override auto.leader.rebalance.enable=true
--override log.dirs=/var/lib/kafka/data/topics
--override broker.id=${HOSTNAME##*-}
--override advertised.listeners=PLAINTEXT://foo-bar.us-west-2.elb.amazonaws.com:9092
--override listeners=PLAINTEXT://${HOSTNAME##*-}:9092
--override default.replication.factor=3
--override num.partitions=3
Output from sangrenel before (note the error rate):
2017/07/20 11:59:35 [ topic: logs ]
> Messages: 31Mb/sec @ 13501 msgs/sec. | error rate 47.41%
> Batches: 27.00 batches/sec. | 2.15903s p99 | 547.176ms HMean | 332.936ms Min | 2.167833s Max
332.936ms - 516.425ms --------------------------------------------------
516.425ms - 699.915ms -
699.915ms - 883.405ms -
883.405ms - 1.066895s -
1.066895s - 1.250384s -
1.250384s - 1.433874s --------
1.433874s - 1.617364s -
1.617364s - 1.800854s -------------------------------
1.800854s - 1.984343s -
1.984343s - 2.167833s -
After:
> Messages: 75Mb/sec @ 32701 msgs/sec. | error rate 0.00%
> Batches: 65.40 batches/sec. | 536.672ms p99 | 376.556ms HMean | 335.445ms Min | 552.701ms Max
2017/07/20 13:48:59 [ topic: san2 ]
335.445ms - 357.171ms ----------
357.171ms - 378.897ms --------------------------------------------------
378.897ms - 400.622ms ----------------
400.622ms - 422.348ms -
422.348ms - 444.073ms -
444.073ms - 465.799ms -
465.799ms - 487.524ms -
487.524ms - 509.25ms -
509.25ms - 530.975ms --
530.975ms - 552.701ms --
So the only thing left is to make the advertised.listeners get it's value from the service (which I can work out)
Nice!
Why is it --override listeners=PLAINTEXT://${HOSTNAME##*-}:9092
? I would have guessed instead --override listeners=PLAINTEXT://kafka-${HOSTNAME##*-}:9092
.
two of the Kafka brokers kept dying....
Note that in the kafka-011
branch I'm experimenting with resource limits, and they frequently cause OOMKilled. I paused that work to do solsson/dockerfiles#5 before I try more tuning.
Got it working ๐
Changed the command: to:
--override log.retention.hours=-1
--override log.dirs=/var/lib/kafka/data/topics
--override broker.id=${HOSTNAME##*-}
--override listener.security.protocol.map=INTERNAL_PLAINTEXT:PLAINTEXT,EXTERNAL_PLAINTEXT:PLAINTEXT
--override advertised.listeners="INTERNAL_PLAINTEXT://${HOSTNAME}.broker.kafka.svc.cluster.local:9092,EXTERNAL_PLAINTEXT://$(eval wget -t3 -T2 -qO- http://169.254.169.254/latest/meta-data/public-hostname):9093"
--override listeners=INTERNAL_PLAINTEXT://0.0.0.0:9092,EXTERNAL_PLAINTEXT://0.0.0.0:9093
--override inter.broker.listener.name=INTERNAL_PLAINTEXT
--override auto.create.topics.enable=true # Just our internal config
--override auto.leader.rebalance.enable=true # Just our internal config
--override num.partitions=3 # Just our internal config
--override default.replication.factor=3 # Just our internal config
You then need to change the ports: to this:
ports:
- containerPort: 9092
- containerPort: 9093
hostPort: 9093
Finally you need to open up security groups if needed. You can then connect using ec2-blah.us-blah.com:9093 without dropping any messages:
> Messages: 50Mb/sec @ 21901 msgs/sec. | error rate 0.00%
> Batches: 43.80 batches/sec. | 979.979ms p99 | 545.219ms HMean | 341.991ms Min | 1.080252s Max
341.991ms - 415.817ms --
415.817ms - 489.643ms --------------------------------------------------
489.643ms - 563.469ms -------------------------------------
563.469ms - 637.295ms ------------------------------
637.295ms - 711.121ms ----------
711.121ms - 784.947ms ----------
784.947ms - 858.774ms ---
858.774ms - 932.6ms -
932.6ms - 1.006426s -
1.006426s - 1.080252s -
So this is just for AWS, but for anything else just change this line: EXTERNAL_PLAINTEXT://$(eval wget -t3 -T2 -qO- http://169.254.169.254/latest/meta-data/public-hostname):9093
to get a routable hostname.
Thanks,
Ben
I'd like to make an "addon" PR with this, but let me get the 0.11 branch ready for merge first.
What kind of access control do you plan on using? I'm curious if it can affect the choice of listeners.
For us at the moment, PLAINTEXT, but we will be moving towards a more secure method soon(ish)
Hi @solsson, as I understand you have added code of @BenjaminDavison and now not need add all --override options which he did, just need to modify 10broker-config.yml.
Is it correct?
Can you explain me please how I can connect to kafka cluster using AWS loadbalancer?
Appreciate
Does my configuration correct:
listeners=INTERNAL_PLAINTEXT://0.0.0.0:9092,EXTERNAL_PLAINTEXT://0.0.0.0:9093
advertised.listeners="INTERNAL_PLAINTEXT://${HOSTNAME}.broker.kafka.svc.cluster.local:9092,EXTERNAL_PLAINTEXT://$(eval wget -t3 -T2 -qO- http://169.254.169.254/latest/meta-data/public-hostname):9093"
listener.security.protocol.map=INTERNAL_PLAINTEXT:PLAINTEXT,EXTERNAL_PLAINTEXT:PLAINTEXT
Because I get exception:
2017-07-31T13:42:34.268980421Z java.lang.IllegalArgumentException: Error creating broker listeners from '"INTERNAL_PLAINTEXT://${HOSTNAME}.broker.kafka.svc.cluster.local:9092,EXTERNAL_PLAINTEXT://$(eval wget -t3 -T2 -qO- http://169.254.169.254/latest/meta-data/public-hostname):9093"': Unable to parse "INTERNAL_PLAINTEXT://${HOSTNAME}.broker.kafka.svc.cluster.local:9092 to a broker endpoint
you have added code of @BenjaminDavison and now
Until now we've only collaborated through this issue. This means I can't really explain how. The documentation is above ๐.
not need add all --override options which he did, just need to modify 10broker-config.yml.
Correct. #30 has been merged to master now. The above work was done prior to that change though, hence the override
s.
In a few days I will have done some more touchup of kafka 0.11 and rolled it out to our QA cluster. After that I might look into new features.
In case someone would like to start on an addon-style PR, there are different places for config with different purpose:
--override
can still be used, but only for static values (since bfe7e31)- Config file changes in 10broker-config.yml
- Tricks applied through
init.sh
. This is where I suggest thewget
used for AWS name lookup is placed.
The reason for introducing init.sh
is that I can't see any maintainable way to do things like #41 with bash one-liners embedded in bash -c
command
s.
hi @solsson thank you for answer, can you upload Dockerfile with init.sh as well?
It will be helpful
There is no dockerfile with init.sh. It lives in the ConfigMap
. https://github.com/Yolean/kubernetes-kafka/blob/master/10broker-config.yml#L7
The init container(s) can be any image really, with any entrypoint, as noted in my ongoing work with rack awareness: e919456#diff-5b024340a1fdf98f2a6e4be02b8c4d30R17
Thank you @solsson got it working....
In server.properties need to add inter.broker.listener.name as well.....
Now that https://github.com/Yolean/kubernetes-kafka/releases/tag/v2.0.0 is released, should we try to make a PR based on the success stories above? What's the current status on these solutions?
If isn't asking for much do you guys have the yaml that got this working?
@yevgeniyo can you post your yaml? It was pre-2.0.0 but I think there were no more changes to the core setup after that point.
Hi guys @cairesvs, @solsson, please find it here....
kafka-external.zip
Thanks @solsson and @yevgeniyo!
Right now I'm fighting against the EXTERNAL_HOSTNAME
because I don't have a public ip.
I got this error when running the command bin/kafka-console-producer.sh --broker-list my-load-balancer.us-east-1.elb.amazonaws.com:9093 --topic topicName
[2017-08-17 18:15:52,059] WARN Connection to node 0 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2017-08-17 18:15:52,208] WARN Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2017-08-17 18:15:52,258] WARN Connection to node 0 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2017-08-17 18:15:52,405] WARN Connection to node 2 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
Do you guys have any insight?
I managed to get the producer send messages from outside change the EXTERNAL_HOSTNAME
to the aws elb.
But the consumer still have problem with receiving the messages.
Running the command bin/kafka-console-consumer.sh --bootstrap-server my-load-balancer.us-east-1.elb.amazonaws.com:9093 --topic topicName --from-beginning
sometimes receives the message but not always. I saw that you guys are using the old consumer way using the --zookeeper
flag, anyone tested with --bootstrap-server
?
Yup, unfortunately seems like the aws elb causes intermittence on both producer and consumer ๐ข
On producer I got verbose error
[2017-08-17 21:22:22,940] WARN Got error produce response with correlation id 185 on topic-partition caires-0, retrying (2 attempts left). Error: NOT_LEADER_FOR_PARTITION (org.apache.kafka.clients.producer.internals.Sender)
[2017-08-17 21:22:23,175] WARN Got error produce response with correlation id 187 on topic-partition caires-0, retrying (1 attempts left). Error: NOT_LEADER_FOR_PARTITION (org.apache.kafka.clients.producer.internals.Sender)
[2017-08-17 21:22:23,408] WARN Got error produce response with correlation id 189 on topic-partition caires-0, retrying (0 attempts left). Error: NOT_LEADER_FOR_PARTITION (org.apache.kafka.clients.producer.internals.Sender)
[2017-08-17 21:22:23,644] ERROR Error when sending message to topic caires with key: null, value: 7 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
@cairesvs I'm facing similar issue as you described.
My Kafka pods exposes node port and advertise both the internal and public hostname. (They listen to different port)
Kafka seems to be able to return the right listeners based on the port connected.
But it seems that accessing kafka from external listener cause the intermittent issue you described. I tried using internal listener and things seems to go well. Need to test more.
I been messing with this for a full day and my intuition is something is wrong when using external listeners.
Will update more when I have more information.
I have solved my problem.
For me, it is that if I was using nodeport and trying to target a specific node. That didn't work because nodeport will send the traffic to kube proxy first and kube proxy end up sending the traffic to all the pods selected, in a round robin fashion.
This cause the intermittent issues I observed. I totally misunderstood how nodeport service worked in this case.
So please note that exposing each node that hosting kafka pod with a public ip/hostname is important. Depend on which port the kafka receive the traffic, it will only advertise the endpoint with the same port (thats how it figure out internal/external traffic). Kafka will let the client know which broker have the partition/replica and the client will fetch it itself.
Thus, I don't foresee load balancing work very well.
Understanding these have helped me nailed my issue. This thread help a lot. Thanks everyone.
So please note that exposing each node that hosting kafka pod with a public ip/hostname is important.
By "node" you mean each broker, I suppose? The conclusion is similar to that in #21.
I've started to investigate replication to a different cluster. Kafka best practices state state that you place the replication mechanism (MirrorMaker etc) in the destination cluster, to avoid data loss in case of network partition. It would be interesting to investigate a pattern for exposing a single NodePort/LoadBalancer to consumers only. I think this already works in the special case with a topic having number of replicas equal to number of brokers.
i think i have similar problem , kubernetes installed with lxd on localhost,
when i run this inside kafka-0 container
$ kafka-console-producer.sh --topic test --broker-list localhost:9092
[2017-09-15 06:17:11,870] WARN Error while fetching metadata with correlation id 1 : {dese3sdfsdf3=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2017-09-15 06:17:12,046] WARN Error while fetching metadata with correlation id 5 : {dese3sdfsdf3=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2017-09-15 06:17:12,265] WARN Error while fetching metadata with correlation id 6 : {dese3sdfsdf3=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2017-09-15 06:17:12,593] WARN Error while fetching metadata with correlation id 7 : {dese3sdfsdf3=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2017-09-15 06:17:12,856] WARN Error while fetching metadata with correlation id 8 : {dese3sdfsdf3=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2017-09-15 06:17:12,999] WARN Error while fetching metadata with correlation id 9 : {dese3sdfsdf3=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
my config is
advertised.host.name = null
advertised.listeners = OUTSIDE://kafka-0.kafka.kafka-cluster.svc.cluster.local:9094,INSIDE://:9092
advertised.port = null
alter.config.policy.class.name = null
authorizer.class.name =
auto.create.topics.enable = true
auto.leader.rebalance.enable = true
background.threads = 10
broker.id = 0
broker.id.generation.enable = true
broker.rack = null
compression.type = producer
connections.max.idle.ms = 600000
controlled.shutdown.enable = true
controlled.shutdown.max.retries = 3
controlled.shutdown.retry.backoff.ms = 5000
controller.socket.timeout.ms = 30000
create.topic.policy.class.name = null
default.replication.factor = 3
delete.records.purgatory.purge.interval.requests = 1
delete.topic.enable = true
fetch.purgatory.purge.interval.requests = 1000
group.initial.rebalance.delay.ms = 0
group.max.session.timeout.ms = 300000
group.min.session.timeout.ms = 6000
host.name =
inter.broker.listener.name = INSIDE
inter.broker.protocol.version = 0.11.0-IV2
leader.imbalance.check.interval.seconds = 300
leader.imbalance.per.broker.percentage = 10
listener.security.protocol.map = INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
listeners = OUTSIDE://:9094,INSIDE://:9092
log.cleaner.backoff.ms = 15000
log.cleaner.dedupe.buffer.size = 134217728
log.cleaner.delete.retention.ms = 86400000
log.cleaner.enable = true
log.cleaner.io.buffer.load.factor = 0.9
log.cleaner.io.buffer.size = 524288
log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
log.cleaner.min.cleanable.ratio = 0.5
log.cleaner.min.compaction.lag.ms = 0
log.cleaner.threads = 1
log.cleanup.policy = [delete]
log.dir = /tmp/kafka-logs
log.dirs = /kafka/kafka-logs-kafka-0
log.flush.interval.messages = 9223372036854775807
log.flush.interval.ms = null
log.flush.offset.checkpoint.interval.ms = 60000
log.flush.scheduler.interval.ms = 9223372036854775807
log.flush.start.offset.checkpoint.interval.ms = 60000
log.index.interval.bytes = 4096
log.index.size.max.bytes = 10485760
log.message.format.version = 0.11.0-IV2
log.message.timestamp.difference.max.ms = 9223372036854775807
log.message.timestamp.type = CreateTime
log.preallocate = false
log.retention.bytes = -1
log.retention.check.interval.ms = 300000
log.retention.hours = 168
log.retention.minutes = null
log.retention.ms = null
log.roll.hours = 168
log.roll.jitter.hours = 0
log.roll.jitter.ms = null
log.roll.ms = null
log.segment.bytes = 1073741824
log.segment.delete.delay.ms = 60000
max.connections.per.ip = 2147483647
max.connections.per.ip.overrides =
message.max.bytes = 1000012
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
min.insync.replicas = 1
num.io.threads = 8
num.network.threads = 3
num.partitions = 20
num.recovery.threads.per.data.dir = 1
num.replica.fetchers = 1
offset.metadata.max.bytes = 4096
offsets.commit.required.acks = -1
offsets.commit.timeout.ms = 5000
offsets.load.buffer.size = 5242880
offsets.retention.check.interval.ms = 600000
offsets.retention.minutes = 1440
offsets.topic.compression.codec = 0
offsets.topic.num.partitions = 50
offsets.topic.replication.factor = 1
offsets.topic.segment.bytes = 104857600
port = 9092
principal.builder.class = class org.apache.kafka.common.security.auth.DefaultPrincipalBuilder
producer.purgatory.purge.interval.requests = 1000
queued.max.requests = 500
quota.consumer.default = 9223372036854775807
quota.producer.default = 9223372036854775807
quota.window.num = 11
quota.window.size.seconds = 1
replica.fetch.backoff.ms = 1000
replica.fetch.max.bytes = 1048576
replica.fetch.min.bytes = 1
replica.fetch.response.max.bytes = 10485760
replica.fetch.wait.max.ms = 500
replica.high.watermark.checkpoint.interval.ms = 5000
replica.lag.time.max.ms = 10000
replica.socket.receive.buffer.bytes = 65536
replica.socket.timeout.ms = 30000
replication.quota.window.num = 11
replication.quota.window.size.seconds = 1
request.timeout.ms = 30000
reserved.broker.max.id = 1000
sasl.enabled.mechanisms = [GSSAPI]
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.principal.to.local.rules = [DEFAULT]
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism.inter.broker.protocol = GSSAPI
security.inter.broker.protocol = PLAINTEXT
socket.receive.buffer.bytes = 102400
socket.request.max.bytes = 104857600
socket.send.buffer.bytes = 102400
ssl.cipher.suites = null
ssl.client.auth = none
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.abort.timed.out.transaction.cleanup.interval.ms = 60000
transaction.max.timeout.ms = 900000
transaction.remove.expired.transaction.cleanup.interval.ms = 3600000
transaction.state.log.load.buffer.size = 5242880
transaction.state.log.min.isr = 1
transaction.state.log.num.partitions = 50
transaction.state.log.replication.factor = 1
transaction.state.log.segment.bytes = 104857600
transactional.id.expiration.ms = 604800000
unclean.leader.election.enable = false
zookeeper.connect = zk-0.zk-svc.kafka-cluster.svc.cluster.local:2181,zk-1.zk-svc.kafka-cluster.svc.cluster.local:2181,zk-2.zk-svc.kafka-cluster.svc.cluster.local:2181
zookeeper.connection.timeout.ms = 6000
zookeeper.session.timeout.ms = 6000
zookeeper.set.acl = false
zookeeper.sync.time.ms = 2000
Also trying to figure out how to go about exposing the brokers externally. Wrote up a SO question for some help from the Kubernetes community: https://stackoverflow.com/questions/46456239/how-to-expose-an-headless-service-for-a-statefulset-externally-in-kubernetes
Found this to be very useful: kubernetes/kubernetes#44103
Trying to wrap my head around how to do this as well
If I understand the root of the problem is that Kafka requires 1:1 mapping
kafka-{0, โฆ, n}.broker.kafka.svc.cluster.local:9092
<->
some-external-address-to-broker-{0, โฆ, n}
?
Is this a correct understanding of the "current state of the world" for solving this problem:
-
Tie the second address to
ec2-instance-address-to-broker-{0, โฆ, n}
(#13 (comment) and #13 (comment)). This assumes one broker per node (?) and is therefore similar to the SO link shared by @nadirabid (https://stackoverflow.com/a/46468433/2171758) (?) -
Create one service per broker and then expose each one by one (similar to the kubernetes issue linked above)
-
Other?
Hey all,
Just wondering if anyone has been able to get this working via an ELB and/or ingress endpoint? Unfortunately the NodePort method will not work in my case as nodes do not have external addresses.
I would be alright manually setting a hostname (either the dynamically-generate ELB hostname or otherwise), however this does not appear to be working currently in my initial tests using kafkacat
. I used yevgeniyo
's zip above (https://github.com/Yolean/kubernetes-kafka/files/1231420/kafka-external.zip) and tried simply modifying the export EXTERNAL_HOSTNAME
line in init.sh
under 10broker-config.yml
to point to a ELB created previously. However, there appears to be a communication error when testing producing and consuming via the external endpoint. I'm wondering if this is due to the brokers giving the client an internal-only set of hostnames to connect to (or something like that). Unfortunately I don't have much in terms of error logs at the moment.
If anyone has got this working, I'd greatly appreciate some advice.
Thanks!
EDIT: One possibility I'm thinking about is, if each broker needs to addressed by the client individually, whether setting up a service-per-broker and associating either a LoadBalancer
or ingress with each would solve the apparent issue of having an single intermediary load balancer. This would obviously not be as dynamically-scalable since an increase in the number of broker replicas would also require additional services and/or ingresses, however it could be an acceptable tradeoff.
I've been struggling through this problem also, though not with an ELB, more just getting an n-node kafka cluster fully accessible externally. @nadirabid - your own posted solution to your SO question is pretty much where I've ended up too. Internally, all the brokers can run on port 9092, externally they all need different port numbers (I use 3209${KAFKA_BROKER_ID}) so that I can use a separate nodeport service per broker what will allow access to each separate broker externally without any port clashes when broker pods end up on the same k8s node.
So the only remaining "manual" part of the cluster scaling process is those individual services needed per broker. Wouldn't it be nice if this was somehow possible through a service template within the stateful set definition?
All in all much better than my earlier conclusion that we wouldn't be able to use a statefulset and would instead have to manage a separate deployment per broker in the cluster!
getting an n-node kafka cluster fully accessible externally
I find that a good scope for this repo. Load Balancers and Ingress are cluster specific, but if NodePort works it will probably help those cases too.
There's many interesting insights in this thread. What are our options?
- Service templates (@comdw did you have a specific idea here?)
- Individual service manifests that address each broker (I guess that's not possible atm; would involve the wider community)
- Expose all these ports on every broker, through use some config+init trick to listen only to one of them.
- Some kind of stateless TCP level proxy that routes consistently between ports and brokers.
More? What's your conclusion on listeners
/advertised.listeners
and listener.security.protocol.map
?
We ended up going with pretty much the same solution as I figured out earlier on.
But, we use pod affinity to make sure we don't end up with Kafka instances on the same box (good practice anyway) and that solves clashing ports for us.
Ben
@solsson, I'm new to all this, but the StatefulSet already allows you to dictate that you want a persistent volume per instance, so why not a nodeport service per instance. This would require support specifically in kubernetes though and it might be too much of a niche thing.
I added kubectl label pods ${HOSTNAME} podname=${HOSTNAME}
so maybe we could use another kubectl command here to add the nodeport service (guess it would need to check if it already exists etc, and they would never get deleted except manually)?
My services example for 3 broker cluster:
# Currently have to manually scale this up to cater for the number of brokers you want in your clusters
---
apiVersion: v1
kind: Service
metadata:
name: brokerexternal-0
spec:
ports:
- port: 32090
targetPort: 32090
nodePort: 32090
type: "NodePort"
selector:
podname: kafka-0
---
apiVersion: v1
kind: Service
metadata:
name: brokerexternal-1
spec:
ports:
- port: 32091
targetPort: 32091
nodePort: 32091
type: "NodePort"
selector:
podname: kafka-1
---
apiVersion: v1
kind: Service
metadata:
name: brokerexternal-2
spec:
ports:
- port: 32092
targetPort: 32092
nodePort: 32092
type: "NodePort"
selector:
podname: kafka-2
I guess @BenjaminDavison you are saying you don't need external services at all if you use pod affinity (the containerPort: 9093
line opens this port externally on each node without an explicit service?). I'm currently only using minikube so can't really test this - externally I've only been able to export ports 30000-32000.
Great, let me know if there's something to try out.
@comdw Yup that's correct.
Bah, looks like Kafka schema registry doesn't support the listener security protocol map
No endpoints found for security protocol [PLAINTEXT]. Endpoints found in ZK [{EXTERNAL_PLAINTEXT=kafkaserver-0:32090, INTERNAL_PLAINTEXT=kafka-0.broker.default.svc.cluster.local:9092}]
Maybe it doesn't need to? It should always be on the inside, I guess. Can we simply rearrange the list?
Edit: ah, just saw that that error can't be from the PR. The PR keeps PLAINTEXT for inside, hence schema registry might be unaffected.
Sorry yes, that was slightly off topic as looks like just a schema registry limitation at the moment. I've worked around it for now by configuring my listeners as "PLAINTEXT" and "EXTERNAL_PLAINTEXT".
... however the separate external/named listeners approach will not work for all Kafka clients that are external to the k8s cluster. E.g. Kafka Tool - its able to obtain the Kafka cluster info via ZK, but errors trying to load topics and consumers, complaining that it can't reach the broker. I suspect this is because its trying to connect specifically to the listener named "PLAINTEXT" as discovered from ZK (rather like schema registry does, but that's ok if its running in the same k8s cluster), which can't be accessed from outside. Kafka tool worked with a single node, one advertised listener address:port that was reachable both inside and outside.
My own producer/consumer clients running outside the cluster do work with a broker/bootstrap config that initially talks to any broker and somehow the response is clever enough to only advertise the external addresses at that point. I'm still not confident I fully understand whats going on in these different cases though!
I could perhaps rework configuration to go back to using the same FQDN and port internally and externally when I have time to experiment.
as discovered from ZK
I think that type of client is deprecated. Maybe the introduction of multiple listeners was a reason for such a deprecation. I guess the host used as bootstrap must be matched to one of the listener types internally, in order to return broker hosts of the same kind. Also I don't see a reason why Zookeeper should have an external service.
The only reason I saw the need to expose zookeeper was because we're using Sarama for the broker clients and there's no way to create topics from that client. So we were going to use the kafka tools (which last I checked did need access to Zookeeper).
We've gone with a different approach entirely (a REST based service) which allows us more control over that process. Exposing Zookeeper outside of Kubernetes seems to be fraught with security issues that I'd rather not entertain.
If you're working on this and don't understand why your hostPort
is not appearing in netstat
, and you're provisioning your cluster using kops
in an internal network, you might be running into kubernetes/kops#3132. This is if you're using Calico, Flannel or any of those.
Edit: Calico actually now enables portmap by default, so it works.
Fixed with the merge of #78.
Alongs these I've been thinking that it would be quite easy to set up kafka access over one kubectl port-forward
per broker.
I solved it in the following way using port-forward and without having to expose kafka to outside world (tiny modification of the solsson/kafka setup):
My init.sh ConfigMap
OUTSIDE_HOST=localhost
if [ $? -ne 0 ]; then
echo "Outside (i.e. cluster-external access) host lookup command failed"
else
OUTSIDE_PORT=3240${KAFKA_BROKER_ID}
sed -i "s|#init#advertised.listeners=OUTSIDE://#init#|advertised.listeners=OUTSIDE://${OUTSIDE_HOST}:${OUTSIDE_PORT}|" /etc/kafka/server.properties
ANNOTATIONS="$ANNOTATIONS kafka-listener-outside-host=$OUTSIDE_HOST kafka-listener-outside-port=$OUTSIDE_PORT"
fi
Broker exposes 9094 and then you get OUTSIDE addresses of your nodes with ports 3240 + "ID OF BROKER" so in case of 3 node setup this would be 32400 32401 32402.
Then i create 4 port forwards
kubectl -n kafka port-forward pod/kafka-0 9094
kubectl -n kafka port-forward pod/kafka-0 32400:9092
kubectl -n kafka port-forward pod/kafka-1 32401:9092
kubectl -n kafka port-forward pod/kafka-2 32402:9092
This way when client asks for list of nodes from a broker at localhost:9094 it will say: Hey I have 3 nodes their addresses are as follows:
localhost:32400
localhost:32401
localhost:32402
Which is correct because porf-forward maps them correctly from your local machines. And you're ready to debug.
Producer is set to localhost:9094. Consumers are set to high ports with corresponding client id equal to last integer of the high number so client-0 to 32400, client-1 to 32401, client-2 to 32402 just so i can make sense of which one is which. I used 3 consumers to test the group consumption and it works fine. To be honest I just want to debug my code and then run everything inside as standard 9092 but it took me way too much time to solve this (thanks to this thread as well). Inside/Outside setup is very cool but I think many people are confused or don't know that broker setting is just to get a list of nodes and that list is produced from how you launch kafka nodes themselves. Inside/Outside mapped on different ports 9092 (regular) 9094 (outside) is a good way to have dual nodelist available for setup. It would be beneficial to document [outside] ability so people can write/debug code against kafka from their local workstations.
@styk-tv Thanks for sharing this. I've also struggled with Kafka's listener model. I've set up mirror-maker using docker-compose with the 9092 ports by mapping brokers' kubernetes names to local containers. See https://gist.github.com/solsson/22e773ab148596ec39d54b7cc153ee67. Like your solution it is based on kubectl port-forward, which is complicated in containers until kubernetes/kubernetes#46517 gets merged.
With #172 it could be easier. Using Updating Broker Configs #78 can probably be implemented (and customized) in a way that is persistent in ZooKeeper and takes precedence over server.properties.
is there a branch with external broker endpoints?
@solsson thanks for the answer
I followed the instructions in The outside-services Readme on local docker kubernetes and couldn't implement the dynamic externalIP part because local docker node has no externalIP attribute!
On the same deployment I have another service configured similarly which I can access from browser and curl. so I suspect that it might be a Kafka related issue and not kubernetes
curl -s http://localhost:30120/sso/secured/healthcheck/status
UP%
โ ~ kubectl describe svc sso Name: sso Namespace: default Labels: app=sso Annotations: <none> Selector: app=sso,app_group=rtp,tier=backend Type: NodePort IP: 10.104.249.237 LoadBalancer Ingress: localhost Port: sso-debug 8000/TCP TargetPort: 8000/TCP NodePort: sso-debug 30121/TCP Endpoints: 10.1.3.104:8000 Port: sso 8080/TCP TargetPort: 8080/TCP NodePort: sso 30120/TCP Endpoints: 10.1.3.104:8080 Session Affinity: None External Traffic Policy: Cluster Events: <none> โ ~ kubectl describe svc outside-0 --namespace kafka Name: outside-0 Namespace: kafka Labels: <none> Annotations: kubectl.kubernetes.io/last-applied-configuration={"apiVersion":"v1","kind":"Service","metadata":{"annotations":{},"name":"outside-0","namespace":"kafka"},"spec":{"ports":[{"nodePort":32400,"port":3240... Selector: app=kafka,kafka-broker-id=0 Type: NodePort IP: 10.100.78.46 LoadBalancer Ingress: localhost Port: <unset> 32400/TCP TargetPort: 9094/TCP NodePort: <unset> 32400/TCP Endpoints: 10.1.3.100:9094 Session Affinity: None External Traffic Policy: Cluster Events: <none> โ ~
The services are the same pattern
This port acts differently to curl
`โ ~ curl http://localhost:32400
curl: (52) Empty reply from server
โ ~ curl http://localhost:32406
curl: (7) Failed to connect to localhost port 32406: Connection refused
โ ~`
but a producer can't connect
I deployed to GKE too on different nodes to no avail
Hi,
I have a problem exposing my kafka broker to external public using the classic load balancer.
Here is my startup command
kafka-server-start.sh /etc/kafka/server.properties \
--override broker.id=${HOSTNAME##*-} \
--override listener.security.protocol.map=INTERNAL_PLAINTEXT:PLAINTEXT,EXTERNAL_PLAINTEXT:PLAINTEXT \
--override advertised.listeners="EXTERNAL_PLAINTEXT://$(eval wget -t3 -T2 -qO- http://169.254.169.254/latest/meta-data/public-hostname):9092,INTERNAL_PLAINTEXT://${HOSTNAME}.broker.kafka.svc.cluster.local:29092" \
--override advertised.host.name="$(eval wget -t3 -T2 -qO- http://169.254.169.254/latest/meta-data/public-hostname)" \
--override advertised.port=9092 \
--override listeners=INTERNAL_PLAINTEXT://0.0.0.0:29092,EXTERNAL_PLAINTEXT://0.0.0.0:9092 \
--override inter.broker.listener.name=INTERNAL_PLAINTEXT \
--override auto.create.topics.enable=true \
--override auto.leader.rebalance.enable=true \
--override zookeeper.connect=pzoo:2181 \
--override background.threads=10 \
--override compression.type=producer \
--override delete.topic.enable=false \
--override leader.imbalance.check.interval.seconds=300 \
--override leader.imbalance.per.broker.percentage=10 \
--override message.max.bytes=1000012 \
--override min.insync.replicas=1 \
--override num.io.threads=8 \
--override num.network.threads=3 \
--override num.recovery.threads.per.data.dir=1 \
--override num.replica.fetchers=1 \
--override offset.metadata.max.bytes=4096 \
--override offsets.commit.required.acks=-1 \
--override offsets.commit.timeout.ms=5000 \
--override offsets.load.buffer.size=5242880 \
--override offsets.retention.check.interval.ms=600000 \
--override offsets.retention.minutes=10080 \
--override offsets.topic.compression.codec=0 \
--override offsets.topic.num.partitions=50 \
--override offsets.topic.replication.factor=1 \
--override offsets.topic.segment.bytes=104857600 \
--override queued.max.requests=500 \
--override quota.consumer.default=9223372036854775807 \
--override quota.producer.default=9223372036854775807 \
--override replica.fetch.min.bytes=1 \
--override replica.fetch.wait.max.ms=500 \
--override replica.high.watermark.checkpoint.interval.ms=5000 \
--override replica.lag.time.max.ms=10000 \
--override replica.socket.receive.buffer.bytes=65536 \
--override replica.socket.timeout.ms=80000 \
--override request.timeout.ms=80000 \
--override socket.receive.buffer.bytes=102400 \
--override socket.request.max.bytes=104857600 \
--override socket.send.buffer.bytes=102400 \
--override unclean.leader.election.enable=false \
--override zookeeper.session.timeout.ms=6000 \
--override zookeeper.set.acl=false \
--override broker.id.generation.enable=true \
--override connections.max.idle.ms=600000 \
--override controlled.shutdown.enable=true \
--override controlled.shutdown.max.retries=3 \
--override controlled.shutdown.retry.backoff.ms=5000 \
--override controller.socket.timeout.ms=30000 \
--override default.replication.factor=1 \
--override fetch.purgatory.purge.interval.requests=1000 \
--override group.max.session.timeout.ms=300000 \
--override group.min.session.timeout.ms=6000 \
--override inter.broker.protocol.version=2.0-IV1 \
--override log.retention.hours=-1 \
--override log.dirs=/var/lib/kafka/data/topics \
--override log.cleaner.backoff.ms=15000 \
--override log.cleaner.dedupe.buffer.size=134217728 \
--override log.cleaner.delete.retention.ms=86400000 \
--override log.cleaner.enable=true \
--override log.cleaner.io.buffer.load.factor=0.9 \
--override log.cleaner.io.buffer.size=524288 \
--override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 \
--override log.cleaner.min.cleanable.ratio=0.5 \
--override log.cleaner.min.compaction.lag.ms=0 \
--override log.cleaner.threads=1 \
--override log.cleanup.policy=delete \
--override log.index.interval.bytes=4096 \
--override log.index.size.max.bytes=10485760 \
--override log.message.timestamp.difference.max.ms=9223372036854775807 \
--override log.message.timestamp.type=CreateTime \
--override log.preallocate=false \
--override log.retention.check.interval.ms=300000 \
--override max.connections.per.ip=2147483647 \
--override num.partitions=1 \
--override producer.purgatory.purge.interval.requests=1000 \
--override replica.fetch.backoff.ms=1000 \
--override replica.fetch.max.bytes=1048576 \
--override replica.fetch.response.max.bytes=10485760 \
--override reserved.broker.max.id=1000 \
--override log.flush.interval.messages=9223372036854775807 \
--override log.flush.offset.checkpoint.interval.ms=60000 \
--override log.flush.scheduler.interval.ms=9223372036854775807 \
--override log.retention.bytes=-1 \
--override log.retention.hours=168 \
--override log.roll.hours=168 \
--override log.roll.jitter.hours=0 \
--override log.segment.bytes=1073741824 \
--override log.segment.delete.delay.ms=60000 \
--override zookeeper.connection.timeout.ms=6000
My Statefulset manifest :
---
apiVersion: v1
kind: Service
metadata:
name: broker
namespace: kafka
spec:
ports:
- port: 29092
name: internal
# [podname].broker.kafka.svc.cluster.local
clusterIP: None
selector:
app: kafka
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: kafka-persistent-storage
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 10Gi
storageClassName: fasta
---
kind: Service
apiVersion: v1
metadata:
name: kafka-svc
annotations:
dns.alpha.kubernetes.io/external: kafka.preprod.example.com
service.beta.kubernetes.io/aws-load-balancer-additional-resource-tags: "Name=kafka, Environment=preprod"
spec:
externalTrafficPolicy: Local
ports:
- port: 9092
selector:
app: kafka
type: LoadBalancer
sessionAffinity: None
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: kafka
namespace: kafka
spec:
selector:
matchLabels:
app: kafka
serviceName: "broker"
replicas: 1
updateStrategy:
type: OnDelete
template:
metadata:
labels:
app: kafka
annotations:
spec:
terminationGracePeriodSeconds: 30
initContainers:
- name: init-config
image: hub.example.com/kafka:v1
env:
- name: NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: POD_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
command: ['/bin/bash', '/etc/kafka-configmap/init.sh']
volumeMounts:
- name: configmap
mountPath: /etc/kafka-configmap
- name: config
mountPath: /etc/kafka
containers:
- name: broker
image: hub.example.com/kafka-preprod:v8
env:
- name: KAFKA_LOG4J_OPTS
value: -Dlog4j.configuration=file:/etc/kafka/log4j.properties
- name: JMX_PORT
value: "5555"
ports:
- name: inside
containerPort: 29092
- name: outside
containerPort: 9092
- name: jmx
containerPort: 5555
command: ['/bin/bash', '/opt/kafka/bin/kafka-endpoint']
resources:
requests:
cpu: 100m
memory: 512Mi
readinessProbe:
tcpSocket:
port: 9092
timeoutSeconds: 1
volumeMounts:
- name: config
mountPath: /etc/kafka
- name: kafka-persistent-storage
mountPath: /var/lib/kafka/data
imagePullSecrets:
- name: nexus
volumes:
- name: configmap
configMap:
name: broker-config
- name: config
emptyDir: {}
volumeClaimTemplates:
- metadata:
name: kafka-persistent-storage
spec:
accessModes: [ "ReadWriteOnce" ]
storageClassName: fasta
resources:
requests:
storage: 10Gi
command: ['/bin/bash', '/opt/kafka/bin/kafka-endpoint']
is a bash script which has the kafka startup command with all the above mentioned override options.
Now when I try to consme messages from the external DNS kafka.preprod.example.com:9092
, I get the error,
kafkacat -C -b kafka.preprod.example.com:9092 -t testing -v Thu Nov 22 13:22:32 2018
% ERROR: Local: Broker transport failure: ec2-11-222-33-444.us-east-2.compute.amazonaws.com:9092/0: Connect to ipv4#11.222.33.444:9092 failed: Operation timed out (after 75477ms in state CONNECT)
and when I try to publish a message, I get an error:
kafkacat -P -b kafka.preprod.example.com:9092 -t testing Thu Nov 22 13:19:15 2018
%4|1542873132.444|REQTMOUT|rdkafka#producer-1| [thrd:kafka.preprod.example.com:9092/bootstrap]: kafka.preprod.example.com:9092/bootstrap: Timed out 1 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests
%4|1542873138.451|REQTMOUT|rdkafka#producer-1| [thrd:kafka.preprod.example.com:9092/bootstrap]: kafka.preprod.example.com:9092/bootstrap: Timed out 0 in-flight, 60 retry-queued, 0 out-queue, 0 partially-sent requests
%4|1542873144.470|REQTMOUT|rdkafka#producer-1| [thrd:kafka.preprod.example.com:9092/bootstrap]: kafka.preprod.example.com:9092/bootstrap: Timed out 0 in-flight, 60 retry-queued, 0 out-queue, 0 partially-sent requests
Also, I do not see any errors in my kafka logs.
(kafka.server.KafkaConfig)
[2018-11-22 07:48:26,100] INFO [ThrottledChannelReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2018-11-22 07:48:26,100] INFO [ThrottledChannelReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2018-11-22 07:48:26,105] INFO [ThrottledChannelReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2018-11-22 07:48:26,136] INFO Loading logs. (kafka.log.LogManager)
[2018-11-22 07:48:26,199] INFO [Log partition=testing-2, dir=/var/lib/kafka/data/topics] Recovering unflushed segment 0 (kafka.log.Log)
[2018-11-22 07:48:26,204] INFO [Log partition=testing-2, dir=/var/lib/kafka/data/topics] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2018-11-22 07:48:26,233] INFO [Log partition=testing-2, dir=/var/lib/kafka/data/topics] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2018-11-22 07:48:26,235] INFO [Log partition=testing-2, dir=/var/lib/kafka/data/topics] Completed load of log with 1 segments, log start offset 0 and log end offset 0 in 63 ms (kafka.log.Log)
[2018-11-22 07:48:26,255] INFO [Log partition=testing-0, dir=/var/lib/kafka/data/topics] Recovering unflushed segment 0 (kafka.log.Log)
[2018-11-22 07:48:26,255] INFO [Log partition=testing-0, dir=/var/lib/kafka/data/topics] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2018-11-22 07:48:26,256] INFO [Log partition=testing-0, dir=/var/lib/kafka/data/topics] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2018-11-22 07:48:26,257] INFO [Log partition=testing-0, dir=/var/lib/kafka/data/topics] Completed load of log with 1 segments, log start offset 0 and log end offset 0 in 3 ms (kafka.log.Log)
[2018-11-22 07:48:26,262] INFO [Log partition=testing-4, dir=/var/lib/kafka/data/topics] Recovering unflushed segment 0 (kafka.log.Log)
[2018-11-22 07:48:26,262] INFO [Log partition=testing-4, dir=/var/lib/kafka/data/topics] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2018-11-22 07:48:26,264] INFO [Log partition=testing-4, dir=/var/lib/kafka/data/topics] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2018-11-22 07:48:26,264] INFO [Log partition=testing-4, dir=/var/lib/kafka/data/topics] Completed load of log with 1 segments, log start offset 0 and log end offset 0 in 3 ms (kafka.log.Log)
[2018-11-22 07:48:26,271] INFO [Log partition=testing-1, dir=/var/lib/kafka/data/topics] Recovering unflushed segment 0 (kafka.log.Log)
[2018-11-22 07:48:26,271] INFO [Log partition=testing-1, dir=/var/lib/kafka/data/topics] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2018-11-22 07:48:26,272] INFO [Log partition=testing-1, dir=/var/lib/kafka/data/topics] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2018-11-22 07:48:26,273] INFO [Log partition=testing-1, dir=/var/lib/kafka/data/topics] Completed load of log with 1 segments, log start offset 0 and log end offset 0 in 4 ms (kafka.log.Log)
[2018-11-22 07:48:26,278] INFO [Log partition=testing-5, dir=/var/lib/kafka/data/topics] Recovering unflushed segment 0 (kafka.log.Log)
[2018-11-22 07:48:26,279] INFO [Log partition=testing-5, dir=/var/lib/kafka/data/topics] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2018-11-22 07:48:26,280] INFO [Log partition=testing-5, dir=/var/lib/kafka/data/topics] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2018-11-22 07:48:26,280] INFO [Log partition=testing-5, dir=/var/lib/kafka/data/topics] Completed load of log with 1 segments, log start offset 0 and log end offset 0 in 3 ms (kafka.log.Log)
[2018-11-22 07:48:26,288] INFO [Log partition=testing-3, dir=/var/lib/kafka/data/topics] Recovering unflushed segment 0 (kafka.log.Log)
[2018-11-22 07:48:26,288] INFO [Log partition=testing-3, dir=/var/lib/kafka/data/topics] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2018-11-22 07:48:26,290] INFO [Log partition=testing-3, dir=/var/lib/kafka/data/topics] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2018-11-22 07:48:26,290] INFO [Log partition=testing-3, dir=/var/lib/kafka/data/topics] Completed load of log with 1 segments, log start offset 0 and log end offset 0 in 3 ms (kafka.log.Log)
[2018-11-22 07:48:26,295] INFO Logs loading complete in 159 ms. (kafka.log.LogManager)
[2018-11-22 07:48:26,309] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
[2018-11-22 07:48:26,311] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
[2018-11-22 07:48:26,608] INFO Awaiting socket connections on 0.0.0.0:29092. (kafka.network.Acceptor)
[2018-11-22 07:48:26,640] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
[2018-11-22 07:48:26,652] INFO [SocketServer brokerId=0] Started 2 acceptor threads (kafka.network.SocketServer)
[2018-11-22 07:48:26,678] INFO [ExpirationReaper-0-Produce]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2018-11-22 07:48:26,680] INFO [ExpirationReaper-0-Fetch]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2018-11-22 07:48:26,686] INFO [ExpirationReaper-0-DeleteRecords]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2018-11-22 07:48:26,697] INFO [LogDirFailureHandler]: Starting (kafka.server.ReplicaManager$LogDirFailureHandler)
[2018-11-22 07:48:26,746] INFO Creating /brokers/ids/0 (is it secure? false) (kafka.zk.KafkaZkClient)
[2018-11-22 07:48:26,750] INFO Result of znode creation at /brokers/ids/0 is: OK (kafka.zk.KafkaZkClient)
[2018-11-22 07:48:26,751] INFO Registered broker 0 at path /brokers/ids/0 with addresses: ArrayBuffer(EndPoint(ec2-18-219-76-229.us-east-2.compute.amazonaws.com,9092,ListenerName(EXTERNAL_PLAINTEXT),PLAINTEXT), EndPoint(kafka-0.broker.ashokleyland.svc.cluster.local,29092,ListenerName(INTERNAL_PLAINTEXT),PLAINTEXT)) (kafka.zk.KafkaZkClient)
[2018-11-22 07:48:26,845] INFO [ExpirationReaper-0-topic]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2018-11-22 07:48:26,847] INFO [ExpirationReaper-0-Heartbeat]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2018-11-22 07:48:26,849] INFO Creating /controller (is it secure? false) (kafka.zk.KafkaZkClient)
[2018-11-22 07:48:26,856] INFO Result of znode creation at /controller is: OK (kafka.zk.KafkaZkClient)
[2018-11-22 07:48:26,872] INFO [ExpirationReaper-0-Rebalance]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2018-11-22 07:48:26,893] INFO [GroupCoordinator 0]: Starting up. (kafka.coordinator.group.GroupCoordinator)
[2018-11-22 07:48:26,913] INFO [GroupCoordinator 0]: Startup complete. (kafka.coordinator.group.GroupCoordinator)
[2018-11-22 07:48:26,922] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 9 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2018-11-22 07:48:26,938] INFO [ProducerId Manager 0]: Acquired new producerId block (brokerId:0,blockStartProducerId:3000,blockEndProducerId:3999) by writing to Zk with path version 4 (kafka.coordinator.transaction.ProducerIdManager)
[2018-11-22 07:48:26,991] INFO [TransactionCoordinator id=0] Starting up. (kafka.coordinator.transaction.TransactionCoordinator)
[2018-11-22 07:48:26,998] INFO [Transaction Marker Channel Manager 0]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager)
[2018-11-22 07:48:26,998] INFO [TransactionCoordinator id=0] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)
[2018-11-22 07:48:27,111] INFO [/config/changes-event-process-thread]: Starting (kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread)
[2018-11-22 07:48:27,139] INFO [SocketServer brokerId=0] Started processors for 2 acceptors (kafka.network.SocketServer)
[2018-11-22 07:48:27,153] INFO Kafka version : 2.0.0 (org.apache.kafka.common.utils.AppInfoParser)
[2018-11-22 07:48:27,153] INFO Kafka commitId : 3402a8361b734732 (org.apache.kafka.common.utils.AppInfoParser)
[2018-11-22 07:48:27,158] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
[2018-11-22 07:48:27,224] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions testing-5,testing-0,testing-2,testing-1,testing-3,testing-4 (kafka.server.ReplicaFetcherManager)
[2018-11-22 07:48:27,237] INFO Replica loaded for partition testing-5 with initial high watermark 0 (kafka.cluster.Replica)
[2018-11-22 07:48:27,240] INFO [Partition testing-5 broker=0] testing-5 starts at Leader Epoch 0 from offset 0. Previous Leader Epoch was: -1 (kafka.cluster.Partition)
[2018-11-22 07:48:27,256] INFO Replica loaded for partition testing-2 with initial high watermark 0 (kafka.cluster.Replica)
[2018-11-22 07:48:27,256] INFO [Partition testing-2 broker=0] testing-2 starts at Leader Epoch 0 from offset 0. Previous Leader Epoch was: -1 (kafka.cluster.Partition)
[2018-11-22 07:48:27,258] INFO Replica loaded for partition testing-3 with initial high watermark 0 (kafka.cluster.Replica)
[2018-11-22 07:48:27,259] INFO [Partition testing-3 broker=0] testing-3 starts at Leader Epoch 0 from offset 0. Previous Leader Epoch was: -1 (kafka.cluster.Partition)
[2018-11-22 07:48:27,262] INFO Replica loaded for partition testing-0 with initial high watermark 0 (kafka.cluster.Replica)
[2018-11-22 07:48:27,264] INFO [Partition testing-0 broker=0] testing-0 starts at Leader Epoch 0 from offset 0. Previous Leader Epoch was: -1 (kafka.cluster.Partition)
[2018-11-22 07:48:27,267] INFO Replica loaded for partition testing-4 with initial high watermark 0 (kafka.cluster.Replica)
[2018-11-22 07:48:27,267] INFO [Partition testing-4 broker=0] testing-4 starts at Leader Epoch 0 from offset 0. Previous Leader Epoch was: -1 (kafka.cluster.Partition)
[2018-11-22 07:48:27,269] INFO Replica loaded for partition testing-1 with initial high watermark 0 (kafka.cluster.Replica)
[2018-11-22 07:48:27,269] INFO [Partition testing-1 broker=0] testing-1 starts at Leader Epoch 0 from offset 0. Previous Leader Epoch was: -1 (kafka.cluster.Partition)
[2018-11-22 07:48:27,282] INFO [ReplicaAlterLogDirsManager on broker 0] Added fetcher for partitions List() (kafka.server.ReplicaAlterLogDirsManager)
[2018-11-22 07:58:26,894] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
I also tried forsy creating a Load balancer service and then using its DNS name as an advertised.listener
. Then too it would throw the same error.
If anybody could help me, it would be really great. Kindly let me know if more details are required.
Thanks!
@ridicule777 Does this differ from the other cases of name resolution outside cluster that have been discussed in this repo several times? Clients must be able to access all kafka brokers on individual listener addresses known to kafka.