Yolean/kubernetes-kafka

Regular broker crashes since 5.0

flqw opened this issue · 19 comments

flqw commented

I'm having some serious issues with my kafka cluster since upgrading to 5.0 / Kafka 2.1.0. The cluster ran completely stable for the last 6 months, but since the upgrade it crashes every few days in regular operation and within a few minutes if i put heavy load on it. Unfortunately i can't downgrade as the kafka upgrade guide states there were some incompatible changes to the internal consumer offsets schema. I also have a hard time tracing the issue, but it always has the same symptoms:

The consumers/producers in all my client services suddenly produce a lot of errors like

Got error produce response with correlation id 3827806 on topic-partition ..., retrying (2147483646 attempts left). Error: NETWORK_EXCEPTION``

and

Failed to commit stream task 1_15 due to the following error: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before successfully committing offsets {...=OffsetAndMetadata{offset=28547576, leaderEpoch=null, metadata=''}}

while that happens, the broker logs show that the connection to a certain node fails (the node is always different):

WARN [Controller id=0, targetBrokerId=0] Connection to node 0 (kafka-0.broker.kafka.svc.cluster.local/10.24.6.64:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
 WARN [ReplicaFetcher replicaId=0, leaderId=4, fetcherId=3] Error in response for fetch request...

while the affected node (in this case node 0) logs the following:

java.lang.IllegalStateException: No entry found for connection X at 
org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:330) at 
org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:134) at 
org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:885) at 
org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:276) at 
org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:64) at 
kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:92) at 
kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:190) at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:241) at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:130) at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:129) at scala.Option.foreach(Option.scala:257) at 
kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129) at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)

The only solution i found so far is to stop all my services, scale down the brokers to zero and back up again, effictively restarting everything.

When starting back up, the affected node also restores a ton of corrupted index files (...Found a corrupted index file corresponding to log file...). The other nodes start up normally.

The cluster consists of 5 brokers and runs on GCE. I also tried up/downgrading the kubernetes versions but that didn't help, so my guess its either an issue with kafka 2.1.0 itself or has something to do with the upgrade to java 11. Any ideas?

Thanks.

This indeed sounds serious. I agree it's unlikely to depend on the kubernetes version. What was the kubernetes-kafka version you were running before upgrade?

It's my understanding that Kafka 2.1 was tested with Java 11 before release. I didn't see any info on which Java 11 build though.

I'd really like to know more about this issue. How about building a new Kafka 2.1.0 image on Java 8, and just switch image. I suppose that even if you can't downgrade Kafka due to persistence formats you can switch to kafka running on different JVMs.

How much generic searching, outside this repo, have you done for this type of kafka error?

Got one more idea, but you might have tried it already. Even if 2.1.0 was tested with Java 11, maybe it wasn't tested with docker resource limits. Have you tried removing the resource limits? Maybe hitting those can result in different JVM behavior than regular memory shortage.

Except the memory limits this setup shouldn't be so different from running Kafka on Debian virtual machines. I'm curious if you've tried to find help in the wider Kafka community.

Possibly relevant: what type of persistent volumes do you have?

flqw commented

I’m using standard hdd pv’s through the automatic provisioning by GKE, 1TB per node. I think I already tried without resource limits, but the nodes never use more than 1.5gigs of ram, even when giving them a lot more to play with. I’ll try with a java 8 based image and report back as soon as i know more, thanks for suggestions.

I actually upgraded from 2.0 to 2.1 using this repo's v5.0.3 tag. I can see in my logs too that I have

[2019-01-09 10:32:13,241] WARN [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Error when sending leader epoch request for Map(dc.3.metrics.json.v1-0 -> org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest$PartitionData@da44319) (kafka.server.ReplicaFetcherThread)
java.lang.IllegalStateException: No entry found for connection 0
	at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:330)
	at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:134)
	at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:885)
	at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:276)
	at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:64)
	at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:92)
	at kafka.server.ReplicaFetcherThread.fetchEpochsFromLeader(ReplicaFetcherThread.scala:314)
	at kafka.server.AbstractFetcherThread.maybeTruncate(AbstractFetcherThread.scala:179)
	at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:110)
	at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)

The timing of these logs (2019-01-09 10:32:13), and that you @flqw say the error stopped when you effectively restarted the entire cluster, makes me think if you did a restart as a part of the upgrade?

I followed the rolling upgrade steps found here, which I interpreted as requiring at least 2 full restarts of the brokers. One when setting the inter.broker.protocol.version to the current running version (2.0 for me) but with the new image (provided by this repo), and one when setting the inter.broker.protocol.version to the new version (2.1).

I haven't seen any of these errors after my second restart, or in other words: I haven't seen these errors in the logs after I completely finished the upgrade. Disclaimer that I have only run 2.1 for approx 2 days and I wouldn't say at all that I have high loads right now.

I should also point out, given @solsson's comment that I do not have any resource limits on my brokers! Lots of monitoring though 😉

flqw commented

Yep of course i did restart. The crashes happen frequently since i did the upgrade. It seems like it's some issue that happens after a certain amount of messages/data is sent to the brokers, as it happens maybe every few days in regular operation with varying amounts of data but can be provoked when starting heavy workloads. I just started a heavy workload test on a java8 based image i forked from @solsson.

I think I already tried without resource limits, but the nodes never use more than 1.5gigs of ram, even when giving them a lot more to play with.

I don't understand this comment.

I just started a heavy workload test on a java8 based image

If you get positive results I'd still like to see if the original image without any resource requests+limits makes any difference. Reverting to Java 8 sort of defeats the purpose of the quite early upgrade to Kafka 2.1.0, and we might not be able to reproduce the issue later.

The link to the java8 based image didn't work. I assume it's https://github.com/flqw/dockerfiles/commit/d49c0e97fcec2081654dad1b4d1713b44d9af553.

flqw commented

Yes thats the change.

I think I already tried without resource limits, but the nodes never use more than 1.5gigs of ram, even when giving them a lot more to play with.

I don't understand this comment.

I don't see how its a memory issue because the reported usage of the container never goes above 1.5gigs of ram, thats with java8, without resource limits and with a memory limit of 8gigs.

@flqw A question about your traffic: When you say heavy load, is that both consumer and producer or just one of them?

flqw commented

It's both, but more consumer-heavy than producing. A typical scenario is about 60MB/s read and 10MB/s write per node so 300MB/s read and 60MB/s write in total.

So far the java 8 based nodes are running without any hickups in regular operation and a few high load scenarios.

I think this finding should be reported as kafka issue. Did you do any searches for this error outside the kubernetes space?

flqw commented

I had another crash with the java8 image, same order of events. I found a readiness probe failed on one of the zookeeper nodes before the crash, so i also switched the zk nodes to java 8, maybe there's an issue with zk and java 11? If it reoccurs i'll report a kafka issue.

Did you find any interesting logs on the failing zk node?

flqw commented

I guess i found the issue, at least everything is running normally again. After some longer debugging session i found that the zookeeper latencies (mntr) were insanely high (avgs around 1sec, max beyond 30sec) and traced the issue back to disk performance.

GCE disks come with increasing performance by disk size. For a regional ssd as provisioned by GKE through the zookeeper storage class at 1GB of storage this is according to the docs a whopping 30 IOPS and 0,48MB/s of throughput. This just didn't cut it anymore for my workload. Although not needed storage-wise I increased the zookeeper disks to 50gigs giving me 1500 IOPS and 24MB/s of throughput. After that the latency avgs dropped to 0 and everything behaved way better (shutting down a broker node didn't crash everything anymore, but did a leader re-election and distributed the load to the other brokers as it should).

Maybe that should be mentioned somewhere around the zookeeper service as it just defaults to 1GB.

Great find @flqw. Did you find messages in the zookeeper logs that others can look for? Would be quite valuable if we try to document the caveat.

flqw commented

Key indicator should be the latency stats from the zookeeper mntr, but i also found some related logs that look like this:

WARN fsync-ing the write ahead log in SyncThread:4 took 1024ms 
which will adversely effect operation latency. See the ZooKeeper
troubleshooting guide (org.apache.zookeeper.server.persistence.FileTxnLog)

I think that instead of documenting stuff (Who reads docs before they learned the hard way that they should have? I don't.) we could work on a PR with some tooling to help load testing. Do you have advice on how to add load on zookeeper? I'm guessing that simply writing more to Kafka won't always convert to zk load? A complementing strategy I could think of is to patch on a sidecar that produces dummy io to the data volume, to slow it down. One advantage could be that we'd be able to test/break zoo and pzoo independently. Alternatively use zk clients to stress one zk pod at a time.