snowflakedb/snowflake-kafka-connector

High lag with Snowflake Kafka Connector: Seeking advice on configuration tuning

Closed this issue ยท 13 comments

Hello team,

We are doing some load testing for our Kafka setup and seeing substantial lag in case when we are trying to send 2K events per sec to Kafka, avg message size is of 85kb. Seeing lag in range of 3 Million to 4 Million records.
So seeking advice on how to optimize our configuration to reduce this lag. Here are the details of our current setup:

Current Configuration:

connectReplica: "6"
buffer.count.records: "10000"
buffer.flush.time: "5"
buffer.size.bytes: "10000000"
offsetFlushIntervalMs: "10000"
tasksMax: "6"
snowflake.enable.schematization: "TRUE"
enable.streaming.client.optimization: "TRUE"
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable: false
value.converter.schemas.enable: false
consumerMaxPollRec: "3000"
snowflake.ingestion.method: SNOWPIPE_STREAMING

We are using strimzi kafka connect framework.
Initially we had 1 topic with 3 partitions. And 3 connect workers with one connect task each for 1K events per sec load testing.
As per my understanding, in order to scale better we can increase more partitions and then also the connect workers so that more workers can read in parallel.
So currently we have 6 partitions for that same topic and kafka connect has 6 worker pods with 6 tasks.
But that didn't seem to help with the lag.

I also have few doubts regarding some of the config params.
For these params, buffer.count.records, buffer.flush.time, buffer.size.bytes my understanding is that if even one of the params threshold is breached, that will trigger the connector to sink data to Snowflake.
So let's say if we are producing 2k records per second, then in 5 sec we would be having 10k records in Kafka topic.
If we set buffer.flush.time to 5 sec and buffer.count.records to 10K, theoretically the connector should consume all records?
But I'm not sure how often does the connector poll the kafka topic for getting new data, if it happens every 1 sec, then maybe we can set consumerMaxPollRec to 2k, so that we can pull 2k records every 1 sec. My guess is using consumerMaxPollRec param we can specify how many max records can be returned as part on 1 poll.
Post poll, the records are stored in buffer and will continue to live in buffer until one of the buffer thresholds is breached. Once threshold is breached, connector will sink the data, and commit the offset. And now the next poll will be called.
Is my understanding correct? Also would disabling enable.streaming.client.optimization help?

Hello @piyushbhojgude, thank you for using Snowflake Connector! Your load is pretty high ~166MB/s but we test it internally with even higher so it's mainly a matter of hardware.

Parallelization with tasksMax is fine as long as it corresponds to the number of CPUs used by Kafka Connect cluster. Do you have 6 available CPU cores and are they utilized in 100%?

Regarding the internal connector buffer - I don't think playing with that settings would affect throughput much. It affects latency and memory consumption but that's not really a problem here.

Regarding the last part - Connector itself does not poll the Kafka topic. It is done by Kafka Connect framework. Playing with consumerMaxPollRec might help but again check the resource utilization first. enable.streaming.client.optimization is recommended to be set off for high throughput scenarios but it shouldn't affects throughput.

Hi @sfc-gh-mbobowski , thanks for replying. So, I have modified the test setup a bit. As of now, I'm using topic with just 1 partition and setting tasksMax to 1.
My kafka connect pod doesn't have any requests or limits for now on both CPU and memory.
My current observations are as follows:
At any given time, the consumer is able to consume max ~ 300 records per second.
The lag keeps on increasing forever until the kafka topic (size and time) retention kicks in, which triggers kafka to delete the records, which in turn gave us an impression that the records were getting consumed. Whereas in reality the reduction in lag was due to Kafka deleting the records and not the consumer consuming the same.
Also, for such high load scenarios, I believe we would need to bump up the JVM options as well like Heap size?

In order to increase the connector performance you need to know what is the bottleneck first. It can be number of CPUs, a power of single CPU, heap size used by Kafka Connector or even a network throughput between the machines.

@piyushbhojgude a few thoughts in addition to what @sfc-gh-mbobowski had already shared.

If you're in the testing phase, I would try the same load without schematization. You can expect things to be slower with schematization enabled, but it's also going to be significantly different in performance if you have a few columns versus having a lot of columns ending up in a wide table.

Are you stuck with JSON messages, or can you switch to Avro? You should see significantly better performance with Avro. You will take a hit with using JSON, but generally speaking, 2k records per second isn't a very high rate of incoming volume, and processing 300 records per second feels kind of low to me.

It's impossible for anyone to say if you should or shouldn't bump up the maximum heap size for your JVM. When you're doing your tests, you need to monitor your JVM's heap usage and GC's performance. You could also just enable GC logging and analyze that after the fact. If you're heap usage is consistently high and close to the maximum heap size then that might be a sign that you want to increase the maximum heap size. You'll also want to make sure that you're using the G1GC garbage collector policy. That should be the default starting with JDK 9, but I have seen cases where users were on later JDK releases and were using something like CMS which was causing performance issues.

Hi @sfc-gh-wfateem and @sfc-gh-mbobowski ,
Looks like the issue was not setting the requests and limits for the kafka connect cluster.
I provided adequate CPU and memory resources for Kafka connect cluster by setting requests and limits. And I observed improvements in consumption upto 400 EPS for one pod. I believe that is the max that 1 pod could do.
So I went ahead and created 6 pods with 6 tasks and 6 partitions.
Post that I saw overall consumption increased to 2500 EPS.
Yeah we are currently going with JSON, but I'll keep this point in my mind. I also had the same thought regarding schematization. That's good to know.

Hey @sfc-gh-wfateem and @sfc-gh-mbobowski ,
I believe having more workers than partitions doesn't help in scaling or increasing consumer throughput. i.e let's say if I have 6 partitions and 12 workers, is it true that 6 workers would be idle while 6 would be running. Given that we do have 12 CPUs available. Is my understanding correct?

And lastly, as of now while doing the test of 2k EPS with 85kb event size, the max one consumer worker can consume is around 300 EPS. I see the worker using ~2.5 CPU. Is this the max one worker could do, or is there any way to further improve this throughput. My worry is if this is the max, then for testing 10k EPS, we may need to spin up 30 consumer pods and have 30 partitions on the topic.

Also during one of the test runs, I observed that couple of worker pods were not consuming fast enough, which lead to overall consumer lag. For example if you see below image, we have two partitions with almost 195k Lag out of the total 205k lag. Wondering what could cause this, since each worker has the same configuration.

@piyushbhojgude the term "worker" can mean different things, for example, it can mean a Kafka Connect node which can in return run multiple tasks depending on how many were assigned to it. I'm going to use the term "worker" to mean a single task. In Kafka Connect, your sink task acts as a Kafka consumer, so given you have N partitions it only makes sense to have M tasks such that M <= N. Any consumer starting from N+1 in the same consumer group is going to be idle.

It's difficult to say why you're only seeing a consumption rate of 300 messages per second. My suggestion is that you run a load test without schematization to verify the consumption rate.

Also hard to say why two of your tasks lagged behind. You'll first want to differentiate if they're just slow or if they're not making any progress at all. You can look at the consumer offset for those two partitions and see if they're progressing or not. You will then want to look at the logs to see what those two tasks are reporting at the time when the lag started to increase. If they're not making any progress at all then you should get several Java thread dumps and look at how the sink task threads are doing over time.

Hey @sfc-gh-wfateem , thanks for the confirmation regarding the task and partitions.
I had a look at those 2 tasks, and they were consuming the records. So it wasn't like they were not making any progress. It's just that compared to other tasks they were not consuming records fast enough. For other tasks I do see the Successfully called insertRows logs every 2 sec, whereas for these two tasks I see them 3 secs apart.
Sure, I'll try without schematization as well.

@piyushbhojgude I would look at the performance of the garbage collector on the JVM hosting the straggling tasks.
We have seen cases where users are on an older garbage collection policy and have seen improvement once they switch to G1GC.

I would expect to see further improvements with some of the newer GC policies that were released with the newer JDKs, for example, ZGC is another one to try if you're on JDK 15 or higher.

Hi @sfc-gh-wfateem , thanks for the update. We are already using G1GC.
Will try out ZGC !

Hey @piyushbhojgude,

Just checking in to see if you still need any help here.

Hey @sfc-gh-wfateem, thanks for all the inputs, I'm good for now. Will close the issue.