Azure/azure-cosmosdb-spark

Error writing - JSONObject['version'] not a string

Closed this issue · 5 comments

I'm loading log data into Databricks to filter and extract some events we're interested in. Everything looks good but when I try to write the data to CosmosDB, I get an error "org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 9.0 failed 4 times, most recent failure: Lost task 1.3 in stage 9.0 (TID 24, 10.139.64.5, executor 0): com.microsoft.azure.documentdb.DocumentClientException: org.json.JSONException: JSONObject["version"] not a string."

I'm not sure where "version" is coming from - it's not one of my column names nor is "version" present in any of the data I'm trying to save to the database.

This is on Azure Databricks 5.2 (spark 2.4.0, scala 2.11). I installed the connector from the maven coordinate (com.microsoft.azure:azure-cosmosdb-spark_2.4.0_2.11:1.3.5).

image

Stack Trace:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<command-4178679052779286> in <module>()
      9 
     10 # Write to Cosmos DB from the records DataFrame
---> 11 cleaned_records.write.format("com.microsoft.azure.cosmosdb.spark").options(**writeConfig).save()

/databricks/spark/python/pyspark/sql/readwriter.py in save(self, path, format, mode, partitionBy, **options)
    732             self.format(format)
    733         if path is None:
--> 734             self._jwrite.save()
    735         else:
    736             self._jwrite.save(path)

/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o426.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 9.0 failed 4 times, most recent failure: Lost task 1.3 in stage 9.0 (TID 24, 10.139.64.5, executor 0): com.microsoft.azure.documentdb.DocumentClientException: org.json.JSONException: JSONObject["version"] not a string.
	at com.microsoft.azure.documentdb.bulkexecutor.DocumentBulkExecutor.toDocumentClientException(DocumentBulkExecutor.java:1566)
	at com.microsoft.azure.documentdb.bulkexecutor.DocumentBulkExecutor.executeBulkImportInternal(DocumentBulkExecutor.java:638)
	at com.microsoft.azure.documentdb.bulkexecutor.DocumentBulkExecutor.importAll(DocumentBulkExecutor.java:494)
	at com.microsoft.azure.cosmosdb.spark.CosmosDBSpark$.bulkImport(CosmosDBSpark.scala:309)
	at com.microsoft.azure.cosmosdb.spark.CosmosDBSpark$.savePartition(CosmosDBSpark.scala:467)
	at com.microsoft.azure.cosmosdb.spark.CosmosDBSpark$.com$microsoft$azure$cosmosdb$spark$CosmosDBSpark$$saveFilePartition(CosmosDBSpark.scala:356)
	at com.microsoft.azure.cosmosdb.spark.CosmosDBSpark$$anonfun$1.apply(CosmosDBSpark.scala:189)
	at com.microsoft.azure.cosmosdb.spark.CosmosDBSpark$$anonfun$1.apply(CosmosDBSpark.scala:183)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:869)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:869)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:304)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.doRunTask(Task.scala:139)
	at org.apache.spark.scheduler.Task.run(Task.scala:112)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1432)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.json.JSONException: JSONObject["version"] not a string.
	at org.json.JSONObject.getString(JSONObject.java:658)
	at com.microsoft.azure.documentdb.JsonSerializable.getString(JsonSerializable.java:288)
	at com.microsoft.azure.documentdb.PartitionKeyDefinition.getVersion(PartitionKeyDefinition.java:64)
	at com.microsoft.azure.documentdb.CommonsBridgeInternal.isV2(CommonsBridgeInternal.java:28)
	at com.microsoft.azure.documentdb.internal.routing.PartitionKeyInternalHelper.getEffectivePartitionKeyString(PartitionKeyInternalHelper.java:164)
	at com.microsoft.azure.documentdb.internal.routing.PartitionKeyInternal.getEffectivePartitionKeyString(PartitionKeyInternal.java:163)
	at com.microsoft.azure.documentdb.bulkexecutor.DocumentBulkExecutor.lambda$executeBulkImportAsyncImpl$1(DocumentBulkExecutor.java:855)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
	at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
	at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:401)
	at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734)
	at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
	at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:583)
	at com.microsoft.azure.documentdb.bulkexecutor.DocumentBulkExecutor.executeBulkImportAsyncImpl(DocumentBulkExecutor.java:853)
	at com.microsoft.azure.documentdb.bulkexecutor.DocumentBulkExecutor.executeBulkImportInternal(DocumentBulkExecutor.java:587)
	... 20 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2100)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2088)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2087)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2087)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1076)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1076)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1076)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2319)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2267)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2255)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:873)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2252)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2274)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2293)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2318)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:961)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:379)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:960)
	at com.microsoft.azure.cosmosdb.spark.CosmosDBSpark$.save(CosmosDBSpark.scala:193)
	at com.microsoft.azure.cosmosdb.spark.CosmosDBSpark$.save(CosmosDBSpark.scala:501)
	at com.microsoft.azure.cosmosdb.spark.DefaultSource.createRelation(DefaultSource.scala:77)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:72)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:70)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:88)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:143)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$5.apply(SparkPlan.scala:183)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:180)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:114)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:114)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:690)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:690)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:99)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:228)
	at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:85)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:158)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:690)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:290)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:284)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
	at py4j.Gateway.invoke(Gateway.java:295)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:251)
	at java.lang.Thread.run(Thread.java:748)
Caused by: com.microsoft.azure.documentdb.DocumentClientException: org.json.JSONException: JSONObject["version"] not a string.
	at com.microsoft.azure.documentdb.bulkexecutor.DocumentBulkExecutor.toDocumentClientException(DocumentBulkExecutor.java:1566)
	at com.microsoft.azure.documentdb.bulkexecutor.DocumentBulkExecutor.executeBulkImportInternal(DocumentBulkExecutor.java:638)
	at com.microsoft.azure.documentdb.bulkexecutor.DocumentBulkExecutor.importAll(DocumentBulkExecutor.java:494)
	at com.microsoft.azure.cosmosdb.spark.CosmosDBSpark$.bulkImport(CosmosDBSpark.scala:309)
	at com.microsoft.azure.cosmosdb.spark.CosmosDBSpark$.savePartition(CosmosDBSpark.scala:467)
	at com.microsoft.azure.cosmosdb.spark.CosmosDBSpark$.com$microsoft$azure$cosmosdb$spark$CosmosDBSpark$$saveFilePartition(CosmosDBSpark.scala:356)
	at com.microsoft.azure.cosmosdb.spark.CosmosDBSpark$$anonfun$1.apply(CosmosDBSpark.scala:189)
	at com.microsoft.azure.cosmosdb.spark.CosmosDBSpark$$anonfun$1.apply(CosmosDBSpark.scala:183)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:869)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:869)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:304)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.doRunTask(Task.scala:139)
	at org.apache.spark.scheduler.Task.run(Task.scala:112)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1432)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: org.json.JSONException: JSONObject["version"] not a string.
	at org.json.JSONObject.getString(JSONObject.java:658)
	at com.microsoft.azure.documentdb.JsonSerializable.getString(JsonSerializable.java:288)
	at com.microsoft.azure.documentdb.PartitionKeyDefinition.getVersion(PartitionKeyDefinition.java:64)
	at com.microsoft.azure.documentdb.CommonsBridgeInternal.isV2(CommonsBridgeInternal.java:28)
	at com.microsoft.azure.documentdb.internal.routing.PartitionKeyInternalHelper.getEffectivePartitionKeyString(PartitionKeyInternalHelper.java:164)
	at com.microsoft.azure.documentdb.internal.routing.PartitionKeyInternal.getEffectivePartitionKeyString(PartitionKeyInternal.java:163)
	at com.microsoft.azure.documentdb.bulkexecutor.DocumentBulkExecutor.lambda$executeBulkImportAsyncImpl$1(DocumentBulkExecutor.java:855)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
	at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
	at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:401)
	at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734)
	at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
	at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:583)
	at com.microsoft.azure.documentdb.bulkexecutor.DocumentBulkExecutor.executeBulkImportAsyncImpl(DocumentBulkExecutor.java:853)
	at com.microsoft.azure.documentdb.bulkexecutor.DocumentBulkExecutor.executeBulkImportInternal(DocumentBulkExecutor.java:587)
	... 20 more

I got the exact same problem, looking at my DataFrame I see no version column either
df show

I had see the same error in my customer site and this will not occur in azure-cosmosdb-spark_2.4.0_2.11-1.3.4 but same error will happen when using azure-cosmosdb-spark_2.4.0_2.11-1.3.5.
If you want to avoid this to develop, please uninstall 1.3.5 and install 1.3.4

This is now resolved in 1.4.0
The issue was happening to collections which checked partition keys > 100 bytes and the sdk dependency for the connector needed to be updated. Please upgrade to 1.4.0 and let us know if you still encounter the issue

This definitely fixed the problem I was having. Thank you!

This is now resolved in 1.4.0
The issue was happening to collections which checked partition keys > 100 bytes and the sdk dependency for the connector needed to be updated. Please upgrade to 1.4.0 and let us know if you still encounter the issue

Hi when I am using the azure-cosmosdb-spark_2.4.0_2.11-1.4.0-uber.jar and try to connect it says java.lang.IllegalArgumentException: Invalid JSON String: ''