streamnative/pulsar-spark

[BUG] Failed to create producer: Namespace is being unloaded, cannot add topic

mathieudruart opened this issue · 7 comments

Describe the bug
A running Spark application stopped working after a microbatch execution failed (HTTP 500 error from Pulsar), after the failure, the Spark Application seems to try to reconnect to the output topic but only gets "Could not get connection to broker: Namespace is being unloaded, cannot add topic".

Others consumers/producers are connected to the same Topic and are still working correctly.

To Reproduce
Pulsar (2.5.0) and Spark (2.4.4) are deployed on the same kubernetes cluster. Pulsar is deployed with default mini-helm charts. Spark is deployed with Spark-Operator.

The Spark application is a streaming Dataset (input and output are pulsar topics). The output topic name is dynamically created based on informations in the input message.

The application was working correctly during hours, and then an error occured (seems to be a temporary error from Pulsar) :

20/01/31 19:35:20 ERROR MicroBatchExecution: Query [id = 27bd0ffe-0c41-4a43-8d1b-2636549b6314, runId = 86420f14-ff24-4e1d-b2da-40627d6e5c6a] terminated with error
java.lang.RuntimeException: Failed to get last messageId for persistent://public/default/aom-emp
at org.apache.spark.sql.pulsar.PulsarMetadataReader$$anonfun$fetchLatestOffsets$1.liftedTree1$1(PulsarMetadataReader.scala:203)
at org.apache.spark.sql.pulsar.PulsarMetadataReader$$anonfun$fetchLatestOffsets$1.apply(PulsarMetadataReader.scala:197)
at org.apache.spark.sql.pulsar.PulsarMetadataReader$$anonfun$fetchLatestOffsets$1.apply(PulsarMetadataReader.scala:195)
...
org.apache.spark.sql.streaming.StreamingQueryException: Failed to get last messageId for persistent://public/default/aom-emp
=== Streaming Query ===
Identifier: [id = 27bd0ffe-0c41-4a43-8d1b-2636549b6314, runId = 86420f14-ff24-4e1d-b2da-40627d6e5c6a]
Current Committed Offsets: {org.apache.spark.sql.pulsar.PulsarMicroBatchReader@6d62ae90: {"persistent://public/default/aom-emp":[8,-114,-123,11,16,6]}}
Current Available Offsets: {org.apache.spark.sql.pulsar.PulsarMicroBatchReader@6d62ae90: {"persistent://public/default/aom-emp":[8,-114,-123,11,16,6]}}
Current State: ACTIVE
Thread State: RUNNABLE
Logical Plan:
Project [concat(objet#8, _, action#9) AS __topic#40, cast(window#24-T3660000ms as string) AS __key#41, sum(nb)#34L AS value#42L]
+- Aggregate [window#35-T3660000ms, objet#8, action#9], [window#35-T3660000ms AS window#24-T3660000ms, objet#8, action#9, sum(cast(nb#10 as bigint)) AS sum(nb)#34L]
+- Filter isnotnull(__eventTime#15-T3660000ms)
+- Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(__eventTime#15-T3660000ms, TimestampType, LongType) - 0) as double) / cast(3600000000 as double))) as double) = (cast((precisetimestampconversion(__eventTime#15-T3660000ms, TimestampType, LongType) - 0) as double) / cast(3600000000 as double))) THEN (CEIL((cast((precisetimestampconversion(__eventTime#15-T3660000ms, TimestampType, LongType) - 0) as double) / cast(3600000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(__eventTime#15-T3660000ms, TimestampType, LongType) - 0) as double) / cast(3600000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 3600000000) + 0), LongType, TimestampType), end, precisetimestampconversion((((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(__eventTime#15-T3660000ms, TimestampType, LongType) - 0) as double) / cast(3600000000 as double))) as double) = (cast((precisetimestampconversion(__eventTime#15-T3660000ms, TimestampType, LongType) - 0) as double) / cast(3600000000 as double))) THEN (CEIL((cast((precisetimestampconversion(__eventTime#15-T3660000ms, TimestampType, LongType) - 0) as double) / cast(3600000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(__eventTime#15-T3660000ms, TimestampType, LongType) - 0) as double) / cast(3600000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 3600000000) + 0) + 3600000000), LongType, TimestampType)) AS window#35-T3660000ms, objet#8, action#9, nb#10, __key#11, __topic#12, __messageId#13, __publishTime#14, __eventTime#15-T3660000ms]
+- EventTimeWatermark __eventTime#15: timestamp, interval 1 hours 1 minutes
+- StreamingExecutionRelation org.apache.spark.sql.pulsar.PulsarMicroBatchReader@6d62ae90, [objet#8, action#9, nb#10, __key#11, __topic#12, __messageId#13, __publishTime#14, __eventTime#15]
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:297)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
Caused by: java.lang.RuntimeException: Failed to get last messageId for persistent://public/default/aom-emp
at org.apache.spark.sql.pulsar.PulsarMetadataReader$$anonfun$fetchLatestOffsets$1.liftedTree1$1(PulsarMetadataReader.scala:203)
...
Caused by: org.apache.pulsar.shade.javax.ws.rs.InternalServerErrorException: HTTP 500 Internal Server Error
at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyInvocation.convertToException(JerseyInvocation.java:1098)
at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyInvocation.access$700(JerseyInvocation.java:99)
... 50 more

After this error the Spark application seems to try to reconnect regularly, but only have these errors :

20/01/31 23:57:20 ERROR ClientCnx: [id: 0xce703541, L:/10.42.0.28:37144 - R:172.16.101.114/172.16.101.114:30002] Close connection because received internal-server error org.apache.pulsar.client.api.PulsarClientException$LookupException: java.lang.IllegalStateException: Namespace bundle public/default/0x70000000_0x80000000 is being unloaded
20/01/31 23:57:20 WARN BinaryProtoLookupService: [persistent://public/default/AbsenceEnseignant_creation] failed to send lookup request : org.apache.pulsar.client.api.PulsarClientException$LookupException: org.apache.pulsar.client.api.PulsarClientException$LookupException: java.lang.IllegalStateException: Namespace bundle public/default/0x70000000_0x80000000 is being unloaded
20/01/31 23:57:20 WARN ConnectionHandler: [AbsenceEnseignant_creation] [terrific-dragonfly-pulsar-0-109] Error connecting to broker: org.apache.pulsar.client.api.PulsarClientException$LookupException: org.apache.pulsar.client.api.PulsarClientException$LookupException: java.lang.IllegalStateException: Namespace bundle public/default/0x70000000_0x80000000 is being unloaded
20/01/31 23:57:20 WARN ConnectionHandler: [AbsenceEnseignant_creation] [terrific-dragonfly-pulsar-0-109] Could not get connection to broker: org.apache.pulsar.client.api.PulsarClientException$LookupException: org.apache.pulsar.client.api.PulsarClientException$LookupException: java.lang.IllegalStateException: Namespace bundle public/default/0x70000000_0x80000000 is being unloaded -- Will try again in 0.195 s
20/02/01 01:42:20 ERROR ClientCnx: [id: 0x9c396ec5, L:/10.42.0.28:38560 - R:172.16.101.114/172.16.101.114:30002] Close connection because received internal-server error org.apache.pulsar.client.api.PulsarClientException$LookupException: java.lang.IllegalStateException: Namespace bundle public/default/0x70000000_0x80000000 is being unloaded
20/02/01 01:42:20 WARN BinaryProtoLookupService: [persistent://public/default/AbsenceEnseignant_creation] failed to send lookup request : org.apache.pulsar.client.api.PulsarClientException$LookupException: org.apache.pulsar.client.api.PulsarClientException$LookupException: java.lang.IllegalStateException: Namespace bundle public/default/0x70000000_0x80000000 is being unloaded
20/02/01 01:42:20 WARN ConnectionHandler: [AbsenceEnseignant_creation] [terrific-dragonfly-pulsar-0-109] Error connecting to broker: org.apache.pulsar.client.api.PulsarClientException$LookupException: org.apache.pulsar.client.api.PulsarClientException$LookupException: java.lang.IllegalStateException: Namespace bundle public/default/0x70000000_0x80000000 is being unloaded
20/02/01 01:42:20 WARN ConnectionHandler: [AbsenceEnseignant_creation] [terrific-dragonfly-pulsar-0-109] Could not get connection to broker: org.apache.pulsar.client.api.PulsarClientException$LookupException: org.apache.pulsar.client.api.PulsarClientException$LookupException: java.lang.IllegalStateException: Namespace bundle public/default/0x70000000_0x80000000 is being unloaded -- Will try again in 0.191 s
20/02/01 16:30:20 ERROR ClientCnx: [id: 0x9e3cd9fd, L:/10.42.0.28:60278 - R:172.16.101.114/172.16.101.114:30002] Close connection because received internal-server error org.apache.pulsar.client.api.PulsarClientException$LookupException: java.lang.IllegalStateException: Namespace bundle public/default/0x70000000_0x80000000 is being unloaded
20/02/01 16:30:20 WARN BinaryProtoLookupService: [persistent://public/default/AbsenceEnseignant_creation] failed to send lookup request : org.apache.pulsar.client.api.PulsarClientException$LookupException: org.apache.pulsar.client.api.PulsarClientException$LookupException: java.lang.IllegalStateException: Namespace bundle public/default/0x70000000_0x80000000 is being unloaded
20/02/01 16:30:20 WARN ConnectionHandler: [AbsenceEnseignant_creation] [terrific-dragonfly-pulsar-0-109] Error connecting to broker: org.apache.pulsar.client.api.PulsarClientException$LookupException: org.apache.pulsar.client.api.PulsarClientException$LookupException: java.lang.IllegalStateException: Namespace bundle public/default/0x70000000_0x80000000 is being unloaded
20/02/01 16:30:20 WARN ConnectionHandler: [AbsenceEnseignant_creation] [terrific-dragonfly-pulsar-0-109] Could not get connection to broker: org.apache.pulsar.client.api.PulsarClientException$LookupException: org.apache.pulsar.client.api.PulsarClientException$LookupException: java.lang.IllegalStateException: Namespace bundle public/default/0x70000000_0x80000000 is being unloaded -- Will try again in 0.193 s
20/02/02 16:51:21 WARN ClientCnx: [id: 0x035d4a85, L:/10.42.0.28:37146 - R:172.16.101.114/172.16.101.114:30002] Received error from server: Namespace is being unloaded, cannot add topic persistent://public/default/AbsenceEnseignant_creation
20/02/02 16:51:21 ERROR ProducerImpl: [AbsenceEnseignant_creation] [terrific-dragonfly-pulsar-0-109] Failed to create producer: Namespace is being unloaded, cannot add topic persistent://public/default/AbsenceEnseignant_creation
20/02/02 16:51:21 WARN ConnectionHandler: [AbsenceEnseignant_creation] [terrific-dragonfly-pulsar-0-109] Could not get connection to broker: Namespace is being unloaded, cannot add topic persistent://public/default/AbsenceEnseignant_creation -- Will try again in 0.185 s
20/02/03 20:24:20 WARN ClientCnx: [id: 0x035d4a85, L:/10.42.0.28:37146 - R:172.16.101.114/172.16.101.114:30002] Received error from server: Namespace is being unloaded, cannot add topic persistent://public/default/AbsenceEnseignant_creation
20/02/03 20:24:20 ERROR ProducerImpl: [AbsenceEnseignant_creation] [terrific-dragonfly-pulsar-0-109] Failed to create producer: Namespace is being unloaded, cannot add topic persistent://public/default/AbsenceEnseignant_creation
20/02/03 20:24:20 WARN ConnectionHandler: [AbsenceEnseignant_creation] [terrific-dragonfly-pulsar-0-109] Could not get connection to broker: Namespace is being unloaded, cannot add topic persistent://public/default/AbsenceEnseignant_creation -- Will try again in 0.198 s

Others producers and consumers have no issues connecting to the topic persistent://public/default/AbsenceEnseignant_creation

Expected behavior
After a temporary Pulsar error, the Streaming Spark application should succeed to reconnect to the topic.

sijie commented

@yjshen can you help check the errors here?

I'll look into this.

any progress for this bug? ping @yjshen

any progress for this bug @yjshen

any progress for this bug? @yjshen
we also meet this bug when use 2.6.3

@wususu Spark 2.6.3 support has been dropped. Can you confirm this on our latest release on Spark 3.2.x?

nlu90 commented

close due to inactivity.