
Kafka Driver for WinCC OA (WinCCOA)

Contact email: hse-cen-co@cern.ch

REMUS WinCC OA Kafka Driver

1. Description

This is a generic WinCC OA driver used to stream and/or ingest data via Kafka. The user has the following possibilites:

  • Run a producer
  • Run one or several consumers (one consumer per topic)
  • Run both a producer and/or consumer(s)

2. Libraries

  • C++11 STL
  • WinCC OA API libraries
  • librdkafka under BSD2 (available as git submodule. Our working version commit is @ 8695b9d6)
  • cppkafka under BSD2 (available as git submodule. Our working version commit is @ df4eaa07)
  • cyrus-sasl-gssapi
  • boost
  • cmake
  • openssl-dev

2.1 Installation of libraries

librdkafka and cppkafka are open source libraries, available on Github. They are both built from source and added to this project as git sub-modules.

After cloning this repository run

make installLibs

This will perform the following:

1. Retrieve the `librdkafka` and `cppkafka` git submodules in the `libs` folder
2. Install `boost`, `cyrus-sasl-gssapi`, `openssl-dev` and `cmake` libraries (if needed)
3. Launch the build&install process for `librdkafka`
4. Launch the build&install process for `cppkafka`
NOTE: you need sudo rights

3. Compilation:

Project has a Makefile. Note that you can also set the PVSS_PROJ_PATH environment variable beforehand.

3.1 To build the driver


3.2 To install

make install PVSS_PROJ_PATH=<path_to_pvss_project_bin_folder>

3.3 Run

It can be run from the WinCCOA Console or from command line and it will require the config.kafka file:

./WINCCOAkafkaDrv -num <driver_number> -proj <project_name> +config config.kafka

4. Config file

The config.kafka file has to be present under the WinCCOA project folder config.

Here, producer and consumer configs can be specified (based on your own needs). These will be parsed by the driver at startup thanks to the PRODUCER.CONFIG. keyword for producer and the CONSUMER.CONFIG. keyword for the consumer(s).

Here is an example config file:

PRODUCER.CONFIG.metadata.broker.list = dbnile-kafka-a-8.cern.ch:9093,dbnile-kafka-b-8.cern.ch:9093,dbnile-kafka-c-8.cern.ch:9093
PRODUCER.CONFIG.security.protocol = SASL_SSL
PRODUCER.CONFIG.sasl.kerberos.service.name = kafka
PRODUCER.CONFIG.sasl.kerberos.principal = asavules
PRODUCER.CONFIG.sasl.kerberos.keytab = /home/asavules/Workspace/as.keytab
PRODUCER.CONFIG.group.id = test-consumer-group
PRODUCER.CONFIG.statistics.interval.ms = 60000
CONSUMER.CONFIG.metadata.broker.list = dbnile-kafka-a-8.cern.ch:9093,dbnile-kafka-b-8.cern.ch:9093,dbnile-kafka-c-8.cern.ch:9093
CONSUMER.CONFIG.security.protocol = SASL_SSL
CONSUMER.CONFIG.sasl.kerberos.service.name = kafka
CONSUMER.CONFIG.sasl.kerberos.principal = asavules
CONSUMER.CONFIG.sasl.kerberos.keytab = /home/asavules/Workspace/as.keytab
CONSUMER.CONFIG.statistics.interval.ms = 60000
CONSUMER.CONFIG.group.id = remustest_meas_consumer
CONSUMER.CONFIG.enable.auto.commit = true
CONSUMER.CONFIG.auto.offset.reset = latest

In this config file you can add other producer or consumer configuration entries(see Kafka Official Documentation).

4.1 Keytab (CERN Only)

To be able to access kafka, you will need to create a keytab.

Please refer to official user guide, section How to generate a keytab file: CERN Kafka Official User Guide

For example with NICE user jdoe:

    cern-get-keytab --user --keytab jdoe.keytab --login jdoe

Edit the config/config.kafka file in your WinCC OA project and update the CONSUMER.CONFIG.sasl.kerberos.keytab and/or PRODUCER.CONFIG.sasl.kerberos.keytab entry to point to your keytab file + the CONSUMER.CONFIG.sasl.kerberos.principal and/or PRODUCER.CONFIG.sasl.kerberos.principal to your NICE username.

For example:

CONSUMER.CONFIG.metadata.broker.list = dbnile-kafka-a-8.cern.ch:9093,dbnile-kafka-b-8.cern.ch:9093,dbnile-kafka-c-8.cern.ch:9093
CONSUMER.CONFIG.security.protocol = SASL_SSL
CONSUMER.CONFIG.sasl.kerberos.service.name = kafka
CONSUMER.CONFIG.sasl.kerberos.principal = jdoe
CONSUMER.CONFIG.sasl.kerberos.keytab = /home/jdoe/Workspace/jdoe.keytab
CONSUMER.CONFIG.statistics.interval.ms = 60000
CONSUMER.CONFIG.group.id = remustest_demo_consumer
CONSUMER.CONFIG.enable.auto.commit = true

5. WinCC OA Installation

Under the winccoa folder you will find the following files that you need to copy to your project in the corresponding paths:

  • dplist/kafka_driver_config.dpl : it contains internal driver & CONFIG_KAFKA DPs. Once you've successfully launched the driver in the WinCC project manangement, you can import it via the ASCII Manager(refer to the official WinCC OA Documentation).


* This is a specific dump of the REMUS redundant project DPs 
* The internal driver number in the dump is 14. If it's unavailable to you, try to modify the dump file directly. 

See 6.3 Driver configuration section for a brief descprition of relevant CONFIG_KAFKA DPEs.

6. Kafka Driver Technical Documentation

6.1 Main entry points

After the driver startup, the main entry points in the driver are:

  • kafkaHwService::writeData() -> WinCC to Driver communication

    This is how the kafka streaming is performed. Thanks to the addressing <TOPIC>$<KEY>[$<DEBOUNCING_TIMEFRAME>], the driver will be able to stream to the right topic.

  • kafkaHwService::workProc() -> Driver to WinCC communication

    This is how we push data to WinCC from Kafka. Thanks to the addressing <TOPIC>$<KEY>,the driver will be able to map the data ingested from the respective Kafka topic to the WinCC DPE.

Please refer to the WinCC documentation for more information on the WinCC OA API. For more info on the debouncing, see Remus RealTime Evolution - KAFKA presentation.

6.2 Addressing DPEs with the Kafka driver

6.2.1 Data Types

When the kafka driver pushes a DPE value to WinCC, a transformation takes place. See Transformations folder. We are currently supporting the following data types for the periphery address:

WinCC DataType Transformation class Periphery data type value
bool kafkaBoolTrans.cxx 1000 (TransUserType def in WinCC OA API)
uint8 kafkaUint8Trans.cxx 1001 (TransUserType + 1)
int32 kafkaInt32Trans.cxx 1002 (TransUserType + 2)
int64 kafkaInt64Trans.cxx 1003 (TransUserType + 3)
float kafkaFloatTrans.cxx 1004 (TransUserType + 4)
string kafkaStringTrans.cxx 1005 (TransUserType + 5)
time kafkaTimeTrans.cxx 1006 (TransUserType + 6)

6.2.2 For streaming

The periphery address data type has to be set to string (1005).

The direction in the periphery has to be set to OUT. The addressing is generic: <TOPIC>$<KEY>[$<DEBOUNCING_TIMEFRAME>]. The $DEBOUNCING_TIMEFRAME is optional and if not set then it is 0 by default.

  • Example 1


    Gives us :

    • The kafka topic: remustest_demo
    • The key key: MeasurementJSON123
    • Debouncing timeframe: 200 milliseconds
  • Example 2


    Gives us :

    • The kafka topic: remustest_demo
    • The key key: MeasurementJSON123
    • No debouncing timeframe

For more info on the addressing, see Remus RealTime Evolution - KAFKA presentation.

6.2.3 For data ingestion

The direction in the periphery address has to be set to IN. The addressing is generic: <TOPIC>$<KEY>.

For example


Gives us :
* The kafka topic: remustest_demo
* The key key: MeasurementJSON123

There is a ready-to-launch kafka consumer demo project available with full instructions here:

6.2.4 Adding a new transformation

To add a new transformation you need to do the following:

  • create a define in kafkaHWMappeer.hxx

      #define kafkaDrvDoubleTransType (TransUserType + 7)
  • handle the new transformation type in kafkaHWMapper::addDpPa()

  • implement the transformation type class. The important functions here are

    • ::toPeriph(...) for WinCC OA to Kafka driver transformation
    • ::toVar(...) for Kafka driver to WinCC OA transformation

6.3 Driver Configuration

Available via the WinCC OA CONFIG_KAFKA DataPoint, we have the following

Config DPE Direction Addressing Type Description
DebugLvl OUT DEBUGLVL INT32 Debug Level for logging. You can use this to debug issues. (default 0)
DebouncingThreadInterval OUT DEBOUNCINGTHREADINTERVAL INT32 In milliseconds. The sleep interval for the debouncing thread (default 50 ms)
MaxPollRecords OUT MAXPOLLRECORDS INT32 MAximum number of records should the consumer poll retrieve (default 1000)
IN.ProducerStatsDP IN PRODUCER_STATISTICS STRING This is where the driver periodically pushes statistics from kafka producer (json)
IN.ProducerAllBrokersDown IN PRODUCER_ALL_BROKERS_DOWN BOOL Set by the when all brokers are down for the producer.
IN.ConsumerStatsDP IN CONSUMER_STATISTICS STRING This is where the driver periodically pushes statistics from kafka consumer (json)

