Azure/azure-event-hubs-spark

Azure Databricks job stops consuming event hub messages.

ameydeshpande18 opened this issue · 2 comments

We have an azure databricks event hub triggered job running continuously and it’s responsible for storing the device telemetry data into one of the databricks table.

Issue:
The data stopped getting inserted into the table for all the devices, but the job was still in running state. (stopped consuming event hub messages) This issue has occurred couple of times.

image

We had raised the ticket to microsoft, as per their suggestion, we had upgraded the event-hub-spark package to ver. 2.12:2.3.22
https://search.maven.org/artifact/com.microsoft.azure/azure-eventhubs-spark_2.12/2.3.22/jar

https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-amqp-troubleshoot#connection-is-closed

Even after upgrading the version to the latest one, we are still facing the same issue of data not being consumed by the databricks jobs.
Microsft Azure Databricks and event hub team checked the logs from their end and did not find any issue with their resource. They have shared driver logs which indicates some timeout connection from event hub package.

22/09/06 01:24:14 WARN RequestResponseOpener: requestResponseChannel.onClose error clientId[MF_cdba69_1660893297704], session[mgmt-session], link[mgmt], endpoint[$management], rrc[RRC_049fdd_1661896909024], error The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:2906f6d644304f56bc223563f30e2283_G0S1, SystemTracker:gateway5, Timestamp:2022-09-06T01:24:14
22/09/06 01:24:14 INFO SessionHandler: onSessionRemoteClose connectionId[mgmt-session], entityName[MF_cdba69_1660893297704], condition[Error{condition=null, description='null', info=null}]
22/09/06 01:24:14 INFO ConnectionHandler: onConnectionRemoteClose hostname[sim-india-prod-databricks-eventhub.servicebus.windows.net:5671], connectionId[MF_cdba69_1660893297704], errorCondition[amqp:connection:forced], errorDescription[The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:2906f6d644304f56bc223563f30e2283_G0S1, SystemTracker:gateway5, Timestamp:2022-09-06T01:24:14]
22/09/06 01:24:14 WARN MessagingFactory: onConnectionError messagingFactory[MF_cdba69_1660893297704], hostname[sim-india-prod-databricks-eventhub.servicebus.windows.net], error[The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:2906f6d644304f56bc223563f30e2283_G0S1, SystemTracker:gateway5, Timestamp:2022-09-06T01:24:14]
22/09/06 01:24:14 WARN MessagingFactory: onConnectionError messagingFactory[MF_cdba69_1660893297704], hostname[sim-india-prod-databricks-eventhub.servicebus.windows.net], closing current connection
22/09/06 01:24:14 INFO BaseLinkHandler: onLinkLocalClose clientName[mgmt], linkName[mgmt:sender], errorCondition[amqp:connection:forced], errorDescription[The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:2906f6d644304f56bc223563f30e2283_G0S1, SystemTracker:gateway5, Timestamp:2022-09-06T01:24:14]
22/09/06 01:24:14 INFO SessionHandler: onSessionLocalClose connectionId[mgmt-session], entityName[MF_cdba69_1660893297704], condition[Error{condition=amqp:connection:forced, description='The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:2906f6d644304f56bc223563f30e2283_G0S1, SystemTracker:gateway5, Timestamp:2022-09-06T01:24:14', info=null}]
22/09/06 01:24:14 INFO BaseLinkHandler: onLinkLocalClose clientName[mgmt], linkName[mgmt:receiver], errorCondition[amqp:connection:forced], errorDescription[The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:2906f6d644304f56bc223563f30e2283_G0S1, SystemTracker:gateway5, Timestamp:2022-09-06T01:24:14]
22/09/06 01:24:14 INFO ConnectionHandler: onConnectionLocalClose hostname[sim-india-prod-databricks-eventhub.servicebus.windows.net:5671], connectionId[MF_cdba69_1660893297704], errorCondition[amqp:connection:forced], errorDescription[The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:2906f6d644304f56bc223563f30e2283_G0S1, SystemTracker:gateway5, Timestamp:2022-09-06T01:24:14]
22/09/06 01:24:14 INFO ConnectionHandler: onConnectionUnbound hostname[sim-india-prod-databricks-eventhub.servicebus.windows.net:5671], connectionId[MF_cdba69_1660893297704], state[CLOSED], remoteState[CLOSED]
22/09/06 01:24:14 INFO SessionHandler: onSessionFinal connectionId[MF_cdba69_1660893297704], entityName[mgmt-session], condition[amqp:connection:forced], description[The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:2906f6d644304f56bc223563f30e2283_G0S1, SystemTracker:gateway5, Timestamp:2022-09-06T01:24:14]
22/09/06 01:24:14 INFO ConnectionHandler: onConnectionFinal hostname[sim-india-prod-databricks-eventhub.servicebus.windows.net:5671], connectionId[MF_cdba69_1660893297704], errorCondition[amqp:connection:forced], errorDescription[The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:2906f6d644304f56bc223563f30e2283_G0S1, SystemTracker:gateway5, Timestamp:2022-09-06T01:24:14]
22/09/06 01:24:17 INFO ClusterLoadAvgHelper: Current cluster load: 2, Old Ema: 1.9999999536833557, New Ema: 1.9999999606308523
22/09/06 01:24:19 INFO BaseLinkHandler: onLinkRemoteClose clientName[mgmt], linkName[mgmt:sender], errorCondition[amqp:connection:forced], errorDescription[The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:6462135b51674a9cbf4f48fe9e4de716_G3S2, SystemTracker:gateway5, Timestamp:2022-09-06T01:24:19]
22/09/06 01:24:19 INFO BaseLinkHandler: processOnClose clientName[mgmt], linkName[mgmt:sender], errorCondition[amqp:connection:forced], errorDescription[The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:6462135b51674a9cbf4f48fe9e4de716_G3S2, SystemTracker:gateway5, Timestamp:2022-09-06T01:24:19]
22/09/06 01:24:19 INFO BaseLinkHandler: closeSession for clientName[mgmt], linkName[mgmt:sender], errorCondition[amqp:connection:forced], errorDescription[The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:6462135b51674a9cbf4f48fe9e4de716_G3S2, SystemTracker:gateway5, Timestamp:2022-09-06T01:24:19]
22/09/06 01:24:19 INFO BaseLinkHandler: onLinkRemoteClose clientName[mgmt], linkName[mgmt:receiver], errorCondition[amqp:connection:forced], errorDescription[The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:6462135b51674a9cbf4f48fe9e4de716_G3S2, SystemTracker:gateway5, Timestamp:2022-09-06T01:24:19]
22/09/06 01:24:19 INFO BaseLinkHandler: processOnClose clientName[mgmt], linkName[mgmt:receiver], errorCondition[amqp:connection:forced], errorDescription[The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:6462135b51674a9cbf4f48fe9e4de716_G3S2, SystemTracker:gateway5, Timestamp:2022-09-06T01:24:19]
22/09/06 01:24:19 WARN RequestResponseOpener: requestResponseChannel.onClose error clientId[MF_fa3275_1660893297543], session[mgmt-session], link[mgmt], endpoint[$management], rrc[RRC_6e716c_1661898110001], error The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:6462135b51674a9cbf4f48fe9e4de716_G3S2, SystemTracker:gateway5, Timestamp:2022-09-06T01:24:19
22/09/06 01:24:19 INFO SessionHandler: onSessionRemoteClose connectionId[mgmt-session], entityName[MF_fa3275_1660893297543], condition[Error{condition=null, description='null', info=null}]
22/09/06 01:24:19 INFO ConnectionHandler: onConnectionRemoteClose hostname[sim-india-prod-databricks-eventhub.servicebus.windows.net:5671], connectionId[MF_fa3275_1660893297543], errorCondition[amqp:connection:forced], errorDescription[The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:6462135b51674a9cbf4f48fe9e4de716_G3S2, SystemTracker:gateway5, Timestamp:2022-09-06T01:24:19]
22/09/06 01:24:19 WARN MessagingFactory: onConnectionError messagingFactory[MF_fa3275_1660893297543], hostname[sim-india-prod-databricks-eventhub.servicebus.windows.net], error[The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:6462135b51674a9cbf4f48fe9e4de716_G3S2, SystemTracker:gateway5, Timestamp:2022-09-06T01:24:19]
22/09/06 01:24:19 WARN MessagingFactory: onConnectionError messagingFactory[MF_fa3275_1660893297543], hostname[sim-india-prod-databricks-eventhub.servicebus.windows.net], closing current connection
22/09/06 01:24:19 INFO BaseLinkHandler: onLinkLocalClose clientName[mgmt], linkName[mgmt:sender], errorCondition[amqp:connection:forced], errorDescription[The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:6462135b51674a9cbf4f48fe9e4de716_G3S2, SystemTracker:gateway5, Timestamp:2022-09-06T01:24:19]
22/09/06 01:24:19 INFO SessionHandler: onSessionLocalClose connectionId[mgmt-session], entityName[MF_fa3275_1660893297543], condition[Error{condition=amqp:connection:forced, description='The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:6462135b51674a9cbf4f48fe9e4de716_G3S2, SystemTracker:gateway5, Timestamp:2022-09-06T01:24:19', info=null}]
22/09/06 01:24:19 INFO BaseLinkHandler: onLinkLocalClose clientName[mgmt], linkName[mgmt:receiver], errorCondition[amqp:connection:forced], errorDescription[The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:6462135b51674a9cbf4f48fe9e4de716_G3S2, SystemTracker:gateway5, Timestamp:2022-09-06T01:24:19]
22/09/06 01:24:19 INFO ConnectionHandler: onConnectionLocalClose hostname[sim-india-prod-databricks-eventhub.servicebus.windows.net:5671], connectionId[MF_fa3275_1660893297543], errorCondition[amqp:connection:forced], errorDescription[The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:6462135b51674a9cbf4f48fe9e4de716_G3S2, SystemTracker:gateway5, Timestamp:2022-09-06T01:24:19]
22/09/06 01:24:19 INFO ConnectionHandler: onConnectionUnbound hostname[sim-india-prod-databricks-eventhub.servicebus.windows.net:5671], connectionId[MF_fa3275_1660893297543], state[CLOSED], remoteState[CLOSED]
22/09/06 01:24:19 INFO SessionHandler: onSessionFinal connectionId[MF_fa3275_1660893297543], entityName[mgmt-session], condition[amqp:connection:forced], description[The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:6462135b51674a9cbf4f48fe9e4de716_G3S2, SystemTracker:gateway5, Timestamp:2022-09-06T01:24:19]
22/09/06 01:24:19 INFO ConnectionHandler: onConnectionFinal hostname[sim-india-prod-databricks-eventhub.servicebus.windows.net:5671], connectionId[MF_fa3275_1660893297543], errorCondition[amqp:connection:forced], errorDescription[The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:6462135b51674a9cbf4f48fe9e4de716_G3S2, SystemTracker:gateway5, Timestamp:2022-09-06T01:24:19]
22/09/06 01:24:20 INFO ClusterLoadAvgHelper: Current cluster load: 2, Old Ema: 1.9999999606308523, New Ema: 1.9999999665362245
22/09/06 01:24:20 WARN TaskSetManager: Lost task 0.0 in stage 3318011.0 (TID 16388549) (10.139.64.8 executor 126): java.util.concurrent.TimeoutException: Futures timed out after [5 minutes]

Please check and revert on the possible reasons of this at earliest, as this is leading to a data loss for us.

This is kind of a useless log as it doesn't contain the time period before the connection was closed? The AMQP connection idles out after 5 minutes, if there is some issue, it happened before the log that's presented here.

I believe that this is a known issue (and one that I'm surprised isn't addressed here). The spark config setting
spark.locality.wait
is not set sufficiently long enough for the linked tasks to process the data.
We had this issue and set our to
spark.locality.wait 1800s
in the cluster 'spark config' section and these issues went away.
See
https://github.com/Azure/azure-event-hubs-spark/blob/master/examples/multiple-readers-example.md#receivers-move-between-executor-nodes

and #586

You increase the 'locality level' of the task executions by upping the spark.locality.wait accordingly.

Hope this helps.