Spark streaming suddenly started consuming Kafka Data in EventHub from very beginning
Guodong-Wang-prog opened this issue · 3 comments
Feature Requests:
Sometimes spark streaming applications suddenly start to consume data from very beginning. After analyzing the application log, it seems that there is some internal error for the communications between Kafka client and Event Hub Kafka, which may cause this issue. Please help to check spark event hubs Kafka connecter SDK and provide the solution.
- What issue are you trying to solve?
Spark streaming application start to consume data for half partitions from very beginning.
- How do you want to solve it?
Provide resolution to avoid consuming data from very beginning.
For the first rebalance failed, we need to understand why there are 2 consumer IDs (maybe equivalent to consumer group concept?) trying to connect to Kafka, and each of them holds half number of partitions. And this cause half partitions loss temporarily.
Can we use Kafka offset management to avoid this issue? If the answer is yes, how to do the setting?
- What is your use case for this feature?
Sometimes the spark streaming application starts to consuming data from very beginning, which cause duplicate data and impact consuming latest data.
Bug Report:
Normally, when there is a rebalance in Kafka, there are just consumers come and go, there should not be any impact on connection with Kafka server.
While for this issue, when we analyzed the spark streaming application logs, we found that for the first rebalance failed, there are 2 consumer IDs (maybe equivalent to consumer group concept?) trying to connect to Kafka, and each of them takes half number of partitions:
First Rebalance failed:
“
22/08/22 14:49:03 INFO ConsumerCoordinator: [Consumer clientId=consumer-spark-kafka-source-e42dd0f3-b8dd-4c8e-8379-3e2bacb8f46b--2012452121-driver-0-1, groupId=spark-kafka-source-e42dd0f3-b8dd-4c8e-8379-3e2bacb8f46b--2012452121-driver-0] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group
22/08/22 14:49:03 INFO ConsumerCoordinator: [Consumer clientId=consumer-spark-kafka-source-e42dd0f3-b8dd-4c8e-8379-3e2bacb8f46b--2012452121-driver-0-1, groupId=spark-kafka-source-e42dd0f3-b8dd-4c8e-8379-3e2bacb8f46b--2012452121-driver-0] Lost previously assigned partitions warriorehagentmetric-7, warriorehagentmetric-40, warriorehagentmetric-73, warriorehagentmetric-15, warriorehagentmetric-48, warriorehagentmetric-81, warriorehagentmetric-24, warriorehagentmetric-57, warriorehagentmetric-82, warriorehagentmetric-32, warriorehagentmetric-65, warriorehagentmetric-90, warriorehagentmetric-8, warriorehagentmetric-41, warriorehagentmetric-66, warriorehagentmetric-16, warriorehagentmetric-49, warriorehagentmetric-74, warriorehagentmetric-25, warriorehagentmetric-50, warriorehagentmetric-83, warriorehagentmetric-0, warriorehagentmetric-33, warriorehagentmetric-58, warriorehagentmetric-91, warriorehagentmetric-9, warriorehagentmetric-34, warriorehagentmetric-67, warriorehagentmetric-17, warriorehagentmetric-42, warriorehagentmetric-75, warriorehagentmetric-18, warriorehagentmetric-51, warriorehagentmetric-84, warriorehagentmetric-1, warriorehagentmetric-26, warriorehagentmetric-59, warriorehagentmetric-92, warriorehagentmetric-2, warriorehagentmetric-35, warriorehagentmetric-68, warriorehagentmetric-10, warriorehagentmetric-43, warriorehagentmetric-76, warriorehagentmetric-19, warriorehagentmetric-52, warriorehagentmetric-85, warriorehagentmetric-27, warriorehagentmetric-60, warriorehagentmetric-93, warriorehagentmetric-3, warriorehagentmetric-36, warriorehagentmetric-69, warriorehagentmetric-11, warriorehagentmetric-44, warriorehagentmetric-77, warriorehagentmetric-86, warriorehagentmetric-20, warriorehagentmetric-53, warriorehagentmetric-94, warriorehagentmetric-28, warriorehagentmetric-61, warriorehagentmetric-70, warriorehagentmetric-4, warriorehagentmetric-37, warriorehagentmetric-78, warriorehagentmetric-12, warriorehagentmetric-45, warriorehagentmetric-54, warriorehagentmetric-87, warriorehagentmetric-21, warriorehagentmetric-62, warriorehagentmetric-95, warriorehagentmetric-29, warriorehagentmetric-38, warriorehagentmetric-71, warriorehagentmetric-5, warriorehagentmetric-46, warriorehagentmetric-79, warriorehagentmetric-13, warriorehagentmetric-22, warriorehagentmetric-55, warriorehagentmetric-88, warriorehagentmetric-30, warriorehagentmetric-63, warriorehagentmetric-6, warriorehagentmetric-39, warriorehagentmetric-72, warriorehagentmetric-14, warriorehagentmetric-47, warriorehagentmetric-80, warriorehagentmetric-23, warriorehagentmetric-56, warriorehagentmetric-89, warriorehagentmetric-31, warriorehagentmetric-64
22/08/22 14:49:03 INFO AbstractCoordinator: [Consumer clientId=consumer-spark-kafka-source-e42dd0f3-b8dd-4c8e-8379-3e2bacb8f46b--2012452121-driver-0-1, groupId=spark-kafka-source-e42dd0f3-b8dd-4c8e-8379-3e2bacb8f46b--2012452121-driver-0] (Re-)joining group
22/08/22 14:49:03 INFO AbstractCoordinator: [Consumer clientId=consumer-spark-kafka-source-e42dd0f3-b8dd-4c8e-8379-3e2bacb8f46b--2012452121-driver-0-1, groupId=spark-kafka-source-e42dd0f3-b8dd-4c8e-8379-3e2bacb8f46b--2012452121-driver-0] Group coordinator warriordata-eventhub-namespace-cu-us-1.servicebus.windows.net:9093 (id: 2147483647 rack: null) is unavailable or invalid due to cause: null.isDisconnected: true. Rediscovery will be attempted.
22/08/22 14:49:03 INFO AbstractCoordinator: [Consumer clientId=consumer-spark-kafka-source-e42dd0f3-b8dd-4c8e-8379-3e2bacb8f46b--2012452121-driver-0-1, groupId=spark-kafka-source-e42dd0f3-b8dd-4c8e-8379-3e2bacb8f46b--2012452121-driver-0] Rebalance failed.
kafkashaded.org.apache.kafka.common.errors.DisconnectException
After this Rebalance failed, I found there are 2 consumer IDs, each of which contains half partitions (48), then we can see there are only partitions 48-95 after assignment:
22/08/22 14:49:07 INFO ConsumerCoordinator: [Consumer clientId=consumer-spark-kafka-source-e42dd0f3-b8dd-4c8e-8379-3e2bacb8f46b--2012452121-driver-0-1, groupId=spark-kafka-source-e42dd0f3-b8dd-4c8e-8379-3e2bacb8f46b--2012452121-driver-0] Finished assignment for group at generation 18: {warriordata-eventhub-namespace-cu-us-1.servicebus.windows.net:c:spark-kafka-source-e42dd0f3-b8dd-4c8e-8379-3e2bacb8f46b--2012452121-driver-0:I:consumer-spark-kafka-source-e42dd0f3-b8dd-4c8e-8379-3e2bacb8f46b--2012452121-driver-0-1-48fcc06cf767456e801d323f9a5b33bb=Assignment(partitions=[warriorehagentmetric-0, warriorehagentmetric-1, warriorehagentmetric-2, warriorehagentmetric-3, warriorehagentmetric-4, warriorehagentmetric-5, warriorehagentmetric-6, warriorehagentmetric-7, warriorehagentmetric-8, warriorehagentmetric-9, warriorehagentmetric-10, warriorehagentmetric-11, warriorehagentmetric-12, warriorehagentmetric-13, warriorehagentmetric-14, warriorehagentmetric-15, warriorehagentmetric-16, warriorehagentmetric-17, warriorehagentmetric-18, warriorehagentmetric-19, warriorehagentmetric-20, warriorehagentmetric-21, warriorehagentmetric-22, warriorehagentmetric-23, warriorehagentmetric-24, warriorehagentmetric-25, warriorehagentmetric-26, warriorehagentmetric-27, warriorehagentmetric-28, warriorehagentmetric-29, warriorehagentmetric-30, warriorehagentmetric-31, warriorehagentmetric-32, warriorehagentmetric-33, warriorehagentmetric-34, warriorehagentmetric-35, warriorehagentmetric-36, warriorehagentmetric-37, warriorehagentmetric-38, warriorehagentmetric-39, warriorehagentmetric-40, warriorehagentmetric-41, warriorehagentmetric-42, warriorehagentmetric-43, warriorehagentmetric-44, warriorehagentmetric-45, warriorehagentmetric-46, warriorehagentmetric-47]), warriordata-eventhub-namespace-cu-us-1.servicebus.windows.net:c:spark-kafka-source-e42dd0f3-b8dd-4c8e-8379-3e2bacb8f46b--2012452121-driver-0:I:consumer-spark-kafka-source-e42dd0f3-b8dd-4c8e-8379-3e2bacb8f46b--2012452121-driver-0-1-76da0d8c99214d319b5a18ca6173d7f3=Assignment(partitions=[warriorehagentmetric-48, warriorehagentmetric-49, warriorehagentmetric-50, warriorehagentmetric-51, warriorehagentmetric-52, warriorehagentmetric-53, warriorehagentmetric-54, warriorehagentmetric-55, warriorehagentmetric-56, warriorehagentmetric-57, warriorehagentmetric-58, warriorehagentmetric-59, warriorehagentmetric-60, warriorehagentmetric-61, warriorehagentmetric-62, warriorehagentmetric-63, warriorehagentmetric-64, warriorehagentmetric-65, warriorehagentmetric-66, warriorehagentmetric-67, warriorehagentmetric-68, warriorehagentmetric-69, warriorehagentmetric-70, warriorehagentmetric-71, warriorehagentmetric-72, warriorehagentmetric-73, warriorehagentmetric-74, warriorehagentmetric-75, warriorehagentmetric-76, warriorehagentmetric-77, warriorehagentmetric-78, warriorehagentmetric-79, warriorehagentmetric-80, warriorehagentmetric-81, warriorehagentmetric-82, warriorehagentmetric-83, warriorehagentmetric-84, warriorehagentmetric-85, warriorehagentmetric-86, warriorehagentmetric-87, warriorehagentmetric-88, warriorehagentmetric-89, warriorehagentmetric-90, warriorehagentmetric-91, warriorehagentmetric-92, warriorehagentmetric-93, warriorehagentmetric-94, warriorehagentmetric-95])}
Notifying assignor about the new Assignment(partitions=[warriorehagentmetric-48, warriorehagentmetric-49, warriorehagentmetric-50, warriorehagentmetric-51, warriorehagentmetric-52, warriorehagentmetric-53, warriorehagentmetric-54, warriorehagentmetric-55, warriorehagentmetric-56, warriorehagentmetric-57, warriorehagentmetric-58, warriorehagentmetric-59, warriorehagentmetric-60, warriorehagentmetric-61, warriorehagentmetric-62, warriorehagentmetric-63, warriorehagentmetric-64, warriorehagentmetric-65, warriorehagentmetric-66, warriorehagentmetric-67, warriorehagentmetric-68, warriorehagentmetric-69, warriorehagentmetric-70, warriorehagentmetric-71, warriorehagentmetric-72, warriorehagentmetric-73, warriorehagentmetric-74, warriorehagentmetric-75, warriorehagentmetric-76, warriorehagentmetric-77, warriorehagentmetric-78, warriorehagentmetric-79, warriorehagentmetric-80, warriorehagentmetric-81, warriorehagentmetric-82, warriorehagentmetric-83, warriorehagentmetric-84, warriorehagentmetric-85, warriorehagentmetric-86, warriorehagentmetric-87, warriorehagentmetric-88, warriorehagentmetric-89, warriorehagentmetric-90, warriorehagentmetric-91, warriorehagentmetric-92, warriorehagentmetric-93, warriorehagentmetric-94, warriorehagentmetric-95])
22/08/22 14:49:07 INFO ConsumerCoordinator: [Consumer clientId=consumer-spark-kafka-source-e42dd0f3-b8dd-4c8e-8379-3e2bacb8f46b--2012452121-driver-0-1, groupId=spark-kafka-source-e42dd0f3-b8dd-4c8e-8379-3e2bacb8f46b--2012452121-driver-0] Adding newly assigned partitions: warriorehagentmetric-73, warriorehagentmetric-69, warriorehagentmetric-48, warriorehagentmetric-81, warriorehagentmetric-77, warriorehagentmetric-86, warriorehagentmetric-57, warriorehagentmetric-82, warriorehagentmetric-53, warriorehagentmetric-94, warriorehagentmetric-65, warriorehagentmetric-90, warriorehagentmetric-61, warriorehagentmetric-70, warriorehagentmetric-66, warriorehagentmetric-78, warriorehagentmetric-49, warriorehagentmetric-74, warriorehagentmetric-54, warriorehagentmetric-87, warriorehagentmetric-50, warriorehagentmetric-83, warriorehagentmetric-62, warriorehagentmetric-95, warriorehagentmetric-58, warriorehagentmetric-91, warriorehagentmetric-71, warriorehagentmetric-67, warriorehagentmetric-79, warriorehagentmetric-75, warriorehagentmetric-55, warriorehagentmetric-88, warriorehagentmetric-51, warriorehagentmetric-84, warriorehagentmetric-63, warriorehagentmetric-59, warriorehagentmetric-92, warriorehagentmetric-72, warriorehagentmetric-68, warriorehagentmetric-80, warriorehagentmetric-76, warriorehagentmetric-56, warriorehagentmetric-89, warriorehagentmetric-52, warriorehagentmetric-85, warriorehagentmetric-64, warriorehagentmetric-60, warriorehagentmetric-93
Then, due to this, we found half partitions were gone shortly after this error:
"
22/08/22 14:49:17 WARN KafkaMicroBatchStream: Set(warriorehagentmetric-29, warriorehagentmetric-22, warriorehagentmetric-47, warriorehagentmetric-13, warriorehagentmetric-6, warriorehagentmetric-33, warriorehagentmetric-42, warriorehagentmetric-36, warriorehagentmetric-19, warriorehagentmetric-37, warriorehagentmetric-3, warriorehagentmetric-30, warriorehagentmetric-23, warriorehagentmetric-14, warriorehagentmetric-24, warriorehagentmetric-41, warriorehagentmetric-7, warriorehagentmetric-0, warriorehagentmetric-18, warriorehagentmetric-44, warriorehagentmetric-2, warriorehagentmetric-27, warriorehagentmetric-45, warriorehagentmetric-38, warriorehagentmetric-4, warriorehagentmetric-11, warriorehagentmetric-31, warriorehagentmetric-32, warriorehagentmetric-17, warriorehagentmetric-8, warriorehagentmetric-26, warriorehagentmetric-15, warriorehagentmetric-35, warriorehagentmetric-1, warriorehagentmetric-20, warriorehagentmetric-10, warriorehagentmetric-12, warriorehagentmetric-46, warriorehagentmetric-21, warriorehagentmetric-39, warriorehagentmetric-40, warriorehagentmetric-5, warriorehagentmetric-34, warriorehagentmetric-25, warriorehagentmetric-16, warriorehagentmetric-43, warriorehagentmetric-9, warriorehagentmetric-28) are gone. Some data may have been missed..
Some data may have been lost because they are not available in Kafka any more; either the
data was aged out by Kafka or the topic may have been deleted before all the data in the
topic was processed. If you want your streaming query to fail on such cases, set the source
option "failOnDataLoss" to "true".
"
We can see for next Rebalance failed, there is just one consumer ID which hosted all partitions:
"
22/08/22 16:06:23 INFO ConsumerCoordinator: [Consumer clientId=consumer-spark-kafka-source-e42dd0f3-b8dd-4c8e-8379-3e2bacb8f46b--2012452121-driver-0-1, groupId=spark-kafka-source-e42dd0f3-b8dd-4c8e-8379-3e2bacb8f46b--2012452121-driver-0] Finished assignment for group at generation 21: {warriordata-eventhub-namespace-cu-us-1.servicebus.windows.net:c:spark-kafka-source-e42dd0f3-b8dd-4c8e-8379-3e2bacb8f46b--2012452121-driver-0:I:consumer-spark-kafka-source-e42dd0f3-b8dd-4c8e-8379-3e2bacb8f46b--2012452121-driver-0-1-771740afa48d4ffea50e4ecd3e650274=Assignment(partitions=[warriorehagentmetric-0, warriorehagentmetric-1, warriorehagentmetric-2, warriorehagentmetric-3, warriorehagentmetric-4, warriorehagentmetric-5, warriorehagentmetric-6, warriorehagentmetric-7, warriorehagentmetric-8, warriorehagentmetric-9, warriorehagentmetric-10, warriorehagentmetric-11, warriorehagentmetric-12, warriorehagentmetric-13, warriorehagentmetric-14, warriorehagentmetric-15, warriorehagentmetric-16, warriorehagentmetric-17, warriorehagentmetric-18, warriorehagentmetric-19, warriorehagentmetric-20, warriorehagentmetric-21, warriorehagentmetric-22, warriorehagentmetric-23, warriorehagentmetric-24, warriorehagentmetric-25, warriorehagentmetric-26, warriorehagentmetric-27, warriorehagentmetric-28, warriorehagentmetric-29, warriorehagentmetric-30, warriorehagentmetric-31, warriorehagentmetric-32, warriorehagentmetric-33, warriorehagentmetric-34, warriorehagentmetric-35, warriorehagentmetric-36, warriorehagentmetric-37, warriorehagentmetric-38, warriorehagentmetric-39, warriorehagentmetric-40, warriorehagentmetric-41, warriorehagentmetric-42, warriorehagentmetric-43, warriorehagentmetric-44, warriorehagentmetric-45, warriorehagentmetric-46, warriorehagentmetric-47, warriorehagentmetric-48, warriorehagentmetric-49, warriorehagentmetric-50, warriorehagentmetric-51, warriorehagentmetric-52, warriorehagentmetric-53, warriorehagentmetric-54, warriorehagentmetric-55, warriorehagentmetric-56, warriorehagentmetric-57, warriorehagentmetric-58, warriorehagentmetric-59, warriorehagentmetric-60, warriorehagentmetric-61, warriorehagentmetric-62, warriorehagentmetric-63, warriorehagentmetric-64, warriorehagentmetric-65, warriorehagentmetric-66, warriorehagentmetric-67, warriorehagentmetric-68, warriorehagentmetric-69, warriorehagentmetric-70, warriorehagentmetric-71, warriorehagentmetric-72, warriorehagentmetric-73, warriorehagentmetric-74, warriorehagentmetric-75, warriorehagentmetric-76, warriorehagentmetric-77, warriorehagentmetric-78, warriorehagentmetric-79, warriorehagentmetric-80, warriorehagentmetric-81, warriorehagentmetric-82, warriorehagentmetric-83, warriorehagentmetric-84, warriorehagentmetric-85, warriorehagentmetric-86, warriorehagentmetric-87, warriorehagentmetric-88, warriorehagentmetric-89, warriorehagentmetric-90, warriorehagentmetric-91, warriorehagentmetric-92, warriorehagentmetric-93, warriorehagentmetric-94, warriorehagentmetric-95])}
"
-Event Hub Configuration
ehConf = {"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_SSL",
"kafka.session.timeout.ms": "300000",
"kafka.max.poll.interval.ms": "600000"}
ehConf["subscribe"] = os.environ.get('EVENTHUB_NAME')
EventhubListen = dbutils.secrets.get(scope = 'warriortwkeyvaultprod', key = os.environ.get('EVENTHUB_CONNECT', 'EventhubListen'))
extract_server_name = EventhubListen[len("Endpoint=sb://"):EventhubListen.index(".servicebus.windows.net")]
ehConf["kafka.bootstrap.servers"] = f"{extract_server_name}.servicebus.windows.net:9093"
ehConf["kafka.sasl.jaas.config"] = f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username='$ConnectionString' password='{EventhubListen}';"
ehConf['maxOffsetsPerTrigger'] = int(os.environ.get("MAX_EVENT", sc.defaultParallelism*1000))
ehConf['failOnDataLoss'] = "false"
ehConf
Out[5]: {'kafka.sasl.mechanism': 'PLAIN', 'kafka.security.protocol': 'SASL_SSL', 'kafka.session.timeout.ms': '300000', 'kafka.max.poll.interval.ms': '600000', 'subscribe': 'warriorehagentmetric', 'kafka.bootstrap.servers': 'warriordata-eventhub-namespace-cu-us-1.servicebus.windows.net:9093', 'kafka.sasl.jaas.config': "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username='$ConnectionString' password='[REDACTED]';", 'maxOffsetsPerTrigger': 768000, 'failOnDataLoss': 'false'}
- Actual behavior
When there is a rebalance or rebalance failed, just consumers should be impacted. There should be any impact on Kafka partitions, consumer groups, and there should not be 2 consumer IDs (maybe equivalent to consumer groups actually?) trying to connect to Kafka server. While it's the case for this issue.
- Expected behavior
Rebalancing is normal behavior, which just cause consumers coming and leaving.
- Spark version
Databricks Runtime 10.4 LTS (includes Apache Spark 3.2.1, Scala 2.12)
- spark-eventhubs artifactId and version
Default version of Databricks Runtime 10.4 LTS, there is no spark-eventhubs artifactId installed specifically.
Dear Eric @hmlam
This is recurring critical issue from a S500 customer. May we have your favor to take a look in to this?
Here are how cx read and write data:
Input source
instream = (spark
.readStream
.format("kafka")
.options(**ehConf)
.load())
instream = instream.withColumn("value", instream.value.cast(StringType()))
##Process json format
batchDF = agent_data_transform(spark, instream, agent_data_format)
Output sink
query = (batchDF.writeStream
.foreachBatch(foreach_batch_function)
.option("checkpointLocation", checkpoint_path)
.trigger(processingTime="1 minute")
.start())
checkpoint_path = os.path.join(f"abfss://{cfg['checkpoint_container']}@{storage_name}.dfs.core.windows.net/", operating_system+log_suffix)
This is not related to EH Connector for Spark. will discuss this in other channels