kubernetes-retired/contrib

Kafka Statefulset external access issue

LauraMoraB opened this issue · 12 comments

Hello,
I have deployed Kafka and Zookeeper in a Kubernetes cluster using Statefulsets with three replicas in a Softlayer machine, using this configuration (deployment and Image), but changing the kafka version to 1.0.
Now, I am having problems to produce/consume messages from outside the Kubernetes cluster, but everything works fine when I execute producers/consumers within the cluster. I have seen some issues regarding this problem, but neither resolved it for me.

I have this set up:

NAME                    READY     STATUS    RESTARTS  
po/kafka-0                   1/1       Running   0      
po/kafka-1                   1/1       Running   0         
po/kafka-2                   1/1       Running   0          
po/zk-0                        1/1       Running   0         
po/zk-1                        1/1       Running   0        
po/zk-2                        1/1       Running   0          
NAME               TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)                      
svc/kafka-hs      ClusterIP        None             <none>        9092/TCP                    
svc/kafka-svc    NodePort     10.97.218.58     <none>        9093:30182/TCP              
svc/zk-hs          ClusterIP         None             <none>        2888/TCP,3888/TCP            
svc/zk-svc         ClusterIP   10.111.37.187    <none>        2181/TCP                     

I have two services:

apiVersion: v1
kind: Service
metadata:
  name: kafka-hs
  namespace: cdc1
  labels:
    app: kafka
spec:
  ports:
  - port: 9092
    name: server
  clusterIP: None
  selector:
    app: kafka        
apiVersion: v1
kind: Service
metadata:
  name: kafka-svc
  namespace: cdc1
  labels:
    app: kafka
spec:
  type: NodePort
  ports:
  - port: 9093
    name: external
    nodePort: 30182
  selector:
    app: kafka

I have set this env variables for Kafka:

 - name: KAFKA_ADVERTISED_LISTENERS
   value: "INTERNAL_PLAINTEXT://kafka-hs:9092,EXTERNAL_PLAINTEXT://MACHINE-IP:30182"
 - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
   value: "INTERNAL_PLAINTEXT:PLAINTEXT,EXTERNAL_PLAINTEXT:PLAINTEXT"
 - name: KAFKA_LISTENERS
   value: "INTERNAL_PLAINTEXT://0.0.0.0:9092,EXTERNAL_PLAINTEXT://0.0.0.0:30182"
 - name: KAFKA_INTER_BROKER_LISTENER_NAME
   value: "INTERNAL_PLAINTEXT"

And when I try to produce message from outside the cluster I get the following error:
2018-04-27 13:27:01 WARN NetworkClient:241 - [Producer clientId=producer-2] Connection to node -1 could not be established. Broker may not be available.

I set the bootstrap-server configuration in the producer to machine-IP:30182

Any help will be really appreciated! Thanks!

juv commented

So I guess you have multiple brokers that you expect to be exposed externally?
If yes, you will need the following:

  1. a headless service for inter-broker communication within your Kafka cluster
  2. a service that exposes an external ip for each pod you have -- have a look here

Make sure that your brokers expose the correct advertised ip to ZooKeeper, for example with zookeeper-shell.sh your-zookeeper-svc-name.whatever:2181 get /kafka/brokers/ids/0

This is also relevant to understand how the client bootstrapping process works.

Yes, I was missing a service exposed for each broker. After adding each service and some configuration I could access the cluster externally.

I used port 9093 for each service and then the headless service at port 9092

In order to work correctly, I had to change some specific configurations:

  1. In the services configuration I had to add the selector:
selector:
   statefulset.kubernetes.io/pod-name: <pod-name>
  1. In the Kafka configuration I added these environment variables:
listener.security.protocol.map=INTERNAL_PLAINTEXT:PLAINTEXT,EXTERNAL_PLAINTEXT:PLAINTEXT
advertised.listeners=INTERNAL_PLAINTEXT://:9092,EXTERNAL_PLAINTEXT://:9093
listeners=INTERNAL_PLAINTEXT://:9092,EXTERNAL_PLAINTEXT://:9093
inter.broker.listener.name=INTERNAL_PLAINTEXT

Thanks for the information!

@LauraMoraB Can you share how you were able to accomplish external access in more depth? I have that headless service, utilizing the same listener settings that you have here and have opened up nodeports as an additional service, but am unable to reach the NodePorts or there IPs. What additional did you end up doing? Thanks.

Hi @ralucas , I'm sorry I took so long.

I configured a Nodeport service for each Broker like this:

apiVersion: v1
kind: Service
metadata:
  name: <service-name>
spec:
  ports:
  - port: 9093
    targetPort: 9092
    name: kafka-port
    protocol: TCP
  selector:
    statefulset.kubernetes.io/pod-name: <pod-name>
  type: NodePort

As an image I used: https://github.com/kubernetes/contrib/tree/master/statefulsets/kafka, but I changed the OS to alpine and the kafka version to 1.0.0 (If this helps provide more info)

And the Statefulset Configuration:

apiVersion: apps/v1beta1
kind: StatefulSet
metadata:
  name: kafka
spec:
  serviceName: kafka-hs
  replicas: 3
  template:
    metadata:
      labels:
        app: kafka
    spec:
      terminationGracePeriodSeconds: 300
      containers:
      - name: k8skafka
        image: <image>
        ports:
        - containerPort: 9093
        - containerPort: 9092
          name: server
        command:
        - sh
        - -c
        - "exec kafka-server-start.sh /etc/kafka/config/server.properties --override broker.id=${HOSTNAME##*-} \
          --override listener.security.protocol.map=INTERNAL_PLAINTEXT:PLAINTEXT,EXTERNAL_PLAINTEXT:PLAINTEXT \
          --override advertised.listeners=INTERNAL_PLAINTEXT://:9092,EXTERNAL_PLAINTEXT://:9093 \
          --override listeners=INTERNAL_PLAINTEXT://:9092,EXTERNAL_PLAINTEXT://:9093 \
          --override inter.broker.listener.name=INTERNAL_PLAINTEXT \
        ...
NAME          READY     STATUS    RESTARTS   AGE
pod/kafka-0   1/1       Running   0          4d
pod/kafka-1   1/1       Running   0          4d
pod/kafka-2   1/1       Running   0          4d
pod/zk-0      1/1       Running   0          4d
pod/zk-1      1/1       Running   0          4d
pod/zk-2      1/1       Running   0          4d

NAME               TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)             AGE
service/broker-0   NodePort    <ip>      <none>        9093:30001/TCP      4d
service/broker-1   NodePort    <ip>     <none>        9093:30002/TCP      4d
service/broker-2   NodePort    <ip>     <none>        9093:30003/TCP      4d
service/kafka-hs   ClusterIP   None             <none>        9092/TCP            4d
service/zk-hs      ClusterIP   None             <none>        2888/TCP,3888/TCP   4d
service/zk-svc     ClusterIP   <ip>   <none>        2181/TCP            4d

This is all my configuration. If it still doesn't work for you, share the error it gives you when you try to reach the Broker and I will try to be provide more help.

@LauraMoraB , in the previous example , i don't see any external IP to acces kafka broker from external ip. In that case what should be the best way to achieve this.
thx

Hello @lalan7 ,

The exposed service is a NoderPort Service. This means it does not require to set an external IP, you just need to use the IP from your machine with the nodeport port. For example, to access the first broker: broker-0, you should provide: <YourMachineIP>:30001

There is a really nice explanation on services in the Kubernetes official documentation page, see: https://kubernetes.io/docs/concepts/services-networking/service/#publishing-services-service-types

@LauraMoraB Can you share your zookeeper config too?
For me, when I try to use producer from outside cluster, it says, it can't connect to my kafkaPodHostname:9092 even though, telnet to nodeIP & nodePort works fine.

I use a docker container outside kafka-cluster as follows:
./kafka-console-producer.sh --broker-list nodeIP:nodePort --topic mytopic

and get error similar to following one:
[2018-11-07 10:57:00,374] WARN [Producer clientId=console-producer] Error connecting to node kafka-2.kafka-svc.kafka-ns.svc.cluster.local:9092 (id: 2 rack: null) (org.apache.kafka.clients.NetworkClient) java.io.IOException: Can't resolve address: kafka-2.kafka-svc.kafka-ns.cluster.local:9092 at org.apache.kafka.common.network.Selector.doConnect(Selector.java:235) at org.apache.kafka.common.network.Selector.connect(Selector.java:214) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:864)

and if you have the whole Kafka & zookeeper config somewhere in github, please provide that details too :)

Thank you.

On port the Kafka server was started ? 9093 or 9092 ? if possible can you share your YAML files or upload it in Github . Thanks

@LauraMoraB On port the Kafka server was started ? 9093 or 9092 ? if possible can you share your YAML files or upload it in Github . Thanks

@LauraMoraB I have used the same configuration that you suggested, but when I tried to write to a topic from outside of kubenetes cluster, I am getting the following error.

./kafka-console-producer.sh --broker-list xxxxxx:30524 --topic spr
testing
[2018-12-21 06:49:08,976] ERROR Error when sending message to topic spr with key: null, value: 7 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)

can you please help here.

@LauraMoraB I have used the same configuration that you suggested, but when I tried to write to a topic from outside of kubenetes cluster, I am getting the following error.

./kafka-console-producer.sh --broker-list xxxxxx:30524 --topic spr testing [2018-12-21 06:49:08,976] ERROR Error when sending message to topic spr with key: null, value: 7 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)

can you please help here.

i am also facing the same issue. Interbroker communication and producer consumer both work fine from within the cluster. but when starting producer from outside the cluster, TCP connection is not established with kafka.

now i used the nodeport but get some tips as belows, it make me confuse, i suppose the nodeport should directly transfer any traffic to the pod host on the node ? but as show it can do load balance,

WARN [Consumer clientId=consumer-1, groupId=console-consumer-84014] Connection to node 1 (/10.244.2.117:9093) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2019-04-11 08:04:44,344] WARN [Consumer clientId=consumer-1, groupId=console-consumer-84014] Connection to node 2 (/10.244.0.79:9093) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2019-04-11 08:04:47,350] WARN [Consumer clientId=consumer-1, groupId=console-consumer-84014] Connection to node 1 (/10.244.2.117:9093) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2019-04-11 08:04:50,356] WARN [Consumer clientId=consumer-1, groupId=console-consumer-84014] Connection to node 0 (/10.244.1.151:9093) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient