Kafka operator is a process that automatically manages creation and deletion of kafka topics, their number of partitions, replicas as well as properties.
Operator monitors ConfigMap Kubernetes resources that are tagged with config=kafka-topic
label. From those ConfigMaps, operator extracts information
about kafka topic to create/update. This check is done periodically, and the period by default is every 60s.
Start kafka operator within a project. The simplest way to do it in OpenShift is as follows:
oc process -f src/main/openshift/kafka-operator-template.yaml | oc create -f -
If your zookeeper instance is not available on zookeeper:2181
you should specify it when using oc process
:
oc process -v ZOOKEEPER_BOOTSTRAP=yourzookeeperaddressandport -f src/main/openshift/kafka-operator-template.yaml | oc create -f -`.
Kubernetes should be configured to have only one instance of the operator running. Having several instances should not cause an issue, but will result in useless interactions with zookeeper and kafka cluster.
Operator must run under service account that that view rights on ConfigMaps in project where it runs. Zookeeper and Kafka clusters must be accessible from operator's namespace.
Operator is configured using environment variables or environment variables:
Environment variable | System property | Description | Default value |
---|---|---|---|
ZOOKEEPER_BOOTSTRAP | zookeeper.bootstrap | Address of zookeeper cluster | zookeeper:2181 |
DEFAULT_REPL_FACTOR | default.repl.factor | Default replication factor to use when one is not specified in ConfigMap | 2 |
REFRESH_INTERVAL | refresh.interval | Interval in seconds between two actions of the operator | 60 |
ENABLE_TOPIC_DELETE | enable.topic.delete | If set to true activates topic delete functionality | false |
IMPORT_TOPICS | import.topics | If set to true existing topics in kafka cluster will be imported and will be managed by operator | false |
Following environment variables also influence behavior, but can't be set using openshift template as it is:
Environment variable | System property | Description | Default value |
---|---|---|---|
JMX_METRICS | jmx.metrics | If set to true metrics are exposed as JMX beans | true |
LOG_METRICS | log.metrics | If set to true activates logging metrics to console | false |
LOG_METRICS_INTERVAL | log.metrics.interval | Interval in seconds of logging metrics to console | 60 |
OPENTRACING_ENABLED | opentracing.enabled | Activates open-tracing experimental feature. | false |
Define a ConfigMap resource within a project where the operator is running. For example create file my-kafka-topic.yaml
with following content:
apiVersion: v1
kind: ConfigMap
metadata:
name: my-kafka-topic
labels:
config: kafka-topic
data:
num.partitions: 20
repl.factor: 3
properties: |
retention.ms=1000000
Note that the name of file is not important. Once the file has been created, create the resource. For example, in openshift, do:
oc create -f my-kafka-topic.yaml
During next periodic check, the operator will read the ConfigMap and create a topic called my-kafka-topic
with 20 partitions, replication factor of 3 and setting topic's retention.ms
property to 1000000. Note that properties are represented as single ConfigMap key. If kafka topic contains capital characters or underscores, you can use name
key in ConfigMap's data
to specify correct name:
apiVersion: v1
kind: ConfigMap
metadata:
name: underscore-kafka-topic
labels:
config: kafka-topic
data:
name: Underscore_Kafka_Topic
num.partitions: 20
repl.factor: 3
properties: |
retention.ms=1000000
Kafka operator only manages topics that are defined in ConfigMaps. If a ConfigMap for a topic is deleted, the operator will no longer manage it. On the other hand, if a new ConfigMap is created for an already existing topic, the operator will start managing it.
Default behavior:
- if
num.partitions
is not provided, the number of partitions will be set to number of brokers - if
repl.factor
is not provided, theDEFAULT_REPL_FACTOR
value is used.
The operator will not accept ConfigMaps that modify topics starting with double underscore __
. Those are considered internal kafka topics.
An existing kafka topic can be modified by updating its ConfigMap. For example we can modify number accepted compression method by updating our ConfigMap to:
apiVersion: v1
kind: ConfigMap
metadata:
name: my-kafka-topic
labels:
config: kafka-topic
data:
num.partitions: 20
repl.factor: 3
properties: |
retention.ms=1000000
compression.type=producer
Known limitations:
- number of replicas can only be increased
- modifying replication factor is not supported at the moment
To enable deletion of existing kafka topics, the operator must be run with ENABLE_TOPIC_DELETE
set to true
and topic deletion must be enabled in kafka cluster. If that is the case, a topic can be deleted by setting its ConfigMap's annotation alpha.topic.kafka.nb/deleted
to true
. For example to delete the topic we have defined, change its ConfigMap as follows:
apiVersion: v1
kind: ConfigMap
metadata:
name: my-kafka-topic
labels:
config: kafka-topic
annotations:
alpha.topic.kafka.nb/deleted: 'true'
data:
num.partitions: 20
repl.factor: 3
properties: |
retention.ms=1000000
During next periodic check, the operator will delete the specified topic. Note that as with all other operations, this cannot be undone.
The operator can import existing kafka topics from brokers and start managing them. This is done by starting operator with environment variable IMPORT_TOPICS
or system property import.topics
set to true
. This will import at startup all existing topics that don't start with double underscore (__
). For each one, operator creates a ConfigMap with name that is same as the name of topic (with underscores replaced with dashes), and data content containing true name, number of partitions, replication factor and properties if any has been set. Its annotation alpha.topic.kafka.nb/generated
is set the time of importing.
- HOSA Metrics creation
- Kafka cluster creation
- Investigate using watch instead of poll