Azure/spark-cdm-connector

java.lang.ArrayIndexOutOfBoundsException on parsing certain entities

Closed this issue · 3 comments

I have to parse a CDM data coming out MS CRM system. I could use this code to parse only few entities and remaining all I am facing an ArrayIndexOutOfBoundsException . Am I missing anything here please?

Code tried:

appid = "XXXXX"
appkey = "YYYY"

tenantid = "ZZZZ"

storageAccountName = "XXX.XXX.core.windows.net"
container = "commondatXXXXXX"

from pyspark.sql.types import *
from pyspark.sql import functions, Row
from decimal import Decimal
from datetime import datetime

testDF = (spark.read.format("com.microsoft.cdm")
.option("storage", storageAccountName)
.option("manifestPath", container + "/model.json")
.option("appId", appid)
.option("appKey", appkey)
.option("tenantID", tenantid)
.option("entity", "queueitem")
.load())

Error:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 31.0 failed 4 times, most recent failure: Lost task 0.3 in stage 31.0 (TID 83, 10.139.64.10, executor 0): java.lang.ArrayIndexOutOfBoundsException

Py4JJavaError Traceback (most recent call last)
in ()
22 .load())
23
---> 24 testDF.show(10)

/databricks/spark/python/pyspark/sql/dataframe.py in show(self, n, truncate, vertical)
377 """
378 if isinstance(truncate, bool) and truncate:
--> 379 print(self._jdf.showString(n, 20, vertical))
380 else:
381 print(self._jdf.showString(n, int(truncate), vertical))

/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in call(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()

/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(

Py4JJavaError: An error occurred while calling o1904.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 30.0 failed 4 times, most recent failure: Lost task 0.3 in stage 30.0 (TID 79, 10.139.64.10, executor 0): java.lang.ArrayIndexOutOfBoundsException

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2355)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2343)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2342)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2342)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1096)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1096)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1096)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2574)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2522)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2510)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:893)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2243)
at org.apache.spark.sql.execution.collect.Collector.runSparkJobs(Collector.scala:270)
at org.apache.spark.sql.execution.collect.Collector.collect(Collector.scala:280)
at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:80)
at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:86)
at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:508)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollectResult(limit.scala:55)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectResult(Dataset.scala:2842)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3462)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2571)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2571)
at org.apache.spark.sql.Dataset$$anonfun$56.apply(Dataset.scala:3446)
at org.apache.spark.sql.Dataset$$anonfun$56.apply(Dataset.scala:3441)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:111)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:240)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:97)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:170)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withAction(Dataset.scala:3441)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2571)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2785)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:265)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:302)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:295)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:251)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException

Dear Team,

Thanks for the quick response. Yes I am using .17 version and I will send model.json file to your email asksparkcdm@microsoft.com. I am working against time to complete this CDM ingestion any help would be really appreciated. thanks.

Hi,
The connector will be reading the CDM entity in FAIFAST mode i.e. it will throw an exception when number of columns csv file != the number of attributes in the entity; which is happening in this case. 0.18.1 verison has been released which throws an error message when you read such entity.

https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/DataFrameReader.html#csv-scala.collection.Seq-