Knative + Kafka
Prerequisites
Assumes you have used try.openshift.com to create an OCP 4.2 cluster. Here is a quick video that illustrates that process.
CLI tools used:
-
kubectl
-
oc
-
jq
-
kafkacat
-
siege
-
watch
Installation
Using the OCP 4.x Administration Console - find OperatorHub and install
-
Knative Serving
-
Knative Eventing
-
Knative Kafka
-
Strimzi
You can check on your installed operators and their versions:
kubectl get csv NAME DISPLAY VERSION REPLACES PHASE knative-eventing-operator.v0.7.1 Knative Eventing Operator 0.7.1 knative-eventing-operator.v0.6.0 Succeeded knative-kafka-operator.v0.7.1 Knative Apache Kafka Operator 0.7.1 knative-kafka-operator.v0.6.0 Succeeded knative-serving-operator.v0.7.1 Knative Serving Operator 0.7.1 knative-serving-operator.v0.6.0 Succeeded strimzi-cluster-operator.v0.13.0 Strimzi Apache Kafka Operator 0.13.0 strimzi-cluster-operator.v0.12.2 Succeeded
Note
|
I have also used the following versions - OpenShift Serverless pulls in ElasticSearch, Jaeger, Kiali |
kubectl get csv NAME DISPLAY VERSION REPLACES PHASE elasticsearch-operator.4.3.1-202002032140 Elasticsearch Operator 4.3.1-202002032140 Succeeded jaeger-operator.v1.13.1 Jaeger Operator 1.13.1 Succeeded kiali-operator.v1.0.9 Kiali Operator 1.0.9 kiali-operator.v1.0.8 Succeeded knative-eventing-operator.v0.12.0 Knative Eventing Operator 0.12.0 knative-eventing-operator.v0.11.0 Succeeded knative-kafka-operator.v0.12.1 Knative Apache Kafka Operator 0.12.1 knative-kafka-operator.v0.11.2 Succeeded serverless-operator.v1.4.1 OpenShift Serverless Operator 1.4.1 serverless-operator.v1.4.0 Succeeded servicemeshoperator.v1.0.7 Red Hat OpenShift Service Mesh 1.0.7 servicemeshoperator.v1.0.6 Succeeded strimzi-cluster-operator.v0.15.0 Strimzi 0.15.0 strimzi-cluster-operator.v0.14.0 Succeeded
Namespace/Project Setup
kubectl create namespace kafka
# make it "sticky"
kubectl config set-context --current --namespace=kakfa
# check that it is set
kubectl config current-context
# or use "oc" to see what the "sticky" namespace is
oc project
Create kafka cluster
cat <<EOF | kubectl apply -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster
spec:
kafka:
config:
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
default.replication.factor: 3
min.insync.replicas: 2
inter.broker.protocol.version: '3.1'
storage:
type: ephemeral
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
version: 3.1.0
replicas: 3
entityOperator:
topicOperator: {}
userOperator: {}
zookeeper:
storage:
type: ephemeral
replicas: 3
EOF
Configure the Knative Eventing Kafka
Note: this only needs to be done one time
apiVersion: operator.serverless.openshift.io/v1alpha1
kind: KnativeKafka
metadata:
finalizers:
- knative-kafka-openshift
name: knative-kafka
namespace: knative-eventing
spec:
broker:
defaultConfig:
authSecretName: ''
bootstrapServers: 'my-cluster-kafka-bootstrap.kafka:9092'
numPartitions: 10
replicationFactor: 3
enabled: true
channel:
authSecretName: ''
authSecretNamespace: ''
bootstrapServers: 'my-cluster-kafka-bootstrap.kafka:9092'
enabled: false
high-availability:
replicas: 1
sink:
enabled: false
source:
enabled: true
Note: the namespace of "kafka" likely
Verify the KnativeEventingKafka took affect
kubectl get crds | grep kafkasource
kafkasources.sources.eventing.knative.dev 2019-09-21T14:23:14Z
and
kubectl get pods -n knative-eventing
NAME READY STATUS RESTARTS AGE
broker-controller-66f988fb6c-6wk4t 1/1 Running 0 20h
eventing-controller-5c955d4694-btwx8 1/1 Running 0 20h
eventing-webhook-7f7bcb8447-27p9s 1/1 Running 0 20h
imc-controller-6ddf4477fd-bjjhh 1/1 Running 0 20h
imc-dispatcher-7676c44559-wzxg4 1/1 Running 0 20h
kafka-ch-controller-5497f498dc-vm8x7 1/1 Running 0 4h19m
kafka-controller-manager-544887898b-j654v 1/1 Running 0 4h20m
kafka-webhook-65d8bb899c-6nsmq 1/1 Running 0 4h19m
Create kafka topic
cat <<EOF | kubectl apply -f -
apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaTopic
metadata:
name: my-topic
labels:
strimzi.io/cluster: my-cluster
spec:
partitions: 100
replicas: 1
EOF
Test to see if the topic was created correctly
oc exec -n kafka -it my-cluster-zookeeper-0 -- /bin/bash
bin/kafka-topics.sh --zookeeper localhost:12181 --list
bin/kafka-topics.sh --zookeeper localhost:12181 --describe --topic my-topic
OR
kubectl exec -n kafka -it my-cluster-zookeeper-0 -- bin/kafka-topics.sh --zookeeper localhost:12181 --describe --topic my-topic
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
Topic:my-topic PartitionCount:100 ReplicationFactor:1 Configs:message.format.version=2.3-IV1
Topic: my-topic Partition: 0 Leader: 2 Replicas: 2 Isr: 2
Topic: my-topic Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: my-topic Partition: 2 Leader: 1 Replicas: 1 Isr: 1
Topic: my-topic Partition: 3 Leader: 2 Replicas: 2 Isr: 2
Topic: my-topic Partition: 4 Leader: 0 Replicas: 0 Isr: 0
Topic: my-topic Partition: 5 Leader: 1 Replicas: 1 Isr: 1
Topic: my-topic Partition: 6 Leader: 2 Replicas: 2 Isr: 2
.
.
.
Deploy a Knative Service
This is your "sink" that receives events
cat <<EOF | kubectl apply -f -
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: myknativesink
spec:
template:
metadata:
annotations:
autoscaling.knative.dev/target: "1"
autoscaling.knative.dev/window: 16s
spec:
containers:
- image: docker.io/burrsutter/myknativesink:1.0.1
resources:
requests:
memory: "50Mi"
cpu: "100m"
limits:
memory: "70Mi"
cpu: "100m"
livenessProbe:
httpGet:
path: /healthz
readinessProbe:
httpGet:
path: /healthz
EOF
If your pod is stuck in PENDING, check your events
kubectl get events --sort-by=.metadata.creationTimestamp
You likely need to add another worker node (OpenShift Console - Compute - MachineSets)
Create the KafkaSource that connects my-topic to ksvc
cat <<EOF | kubectl apply -f -
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
name: mykafka-source
spec:
consumerGroup: knative-group
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092
topics:
- my-topic
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: myknativesink
EOF
You can monitor the logs of kafkasource-mykafka-source to see if it has connectivity issues
stern kafkasource-mykafka-source
Scaling beyond 1 Pod
Kafka Spammer is a simple little application that drives in N messages as fast as it can.
Deploy
kubectl -n kafka run kafka-spammer \ --image=quay.io/rhdevelopers/kafkaspammer:1.0.2
Exec into the Spammer
KAFKA_SPAMMER_POD=$(kubectl -n kafka get pod -l "run=kafka-spammer" \ -o jsonpath='{.items[0].metadata.name}') kubectl -n kafka exec -it $KAFKA_SPAMMER_POD -- /bin/sh
curl localhost:8080/1
Watch the Developer Topology view
Clean up
kubectl delete route kafka-producer
kubectl delete service kafka-producer
kubectl delete deployment kafka-producer
kubectl delete kafkasource mykafka-source
kubectl delete ksvc myknativesink
kubectl delete KafkaTopic my-topic
kubectl delete kafka my-cluster