anguenot/pyspark-cassandra

Random failures on .saveToCassandra()

Closed this issue · 2 comments

Hi all,

Before anything, thanks for your efforts on this library. We've been using it for ~2.5 years and works like a charm.

Recently we've started to migrate the entire stack to Python3, Cassandra 3.11, Spark 2.4. I recompiled the library with the proper modifications to the building script and it works fine. But sometimes I get an error on .saveToCassandra(), I suspect. And I say I suspect, because I've really checked everywhere and cannot find where the error might be. Sometimes it works, sometimes it doesn't.

Code is as follows:

def process(dateToQuery, sc, field_pos):

    buckets = [(dateToQuery, i) for i in range(64)]
    bucketlist = sc.parallelize(buckets)
    query = bucketlist\
        .joinWithCassandraTable(opts.get('keyspace'), opts.get('srctable'))\
        .select(*select)\
        .map(lambda r: mapper(r[1], field_pos))\
        .reduceByKey(reducer)\
        .map(flattener)\
        .saveToCassandra(opts.get('keyspace'), opts.get('dsttable'))

Nothing particularly interesting there.

The error I get sometimes is:

19/07/28 00:30:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
19/07/28 00:30:02 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).

    doing 2019-07-27 16:00:00

    done 2019-07-27 16:00:00

    doing 2019-07-27 17:00:00
19/07/28 00:30:21 WARN TaskSetManager: Lost task 110.0 in stage 2.0 (TID 114, 10.103.96.104, executor 58): net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for datetime.datetime)
        at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
        at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707)
        at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175)
        at net.razorvine.pickle.Unpickler.load(Unpickler.java:99)
        at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112)
        at pyspark_util.BatchUnpickler.apply(Pickling.scala:139)
        at pyspark_util.BatchUnpickler.apply(Pickling.scala:137)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1124)
        at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
        at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1124)
        at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
        at scala.collection.Iterator$Leading$1.hasNext(Iterator.scala:668)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:212)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.TraversableOnce$FlattenOps$$anon$1.hasNext(TraversableOnce.scala:464)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
        at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1124)
        at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
        at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:557)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:345)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:194)

After getting some of those, sometimes the process finishes successfully. Some other times not. When it doesn't, this is the output:

19/07/28 00:30:21 WARN TaskSetManager: Lost task 110.0 in stage 2.0 (TID 114, 10.103.96.104, executor 58): net.razorvine.pickle.PickleException: expected zero arguments for
 construction of ClassDict (for datetime.datetime)
        at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
        at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707)
        at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175)
        at net.razorvine.pickle.Unpickler.load(Unpickler.java:99)
        at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112)
        at pyspark_util.BatchUnpickler.apply(Pickling.scala:139)
        at pyspark_util.BatchUnpickler.apply(Pickling.scala:137)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1124)
        at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
        at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1124)
        at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
        at scala.collection.Iterator$Leading$1.hasNext(Iterator.scala:668)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:212)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.TraversableOnce$FlattenOps$$anon$1.hasNext(TraversableOnce.scala:464)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
        at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1124)
        at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
        at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:557)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:345)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:194)

19/07/28 00:30:23 WARN TaskSetManager: Lost task 3.0 in stage 3.0 (TID 135, 10.103.96.104, executor 58): net.razorvine.pickle.PickleException: expected zero arguments for c
onstruction of ClassDict (for datetime.datetime)
        at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
        at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707)
        at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175)
        at net.razorvine.pickle.Unpickler.load(Unpickler.java:99)
        at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112)
        at pyspark_util.BatchUnpickler.apply(Pickling.scala:139)
        at pyspark_util.BatchUnpickler.apply(Pickling.scala:137)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
        at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12)
        at com.datastax.spark.connector.writer.GroupingBatchBuilder.hasNext(GroupingBatchBuilder.scala:101)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31)
        at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:233)
        at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:210)
        at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:112)
        at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
        at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:145)
        at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)
        at com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:210)
        at com.datastax.spark.connector.writer.TableWriter.insert(TableWriter.scala:197)
        at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:183)
        at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
        at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

19/07/28 00:30:23 ERROR TaskSetManager: Task 52 in stage 3.0 failed 4 times; aborting job
19/07/28 00:30:23 WARN TaskSetManager: Lost task 120.0 in stage 3.0 (TID 252, 10.103.97.185, executor 0): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 90.0 in stage 3.0 (TID 222, 10.103.97.227, executor 39): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 14.0 in stage 3.0 (TID 146, 10.103.99.178, executor 34): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 27.0 in stage 3.0 (TID 159, 10.103.97.227, executor 39): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 34.0 in stage 3.0 (TID 166, 10.103.96.104, executor 57): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 99.0 in stage 3.0 (TID 231, 10.103.99.217, executor 42): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 79.0 in stage 3.0 (TID 211, 10.103.101.4, executor 26): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 114.0 in stage 3.0 (TID 246, 10.103.98.157, executor 50): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 60.0 in stage 3.0 (TID 192, 10.103.100.32, executor 52): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 1.0 in stage 3.0 (TID 133, 10.103.100.102, executor 62): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 15.0 in stage 3.0 (TID 147, 10.103.100.37, executor 22): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 100.0 in stage 3.0 (TID 232, 10.103.98.231, executor 3): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 47.0 in stage 3.0 (TID 179, 10.103.100.32, executor 51): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 10.0 in stage 3.0 (TID 142, 10.103.100.95, executor 15): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 36.0 in stage 3.0 (TID 168, 10.103.99.217, executor 42): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 78.0 in stage 3.0 (TID 210, 10.103.100.37, executor 22): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 63.0 in stage 3.0 (TID 195, 10.103.96.26, executor 27): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 61.0 in stage 3.0 (TID 193, 10.103.98.157, executor 49): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 3.3 in stage 3.0 (TID 264, 10.103.101.77, executor 9): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 76.0 in stage 3.0 (TID 208, 10.103.98.186, executor 45): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 16.0 in stage 3.0 (TID 148, 10.103.101.4, executor 26): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 13.0 in stage 3.0 (TID 145, 10.103.98.186, executor 45): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 125.0 in stage 3.0 (TID 257, 10.103.96.26, executor 29): TaskKilled (Stage cancelled)
Traceback (most recent call last):
  File "/home/sparkuser/prod/scripts/request_summaries/scripts/python3/summarize_requests_hourly_od_data.py", line 281, in <module>
    main(args)
  File "/home/sparkuser/prod/scripts/request_summaries/scripts/python3/summarize_requests_hourly_od_data.py", line 261, in main
    process(dateToQuery, sc, field_pos)
  File "/home/sparkuser/prod/scripts/request_summaries/scripts/python3/summarize_requests_hourly_od_data.py", line 241, in process
    .saveToCassandra(opts.get('keyspace'), opts.get('dsttable'))
  File "/home/sparkuser/prod/scripts/request_summaries/scripts/python3/pyspark-cassandra-assembly-0.11.0.jar/pyspark_cassandra/rdd.py", line 99, in saveToCassandra
  File "/home/sparkuser/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/home/sparkuser/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
19/07/28 00:30:23 WARN TaskSetManager: Lost task 80.0 in stage 3.0 (TID 212, 10.103.97.252, executor 37): TaskKilled (Stage cancelled)
py4j.protocol.Py4JJavaError: An error occurred while calling o46.saveToCassandra.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 52 in stage 3.0 failed 4 times, most recent failure: Lost task 52.3 in stage 3.0 (TID 265, 10.103.
100.102, executor 60): net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for datetime.datetime)
        at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
        at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707)
        at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175)
        at net.razorvine.pickle.Unpickler.load(Unpickler.java:99)
        at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112)
        at pyspark_util.BatchUnpickler.apply(Pickling.scala:139)
        at pyspark_util.BatchUnpickler.apply(Pickling.scala:137)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
        at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12)
        at com.datastax.spark.connector.writer.GroupingBatchBuilder.hasNext(GroupingBatchBuilder.scala:101)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31)
        at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:233)
        at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:210)
        at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:112)
        at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
        at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:145)
        at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)
        at com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:210)
        at com.datastax.spark.connector.writer.TableWriter.insert(TableWriter.scala:197)
        at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:183)
        at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
        at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1887)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1875)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1874)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1874)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2108)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
        at com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:36)
        at pyspark_cassandra.PythonHelper.saveToCassandra(PythonHelper.scala:94)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
Caused by: net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for datetime.datetime)
        at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
        at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707)
        at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175)
        at net.razorvine.pickle.Unpickler.load(Unpickler.java:99)
        at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112)
        at pyspark_util.BatchUnpickler.apply(Pickling.scala:139)
        at pyspark_util.BatchUnpickler.apply(Pickling.scala:137)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
        at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12)
        at com.datastax.spark.connector.writer.GroupingBatchBuilder.hasNext(GroupingBatchBuilder.scala:101)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31)
        at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:233)
        at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:210)
        at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:112)
        at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
        at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:145)
        at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)
        at com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:210)
        at com.datastax.spark.connector.writer.TableWriter.insert(TableWriter.scala:197)
        at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:183)
        at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
        at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more

19/07/28 00:30:23 WARN TaskSetManager: Lost task 113.0 in stage 3.0 (TID 245, 10.103.100.95, executor 17): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 124.0 in stage 3.0 (TID 256, 10.103.98.157, executor 49): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 123.0 in stage 3.0 (TID 255, 10.103.100.32, executor 52): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 110.0 in stage 3.0 (TID 242, 10.103.100.32, executor 51): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 89.0 in stage 3.0 (TID 221, 10.103.100.95, executor 16): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 83.0 in stage 3.0 (TID 215, 10.103.97.185, executor 1): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 103.0 in stage 3.0 (TID 235, 10.103.96.124, executor 56): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 73.0 in stage 3.0 (TID 205, 10.103.100.95, executor 15): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 50.0 in stage 3.0 (TID 182, 10.103.100.95, executor 17): TaskKilled (Stage cancelled)
19/07/28 00:30:23 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() for one-way message.
org.apache.spark.SparkException: Could not find CoarseGrainedScheduler.

and then some more.

When I use .collect() just to print some debug information or counts, everything works perfect.

Thanks in advance for any guidance you can provide.

Hi all,

It turns out the issue was caused by an over-provisioned Spark Cluster. The amount of data we're currently storing in Cassandra is very small, this will change of course. I enabled dynamic allocution of executors and the problem want away. We needed a lot less executors, so I guess some of them were reading and writing thin air and that caused the issue.

Regards,

Hey @zentraedi, thank you for the detailed report. Glad you found the issue.