java.lang.ArithmeticException: long overflow
Closed this issue · 6 comments
Hi,
When I am writing to a CDM location file format as parquet. After the successful write, if I am reading the same entity and doing a count of the dataframe this is giving the below error.
Py4JJavaError: An error occurred while calling o949.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 128.0 failed 4 times, most recent failure: Lost task 0.3 in stage 128.0 (TID 2603, 10.139.64.4, executor 0): java.lang.ArithmeticException: long overflow
at java.lang.Math.multiplyExact(Math.java:892)
at java.time.Instant.nanosUntil(Instant.java:1164)
at java.time.Instant.until(Instant.java:1149)
at java.time.temporal.ChronoUnit.between(ChronoUnit.java:272)
at com.microsoft.cdm.read.ParquetReaderConnector.parseTime(ParquetReaderConnector.scala:216)
at com.microsoft.cdm.read.ParquetReaderConnector.jsonToData(ParquetReaderConnector.scala:259)
at com.microsoft.cdm.read.CDMDataReader$$anonfun$1.apply(CDMDataReader.scala:66)
at com.microsoft.cdm.read.CDMDataReader$$anonfun$1.apply(CDMDataReader.scala:64)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at com.microsoft.cdm.read.CDMDataReader.get(CDMDataReader.scala:64)
at com.microsoft.cdm.read.CDMDataReader.get(CDMDataReader.scala:20)
at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.next(DataSourceRDD.scala:59)
at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:640)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:140)
at org.apache.spark.scheduler.Task.run(Task.scala:113)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:537)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:543)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2362)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2350)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2349)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2349)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1102)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1102)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1102)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2582)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2529)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2517)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:897)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2282)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2380)
at org.apache.spark.sql.execution.collect.Collector.runSparkJobs(Collector.scala:245)
at org.apache.spark.sql.execution.collect.Collector.collect(Collector.scala:280)
at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:80)
at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:86)
at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:508)
at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:480)
at org.apache.spark.sql.execution.SparkPlan.executeCollectResult(SparkPlan.scala:330)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:306)
at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2931)
at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2930)
at org.apache.spark.sql.Dataset$$anonfun$54.apply(Dataset.scala:3492)
at org.apache.spark.sql.Dataset$$anonfun$54.apply(Dataset.scala:3487)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:113)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:243)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:99)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:173)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withAction(Dataset.scala:3487)
at org.apache.spark.sql.Dataset.count(Dataset.scala:2930)
at sun.reflect.GeneratedMethodAccessor414.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:295)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:251)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArithmeticException: long overflow
at java.lang.Math.multiplyExact(Math.java:892)
at java.time.Instant.nanosUntil(Instant.java:1164)
at java.time.Instant.until(Instant.java:1149)
at java.time.temporal.ChronoUnit.between(ChronoUnit.java:272)
at com.microsoft.cdm.read.ParquetReaderConnector.parseTime(ParquetReaderConnector.scala:216)
at com.microsoft.cdm.read.ParquetReaderConnector.jsonToData(ParquetReaderConnector.scala:259)
at com.microsoft.cdm.read.CDMDataReader$$anonfun$1.apply(CDMDataReader.scala:66)
at com.microsoft.cdm.read.CDMDataReader$$anonfun$1.apply(CDMDataReader.scala:64)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at com.microsoft.cdm.read.CDMDataReader.get(CDMDataReader.scala:64)
at com.microsoft.cdm.read.CDMDataReader.get(CDMDataReader.scala:20)
at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.next(DataSourceRDD.scala:59)
at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:640)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:140)
at org.apache.spark.scheduler.Task.run(Task.scala:113)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:537)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:543)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Below is the data that is creating the issue with a header. CSV file.
index,invoice_id,invoice_no,reciept_no,created_date,status,total_collection,gross_amount,net_amount,discount,rounding_adjustment,cashback,guest_id,items_payments_ssg,items_payments_tip,items_payments_tax,items_payments_amount,items_payments_detail_type,items_payments_type,items_taxes_item_percentage,items_taxes_tax_percentage,items_taxes_amount,items_taxes_type,items_invoice_item_id,items_therapist_id,items_cashback_redemption,items_discount,items_final_sale_price,items_quantity,items_type,items_code,items_name,items_id
187,dxxxxxx-4xxx-4xxx-9xxxx-c7xxxxxxxx,INVxxxxx,INVRxxxxxxx,0001-01-01T00:00:00,4,31.93,31.93,31.93,0.0,0.0,0.0,708xxxxx-6bxx-40xx-bfxf-214bd8954e0b,0.0,0.0,2.93,31.93,mastercard,CC,100.0,10.1,2.93,County Tax,b0xxxxx-ffxx-4bxx-a3xx-f0874bxxxxxx,e7dxxxxx-9exx-48xx-9dxx-291xxxxxxxx,0.0,0.0,31.93,1,Product,80130,WATERFALL RCO 5oz,8axxxxx-6dxx-4bxx-88xx-7exx593xxxx
@mashray try writing to a regular storage or delta without the connector and see if you can reproduce this issue, so you can isolate it to something related to Spark or the CDM connector. Also, try Spark 3.X
The same process works without an issue with Spark natively and delta lake. So this is an issue with Spark CDM connector.
Test data is here -- https://drive.google.com/file/d/1cU5Y2ee8AJ6F4gjSy2R8INWDJTSucMep/view?usp=sharing
Thanks @TissonMathew for the test data. This is fixed in .19