Test failures for console producer/consumer with min.insync.replicas=2
Closed this issue · 40 comments
Running the Kafka tests this one do not complete
NAME READY STATUS RESTARTS AGE
kafkacat-4kvj4 3/3 Running 0 1h
produce-consume-kdd92 2/3 Running 0 1h
Logs from Testcase
Nothing consumed yet
Nothing consumed yet
Nothing consumed yet
Nothing consumed yet
Nothing consumed yet
Nothing consumed yet
Nothing consumed yet
....
Is it probably the producer does not generate enough data? No errors in producer or consumer.
Using kubernetes 1.8.5 on GKE
@solsson Actually, I synched with latest recently and I started getting the same test failures, where where I didn't with this repo pulled on Nov 12, 2017.
Not sure if it's related, but I have a java consumer where now I keep getting the coordinator marked as dead.
teachers/splitterator-pure-749947f75f-r8d5k[splitterator]: [pool-1-thread-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=splitterator-pure2] (Re-)joining group
teachers/splitterator-pure-749947f75f-r8d5k[splitterator]: [pool-1-thread-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=splitterator-pure2] Marking the coordinator kafka-1.broker.kafka.svc.cluster.local:9092 (id: 2147483646 rack: null) dead
teachers/splitterator-pure-749947f75f-r8d5k[splitterator]: [pool-1-thread-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=splitterator-pure2] Discovered coordinator kafka-1.broker.kafka.svc.cluster.local:9092 (id: 2147483646 rack: null)
teachers/splitterator-pure-749947f75f-r8d5k[splitterator]: [pool-1-thread-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=splitterator-pure2] (Re-)joining group
Previously, the identical java consumer worked fine. I haven't figured this out yet.
Kubernetes 1.8.4 on AWS via kops.
Looks bad. I'm sorry about leaving master relatively untested. Could be the cause of #114 too. I've been caught up in not only holiday season but also #101 (comment).
I wrote these tests while working on new features, but now that they run constantly I find the readiness indication very useful. I will try to reproduce the above issues.
Hrm, even with the tests succeeding on the old d01c128 , the yahoo kafka-manager addon is failing to do much of anything:
[info] k.m.a.KafkaManagerActor - baseZkPath=/kafka-manager
[info] o.a.z.ClientCnxn - Socket connection established to zookeeper.kafka.svc.cluster.local/100.70.176.112:2181, initiating session
[info] o.a.z.ClientCnxn - Unable to read additional data from server sessionid 0x0, likely server has closed socket, closing socket connection and attempting reconnect
[info] play.api.Play - Application started (Prod)
[info] p.c.s.NettyServer - Listening for HTTP on /0:0:0:0:0:0:0:0:80
[info] o.a.z.ClientCnxn - Opening socket connection to server zookeeper.kafka.svc.cluster.local/100.70.176.112:2181. Will not attempt to authenticate using SASL (unknown error)
[info] o.a.z.ClientCnxn - Socket connection established to zookeeper.kafka.svc.cluster.local/100.70.176.112:2181, initiating session
[info] o.a.z.ClientCnxn - Unable to read additional data from server sessionid 0x0, likely server has closed socket, closing socket connection and attempting reconnect
This is weird, since I can do this:
kubectl exec -it zoo-0 -n kafka -- /bin/bash
root@zoo-0:/opt/kafka# echo ruok | nc -w 1 -q 1 zookeeper.kafka.svc.cluster.local 2181
imokroot@zoo-0:/opt/kafka#
If I go to the actual kafka-manager (via dashboard /api/v1/namespaces/kafka/services/kafka-manager/proxy/
):
Yikes! Ask timed out on [ActorSelection[Anchor(akka://kafka-manager-system/), Path(/user/kafka-manager)]] after [5000 ms]
Let's assume that the issue(s?) is caused by min.insync.replicas=2. #108 and #114 were both caused by this change.
I fail to reproduce the test issue. Kafka Manager works fine too. @allquantor Did you upgrade an existing cluster, that used to have min.insync.replicas=1
. Can you see any broker log messages like those in #108? I don't know if console-consumer and console-producer can produce debug output, like -b broker,topic
with kafkacat.
The test can be improved. min.insync.replicas
can be overridden per producer, by setting acks
explicitly. Also the topic creation job can be removed. It specifies --replication-factor=2. Not sure how this works with min.insync.replicas
. But the kafkacat test has the same topic creation, and looks ok.
I noticed when doing e059690 that default min.insync.replicas is 1. Thus the tests were probably unaffected by the new default 2
.
The change might still have affected Kafka internals, as in #114.
I'm seeing the same issue as @StevenACoffman but with the stock kafka-console-consumer using it both inside and outside the cluster: the consumer is stuck in a loop marking a coordinator resolved as id = max(int32) as dead.
None that I could see on the server side, but then I'm fairly new to them - I've upped the logging and been tailing the kafka-request.log to not much avail so far.
Forgot to mention an important detail: the console works when using the old consumer, i.e., pointing to --zookeeper rather than --bootstrap-server. Clients and server versions are aligned as they are based on the same image.
Client-wise this is the block that keeps on repeating - note the node's id is lost between lines 2 & 3. I couldn't tell how relevant that is.
2018-01-09 14:20:32 DEBUG AbstractCoordinator:183 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-bc5d5601-5f1a-4b96-9193-7cb9e6c553fc--751033765-driver-0] Sending GroupCoordinator request to broker 192.168.1.21:32402 (id: 2 rack: null)
2018-01-09 14:20:32 DEBUG AbstractCoordinator:183 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-bc5d5601-5f1a-4b96-9193-7cb9e6c553fc--751033765-driver-0] Received GroupCoordinator response ClientResponse(receivedTimeMs=1515507632283, latencyMs=1, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1, clientId=consumer-1, correlationId=251), responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', error=NONE, node=192.168.1.21:32400 (id: 0 rack: null)))
2018-01-09 14:20:32 INFO AbstractCoordinator:341 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-bc5d5601-5f1a-4b96-9193-7cb9e6c553fc--751033765-driver-0] Discovered coordinator 192.168.1.21:32400 (id: 2147483647 rack: null)
2018-01-09 14:20:32 DEBUG NetworkClient:183 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-bc5d5601-5f1a-4b96-9193-7cb9e6c553fc--751033765-driver-0] Initiating connection to node 192.168.1.21:32400 (id: 2147483647 rack: null)
2018-01-09 14:20:32 DEBUG AbstractCoordinator:177 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-bc5d5601-5f1a-4b96-9193-7cb9e6c553fc--751033765-driver-0] Disabling heartbeat thread
2018-01-09 14:20:32 INFO AbstractCoordinator:336 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-bc5d5601-5f1a-4b96-9193-7cb9e6c553fc--751033765-driver-0] (Re-)joining group
2018-01-09 14:20:32 DEBUG AbstractCoordinator:189 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-bc5d5601-5f1a-4b96-9193-7cb9e6c553fc--751033765-driver-0] Sending JoinGroup ((type: JoinGroupRequest, groupId=spark-kafka-source-bc5d5601-5f1a-4b96-9193-7cb9e6c553fc--751033765-driver-0, sessionTimeout=10000, rebalanceTimeout=300000, memberId=consumer-1-5a07a421-70d7-4ae0-9be4-cbf0d2babfa1, protocolType=consumer, groupProtocols=org.apache.kafka.common.requests.JoinGroupRequest$ProtocolMetadata@21918021)) to coordinator 192.168.1.21:32400 (id: 2147483647 rack: null)
2018-01-09 14:20:32 DEBUG Selector:195 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-bc5d5601-5f1a-4b96-9193-7cb9e6c553fc--751033765-driver-0] Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 2147483647
2018-01-09 14:20:32 DEBUG NetworkClient:183 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-bc5d5601-5f1a-4b96-9193-7cb9e6c553fc--751033765-driver-0] Completed connection to node 2147483647. Fetching API versions.
2018-01-09 14:20:32 DEBUG NetworkClient:183 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-bc5d5601-5f1a-4b96-9193-7cb9e6c553fc--751033765-driver-0] Initiating API versions fetch from node 2147483647.
2018-01-09 14:20:32 DEBUG NetworkClient:189 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-bc5d5601-5f1a-4b96-9193-7cb9e6c553fc--751033765-driver-0] Recorded API versions for node 2147483647: (Produce(0): 0 to 5 [usable: 5], Fetch(1): 0 to 6 [usable: 6], ListOffsets(2): 0 to 2 [usable: 2], Metadata(3): 0 to 5 [usable: 5], LeaderAndIsr(4): 0 to 1 [usable: 1], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 4 [usable: 4], ControlledShutdown(7): 0 to 1 [usable: 1], OffsetCommit(8): 0 to 3 [usable: 3], OffsetFetch(9): 0 to 3 [usable: 3], FindCoordinator(10): 0 to 1 [usable: 1], JoinGroup(11): 0 to 2 [usable: 2], Heartbeat(12): 0 to 1 [usable: 1], LeaveGroup(13): 0 to 1 [usable: 1], SyncGroup(14): 0 to 1 [usable: 1], DescribeGroups(15): 0 to 1 [usable: 1], ListGroups(16): 0 to 1 [usable: 1], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 1 [usable: 1], CreateTopics(19): 0 to 2 [usable: 2], DeleteTopics(20): 0 to 1 [usable: 1], DeleteRecords(21): 0 [usable: 0], InitProducerId(22): 0 [usable: 0], OffsetForLeaderEpoch(23): 0 [usable: 0], AddPartitionsToTxn(24): 0 [usable: 0], AddOffsetsToTxn(25): 0 [usable: 0], EndTxn(26): 0 [usable: 0], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 [usable: 0], DescribeAcls(29): 0 [usable: 0], CreateAcls(30): 0 [usable: 0], DeleteAcls(31): 0 [usable: 0], DescribeConfigs(32): 0 [usable: 0], AlterConfigs(33): 0 [usable: 0], AlterReplicaLogDirs(34): 0 [usable: 0], DescribeLogDirs(35): 0 [usable: 0], SaslAuthenticate(36): 0 [usable: 0], CreatePartitions(37): 0 [usable: 0])
2018-01-09 14:20:32 DEBUG AbstractCoordinator:183 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-bc5d5601-5f1a-4b96-9193-7cb9e6c553fc--751033765-driver-0] Received successful JoinGroup response: org.apache.kafka.common.requests.JoinGroupResponse@35b15e13
2018-01-09 14:20:32 DEBUG ConsumerCoordinator:189 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-bc5d5601-5f1a-4b96-9193-7cb9e6c553fc--751033765-driver-0] Performing assignment using strategy range with subscriptions {consumer-1-5a07a421-70d7-4ae0-9be4-cbf0d2babfa1=Subscription(topics=[clickstream_raw])}
2018-01-09 14:20:32 DEBUG ConsumerCoordinator:183 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-bc5d5601-5f1a-4b96-9193-7cb9e6c553fc--751033765-driver-0] Finished assignment for group: {consumer-1-5a07a421-70d7-4ae0-9be4-cbf0d2babfa1=Assignment(partitions=[clickstream_raw-0])}
2018-01-09 14:20:32 DEBUG AbstractCoordinator:189 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-bc5d5601-5f1a-4b96-9193-7cb9e6c553fc--751033765-driver-0] Sending leader SyncGroup to coordinator 192.168.1.21:32400 (id: 2147483647 rack: null): (type=SyncGroupRequest, groupId=spark-kafka-source-bc5d5601-5f1a-4b96-9193-7cb9e6c553fc--751033765-driver-0, generationId=63, memberId=consumer-1-5a07a421-70d7-4ae0-9be4-cbf0d2babfa1, groupAssignment=consumer-1-5a07a421-70d7-4ae0-9be4-cbf0d2babfa1)
2018-01-09 14:20:32 DEBUG AbstractCoordinator:183 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-bc5d5601-5f1a-4b96-9193-7cb9e6c553fc--751033765-driver-0] SyncGroup failed:
2018-01-09 14:20:32 INFO AbstractCoordinator:341 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-bc5d5601-5f1a-4b96-9193-7cb9e6c553fc--751033765-driver-0] Marking the coordinator 192.168.1.21:32400 (id: 2147483647 rack: null) dead
2018-01-09 14:20:32 DEBUG NetworkClient:189 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-bc5d5601-5f1a-4b96-9193-7cb9e6c553fc--751033765-driver-0] Manually disconnected from 2147483647. Removed requests: .
2018-01-09 14:20:32 DEBUG AbstractCoordinator:183 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-bc5d5601-5f1a-4b96-9193-7cb9e6c553fc--751033765-driver-0] Sending GroupCoordinator request to broker 192.168.1.21:32402 (id: 2 rack: null)
From the port numbers it looks like you get the "outside" listener (#78). I'm quite curious how that can happen in the tests. An ordering issue was resolved in 4c202f4. Might #99 (comment) be helpful to you too?
Same happens using the inside listener. I had attached the external client's logs because those are the only ones I've been able to configure for DEBUG logging. You can see below the console's output when used within my Minikube, v0.24.1.
Re. commit 4c202f4, my repo is pointing at bbed23e so I presume the fix applies? I haven't checked #99 yet - I was planning on rolling back to early Nov as @StevenACoffman had done and work from there.
root@pzoo-1:/opt/kafka/bin# ./kafka-console-consumer.sh --bootstrap-server kafka-0.broker.kafka.svc.cluster.local:9092 --topic xxxx --from-beginning
[2018-01-10 10:39:26,845] INFO ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [kafka-0.broker.kafka.svc.cluster.local:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = console-consumer-77574
heartbeat.interval.ms = 3000
interceptor.classes = null
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
(org.apache.kafka.clients.consumer.ConsumerConfig)
[2018-01-10 10:39:26,893] INFO Kafka version : 1.0.0 (org.apache.kafka.common.utils.AppInfoParser)
[2018-01-10 10:39:26,893] INFO Kafka commitId : aaa7af6d4a11b29d (org.apache.kafka.common.utils.AppInfoParser)
[2018-01-10 10:39:26,973] INFO [Consumer clientId=consumer-1, groupId=console-consumer-77574] Discovered coordinator kafka-0.broker.kafka.svc.cluster.local:9092 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2018-01-10 10:39:26,976] INFO [Consumer clientId=consumer-1, groupId=console-consumer-77574] Revoking previously assigned partitions [] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2018-01-10 10:39:26,976] INFO [Consumer clientId=consumer-1, groupId=console-consumer-77574] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2018-01-10 10:39:27,031] INFO [Consumer clientId=consumer-1, groupId=console-consumer-77574] Marking the coordinator kafka-0.broker.kafka.svc.cluster.local:9092 (id: 2147483647 rack: null) dead (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2018-01-10 10:39:27,144] INFO [Consumer clientId=consumer-1, groupId=console-consumer-77574] Discovered coordinator kafka-0.broker.kafka.svc.cluster.local:9092 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2018-01-10 10:39:27,145] INFO [Consumer clientId=consumer-1, groupId=console-consumer-77574] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
@solsson, just to confirm your comment above, #116 (comment), changing min.insync.replicas=1 whilst retaining default.replication.factor=3 clears these issues in my setup: produce-consume test fails & console consumer cannot use --bootstrap-server
@albamoro I am confused by your last comment:
clears these issues in my setup: produce-consume test fails & console consumer cannot use --bootstrap-server
Does changing those values cause the failing test issue or fix the issue?
Yup, sorry, thought it might not be entirely clear :) Changing it makes the issues go. I haven't checked the implications of the "ack" setting, I'm currently using "all" from my producers.
I have tested the min.insync.replicas setting against https://github.com/wurstmeister/kafka-docker on docker-compose and the results are the same, i.e. with min.insync.replicas=2 the console consumer fails when using --bootstrap-server rather than zookeeper. I don't understand how @solsson local environment could pass the tests with ISR=2. Otherwise I'd think we are either missing a trick here with regards to the Kafka config or maybe we are inheriting a bug on wurstmeister's dockerfile as I understand it is the basis of yolean's image?
So I have three different kubernetes clusters, and in each I have done kubectl delete -f
each file in reverse order from how they were created, removed any lingering detached persistent volumes, and reinstalled everything from latest master, and I have different results from these tests. In one they succeed, and in two others they failed. On one of the failing clusters, switching to the #121 branch:
- producer
++ date --iso-8601=ns -u
+ echo '--- start produce-consume-z2vn2 2018-01-10T15:46:16,496865915+00:00 ---'
+ ./bin/kafka-topics.sh --zookeeper zookeeper.kafka:2181 --describe --topic test-produce-consume
+ ./bin/kafka-console-producer.sh --broker-list bootstrap.kafka:9092 --topic test-produce-consume --producer-property acks=-1
+ tail -f /shared/produce.tmp
[2018-01-10 16:15:36,241] WARN [Producer clientId=console-producer] Got error produce response with correlation id 185 on topic-partition test-produce-consume-0, retrying (2 attempts left). Error: NOT_LEADER_FOR_PARTITION (org.apache.kafka.clients.producer.internals.Sender)
[2018-01-10 16:16:57,574] ERROR Error when sending message to topic test-produce-consume with key: null, value: 62 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for test-produce-consume-0: 1501 ms has passed since batch creation plus linger time
[2018-01-10 16:17:07,615] ERROR Error when sending message to topic test-produce-consume with key: null, value: 62 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for test-produce-consume-0: 1540 ms has passed since batch creation plus linger time
- consumer:
[2018-01-10 15:46:45,647] WARN [Consumer clientId=consumer-1, groupId=console-consumer-240] Error while fetching metadata with correlation id 2 : {test-produce-consume=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2018-01-10 16:14:43,887] WARN [Consumer clientId=consumer-1, groupId=console-consumer-240] Connection to node 2147483645 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
What is weird is that I can install the outside services, and kafkacat ok from that from my laptop.
kafkacat -C -b $BOOTSTRAP -t $TOPIC -o -10
@StevenACoffman With outside services, are you bootstrapping using the address of all three brokers?
I have different results from these tests
Could it be that the bootstrap service (in combination with acks) is the culprit? It does introduce the kind of randomness that would explain different behavior in identical setups. If so, does this only affect java clients? Have any of you spotted issues with the kafkacat
test?
In 3.0 we used a single broker for bootstrap, to avoid errors for single-node setups. In 2.1 we used the full address to three brokers.
- ./bin/kafka-topics.sh --zookeeper zookeeper.kafka:2181 --describe --topic test-produce-consume
I'm just curious, why didn't this print some topic metadata? Will it do on pod restart?
In my case, the kafkacat test always passed, regardless of the min.insync.replicas setting.
I wouldn't want to unnecessarily add to the confusion, but if the bootstrap service was to blame, why do I get the same error using docker-compose and the wurstmeister/kafka image?
By the way, it's very handy to do this:
kubectl exec -it zoo-0 -n kafka -- /bin/bash
Then you can just run commands inside the cluster like:
./bin/kafka-topics.sh --zookeeper zookeeper.kafka:2181 --describe --topic test-produce-consume
When I diff the results, I get identical output except for one block. The passing test cluster:
Topic:test-produce-consume PartitionCount:1 ReplicationFactor:2 Configs:
Topic: test-produce-consume Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1
The failing test cluster:
Topic:test-produce-consume PartitionCount:1 ReplicationFactor:3 Configs:
Topic: test-produce-consume Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 2,0,1
@solsson Using current master bbed23e I adjusted the min.insync.replicas
to 1 (from 2), and set num.partitions
to 5 instead of 1 (side note: we use 15 in our normal non-kubernetes infrastructure kafka servers). No other alterations were made.
With these new settings, in all three of my kubernetes clusters, I can repeatably, reliably teardown all the kubernetes resources i from this repository (including lingering persistent volumes) and re-apply them with no problems. With min.insync.replicas=2
it sometimes works and sometimes does not. It never appears to recover when it doesn't work. 6 minutes is enough for all tests to pass if they are ever going to in my AWS kubernetes clusters.
By the way, the yahoo kafka-manager problem was my own doing. I had somehow gotten into the habit of incorrectly entering the kafka bootstrap service bootstrap.kafka:9092
instead of the correct zookeeper service zookeeper.kafka:2181
when adding the cluster in the kafka-manager. This went about as well as expected.
I appreciate your careful attention, patience and help during all of this. #122 was very helpful.
I'm not sure if the sporadic successful runs were attributable to changes in kafka internals because I occasionally did this:
kubectl exec -it zoo-0 -n kafka -- /bin/bash
#-- inside kube shell next
cd /opt/kafka/bin
./kafka-topics.sh --create --if-not-exists --zookeeper zookeeper.kafka:2181 --replication-factor 2 --partitions 15 --topic k8s-firehose --config retention.ms=6912500000
The topic that was created was not the same as the test-produce-consume
yet at least one of the clusters that I performed that on had tests passing and had the variant results for:
./bin/kafka-topics.sh --zookeeper zookeeper.kafka:2181 --describe --topic test-produce-consume
I went through and diff'ed our outside of kubernetes kafka server.properties with the one here, and I noticed two big differences. The first:
num.partitions=15 #you have 1
default.replication.factor=2 # you have 3
# we do not specify min.insync.replicas
The second is that we omit this section:
############################# Internal Topic Settings #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
That last section makes me wonder if that was my real problem all along.
I can alter offsets.topic.replication.factor
to set it to 3, but I wonder if the other two should also be altered in production.
Setting min.insync.replicas=2
makes a test or two fail, and min.insync.replicas=1
makes the tests pass.
Changing offsets.topic.replication.factor=3
and omitting transaction.state.log.replication.factor
and transaction.state.log.replication.factor
seems like a good idea outside of minikube, but I don't notice a doesn't affect the tests passing or failing with either min.insync.replicas
value.
I have tested the min.insync.replicas setting against https://github.com/wurstmeister/kafka-docker on docker-compose and the results are the same, i.e. with min.insync.replicas=2 the console consumer fails when using --bootstrap-server rather than zookeeper.
Sorry I didn't pay enough attention to this finding. Maybe docker-compose can provide a better test case for min.insync.replicas
. Can you link to a gist or pastebin with your compose file?
wurstmeister's dockerfile as I understand it is the basis of yolean's image?
In fact I think Confluent's image looks inspired by wurstmeister's. I started out with Confluent's image, but with Kubernetes we have the ConfigMap feature so there's less need for using environment variables with services that use an actual config file. I wanted an image that was as close as possible to downloading and installing from https://kafka.apache.org/. There's some background on this choice in #46.
This thread is very important. I consider reverting the change to min.insync.replicas
. But if this repo is opinionated about anything, it's that Kafka can be used in small scale AND with business critical data. Two acks from three replicas sounds to me like an essential part of that, and I'd really like it to be the default for producers that don't explicitly configure acks.
I will try to find more time to research this myself. Thanks for your efforts @albamoro and @StevenACoffman.
I'm not sure if the sporadic successful runs were attributable to changes in kafka internals because I occasionally did this:
kubectl exec -it zoo-0 -n kafka -- /bin/bash #-- inside kube shell next cd /opt/kafka/bin ./kafka-topics.sh --create --if-not-exists --zookeeper zookeeper.kafka:2181 --replication-factor 2 --partitions 15 --topic k8s-firehose --config retention.ms=6912500000
This is interesting & could have some bearing on the question: my tests fail when isr=2 and I use the new, bootstrap-server based consumer. My topics are autocreated on producer request, so maybe creating them using --zookeeper does something differently?
@solsson I've left the docker-compose.yml here: https://gist.github.com/albamoro/a56ed9aff40a10c2580d678a55b2b5d9
You'll need to create the backbone network beforehand:
docker network create backbone
I start it using:
docker-compose up -d --scale kafka=3
I very much appreciate your efforts too, thank you all, including @allquantor for raising it.
@solsson I appreciate the goal of making something that works for developers and works in production.
We've used Kafka in production for 5 years at Ithaka. In production in just November, our 5 node kafka cluster had a topic that handled 30,419,294 messages per day, and another that was was about half that. Lots of other busy topics too. We don't specify min.insync.replicas
(so it defaults to 1) and it hasn't been a problem so far as we know. We do set num.partitions=15
and default.replication.factor=2
.
I'm not sure min.insync.replicas=2
is necessary for production level durability.
With that said, it sounds very nice in theory, and I'm very interested in understanding how you don't get the same results I do. If there's something else that needs to be turned on to make it work, then I'm all for it.
I've contemplated this while re-reading relevant parts of the kafka book... with acks=1 the leader doesn't know if any replica is up-to-date or not. This has worked well for us too, but so has min.insync.replicas=2 in QA for the last month or so.
Exerpts from Neha Narkhede, Gwen Shapira, and Todd Palino. ”Kafka: The Definitive Guide”:
"Only in-sync replicas are eligible to be elected as partition leaders in case the existing leader fails."
"Produced messages are considered “committed” when they were written to the partition on all its in-sync replicas (but not necessarily flushed to disk). Producers can choose to receive acknowledgments of sent messages when the message was fully committed, when it was written to the leader, or when it was sent over the network.
Messages that are committed will not be lost as long as at least one replica remains alive.
Consumers can only read messages that are committed."
"As we’ve seen, there are cases where even though we configured a topic to have three replicas, we may be left with a single in-sync replica. If this replica becomes unavailable, we may have to choose between availability and consistency. This is never an easy choice. Note that part of the problem is that, per Kafka reliability guarantees, data is considered committed when it is written to all in-sync replicas, even when all means just one replica and the data could be lost if that replica is unavailable."
In our case, as we grow with Kafka, I think it is important to require at least 1 successful leader->follower replication so that we catch cluster problems sooner rather than later. With Kubernetes loss of a node should be a non-event. It probably can be with commit="written to the leader" thanks to graceful shutdown, but I'd like higher consistency guarantees.
Thus we'll keep running with 2
and I will report any findings here or in referencing issues. Clients can always opt for acks=1 if/when we need workarounds. I think the test issues indicate a configuration problem, but I have been unable to track it down. It might be affected by auto.create.topics too. In order to investigate further, we've decided that monitoring is first priority. I will try to resume from #49 #93 #96 #112 and quite possibly investigate https://github.com/linkedin/Burrow. This'll have to take some calendar time.
With that said, it sounds very nice in theory, and I'm very interested in understanding how you don't get the same results I do. If there's something else that needs to be turned on to make it work, then I'm all for it.
@StevenACoffman Good summary of the path we're taking at Yolean :)
Actually there's been a readiness test for checking for under-replicated partitions since #95, that I had forgotten about :) Prometheus + #128 lets you alert on sum({name="UnderReplicatedPartitions"})
> 0 and the replicated-partitions
test can be used to list those.
I still haven't investigated if there's a difference between auto-created topics and those created through the CLI.
@solsson Hi! I'm getting:
kubectl --namespace=test-kafka logs produce-consume-xqhbk producer
++ date --iso-8601=ns -u
+ echo '--- start produce-consume-xqhbk 2018-01-19T23:27:05,831153320+00:00 ---'
+ tail -f /shared/produce.tmp
+ ./bin/kafka-console-producer.sh --broker-list bootstrap.kafka:9092 --topic test-produce-consume
[2018-01-19 23:27:07,325] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 1 : {test-produce-consume=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
With:
kubectl --namespace=test-kafka get pods
NAME READY STATUS RESTARTS AGE
kafkacat-jtv2f 3/3 Running 0 12m
produce-consume-xqhbk 2/3 Running 0 12m
Can it be related to this issue?
Also seeing errors like this in Zookeeper:
[2018-01-19 23:08:41,004] INFO Defaulting to majority quorums (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2018-01-19 23:08:41,008] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager)
[2018-01-19 23:08:41,008] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager)
[2018-01-19 23:08:41,008] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager)
[2018-01-19 23:08:41,021] INFO Starting quorum peer (org.apache.zookeeper.server.quorum.QuorumPeerMain)
[2018-01-19 23:08:41,041] INFO minSessionTimeout set to -1 (org.apache.zookeeper.server.quorum.QuorumPeer)
[2018-01-19 23:08:41,041] INFO maxSessionTimeout set to -1 (org.apache.zookeeper.server.quorum.QuorumPeer)
[2018-01-19 23:08:41,041] INFO QuorumPeer communication is not secured! (org.apache.zookeeper.server.quorum.QuorumPeer)
[2018-01-19 23:08:41,041] INFO quorum.cnxn.threads.size set to 20 (org.apache.zookeeper.server.quorum.QuorumPeer)
[2018-01-19 23:08:41,044] INFO currentEpoch not found! Creating with a reasonable default of 0. This should only happen when you are upgrading your installation (org.apache.zookeeper.server.quorum.QuorumPeer)
[2018-01-19 23:08:41,076] INFO acceptedEpoch not found! Creating with a reasonable default of 0. This should only happen when you are upgrading your installation (org.apache.zookeeper.server.quorum.QuorumPeer)
[2018-01-19 23:08:41,087] INFO My election bind port: /0.0.0.0:3888 (org.apache.zookeeper.server.quorum.QuorumCnxManager)
[2018-01-19 23:08:41,095] INFO LOOKING (org.apache.zookeeper.server.quorum.QuorumPeer)
[2018-01-19 23:08:41,095] INFO New election. My id = 5, proposed zxid=0x0 (org.apache.zookeeper.server.quorum.FastLeaderElection)
[2018-01-19 23:08:41,098] WARN Cannot open channel to 1 at election address pzoo-0.pzoo:3888 (org.apache.zookeeper.server.quorum.QuorumCnxManager)
java.net.UnknownHostException: pzoo-0.pzoo
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:184)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectOne(QuorumCnxManager.java:562)
at org.apache.zookeeper.server.quorum.QuorumCnxManager.toSend(QuorumCnxManager.java:538)
at org.apache.zookeeper.server.quorum.FastLeaderElection$Messenger$WorkerSender.process(FastLeaderElection.java:452)
at org.apache.zookeeper.server.quorum.FastLeaderElection$Messenger$WorkerSender.run(FastLeaderElection.java:433)
at java.lang.Thread.run(Thread.java:748)
I consider reverting the change to min.insync.replicas
I don't think I consider that anymore :) This is slightly dated, but I found it useful for our persistent data use case: https://www.slideshare.net/gwenshap/kafka-reliability-when-it-absolutely-positively-has-to-be-there
In addition, when working with #134, I learned that clients can't specify >1 acks. They can only chose to adhere to "all". It's an argument for min.insync.replicas > 1, that clients (or topics) can always specify 0
or 1
to opt out.
Also I've tried to double check that min.insync.replicas=2 actually mean leader + 1 follower, not 2 followers. It's seemingly implicit in kafka docs, but here they write "The leader is considered one of the in-sync replicas.".
[2018-01-19 23:27:07,325] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 1 : {test-produce-consume=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
I've seen this too, frequently since #107. Might be normal with auto create topics. But, there's actually a timing issue with test start (which I realized today @albamoro @allquantor @StevenACoffman could be a cause of the original issue). You never know which container comes first to creating the topic, the consumer (which oddly is allowed to create topics) or the producer or the topic creation job. If it's the job you won't get any errors in producer or consumer, but on the other hand you get replication factor =2
which is different from the default introduced in #107. I find kafka-manager (#83) really useful for looking at topic config.
@lenadroid The Zookeeper issue must be something else. Please create a separate issue.
I think this issue will conclude the 3.1 cycle, dubbed "The paintstaking path to min.insync.replicas=2". Do you think #140 has fixed the issue(s) here?
I released 3.1.0 now. Will leave this issue open, until I get positive reports or until I find the time to go through current open issues and close inactive ones.
I've only had my three non-toy clusters running with this for 2 days, but so far it's been very solid, even with min.insync.replicas=2
. This is very close to our out of kubernetes cluster settings now.
Sounds good. Made all the difference for me to learn about these additional replication factor props. I will close this then.
For reference, we use this configuration in our Kafka clusters outside of kubernetes:
server.properties.txt
I have summarized the differences below. We have upgraded our clusters many times, so please don't assume these are still current best practices, even if they've proven fairly reliable for us. Aside from retention policies, I find some very intriguing differences and I wonder if you have any opinions on them @solsson
setting | Ithaka Kafka | Yolean | Default |
---|---|---|---|
num.network.threads | 2 | 3 | 3 |
num.io.threads | 2 | 8 | 8 |
socket.send.buffer.bytes | 1048576 | 102400 | 102400 |
socket.receive.buffer.bytes | 1048576 | 102400 | 102400 |
socket.request.max.bytes | 104857600 | 102400 | 104857600 |
num.partitions | 15 | 1 | |
default.replication.factor | 2 | 1 | |
offsets.topic.replication.factor | 3 | 3 | |
log.flush.interval.messages | 10000 | 9223372036854775807 | |
log.flush.interval.ms | 1000 | 9223372036854775807 | |
log.retention.hours | 4 | -1 | 168 |
log.retention.bytes | 104857600 | -1 | |
log.segment.bytes | 26214400 | 1073741824 | 1073741824 |
log.cleanup.interval.mins* | 1 | 5 | |
log.retention.check.interval.ms | 60000 | 300000 | 300000 |
auto.leader.rebalance.enable | true | true | |
group.initial.rebalance.delay.ms | 0 | 3000 | |
leader.imbalance.per.broker.percentage | 10 | 10 | |
leader.imbalance.check.interval.seconds | 3600 | 300 | |
zookeeper.connection.timeout.ms | 1000000 | 6000 | 6000 |
zookeeper.session.timeout.ms | 30000 | 6000 | |
kafka.metrics.polling.interval.secs | 60 | ? |
*
log.cleanup.interval.mins
has been renamed log.retention.check.interval.ms
For very important topics with settings that differ from the default, we have a few external programs that are constantly ensuring those topics exist and have the proper retention periods (e.g. 1 week instead of 4 hours).
We have considered flipping this and defaulting to retaining for 1 week and changing the topic settings for all topics not on the whitelist.
Wow, that's some differences for sure :) We haven't done any performance tuning, and I think we're on recent defaults.
With hindsight, and #140 fresh in memory, we shouldn't have started from the sample server.properties
out of the kafka distribution. Should have left server.properties
completely empty instead, except for the things we care about: retention = -1
as default, so topics are "persistent" by default as we keep our primary records of domain data in Kafka.
For example the essential auto.leader.rebalance.enable
is true
in this repo's setup too, but thanks to broker defaults.
I updated the table to include a column for defaults. Now that 3.1 is released, would removing default values (or commenting them out) be warranted here? That would allow documentation to highlight the reason for any divergence from defaults.
Our situation is that we initially set all values explicitly, and painstakingly performance tuned when issues arose. However, as we upgraded multiple times and the recommended defaults changed, we didn't retain the history of which values were intentional differences for our usage and which were due to outdated adherence to "best practices" that were contextual on those older versions.
Part of the appeal of this project is it affords us the ability to cheaply experiment and revisit these settings.