Saturation in number of records produced towards kafka
Lidhoria opened this issue · 11 comments
Only a maximum of 70k records per second towards Kafka are produced as the JSON output thread reaches 99% of CPU utilization. and how can it be improved?
Hi,
can you describe your OS, startup configuration, etc.? How is the collector deployed? Does it run on, for example, the same server as Kafka?
By the way, this previous issue might help you. #20
Lukas
The OS we are using is CentOS.
The startup configuration is startup.xml.
Ipfixcol and Kafka are running on different machines i.e VM
We collect IPFIX data at ipfixcol2 on UDP and send it to Kafka in JSON format.
Can you be a little more specific? Based on the information you have provided, I have not learned much to help you.
a) By startup configuration, I meant its contents and in particular the configuration of the JSON output module. The name of the file is irrelevant.
b) Have you tried any of the procedures mentioned in the previously linked issue #20. If so, did any help and how much? What is your targeted speed at all?
c) If running in a virtual machine, how many cores does the machine with IPFIXcol2 have? In the case of a simple configuration (TCP/UDP -> JSON), at least 4-6 cores (threads) are required for optimal functionality.
Lukas
Does the JSON Output performance depend on number of ipfix elements defined in system and user directories of libfds? Our incoming flows also contain elements defined in subtemplate lists along with many ipfix non-standard elements. Does it increase the processing latency?
(A) <outputPlugins> <output> <name>JSON output</name> <plugin>json-kafka</plugin> <verbosity>info</verbosity> <params> <tcpFlags>formatted</tcpFlags> <timestamp>unix</timestamp> <protocol>raw</protocol> <ignoreUnknown>true</ignoreUnknown> <nonPrintableChar>false</nonPrintableChar> <detailedInfo>true</detailedInfo> <numericNames>false</numericNames> <splitBiflow>false</splitBiflow> <outputs> <kafka> <name>kafka</name> <brokers>broker1:9092,broker2:9092,broker3:9092</brokers> <blocking>false</blocking> <partition>unassigned</partition> <topic>ipfix-output</topic> <property> <key>compression.codec</key> <value>lz4</value> </property> <property> <key>linger.ms</key> <value>1000</value> </property> <property> <key>batch.num.messages</key> <value>1000000</value> </property> </kafka> </outputs> </params> </output> </outputPlugins>
(B)The previous issue did not help. We are sending at least half a million of flows to ipfixcol but JSON output is getting saturated.
(C) Our ipfixcol VM is configured with 72 cores.
The more complicated and structured an IPFIX record is, the longer it takes to convert and send it. Thus, parameters such as <numericNames>false</numericNames>
that shorten the JSON record should have a positive performance impact.
In your case, parallelizing the output data processing might help. The collector can differentiate probes by ODID (Observation Domain ID). So if you are able to differentiate probes by ODID, it is possible to create multiple instances of the output module, each of which processes only a portion of the traffic.
Let's say you have 3 probes where each has a different ODID 1-3. In that case, you can create up to 3 different instances of the JSON plugin, each handling traffic from one probe. The <odidOnly>
(see documentation) is used here:
<outputPlugins>
<output>
<name>JSON output 1</name>
<plugin>json</plugin>
<odidOnly>1</odidOnly> <!-- Only traffic from exporters with ODID 1 is process by the plugin -->
<params> ... </params>
</output>
<output>
<name>JSON output 2</name>
<plugin>json</plugin>
<odidOnly>2</odidOnly> <!-- Only traffic from exporters with ODID 2 is process by the plugin -->
<params> ... </params>
</output>
<output>
<name>JSON output 3</name>
<plugin>json</plugin>
<odidOnly>3</odidOnly> <!-- Only traffic from exporters with ODID 3 is process by the plugin -->
<params> ... </params>
</output>
</outputPlugins>
By the way, how many probes/exporters are you receiving data from? Do they have a different ODID configured or are you able to configure it?
Lukas
By the way, have you tried stress testing your Kafka cluster by inserting e.g. JSON records corresponding to the converted IPFIX records? What is its throughput, how many records per second did the cluster process and how did you measure the performance?
Lukas
We have done stress testing of kafka cluster with the same kind of json records and ingestion rate in kafka easily goes to millions of records per second.
The throughput stagnates at 70KPS even with separate odid-based multiple json outputs.
With dummy output plugin we got throughput of around 55-60KPS.
No of probes are more than 25.
Hi,
Is it possible to integrate the highly scalable user-level TCP stack on DPDK called mTCP for facilitating communication between ipfixcol and Kafka over TCP specifically for transmitting JSON records?
Hi,
the transfer of messages is performed by the librdkafka library. I am not sure whether it is possible to modify the data transfer method on the collector side. However, I believe this should be a last resort. I wonder how you came to the conclusion that TCP transfer is slowing down communication with the cluster?
I decided to try the current collector with conversion to Kafka according to the following steps:
- Install Java
sudo dnf install java-latest-openjdk
- Download Kafka
mkdir kafka && cd kafka
wget https://downloads.apache.org/kafka/3.5.0/kafka_2.13-3.5.0.tgz
tar -xzf kafka_2.13-3.5.0.tgz
cd kafka_2.13-3.5.0
- Run Zookeeper (default configuration)
bin/zookeeper-server-start.sh config/zookeeper.properties
- Run Kafka server in another terminal (default configuration)
bin/kafka-server-start.sh config/server.properties
- Create a topic
bin/kafka-topics.sh --create --topic ipfix --bootstrap-server localhost:9092
- Run IPFIXcol2 (configuration below)
ipfixcol2 -c tcp2kafka.xml
- Send IPFIX sample data in an infinite loop
ipfixsend2 -i ipfixcol2/doc/data/ipfix/example_flows.ipfix -t TCP
Everything was running on my work computer in a virtual machine with Oracle Linux 9. The computer itself doesn't have a particularly powerful processor (Intel Core i7-10700T, 35W TDP), but I managed to achieve a speed of 500k records/s as shown below.
...
INFO: JSON output: STATS: successful deliveries: 496388, failures: 0
INFO: JSON output: STATS: successful deliveries: 501710, failures: 0
INFO: JSON output: STATS: successful deliveries: 496391, failures: 0
INFO: JSON output: STATS: successful deliveries: 494617, failures: 0
INFO: JSON output: STATS: successful deliveries: 498162, failures: 0
INFO: JSON output: STATS: successful deliveries: 496393, failures: 0
INFO: JSON output: STATS: successful deliveries: 492843, failures: 0
INFO: JSON output: STATS: successful deliveries: 496390, failures: 0
INFO: JSON output: STATS: successful deliveries: 528300, failures: 0
INFO: JSON output: STATS: successful deliveries: 498162, failures: 0
...
Content of IPFIXcol2 configuration called `tcp2kafka.xml`
<ipfixcol2>
<!-- Input plugins -->
<inputPlugins>
<input>
<name>TCP collector</name>
<plugin>tcp</plugin>
<params>
<!-- List on port 4739 -->
<localPort>4739</localPort>
<!-- Bind to all local adresses -->
<localIPAddress></localIPAddress>
</params>
</input>
</inputPlugins>
<!-- Output plugins -->
<outputPlugins>
<output>
<name>JSON output</name>
<plugin>json</plugin>
<verbosity>info</verbosity>
<params>
<!-- JSON format paramters -->
<tcpFlags>formatted</tcpFlags>
<timestamp>formatted</timestamp>
<protocol>formatted</protocol>
<ignoreUnknown>true</ignoreUnknown>
<ignoreOptions>true</ignoreOptions>
<nonPrintableChar>true</nonPrintableChar>
<octetArrayAsUint>true</octetArrayAsUint>
<numericNames>false</numericNames>
<splitBiflow>false</splitBiflow>
<detailedInfo>false</detailedInfo>
<templateInfo>false</templateInfo>
<!-- Output methods -->
<outputs>
<kafka>
<name>Send to Kafka</name>
<brokers>127.0.0.1</brokers>
<topic>ipfix</topic>
<blocking>true</blocking>
<partition>unassigned</partition>
<!-- Zero or more additional properties -->
<property>
<key>compression.codec</key>
<value>lz4</value>
</property>
</kafka>
</outputs>
</params>
</output>
</outputPlugins>
</ipfixcol2>
I am therefore unable to reproduce your problem. Can you try to follow my steps and tell me what results you achieve?
We used the sample example_flows.ipfix with ipfixsend2 with Kafka cluster made up of VMs on different physical machines.
Jun 28 16:25:49 2023 pid[3637972]: INFO: JSON output: STATS: successful deliveries: 270688, failures: 0
Jun 28 16:25:50 2023 pid[3637972]: INFO: JSON output: STATS: successful deliveries: 316308, failures: 0
Jun 28 16:25:51 2023 pid[3637972]: INFO: JSON output: STATS: successful deliveries: 225064, failures: 0
Jun 28 16:25:52 2023 pid[3637972]: INFO: JSON output: STATS: successful deliveries: 215940, failures: 0
Jun 28 16:25:53 2023 pid[3637972]: INFO: JSON output: STATS: successful deliveries: 285895, failures: 0
Jun 28 16:25:54 2023 pid[3637972]: INFO: JSON output: STATS: successful deliveries: 325431, failures: 0
Jun 28 16:25:55 2023 pid[3637972]: INFO: JSON output: STATS: successful deliveries: 222025, failures: 0
Jun 28 16:25:56 2023 pid[3637972]: INFO: JSON output: STATS: successful deliveries: 225063, failures: 0
Jun 28 16:25:57 2023 pid[3637972]: INFO: JSON output: STATS: successful deliveries: 298061, failures: 0
Jun 28 16:25:58 2023 pid[3637972]: INFO: JSON output: STATS: successful deliveries: 249395, failures: 0
Jun 28 16:25:59 2023 pid[3637972]: INFO: JSON output: STATS: successful deliveries: 349765, failures: 0
Jun 28 16:26:00 2023 pid[3637972]: INFO: JSON output: STATS: successful deliveries: 307181, failures: 0
Jun 28 16:26:01 2023 pid[3637972]: INFO: JSON output: STATS: successful deliveries: 270687, failures: 0
Jun 28 16:26:02 2023 pid[3637972]: INFO: JSON output: STATS: successful deliveries: 191610, failures: 0
Jun 28 16:26:03 2023 pid[3637972]: INFO: JSON output: STATS: successful deliveries: 218983, failures: 0
Jun 28 16:26:04 2023 pid[3637972]: INFO: JSON output: STATS: successful deliveries: 212899, failures: 0
However when we run ipfixcol2 with actual ipfix udp ingress traffic which has got 100s of IEs with nested fields such as SubTemplateMultiList, subTemplateList we get throughput of 70K
Jun 28 16:52:00 2023 pid[3652480]: INFO: JSON output: STATS: successful deliveries: 59496, failures: 0
Jun 28 16:52:01 2023 pid[3652480]: INFO: JSON output: STATS: successful deliveries: 74911, failures: 0
Jun 28 16:52:02 2023 pid[3652480]: INFO: JSON output: STATS: successful deliveries: 57292, failures: 0
Jun 28 16:52:03 2023 pid[3652480]: INFO: JSON output: STATS: successful deliveries: 53844, failures: 0
Jun 28 16:52:04 2023 pid[3652480]: INFO: JSON output: STATS: successful deliveries: 52563, failures: 0
Jun 28 16:52:05 2023 pid[3652480]: INFO: JSON output: STATS: successful deliveries: 74751, failures: 0
Jun 28 16:52:06 2023 pid[3652480]: INFO: JSON output: STATS: successful deliveries: 67952, failures: 0
Jun 28 16:52:07 2023 pid[3652480]: INFO: JSON output: STATS: successful deliveries: 70969, failures: 0
Jun 28 16:52:08 2023 pid[3652480]: INFO: JSON output: STATS: successful deliveries: 72187, failures: 0
I suggest you try the following steps:
1) Increase the maximum system UDP buffer size
Since you are receiving very large packets over an unreliable protocol (UDP), you need to make sure that the maximum system network buffer size is large enough. In the default system configuration, it is usually too small for flow data reception. If a large amount of flow data arrives at once, the collector is not able to receive them all and some of them are discarded.
The UDP plugin documentation describes the procedure. Basically, you need to call the following command and possibly set this value permanently (not provided in the instructions) in the system so that it remains even after a system restart.
sysctl -w net.core.rmem_max=16777216
This command sets the maximum network buffer size to 16 MB. By the way, on my Oracle Linux 9 the default system configuration is only 0.25 MB, which is too small. You can check the actual value in your system e.g. using cat /proc/sys/net/core/rmem_max
.
Try making the above adjustment and run the throughput tests again. Maybe the problem is not on the transmitting side but on the receiving side of the collector.
2) Adjust the conversion of IPFIX records to JSON
Your flow records are really extremely long and untypical according to what you write. Using more than 100 IEs in one flow record is very unusual. Consider whether you really need to send all fields and possibly modify the exporter configuration.
On the collector side, you can only reduce the length of the resulting JSON by adjusting the formatting and leaving most of the fields in numeric form. Specifically, the following are relevant switches in the plugin configuration:
<tcpFlags>raw</tcpFlags>
<timestamp>unix</timestamp>
<protocol>raw</protocol>
<numericNames>true</numericNames>
If even this does not help, there is probably only one last option...
3) Run an independent collector for each probe
Ideally, create a custom XML collector configuration for each probe/exporter. In other words, if you have 25 probes/exporters, you will have 25 collector instances that will have almost identical configuration and will only differ in the UDP listening port.
You must modify on the exporter side the destination port to which it sends data. For example, the first exporter sends data to a collector with destination port 4000, the second exporter sends data to port 4001, etc.
<ipfixcol2>
<inputPlugins>
<input>
<name>UDP collector</name>
<plugin>udp</plugin>
<params>
<localPort>4000</localPort> <!-- e.g. 4000, 4001, 4002, 4003,... -->
<localIPAddress></localIPAddress>
</params>
</input>
</inputPlugins>
<outputPlugins>
<!-- same for all instances -->
</outputPlugins>
</ipfixcol2>
Try each step to determine the impact on the amount of records that are sent to the Kafka cluster. I will be glad to hear the results of your measurement. I hope I helped.
Lukas