Knative + Kafka

Prerequisites

Assumes you have used try.openshift.com to create an OCP 4.2 cluster. Here is a quick video that illustrates that process.

CLI tools used:

  • kubectl

  • oc

  • jq

  • kafkacat

  • siege

  • watch

Installation

Using the OCP 4.x Administration Console - find OperatorHub and install

  1. Knative Serving

  2. Knative Eventing

  3. Knative Kafka

  4. Strimzi

OperatorHub inside of OpenShift Console
Installed Operators

You can check on your installed operators and their versions:

kubectl get csv
NAME                               DISPLAY                         VERSION   REPLACES                           PHASE
knative-eventing-operator.v0.7.1   Knative Eventing Operator       0.7.1     knative-eventing-operator.v0.6.0   Succeeded
knative-kafka-operator.v0.7.1      Knative Apache Kafka Operator   0.7.1     knative-kafka-operator.v0.6.0      Succeeded
knative-serving-operator.v0.7.1    Knative Serving Operator        0.7.1     knative-serving-operator.v0.6.0    Succeeded
strimzi-cluster-operator.v0.13.0   Strimzi Apache Kafka Operator   0.13.0    strimzi-cluster-operator.v0.12.2   Succeeded
Note
I have also used the following versions - OpenShift Serverless pulls in ElasticSearch, Jaeger, Kiali
kubectl get csv
NAME                                        DISPLAY                          VERSION              REPLACES                            PHASE
elasticsearch-operator.4.3.1-202002032140   Elasticsearch Operator           4.3.1-202002032140                                       Succeeded
jaeger-operator.v1.13.1                     Jaeger Operator                  1.13.1                                                   Succeeded
kiali-operator.v1.0.9                       Kiali Operator                   1.0.9                kiali-operator.v1.0.8               Succeeded
knative-eventing-operator.v0.12.0           Knative Eventing Operator        0.12.0               knative-eventing-operator.v0.11.0   Succeeded
knative-kafka-operator.v0.12.1              Knative Apache Kafka Operator    0.12.1               knative-kafka-operator.v0.11.2      Succeeded
serverless-operator.v1.4.1                  OpenShift Serverless Operator    1.4.1                serverless-operator.v1.4.0          Succeeded
servicemeshoperator.v1.0.7                  Red Hat OpenShift Service Mesh   1.0.7                servicemeshoperator.v1.0.6          Succeeded
strimzi-cluster-operator.v0.15.0            Strimzi                          0.15.0               strimzi-cluster-operator.v0.14.0    Succeeded

Namespace/Project Setup

kubectl create namespace kafka

# make it "sticky"
kubectl config set-context --current --namespace=kakfa

# check that it is set
kubectl config current-context

# or use "oc" to see what the "sticky" namespace is
oc project

Create kafka cluster

cat <<EOF | kubectl apply -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      default.replication.factor: 3
      min.insync.replicas: 2
      inter.broker.protocol.version: '3.1'
    storage:
      type: ephemeral
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
    version: 3.1.0
    replicas: 3
  entityOperator:
    topicOperator: {}
    userOperator: {}
  zookeeper:
    storage:
      type: ephemeral
    replicas: 3
EOF

Configure the Knative Eventing Kafka

Note: this only needs to be done one time

apiVersion: operator.serverless.openshift.io/v1alpha1
kind: KnativeKafka
metadata:
  finalizers:
    - knative-kafka-openshift
  name: knative-kafka
  namespace: knative-eventing
spec:
  broker:
    defaultConfig:
      authSecretName: ''
      bootstrapServers: 'my-cluster-kafka-bootstrap.kafka:9092'
      numPartitions: 10
      replicationFactor: 3
    enabled: true
  channel:
    authSecretName: ''
    authSecretNamespace: ''
    bootstrapServers: 'my-cluster-kafka-bootstrap.kafka:9092'
    enabled: false
  high-availability:
    replicas: 1
  sink:
    enabled: false
  source:
    enabled: true

Note: the namespace of "kafka" likely

Verify the KnativeEventingKafka took affect

kubectl get crds | grep kafkasource
kafkasources.sources.eventing.knative.dev                   2019-09-21T14:23:14Z

and

kubectl get pods -n knative-eventing

NAME                                        READY   STATUS    RESTARTS   AGE
broker-controller-66f988fb6c-6wk4t          1/1     Running   0          20h
eventing-controller-5c955d4694-btwx8        1/1     Running   0          20h
eventing-webhook-7f7bcb8447-27p9s           1/1     Running   0          20h
imc-controller-6ddf4477fd-bjjhh             1/1     Running   0          20h
imc-dispatcher-7676c44559-wzxg4             1/1     Running   0          20h
kafka-ch-controller-5497f498dc-vm8x7        1/1     Running   0          4h19m
kafka-controller-manager-544887898b-j654v   1/1     Running   0          4h20m
kafka-webhook-65d8bb899c-6nsmq              1/1     Running   0          4h19m

Create kafka topic

cat <<EOF | kubectl apply -f -
apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaTopic
metadata:
  name: my-topic
  labels:
    strimzi.io/cluster: my-cluster
spec:
  partitions: 100
  replicas: 1
EOF

Test to see if the topic was created correctly

oc exec -n kafka -it my-cluster-zookeeper-0 -- /bin/bash

bin/kafka-topics.sh --zookeeper localhost:12181 --list

bin/kafka-topics.sh --zookeeper localhost:12181 --describe --topic my-topic

OR

kubectl exec -n kafka -it my-cluster-zookeeper-0 -- bin/kafka-topics.sh --zookeeper localhost:12181 --describe --topic my-topic

OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
Topic:my-topic	PartitionCount:100	ReplicationFactor:1	Configs:message.format.version=2.3-IV1
	Topic: my-topic	Partition: 0	Leader: 2	Replicas: 2	Isr: 2
	Topic: my-topic	Partition: 1	Leader: 0	Replicas: 0	Isr: 0
	Topic: my-topic	Partition: 2	Leader: 1	Replicas: 1	Isr: 1
	Topic: my-topic	Partition: 3	Leader: 2	Replicas: 2	Isr: 2
	Topic: my-topic	Partition: 4	Leader: 0	Replicas: 0	Isr: 0
	Topic: my-topic	Partition: 5	Leader: 1	Replicas: 1	Isr: 1
	Topic: my-topic	Partition: 6	Leader: 2	Replicas: 2	Isr: 2
.
.
.

Deploy a Knative Service

This is your "sink" that receives events

cat <<EOF | kubectl apply -f -
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: myknativesink
spec:
  template:
    metadata:
      annotations:
        autoscaling.knative.dev/target: "1"
        autoscaling.knative.dev/window: 16s
    spec:
      containers:
      - image: docker.io/burrsutter/myknativesink:1.0.1
        resources:
          requests:
            memory: "50Mi"
            cpu: "100m"
          limits:
            memory: "70Mi"
            cpu: "100m"
        livenessProbe:
          httpGet:
            path: /healthz
        readinessProbe:
          httpGet:
            path: /healthz
EOF

If your pod is stuck in PENDING, check your events

kubectl get events --sort-by=.metadata.creationTimestamp

You likely need to add another worker node (OpenShift Console - Compute - MachineSets)

Machinesets

Create the KafkaSource that connects my-topic to ksvc

cat <<EOF | kubectl apply -f -
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
  name: mykafka-source
spec:
  consumerGroup: knative-group
  bootstrapServers:
   - my-cluster-kafka-bootstrap.kafka:9092
  topics:
   - my-topic
  sink:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: myknativesink
EOF

You can monitor the logs of kafkasource-mykafka-source to see if it has connectivity issues

stern kafkasource-mykafka-source

Test

Waiting
Sink pod is up
one more message

Scaling beyond 1 Pod

Kafka Spammer is a simple little application that drives in N messages as fast as it can.

Deploy

kubectl -n kafka run kafka-spammer \
--image=quay.io/rhdevelopers/kafkaspammer:1.0.2

Exec into the Spammer

KAFKA_SPAMMER_POD=$(kubectl -n kafka get pod -l "run=kafka-spammer" \
-o jsonpath='{.items[0].metadata.name}')
kubectl -n kafka exec -it $KAFKA_SPAMMER_POD -- /bin/sh
curl localhost:8080/1

Watch the Developer Topology view

Developer View
Developer View
Terminal View

Clean up

kubectl delete route kafka-producer
kubectl delete service kafka-producer
kubectl delete deployment kafka-producer
kubectl delete kafkasource mykafka-source
kubectl delete ksvc myknativesink
kubectl delete KafkaTopic my-topic
kubectl delete kafka my-cluster