Yolean/kubernetes-kafka

External service for kafka not working

juanli2 opened this issue ยท 76 comments

Hi,
I tried to use a NodePort type to expose the kafka service out.
Attached my service yml file.

And here's the service descrition:
$ kubectl describe svc kafka -n kafka
Name: kafka
Namespace: kafka
Labels:
Selector: app=kafka
Type: NodePort
IP: 100.73.225.207
Port: 9092/TCP
NodePort: 32093/TCP
Endpoints: 10.44.0.10:9092,10.44.0.11:9092,10.44.0.12:9092
Session Affinity: None

But when I tried using port 32039 external to connect to the kafka service it seems not working.

$ ./bin/kafka-console-producer.sh --broker-list ${master-ip}:32093 --topic test2
abc
[2016-11-18 15:26:58,157] ERROR Error when sending message to topic test2 with key: null, value: 3 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for test2-0 due to 1512 ms has passed since batch creation plus linger time

I'm pretty sure that the connection to the master-ip is working, and the port 32039 is listening in the cluster.

It works for the zookeeper, but I'm not sure why the kafka not working

The example kafka service 30service.yml doesn't use NodePort, so it will only be accessible within the cluster. Port will be 9092 but not exposed on nodes.

If you modified the service, what kubernetes environment do you use? Does it require a LoadBalancer type service? This repository has only focused on kafka access within the cluster.

Have you tried any of the pods in https://github.com/Yolean/kubernetes-kafka/tree/master/test ?

The access within the pod is OK. Just I want to test expose the port out. It's in my local lab. Not on cloud. So I use NodePort Type. It works well for zookeeper NodePort. But it just failed when accessed outside.

What's your suggestion if I want to expose the 9092 port out.

[2016-11-21 15:09:38,373] WARN [console-consumer-72367_JUANLI2-M-G38Q-1479712160584-889927b7-leader-finder-thread], Failed to find leader for Set(test1-0) (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
kafka.common.KafkaException: fetching topic metadata for topics [Set(test1)] from broker [ArrayBuffer(BrokerEndPoint(1,kafka-1.broker.kafka.svc.cluster.local,9092), BrokerEndPoint(2,kafka-2.broker.kafka.svc.cluster.local,9092), BrokerEndPoint(0,kafka-0.broker.kafka.svc.cluster.local,9092))] failed
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:65)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
Caused by: java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
... 3 more

It seems I need a LoadBlancer to those brokers

A kafka client that supports broker redirection will work in this scenario, just map the three internal broker endpoints to the one used in your connection string.

Having the same issue. Were you able to figure out how to expose the kafka service @juanli2 using NodePort?

I'm not sure the kafka protocol allows such a thing. See an ongoing investigation in #21. I think consumers must be able to address each broker separately, though it can get the list of brokers from one or more "bootstrap" names. Kubernetes services are inherently round-robin.

I was able to get around this using broker redirection in no-kafka. What happens is that upon connection to a broker, it sends a list of known brokers which then replace whatever endpoint you used to connect with. These are the kubernetes cluster endpoints, however, which will not be the same as what the consumer is connecting with. The broker redirection feature lets you map the external endpoint with the internal one.

Ahh right on, let me try that, and will report back!

@donstaheli Have you tried that setup with a topic that has fewer replicas than you have brokers?

No, I haven't. What is the use case demanding that can't be met with partitioning?

It's more of a gotcha than a use case. In #21 we suspect that when consumers can't resolve individual brokers (internal or external services alike) they will not know how to locate a topic unless it lives on the broker they happen to hit.

I don't think a client lib can solve that, because k8s services are round-robin. There's a discussion about that in kubernetes/kubernetes#36415.

Sorry for the late update, but I looked at the broker redirection no-kafka and this is not what I'm looking for. So, my question is similar to @juanil2. Let me explain. I used the repo to create the zookeeper and Kafka statefulsets. I was successful creating everything as advertised. I am able to access Kafka by either hitting the broker-0.broker.... and so forth or by using the Kafka.default.svc.... I then went ahead and modified the 30service.yml with type: NodePort and attempt to hit this service externally to the k8 cluster, and I get the same error @juanil2 reported. If I understand nodeport services correctly, this should work? Btw, my k8 cluster is in not in a cloud environment. Any ideas ?

I think the no-kafka docs on remapping confirm my position that Kafka isn't meant to be used through a round-robin load balancer. It might work within a cluster though, where you can - depending on config - resolve the addresses to individual brokers that you get from your initial connection through the service.

You could try one service + NodePort per kafka broker, together with remapping.

Guys,

have you figured out how to connect to kafka using AWS classic ELB?

I receive
kafkacat -b %DNS-NAME%:9092 -L
% ERROR: Failed to acquire metadata: Local: Timed out

I have TCP mapping on nginx ingress of port 9092 to service kafka/kafka:9092

data:
"2181": kafka/zookeeper:2181
"9092": kafka/kafka:9092
kind: ConfigMap

Also zookeeper is not accessible from ELB.

Internally it works fine:
./bin/kafka-topics.sh --zookeeper zookeeper:2181 --topic test2 --create --partitions 1 --replication-factor 3
./bin/kafka-console-producer.sh --broker-list kafka-0.broker:9092,kafka-1.broker:9092,kafka-2.broker:9092 --topic test2
./bin/kafka-console-consumer.sh --zookeeper zookeeper:2181 --topic test2 --from-beginning

thanks for any help

Has anyone ever got this working with an external service?

I don't think so. With a single broker it might be straightforward. With >1 you'd probably have to expose each broker externally and set advertised.listeners individually.

So I managed to get messages coming in, using a setup like this:

apiVersion: v1
kind: Service
metadata:
  name: kafka-service
  namespace: kafka
spec:
  ports:
  - port: 9092
    name: kafka-port
    targetPort: 9092
    protocol: TCP
  selector:
    app: kafka
  type: LoadBalancer

And changing 50kafka.yml (note I am using your 0.11 branch of the project)

          ./bin/kafka-server-start.sh
          config/server.properties
          --override log.retention.hours=-1
          --override log.dirs=/var/lib/kafka/data/topics
          --override broker.id=${HOSTNAME##*-}
          --override advertised.listeners=PLAINTEXT://foo-bar.us-west-2.elb.amazonaws.com:9092
          --override default.replication.factor=3
          --override num.partitions=3

I can connect externally and produce messages, but for some reason the Kafka cluster under replicates topics and I get a lot of this in the Kafka logs:

(kafka.server.ReplicaFetcherThread)
[2017-07-20 11:47:56,940] INFO [ReplicaFetcherThread-0-0]: Retrying leaderEpoch request for partition san1-2 as the leader reported an error: UNKNOWN_TOPIC_OR_PARTITION 

Do you have any ideas why this might be happening?

Ben

I guess brokers won't be able to resolve each other at the .elb.amazonaws.com name. Maybe you could add the local name as well to advertised listerners? Using ${HOSTNAME##*-} to make it specific per broker pod. I think producers will be ok even if they only find one of the listeners.

BOSH!

That was it, but instead of using advertising listeners I used listeners (listeners is for internal brokers) so it now look like this:

          ./bin/kafka-server-start.sh
          config/server.properties
          --override log.retention.hours=-1
          --override auto.leader.rebalance.enable=true
          --override log.dirs=/var/lib/kafka/data/topics
          --override broker.id=${HOSTNAME##*-}
          --override advertised.listeners=PLAINTEXT://foo-bar.us-west-2.elb.amazonaws.com:9092
          --override listeners=PLAINTEXT://${HOSTNAME##*-}:9092
          --override default.replication.factor=3
          --override num.partitions=3

Output from sangrenel before (note the error rate):

2017/07/20 11:59:35 [ topic: logs ]
> Messages: 31Mb/sec @ 13501 msgs/sec. | error rate 47.41%
> Batches: 27.00 batches/sec. | 2.15903s p99 | 547.176ms HMean | 332.936ms Min | 2.167833s Max
332.936ms - 516.425ms --------------------------------------------------
516.425ms - 699.915ms -
699.915ms - 883.405ms -
883.405ms - 1.066895s -
1.066895s - 1.250384s -
1.250384s - 1.433874s --------
1.433874s - 1.617364s -
1.617364s - 1.800854s -------------------------------
1.800854s - 1.984343s -
1.984343s - 2.167833s -

After:

> Messages: 75Mb/sec @ 32701 msgs/sec. | error rate 0.00%
> Batches: 65.40 batches/sec. | 536.672ms p99 | 376.556ms HMean | 335.445ms Min | 552.701ms Max
2017/07/20 13:48:59 [ topic: san2 ]
335.445ms - 357.171ms ----------
357.171ms - 378.897ms --------------------------------------------------
378.897ms - 400.622ms ----------------
400.622ms - 422.348ms -
422.348ms - 444.073ms -
444.073ms - 465.799ms -
465.799ms - 487.524ms -
487.524ms - 509.25ms -
509.25ms - 530.975ms --
530.975ms - 552.701ms --

So the only thing left is to make the advertised.listeners get it's value from the service (which I can work out)

Nice!

Why is it --override listeners=PLAINTEXT://${HOSTNAME##*-}:9092? I would have guessed instead --override listeners=PLAINTEXT://kafka-${HOSTNAME##*-}:9092.

two of the Kafka brokers kept dying....

Note that in the kafka-011 branch I'm experimenting with resource limits, and they frequently cause OOMKilled. I paused that work to do solsson/dockerfiles#5 before I try more tuning.

Got it working ๐Ÿ˜„

Changed the command: to:

          --override log.retention.hours=-1
          --override log.dirs=/var/lib/kafka/data/topics
          --override broker.id=${HOSTNAME##*-}
          --override listener.security.protocol.map=INTERNAL_PLAINTEXT:PLAINTEXT,EXTERNAL_PLAINTEXT:PLAINTEXT
          --override advertised.listeners="INTERNAL_PLAINTEXT://${HOSTNAME}.broker.kafka.svc.cluster.local:9092,EXTERNAL_PLAINTEXT://$(eval wget -t3 -T2 -qO-  http://169.254.169.254/latest/meta-data/public-hostname):9093"
          --override listeners=INTERNAL_PLAINTEXT://0.0.0.0:9092,EXTERNAL_PLAINTEXT://0.0.0.0:9093
          --override inter.broker.listener.name=INTERNAL_PLAINTEXT
          --override auto.create.topics.enable=true # Just our internal config
          --override auto.leader.rebalance.enable=true # Just our internal config
          --override num.partitions=3 # Just our internal config
          --override default.replication.factor=3 # Just our internal config

You then need to change the ports: to this:

        ports:
        - containerPort: 9092
        - containerPort: 9093
          hostPort: 9093

Finally you need to open up security groups if needed. You can then connect using ec2-blah.us-blah.com:9093 without dropping any messages:

> Messages: 50Mb/sec @ 21901 msgs/sec. | error rate 0.00%
> Batches: 43.80 batches/sec. | 979.979ms p99 | 545.219ms HMean | 341.991ms Min | 1.080252s Max
341.991ms - 415.817ms --
415.817ms - 489.643ms --------------------------------------------------
489.643ms - 563.469ms -------------------------------------
563.469ms - 637.295ms ------------------------------
637.295ms - 711.121ms ----------
711.121ms - 784.947ms ----------
784.947ms - 858.774ms ---
 858.774ms - 932.6ms -
 932.6ms - 1.006426s -
1.006426s - 1.080252s -

So this is just for AWS, but for anything else just change this line: EXTERNAL_PLAINTEXT://$(eval wget -t3 -T2 -qO- http://169.254.169.254/latest/meta-data/public-hostname):9093 to get a routable hostname.

Thanks,

Ben

I'd like to make an "addon" PR with this, but let me get the 0.11 branch ready for merge first.

What kind of access control do you plan on using? I'm curious if it can affect the choice of listeners.

For us at the moment, PLAINTEXT, but we will be moving towards a more secure method soon(ish)

Hi @solsson, as I understand you have added code of @BenjaminDavison and now not need add all --override options which he did, just need to modify 10broker-config.yml.
Is it correct?
Can you explain me please how I can connect to kafka cluster using AWS loadbalancer?
Appreciate

Does my configuration correct:

listeners=INTERNAL_PLAINTEXT://0.0.0.0:9092,EXTERNAL_PLAINTEXT://0.0.0.0:9093

advertised.listeners="INTERNAL_PLAINTEXT://${HOSTNAME}.broker.kafka.svc.cluster.local:9092,EXTERNAL_PLAINTEXT://$(eval wget -t3 -T2 -qO- http://169.254.169.254/latest/meta-data/public-hostname):9093"

listener.security.protocol.map=INTERNAL_PLAINTEXT:PLAINTEXT,EXTERNAL_PLAINTEXT:PLAINTEXT

Because I get exception:

2017-07-31T13:42:34.268980421Z java.lang.IllegalArgumentException: Error creating broker listeners from '"INTERNAL_PLAINTEXT://${HOSTNAME}.broker.kafka.svc.cluster.local:9092,EXTERNAL_PLAINTEXT://$(eval wget -t3 -T2 -qO- http://169.254.169.254/latest/meta-data/public-hostname):9093"': Unable to parse "INTERNAL_PLAINTEXT://${HOSTNAME}.broker.kafka.svc.cluster.local:9092 to a broker endpoint

you have added code of @BenjaminDavison and now

Until now we've only collaborated through this issue. This means I can't really explain how. The documentation is above ๐Ÿ˜„.

not need add all --override options which he did, just need to modify 10broker-config.yml.

Correct. #30 has been merged to master now. The above work was done prior to that change though, hence the overrides.

In a few days I will have done some more touchup of kafka 0.11 and rolled it out to our QA cluster. After that I might look into new features.

In case someone would like to start on an addon-style PR, there are different places for config with different purpose:

  • --override can still be used, but only for static values (since bfe7e31)
  • Config file changes in 10broker-config.yml
  • Tricks applied through init.sh. This is where I suggest the wget used for AWS name lookup is placed.

The reason for introducing init.sh is that I can't see any maintainable way to do things like #41 with bash one-liners embedded in bash -c commands.

hi @solsson thank you for answer, can you upload Dockerfile with init.sh as well?
It will be helpful

There is no dockerfile with init.sh. It lives in the ConfigMap. https://github.com/Yolean/kubernetes-kafka/blob/master/10broker-config.yml#L7

The init container(s) can be any image really, with any entrypoint, as noted in my ongoing work with rack awareness: e919456#diff-5b024340a1fdf98f2a6e4be02b8c4d30R17

Thank you @solsson got it working....
In server.properties need to add inter.broker.listener.name as well.....

Now that https://github.com/Yolean/kubernetes-kafka/releases/tag/v2.0.0 is released, should we try to make a PR based on the success stories above? What's the current status on these solutions?

If isn't asking for much do you guys have the yaml that got this working?

@yevgeniyo can you post your yaml? It was pre-2.0.0 but I think there were no more changes to the core setup after that point.

Hi guys @cairesvs, @solsson, please find it here....
kafka-external.zip

Thanks @solsson and @yevgeniyo!

Right now I'm fighting against the EXTERNAL_HOSTNAME because I don't have a public ip.

I got this error when running the command bin/kafka-console-producer.sh --broker-list my-load-balancer.us-east-1.elb.amazonaws.com:9093 --topic topicName

[2017-08-17 18:15:52,059] WARN Connection to node 0 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2017-08-17 18:15:52,208] WARN Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2017-08-17 18:15:52,258] WARN Connection to node 0 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2017-08-17 18:15:52,405] WARN Connection to node 2 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

Do you guys have any insight?

I managed to get the producer send messages from outside change the EXTERNAL_HOSTNAME to the aws elb.
But the consumer still have problem with receiving the messages.
Running the command bin/kafka-console-consumer.sh --bootstrap-server my-load-balancer.us-east-1.elb.amazonaws.com:9093 --topic topicName --from-beginning sometimes receives the message but not always. I saw that you guys are using the old consumer way using the --zookeeper flag, anyone tested with --bootstrap-server?

Yup, unfortunately seems like the aws elb causes intermittence on both producer and consumer ๐Ÿ˜ข
On producer I got verbose error

[2017-08-17 21:22:22,940] WARN Got error produce response with correlation id 185 on topic-partition caires-0, retrying (2 attempts left). Error: NOT_LEADER_FOR_PARTITION (org.apache.kafka.clients.producer.internals.Sender)
[2017-08-17 21:22:23,175] WARN Got error produce response with correlation id 187 on topic-partition caires-0, retrying (1 attempts left). Error: NOT_LEADER_FOR_PARTITION (org.apache.kafka.clients.producer.internals.Sender)
[2017-08-17 21:22:23,408] WARN Got error produce response with correlation id 189 on topic-partition caires-0, retrying (0 attempts left). Error: NOT_LEADER_FOR_PARTITION (org.apache.kafka.clients.producer.internals.Sender)
[2017-08-17 21:22:23,644] ERROR Error when sending message to topic caires with key: null, value: 7 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
lxcid commented

@cairesvs I'm facing similar issue as you described.

My Kafka pods exposes node port and advertise both the internal and public hostname. (They listen to different port)

Kafka seems to be able to return the right listeners based on the port connected.

But it seems that accessing kafka from external listener cause the intermittent issue you described. I tried using internal listener and things seems to go well. Need to test more.

I been messing with this for a full day and my intuition is something is wrong when using external listeners.

Will update more when I have more information.

lxcid commented

I have solved my problem.

For me, it is that if I was using nodeport and trying to target a specific node. That didn't work because nodeport will send the traffic to kube proxy first and kube proxy end up sending the traffic to all the pods selected, in a round robin fashion.

This cause the intermittent issues I observed. I totally misunderstood how nodeport service worked in this case.

So please note that exposing each node that hosting kafka pod with a public ip/hostname is important. Depend on which port the kafka receive the traffic, it will only advertise the endpoint with the same port (thats how it figure out internal/external traffic). Kafka will let the client know which broker have the partition/replica and the client will fetch it itself.

Thus, I don't foresee load balancing work very well.

Understanding these have helped me nailed my issue. This thread help a lot. Thanks everyone.

So please note that exposing each node that hosting kafka pod with a public ip/hostname is important.

By "node" you mean each broker, I suppose? The conclusion is similar to that in #21.

I've started to investigate replication to a different cluster. Kafka best practices state state that you place the replication mechanism (MirrorMaker etc) in the destination cluster, to avoid data loss in case of network partition. It would be interesting to investigate a pattern for exposing a single NodePort/LoadBalancer to consumers only. I think this already works in the special case with a topic having number of replicas equal to number of brokers.

i think i have similar problem , kubernetes installed with lxd on localhost,

when i run this inside kafka-0 container

$ kafka-console-producer.sh  --topic test --broker-list localhost:9092
[2017-09-15 06:17:11,870] WARN Error while fetching metadata with correlation id 1 : {dese3sdfsdf3=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2017-09-15 06:17:12,046] WARN Error while fetching metadata with correlation id 5 : {dese3sdfsdf3=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2017-09-15 06:17:12,265] WARN Error while fetching metadata with correlation id 6 : {dese3sdfsdf3=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2017-09-15 06:17:12,593] WARN Error while fetching metadata with correlation id 7 : {dese3sdfsdf3=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2017-09-15 06:17:12,856] WARN Error while fetching metadata with correlation id 8 : {dese3sdfsdf3=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2017-09-15 06:17:12,999] WARN Error while fetching metadata with correlation id 9 : {dese3sdfsdf3=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

my config is

        advertised.host.name = null
	advertised.listeners = OUTSIDE://kafka-0.kafka.kafka-cluster.svc.cluster.local:9094,INSIDE://:9092
	advertised.port = null
	alter.config.policy.class.name = null
	authorizer.class.name = 
	auto.create.topics.enable = true
	auto.leader.rebalance.enable = true
	background.threads = 10
	broker.id = 0
	broker.id.generation.enable = true
	broker.rack = null
	compression.type = producer
	connections.max.idle.ms = 600000
	controlled.shutdown.enable = true
	controlled.shutdown.max.retries = 3
	controlled.shutdown.retry.backoff.ms = 5000
	controller.socket.timeout.ms = 30000
	create.topic.policy.class.name = null
	default.replication.factor = 3
	delete.records.purgatory.purge.interval.requests = 1
	delete.topic.enable = true
	fetch.purgatory.purge.interval.requests = 1000
	group.initial.rebalance.delay.ms = 0
	group.max.session.timeout.ms = 300000
	group.min.session.timeout.ms = 6000
	host.name = 
	inter.broker.listener.name = INSIDE
	inter.broker.protocol.version = 0.11.0-IV2
	leader.imbalance.check.interval.seconds = 300
	leader.imbalance.per.broker.percentage = 10
	listener.security.protocol.map = INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
	listeners = OUTSIDE://:9094,INSIDE://:9092
	log.cleaner.backoff.ms = 15000
	log.cleaner.dedupe.buffer.size = 134217728
	log.cleaner.delete.retention.ms = 86400000
	log.cleaner.enable = true
	log.cleaner.io.buffer.load.factor = 0.9
	log.cleaner.io.buffer.size = 524288
	log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
      log.cleaner.min.cleanable.ratio = 0.5
	log.cleaner.min.compaction.lag.ms = 0
	log.cleaner.threads = 1
	log.cleanup.policy = [delete]
	log.dir = /tmp/kafka-logs
	log.dirs = /kafka/kafka-logs-kafka-0
	log.flush.interval.messages = 9223372036854775807
	log.flush.interval.ms = null
	log.flush.offset.checkpoint.interval.ms = 60000
	log.flush.scheduler.interval.ms = 9223372036854775807
	log.flush.start.offset.checkpoint.interval.ms = 60000
	log.index.interval.bytes = 4096
	log.index.size.max.bytes = 10485760
	log.message.format.version = 0.11.0-IV2
	log.message.timestamp.difference.max.ms = 9223372036854775807
	log.message.timestamp.type = CreateTime
	log.preallocate = false
	log.retention.bytes = -1
	log.retention.check.interval.ms = 300000
	log.retention.hours = 168
	log.retention.minutes = null
	log.retention.ms = null
	log.roll.hours = 168
	log.roll.jitter.hours = 0
	log.roll.jitter.ms = null
	log.roll.ms = null
	log.segment.bytes = 1073741824
	log.segment.delete.delay.ms = 60000
	max.connections.per.ip = 2147483647
	max.connections.per.ip.overrides = 
	message.max.bytes = 1000012
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	min.insync.replicas = 1
	num.io.threads = 8
	num.network.threads = 3
	num.partitions = 20
	num.recovery.threads.per.data.dir = 1
num.replica.fetchers = 1
	offset.metadata.max.bytes = 4096
	offsets.commit.required.acks = -1
	offsets.commit.timeout.ms = 5000
	offsets.load.buffer.size = 5242880
	offsets.retention.check.interval.ms = 600000
	offsets.retention.minutes = 1440
	offsets.topic.compression.codec = 0
	offsets.topic.num.partitions = 50
	offsets.topic.replication.factor = 1
	offsets.topic.segment.bytes = 104857600
	port = 9092
	principal.builder.class = class org.apache.kafka.common.security.auth.DefaultPrincipalBuilder
	producer.purgatory.purge.interval.requests = 1000
	queued.max.requests = 500
	quota.consumer.default = 9223372036854775807
	quota.producer.default = 9223372036854775807
	quota.window.num = 11
	quota.window.size.seconds = 1
replica.fetch.backoff.ms = 1000
	replica.fetch.max.bytes = 1048576
	replica.fetch.min.bytes = 1
	replica.fetch.response.max.bytes = 10485760
	replica.fetch.wait.max.ms = 500
	replica.high.watermark.checkpoint.interval.ms = 5000
	replica.lag.time.max.ms = 10000
	replica.socket.receive.buffer.bytes = 65536
	replica.socket.timeout.ms = 30000
	replication.quota.window.num = 11
	replication.quota.window.size.seconds = 1
	request.timeout.ms = 30000
	reserved.broker.max.id = 1000
	sasl.enabled.mechanisms = [GSSAPI]
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.principal.to.local.rules = [DEFAULT]
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.mechanism.inter.broker.protocol = GSSAPI
	security.inter.broker.protocol = PLAINTEXT
	socket.receive.buffer.bytes = 102400
	socket.request.max.bytes = 104857600
	socket.send.buffer.bytes = 102400
	ssl.cipher.suites = null
	ssl.client.auth = none
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.abort.timed.out.transaction.cleanup.interval.ms = 60000
transaction.max.timeout.ms = 900000
	transaction.remove.expired.transaction.cleanup.interval.ms = 3600000
	transaction.state.log.load.buffer.size = 5242880
	transaction.state.log.min.isr = 1
	transaction.state.log.num.partitions = 50
	transaction.state.log.replication.factor = 1
	transaction.state.log.segment.bytes = 104857600
	transactional.id.expiration.ms = 604800000
	unclean.leader.election.enable = false
	zookeeper.connect = zk-0.zk-svc.kafka-cluster.svc.cluster.local:2181,zk-1.zk-svc.kafka-cluster.svc.cluster.local:2181,zk-2.zk-svc.kafka-cluster.svc.cluster.local:2181
	zookeeper.connection.timeout.ms = 6000
	zookeeper.session.timeout.ms = 6000
	zookeeper.set.acl = false
	zookeeper.sync.time.ms = 2000

@lxcid can you share your solution ?

Also trying to figure out how to go about exposing the brokers externally. Wrote up a SO question for some help from the Kubernetes community: https://stackoverflow.com/questions/46456239/how-to-expose-an-headless-service-for-a-statefulset-externally-in-kubernetes

Found this to be very useful: kubernetes/kubernetes#44103

Trying to wrap my head around how to do this as well

If I understand the root of the problem is that Kafka requires 1:1 mapping
kafka-{0, โ€ฆ, n}.broker.kafka.svc.cluster.local:9092 <-> some-external-address-to-broker-{0, โ€ฆ, n}?

Is this a correct understanding of the "current state of the world" for solving this problem:

Hey all,

Just wondering if anyone has been able to get this working via an ELB and/or ingress endpoint? Unfortunately the NodePort method will not work in my case as nodes do not have external addresses.

I would be alright manually setting a hostname (either the dynamically-generate ELB hostname or otherwise), however this does not appear to be working currently in my initial tests using kafkacat. I used yevgeniyo's zip above (https://github.com/Yolean/kubernetes-kafka/files/1231420/kafka-external.zip) and tried simply modifying the export EXTERNAL_HOSTNAME line in init.sh under 10broker-config.yml to point to a ELB created previously. However, there appears to be a communication error when testing producing and consuming via the external endpoint. I'm wondering if this is due to the brokers giving the client an internal-only set of hostnames to connect to (or something like that). Unfortunately I don't have much in terms of error logs at the moment.

If anyone has got this working, I'd greatly appreciate some advice.

Thanks!

EDIT: One possibility I'm thinking about is, if each broker needs to addressed by the client individually, whether setting up a service-per-broker and associating either a LoadBalancer or ingress with each would solve the apparent issue of having an single intermediary load balancer. This would obviously not be as dynamically-scalable since an increase in the number of broker replicas would also require additional services and/or ingresses, however it could be an acceptable tradeoff.

comdw commented

I've been struggling through this problem also, though not with an ELB, more just getting an n-node kafka cluster fully accessible externally. @nadirabid - your own posted solution to your SO question is pretty much where I've ended up too. Internally, all the brokers can run on port 9092, externally they all need different port numbers (I use 3209${KAFKA_BROKER_ID}) so that I can use a separate nodeport service per broker what will allow access to each separate broker externally without any port clashes when broker pods end up on the same k8s node.

So the only remaining "manual" part of the cluster scaling process is those individual services needed per broker. Wouldn't it be nice if this was somehow possible through a service template within the stateful set definition?

All in all much better than my earlier conclusion that we wouldn't be able to use a statefulset and would instead have to manage a separate deployment per broker in the cluster!

getting an n-node kafka cluster fully accessible externally

I find that a good scope for this repo. Load Balancers and Ingress are cluster specific, but if NodePort works it will probably help those cases too.

There's many interesting insights in this thread. What are our options?

  • Service templates (@comdw did you have a specific idea here?)
  • Individual service manifests that address each broker (I guess that's not possible atm; would involve the wider community)
  • Expose all these ports on every broker, through use some config+init trick to listen only to one of them.
  • Some kind of stateless TCP level proxy that routes consistently between ports and brokers.

More? What's your conclusion on listeners/advertised.listeners and listener.security.protocol.map?

We ended up going with pretty much the same solution as I figured out earlier on.

But, we use pod affinity to make sure we don't end up with Kafka instances on the same box (good practice anyway) and that solves clashing ports for us.

Ben

comdw commented

@solsson, I'm new to all this, but the StatefulSet already allows you to dictate that you want a persistent volume per instance, so why not a nodeport service per instance. This would require support specifically in kubernetes though and it might be too much of a niche thing.

I added kubectl label pods ${HOSTNAME} podname=${HOSTNAME} so maybe we could use another kubectl command here to add the nodeport service (guess it would need to check if it already exists etc, and they would never get deleted except manually)?

My services example for 3 broker cluster:

# Currently have to manually scale this up to cater for the number of brokers you want in your clusters
---
apiVersion: v1
kind: Service
metadata:
  name: brokerexternal-0
spec:
  ports:
  - port: 32090
    targetPort: 32090
    nodePort: 32090
  type: "NodePort"
  selector:
    podname: kafka-0
---
apiVersion: v1
kind: Service
metadata:
  name: brokerexternal-1
spec:
  ports:
  - port: 32091
    targetPort: 32091
    nodePort: 32091
  type: "NodePort"
  selector:
    podname: kafka-1
---
apiVersion: v1
kind: Service
metadata:
  name: brokerexternal-2
spec:
  ports:
  - port: 32092
    targetPort: 32092
    nodePort: 32092
  type: "NodePort"
  selector:
    podname: kafka-2

I guess @BenjaminDavison you are saying you don't need external services at all if you use pod affinity (the containerPort: 9093 line opens this port externally on each node without an explicit service?). I'm currently only using minikube so can't really test this - externally I've only been able to export ports 30000-32000.

@comdw Your suggestion matches quite closely what I did in #78, but I wasn't happy with test tests so I didn't announce it here. Working on tests now :)

comdw commented

Great, let me know if there's something to try out.

@comdw Yup that's correct.

comdw commented

Bah, looks like Kafka schema registry doesn't support the listener security protocol map

No endpoints found for security protocol [PLAINTEXT]. Endpoints found in ZK [{EXTERNAL_PLAINTEXT=kafkaserver-0:32090, INTERNAL_PLAINTEXT=kafka-0.broker.default.svc.cluster.local:9092}]

Maybe it doesn't need to? It should always be on the inside, I guess. Can we simply rearrange the list?

Edit: ah, just saw that that error can't be from the PR. The PR keeps PLAINTEXT for inside, hence schema registry might be unaffected.

comdw commented

Sorry yes, that was slightly off topic as looks like just a schema registry limitation at the moment. I've worked around it for now by configuring my listeners as "PLAINTEXT" and "EXTERNAL_PLAINTEXT".

comdw commented

... however the separate external/named listeners approach will not work for all Kafka clients that are external to the k8s cluster. E.g. Kafka Tool - its able to obtain the Kafka cluster info via ZK, but errors trying to load topics and consumers, complaining that it can't reach the broker. I suspect this is because its trying to connect specifically to the listener named "PLAINTEXT" as discovered from ZK (rather like schema registry does, but that's ok if its running in the same k8s cluster), which can't be accessed from outside. Kafka tool worked with a single node, one advertised listener address:port that was reachable both inside and outside.

My own producer/consumer clients running outside the cluster do work with a broker/bootstrap config that initially talks to any broker and somehow the response is clever enough to only advertise the external addresses at that point. I'm still not confident I fully understand whats going on in these different cases though!

I could perhaps rework configuration to go back to using the same FQDN and port internally and externally when I have time to experiment.

as discovered from ZK

I think that type of client is deprecated. Maybe the introduction of multiple listeners was a reason for such a deprecation. I guess the host used as bootstrap must be matched to one of the listener types internally, in order to return broker hosts of the same kind. Also I don't see a reason why Zookeeper should have an external service.

The only reason I saw the need to expose zookeeper was because we're using Sarama for the broker clients and there's no way to create topics from that client. So we were going to use the kafka tools (which last I checked did need access to Zookeeper).

We've gone with a different approach entirely (a REST based service) which allows us more control over that process. Exposing Zookeeper outside of Kubernetes seems to be fraught with security issues that I'd rather not entertain.

If you're working on this and don't understand why your hostPort is not appearing in netstat, and you're provisioning your cluster using kops in an internal network, you might be running into kubernetes/kops#3132. This is if you're using Calico, Flannel or any of those.

Edit: Calico actually now enables portmap by default, so it works.

Fixed with the merge of #78.

Alongs these I've been thinking that it would be quite easy to set up kafka access over one kubectl port-forward per broker.

I solved it in the following way using port-forward and without having to expose kafka to outside world (tiny modification of the solsson/kafka setup):

My init.sh ConfigMap

      OUTSIDE_HOST=localhost
      if [ $? -ne 0 ]; then
        echo "Outside (i.e. cluster-external access) host lookup command failed"
      else
        OUTSIDE_PORT=3240${KAFKA_BROKER_ID}
        sed -i "s|#init#advertised.listeners=OUTSIDE://#init#|advertised.listeners=OUTSIDE://${OUTSIDE_HOST}:${OUTSIDE_PORT}|" /etc/kafka/server.properties
        ANNOTATIONS="$ANNOTATIONS kafka-listener-outside-host=$OUTSIDE_HOST kafka-listener-outside-port=$OUTSIDE_PORT"
      fi

Broker exposes 9094 and then you get OUTSIDE addresses of your nodes with ports 3240 + "ID OF BROKER" so in case of 3 node setup this would be 32400 32401 32402.

Then i create 4 port forwards

kubectl -n kafka port-forward pod/kafka-0 9094
kubectl -n kafka port-forward pod/kafka-0 32400:9092
kubectl -n kafka port-forward pod/kafka-1 32401:9092
kubectl -n kafka port-forward pod/kafka-2 32402:9092

This way when client asks for list of nodes from a broker at localhost:9094 it will say: Hey I have 3 nodes their addresses are as follows:

localhost:32400
localhost:32401
localhost:32402

Which is correct because porf-forward maps them correctly from your local machines. And you're ready to debug.

Producer is set to localhost:9094. Consumers are set to high ports with corresponding client id equal to last integer of the high number so client-0 to 32400, client-1 to 32401, client-2 to 32402 just so i can make sense of which one is which. I used 3 consumers to test the group consumption and it works fine. To be honest I just want to debug my code and then run everything inside as standard 9092 but it took me way too much time to solve this (thanks to this thread as well). Inside/Outside setup is very cool but I think many people are confused or don't know that broker setting is just to get a list of nodes and that list is produced from how you launch kafka nodes themselves. Inside/Outside mapped on different ports 9092 (regular) 9094 (outside) is a good way to have dual nodelist available for setup. It would be beneficial to document [outside] ability so people can write/debug code against kafka from their local workstations.

@styk-tv Thanks for sharing this. I've also struggled with Kafka's listener model. I've set up mirror-maker using docker-compose with the 9092 ports by mapping brokers' kubernetes names to local containers. See https://gist.github.com/solsson/22e773ab148596ec39d54b7cc153ee67. Like your solution it is based on kubectl port-forward, which is complicated in containers until kubernetes/kubernetes#46517 gets merged.

With #172 it could be easier. Using Updating Broker Configs #78 can probably be implemented (and customized) in a way that is persistent in ZooKeeper and takes precedence over server.properties.

is there a branch with external broker endpoints?

@hamshif #195 and the issue it references sums up the situation quite well I think

@solsson thanks for the answer
I followed the instructions in The outside-services Readme on local docker kubernetes and couldn't implement the dynamic externalIP part because local docker node has no externalIP attribute!

On the same deployment I have another service configured similarly which I can access from browser and curl. so I suspect that it might be a Kafka related issue and not kubernetes

curl -s http://localhost:30120/sso/secured/healthcheck/status
UP%

โžœ ~ kubectl describe svc sso Name: sso Namespace: default Labels: app=sso Annotations: <none> Selector: app=sso,app_group=rtp,tier=backend Type: NodePort IP: 10.104.249.237 LoadBalancer Ingress: localhost Port: sso-debug 8000/TCP TargetPort: 8000/TCP NodePort: sso-debug 30121/TCP Endpoints: 10.1.3.104:8000 Port: sso 8080/TCP TargetPort: 8080/TCP NodePort: sso 30120/TCP Endpoints: 10.1.3.104:8080 Session Affinity: None External Traffic Policy: Cluster Events: <none> โžœ ~ kubectl describe svc outside-0 --namespace kafka Name: outside-0 Namespace: kafka Labels: <none> Annotations: kubectl.kubernetes.io/last-applied-configuration={"apiVersion":"v1","kind":"Service","metadata":{"annotations":{},"name":"outside-0","namespace":"kafka"},"spec":{"ports":[{"nodePort":32400,"port":3240... Selector: app=kafka,kafka-broker-id=0 Type: NodePort IP: 10.100.78.46 LoadBalancer Ingress: localhost Port: <unset> 32400/TCP TargetPort: 9094/TCP NodePort: <unset> 32400/TCP Endpoints: 10.1.3.100:9094 Session Affinity: None External Traffic Policy: Cluster Events: <none> โžœ ~

The services are the same pattern

You can see a full description here:

This port acts differently to curl
`โžœ ~ curl http://localhost:32400

curl: (52) Empty reply from server
โžœ ~ curl http://localhost:32406

curl: (7) Failed to connect to localhost port 32406: Connection refused
โžœ ~`

but a producer can't connect

I deployed to GKE too on different nodes to no avail

Hi,

I have a problem exposing my kafka broker to external public using the classic load balancer.

Here is my startup command

kafka-server-start.sh /etc/kafka/server.properties \
--override broker.id=${HOSTNAME##*-} \
--override listener.security.protocol.map=INTERNAL_PLAINTEXT:PLAINTEXT,EXTERNAL_PLAINTEXT:PLAINTEXT \
--override advertised.listeners="EXTERNAL_PLAINTEXT://$(eval wget -t3 -T2 -qO-  http://169.254.169.254/latest/meta-data/public-hostname):9092,INTERNAL_PLAINTEXT://${HOSTNAME}.broker.kafka.svc.cluster.local:29092" \
--override advertised.host.name="$(eval wget -t3 -T2 -qO-  http://169.254.169.254/latest/meta-data/public-hostname)" \
--override advertised.port=9092 \
--override listeners=INTERNAL_PLAINTEXT://0.0.0.0:29092,EXTERNAL_PLAINTEXT://0.0.0.0:9092 \
--override inter.broker.listener.name=INTERNAL_PLAINTEXT \
--override auto.create.topics.enable=true \
--override auto.leader.rebalance.enable=true \
--override zookeeper.connect=pzoo:2181 \
--override background.threads=10 \
--override compression.type=producer \
--override delete.topic.enable=false \
--override leader.imbalance.check.interval.seconds=300 \
--override leader.imbalance.per.broker.percentage=10 \
--override message.max.bytes=1000012 \
--override min.insync.replicas=1 \
--override num.io.threads=8 \
--override num.network.threads=3 \
--override num.recovery.threads.per.data.dir=1 \
--override num.replica.fetchers=1 \
--override offset.metadata.max.bytes=4096 \
--override offsets.commit.required.acks=-1 \
--override offsets.commit.timeout.ms=5000 \
--override offsets.load.buffer.size=5242880 \
--override offsets.retention.check.interval.ms=600000 \
--override offsets.retention.minutes=10080 \
--override offsets.topic.compression.codec=0 \
--override offsets.topic.num.partitions=50 \
--override offsets.topic.replication.factor=1 \
--override offsets.topic.segment.bytes=104857600 \
--override queued.max.requests=500 \
--override quota.consumer.default=9223372036854775807 \
--override quota.producer.default=9223372036854775807 \
--override replica.fetch.min.bytes=1 \
--override replica.fetch.wait.max.ms=500 \
--override replica.high.watermark.checkpoint.interval.ms=5000 \
--override replica.lag.time.max.ms=10000 \
--override replica.socket.receive.buffer.bytes=65536 \
--override replica.socket.timeout.ms=80000 \
--override request.timeout.ms=80000 \
--override socket.receive.buffer.bytes=102400 \
--override socket.request.max.bytes=104857600 \
--override socket.send.buffer.bytes=102400 \
--override unclean.leader.election.enable=false \
--override zookeeper.session.timeout.ms=6000 \
--override zookeeper.set.acl=false \
--override broker.id.generation.enable=true \
--override connections.max.idle.ms=600000 \
--override controlled.shutdown.enable=true \
--override controlled.shutdown.max.retries=3 \
--override controlled.shutdown.retry.backoff.ms=5000 \
--override controller.socket.timeout.ms=30000 \
--override default.replication.factor=1 \
--override fetch.purgatory.purge.interval.requests=1000 \
--override group.max.session.timeout.ms=300000 \
--override group.min.session.timeout.ms=6000 \
--override inter.broker.protocol.version=2.0-IV1 \
--override log.retention.hours=-1 \
--override log.dirs=/var/lib/kafka/data/topics \
--override log.cleaner.backoff.ms=15000 \
--override log.cleaner.dedupe.buffer.size=134217728 \
--override log.cleaner.delete.retention.ms=86400000 \
--override log.cleaner.enable=true \
--override log.cleaner.io.buffer.load.factor=0.9 \
--override log.cleaner.io.buffer.size=524288 \
--override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 \
--override log.cleaner.min.cleanable.ratio=0.5 \
--override log.cleaner.min.compaction.lag.ms=0 \
--override log.cleaner.threads=1 \
--override log.cleanup.policy=delete \
--override log.index.interval.bytes=4096 \
--override log.index.size.max.bytes=10485760 \
--override log.message.timestamp.difference.max.ms=9223372036854775807 \
--override log.message.timestamp.type=CreateTime \
--override log.preallocate=false \
--override log.retention.check.interval.ms=300000 \
--override max.connections.per.ip=2147483647 \
--override num.partitions=1 \
--override producer.purgatory.purge.interval.requests=1000 \
--override replica.fetch.backoff.ms=1000 \
--override replica.fetch.max.bytes=1048576 \
--override replica.fetch.response.max.bytes=10485760 \
--override reserved.broker.max.id=1000 \
--override log.flush.interval.messages=9223372036854775807 \
--override log.flush.offset.checkpoint.interval.ms=60000 \
--override log.flush.scheduler.interval.ms=9223372036854775807 \
--override log.retention.bytes=-1 \
--override log.retention.hours=168 \
--override log.roll.hours=168 \
--override log.roll.jitter.hours=0 \
--override log.segment.bytes=1073741824 \
--override log.segment.delete.delay.ms=60000 \
--override zookeeper.connection.timeout.ms=6000

My Statefulset manifest :

---
apiVersion: v1
kind: Service
metadata:
  name: broker
  namespace: kafka
spec:
  ports:
  - port: 29092
    name: internal
  # [podname].broker.kafka.svc.cluster.local
  clusterIP: None
  selector:
    app: kafka
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: kafka-persistent-storage
spec:
  accessModes:
  - ReadWriteOnce
  resources:
    requests:
      storage: 10Gi
  storageClassName: fasta
---
kind: Service
apiVersion: v1
metadata:
  name: kafka-svc
  annotations:
    dns.alpha.kubernetes.io/external: kafka.preprod.example.com
    service.beta.kubernetes.io/aws-load-balancer-additional-resource-tags: "Name=kafka, Environment=preprod"
spec:
  externalTrafficPolicy: Local
  ports:
  - port: 9092
  selector:
    app: kafka
  type: LoadBalancer
  sessionAffinity: None
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: kafka
  namespace: kafka
spec:
  selector:
    matchLabels:
      app: kafka
  serviceName: "broker"
  replicas: 1
  updateStrategy:
    type: OnDelete
  template:
    metadata:
      labels:
        app: kafka
      annotations:
    spec:
      terminationGracePeriodSeconds: 30
      initContainers:
      - name: init-config
        image: hub.example.com/kafka:v1
        env:
        - name: NODE_NAME
          valueFrom:
            fieldRef:
              fieldPath: spec.nodeName
        - name: POD_NAME
          valueFrom:
            fieldRef:
              fieldPath: metadata.name
        - name: POD_NAMESPACE
          valueFrom:
            fieldRef:
              fieldPath: metadata.namespace
        command: ['/bin/bash', '/etc/kafka-configmap/init.sh']
        volumeMounts:
        - name: configmap
          mountPath: /etc/kafka-configmap
        - name: config
          mountPath: /etc/kafka
      containers:
      - name: broker
        image: hub.example.com/kafka-preprod:v8
        env:
        - name: KAFKA_LOG4J_OPTS
          value: -Dlog4j.configuration=file:/etc/kafka/log4j.properties
        - name: JMX_PORT
          value: "5555"
        ports:
        - name: inside
          containerPort: 29092
        - name: outside
          containerPort: 9092
        - name: jmx
          containerPort: 5555
        command: ['/bin/bash', '/opt/kafka/bin/kafka-endpoint']
        resources:
          requests:
            cpu: 100m
            memory: 512Mi
        readinessProbe:
          tcpSocket:
            port: 9092
          timeoutSeconds: 1
        volumeMounts:
        - name: config
          mountPath: /etc/kafka
        - name: kafka-persistent-storage
          mountPath: /var/lib/kafka/data
      imagePullSecrets:
        - name: nexus
      volumes:
      - name: configmap
        configMap:
          name: broker-config
      - name: config
        emptyDir: {}
  volumeClaimTemplates:
  - metadata:
      name: kafka-persistent-storage
    spec:
      accessModes: [ "ReadWriteOnce" ]
      storageClassName: fasta
      resources:
        requests:
          storage: 10Gi

command: ['/bin/bash', '/opt/kafka/bin/kafka-endpoint'] is a bash script which has the kafka startup command with all the above mentioned override options.

Now when I try to consme messages from the external DNS kafka.preprod.example.com:9092 , I get the error,

kafkacat -C -b kafka.preprod.example.com:9092 -t testing -v        Thu Nov 22 13:22:32 2018
% ERROR: Local: Broker transport failure: ec2-11-222-33-444.us-east-2.compute.amazonaws.com:9092/0: Connect to ipv4#11.222.33.444:9092 failed: Operation timed out (after 75477ms in state CONNECT)

and when I try to publish a message, I get an error:

kafkacat -P -b kafka.preprod.example.com:9092 -t testing                                  Thu Nov 22 13:19:15 2018
%4|1542873132.444|REQTMOUT|rdkafka#producer-1| [thrd:kafka.preprod.example.com:9092/bootstrap]: kafka.preprod.example.com:9092/bootstrap: Timed out 1 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests
%4|1542873138.451|REQTMOUT|rdkafka#producer-1| [thrd:kafka.preprod.example.com:9092/bootstrap]: kafka.preprod.example.com:9092/bootstrap: Timed out 0 in-flight, 60 retry-queued, 0 out-queue, 0 partially-sent requests
%4|1542873144.470|REQTMOUT|rdkafka#producer-1| [thrd:kafka.preprod.example.com:9092/bootstrap]: kafka.preprod.example.com:9092/bootstrap: Timed out 0 in-flight, 60 retry-queued, 0 out-queue, 0 partially-sent requests

Also, I do not see any errors in my kafka logs.

(kafka.server.KafkaConfig)
[2018-11-22 07:48:26,100] INFO [ThrottledChannelReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2018-11-22 07:48:26,100] INFO [ThrottledChannelReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2018-11-22 07:48:26,105] INFO [ThrottledChannelReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2018-11-22 07:48:26,136] INFO Loading logs. (kafka.log.LogManager)
[2018-11-22 07:48:26,199] INFO [Log partition=testing-2, dir=/var/lib/kafka/data/topics] Recovering unflushed segment 0 (kafka.log.Log)
[2018-11-22 07:48:26,204] INFO [Log partition=testing-2, dir=/var/lib/kafka/data/topics] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2018-11-22 07:48:26,233] INFO [Log partition=testing-2, dir=/var/lib/kafka/data/topics] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2018-11-22 07:48:26,235] INFO [Log partition=testing-2, dir=/var/lib/kafka/data/topics] Completed load of log with 1 segments, log start offset 0 and log end offset 0 in 63 ms (kafka.log.Log)
[2018-11-22 07:48:26,255] INFO [Log partition=testing-0, dir=/var/lib/kafka/data/topics] Recovering unflushed segment 0 (kafka.log.Log)
[2018-11-22 07:48:26,255] INFO [Log partition=testing-0, dir=/var/lib/kafka/data/topics] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2018-11-22 07:48:26,256] INFO [Log partition=testing-0, dir=/var/lib/kafka/data/topics] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2018-11-22 07:48:26,257] INFO [Log partition=testing-0, dir=/var/lib/kafka/data/topics] Completed load of log with 1 segments, log start offset 0 and log end offset 0 in 3 ms (kafka.log.Log)
[2018-11-22 07:48:26,262] INFO [Log partition=testing-4, dir=/var/lib/kafka/data/topics] Recovering unflushed segment 0 (kafka.log.Log)
[2018-11-22 07:48:26,262] INFO [Log partition=testing-4, dir=/var/lib/kafka/data/topics] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2018-11-22 07:48:26,264] INFO [Log partition=testing-4, dir=/var/lib/kafka/data/topics] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2018-11-22 07:48:26,264] INFO [Log partition=testing-4, dir=/var/lib/kafka/data/topics] Completed load of log with 1 segments, log start offset 0 and log end offset 0 in 3 ms (kafka.log.Log)
[2018-11-22 07:48:26,271] INFO [Log partition=testing-1, dir=/var/lib/kafka/data/topics] Recovering unflushed segment 0 (kafka.log.Log)
[2018-11-22 07:48:26,271] INFO [Log partition=testing-1, dir=/var/lib/kafka/data/topics] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2018-11-22 07:48:26,272] INFO [Log partition=testing-1, dir=/var/lib/kafka/data/topics] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2018-11-22 07:48:26,273] INFO [Log partition=testing-1, dir=/var/lib/kafka/data/topics] Completed load of log with 1 segments, log start offset 0 and log end offset 0 in 4 ms (kafka.log.Log)
[2018-11-22 07:48:26,278] INFO [Log partition=testing-5, dir=/var/lib/kafka/data/topics] Recovering unflushed segment 0 (kafka.log.Log)
[2018-11-22 07:48:26,279] INFO [Log partition=testing-5, dir=/var/lib/kafka/data/topics] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2018-11-22 07:48:26,280] INFO [Log partition=testing-5, dir=/var/lib/kafka/data/topics] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2018-11-22 07:48:26,280] INFO [Log partition=testing-5, dir=/var/lib/kafka/data/topics] Completed load of log with 1 segments, log start offset 0 and log end offset 0 in 3 ms (kafka.log.Log)
[2018-11-22 07:48:26,288] INFO [Log partition=testing-3, dir=/var/lib/kafka/data/topics] Recovering unflushed segment 0 (kafka.log.Log)
[2018-11-22 07:48:26,288] INFO [Log partition=testing-3, dir=/var/lib/kafka/data/topics] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2018-11-22 07:48:26,290] INFO [Log partition=testing-3, dir=/var/lib/kafka/data/topics] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2018-11-22 07:48:26,290] INFO [Log partition=testing-3, dir=/var/lib/kafka/data/topics] Completed load of log with 1 segments, log start offset 0 and log end offset 0 in 3 ms (kafka.log.Log)
[2018-11-22 07:48:26,295] INFO Logs loading complete in 159 ms. (kafka.log.LogManager)
[2018-11-22 07:48:26,309] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
[2018-11-22 07:48:26,311] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
[2018-11-22 07:48:26,608] INFO Awaiting socket connections on 0.0.0.0:29092. (kafka.network.Acceptor)
[2018-11-22 07:48:26,640] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
[2018-11-22 07:48:26,652] INFO [SocketServer brokerId=0] Started 2 acceptor threads (kafka.network.SocketServer)
[2018-11-22 07:48:26,678] INFO [ExpirationReaper-0-Produce]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2018-11-22 07:48:26,680] INFO [ExpirationReaper-0-Fetch]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2018-11-22 07:48:26,686] INFO [ExpirationReaper-0-DeleteRecords]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2018-11-22 07:48:26,697] INFO [LogDirFailureHandler]: Starting (kafka.server.ReplicaManager$LogDirFailureHandler)
[2018-11-22 07:48:26,746] INFO Creating /brokers/ids/0 (is it secure? false) (kafka.zk.KafkaZkClient)
[2018-11-22 07:48:26,750] INFO Result of znode creation at /brokers/ids/0 is: OK (kafka.zk.KafkaZkClient)
[2018-11-22 07:48:26,751] INFO Registered broker 0 at path /brokers/ids/0 with addresses: ArrayBuffer(EndPoint(ec2-18-219-76-229.us-east-2.compute.amazonaws.com,9092,ListenerName(EXTERNAL_PLAINTEXT),PLAINTEXT), EndPoint(kafka-0.broker.ashokleyland.svc.cluster.local,29092,ListenerName(INTERNAL_PLAINTEXT),PLAINTEXT)) (kafka.zk.KafkaZkClient)
[2018-11-22 07:48:26,845] INFO [ExpirationReaper-0-topic]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2018-11-22 07:48:26,847] INFO [ExpirationReaper-0-Heartbeat]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2018-11-22 07:48:26,849] INFO Creating /controller (is it secure? false) (kafka.zk.KafkaZkClient)
[2018-11-22 07:48:26,856] INFO Result of znode creation at /controller is: OK (kafka.zk.KafkaZkClient)
[2018-11-22 07:48:26,872] INFO [ExpirationReaper-0-Rebalance]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2018-11-22 07:48:26,893] INFO [GroupCoordinator 0]: Starting up. (kafka.coordinator.group.GroupCoordinator)
[2018-11-22 07:48:26,913] INFO [GroupCoordinator 0]: Startup complete. (kafka.coordinator.group.GroupCoordinator)
[2018-11-22 07:48:26,922] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 9 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2018-11-22 07:48:26,938] INFO [ProducerId Manager 0]: Acquired new producerId block (brokerId:0,blockStartProducerId:3000,blockEndProducerId:3999) by writing to Zk with path version 4 (kafka.coordinator.transaction.ProducerIdManager)
[2018-11-22 07:48:26,991] INFO [TransactionCoordinator id=0] Starting up. (kafka.coordinator.transaction.TransactionCoordinator)
[2018-11-22 07:48:26,998] INFO [Transaction Marker Channel Manager 0]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager)
[2018-11-22 07:48:26,998] INFO [TransactionCoordinator id=0] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)
[2018-11-22 07:48:27,111] INFO [/config/changes-event-process-thread]: Starting (kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread)
[2018-11-22 07:48:27,139] INFO [SocketServer brokerId=0] Started processors for 2 acceptors (kafka.network.SocketServer)
[2018-11-22 07:48:27,153] INFO Kafka version : 2.0.0 (org.apache.kafka.common.utils.AppInfoParser)
[2018-11-22 07:48:27,153] INFO Kafka commitId : 3402a8361b734732 (org.apache.kafka.common.utils.AppInfoParser)
[2018-11-22 07:48:27,158] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
[2018-11-22 07:48:27,224] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions testing-5,testing-0,testing-2,testing-1,testing-3,testing-4 (kafka.server.ReplicaFetcherManager)
[2018-11-22 07:48:27,237] INFO Replica loaded for partition testing-5 with initial high watermark 0 (kafka.cluster.Replica)
[2018-11-22 07:48:27,240] INFO [Partition testing-5 broker=0] testing-5 starts at Leader Epoch 0 from offset 0. Previous Leader Epoch was: -1 (kafka.cluster.Partition)
[2018-11-22 07:48:27,256] INFO Replica loaded for partition testing-2 with initial high watermark 0 (kafka.cluster.Replica)
[2018-11-22 07:48:27,256] INFO [Partition testing-2 broker=0] testing-2 starts at Leader Epoch 0 from offset 0. Previous Leader Epoch was: -1 (kafka.cluster.Partition)
[2018-11-22 07:48:27,258] INFO Replica loaded for partition testing-3 with initial high watermark 0 (kafka.cluster.Replica)
[2018-11-22 07:48:27,259] INFO [Partition testing-3 broker=0] testing-3 starts at Leader Epoch 0 from offset 0. Previous Leader Epoch was: -1 (kafka.cluster.Partition)
[2018-11-22 07:48:27,262] INFO Replica loaded for partition testing-0 with initial high watermark 0 (kafka.cluster.Replica)
[2018-11-22 07:48:27,264] INFO [Partition testing-0 broker=0] testing-0 starts at Leader Epoch 0 from offset 0. Previous Leader Epoch was: -1 (kafka.cluster.Partition)
[2018-11-22 07:48:27,267] INFO Replica loaded for partition testing-4 with initial high watermark 0 (kafka.cluster.Replica)
[2018-11-22 07:48:27,267] INFO [Partition testing-4 broker=0] testing-4 starts at Leader Epoch 0 from offset 0. Previous Leader Epoch was: -1 (kafka.cluster.Partition)
[2018-11-22 07:48:27,269] INFO Replica loaded for partition testing-1 with initial high watermark 0 (kafka.cluster.Replica)
[2018-11-22 07:48:27,269] INFO [Partition testing-1 broker=0] testing-1 starts at Leader Epoch 0 from offset 0. Previous Leader Epoch was: -1 (kafka.cluster.Partition)
[2018-11-22 07:48:27,282] INFO [ReplicaAlterLogDirsManager on broker 0] Added fetcher for partitions List() (kafka.server.ReplicaAlterLogDirsManager)
[2018-11-22 07:58:26,894] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)

I also tried forsy creating a Load balancer service and then using its DNS name as an advertised.listener. Then too it would throw the same error.

If anybody could help me, it would be really great. Kindly let me know if more details are required.

Thanks!

@ridicule777 Does this differ from the other cases of name resolution outside cluster that have been discussed in this repo several times? Clients must be able to access all kafka brokers on individual listener addresses known to kafka.