Usage with SparkConnect
mohaidoss opened this issue · 5 comments
mohaidoss commented
Following the example with spark-shell
I get the error below when connecting a remote spark using sparkConnect
pyspark.errors.exceptions.connect.SparkConnectGrpcException: (org.apache.spark.SparkException) Job aborted due to stage failure: Task 0 in stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 (TID 12) (10.2.3.29 executor 3): java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.sql.execution.datasources.v2.DataSourceRDDPartition.inputPartitions of type scala.collection.Seq in instance of org.apache.spark.sql.execution.datasources.v2.DataSourceRDDPartition
at java.base/java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2096)
at java.base/java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(ObjectStreamClass.java:2060)
at java.base/java.io.ObjectStreamClass.checkObjFieldValueTypes(ObjectStreamClass.java:1347)
at java.base/java.io.ObjectInputStream$FieldValues.defaultCheckFieldValues(ObjectInputStream.java:2679)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2486)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
at java.base/java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:509)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:467)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:579)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.r..
Code snippet to reproduce
from pyspark.sql import SparkSession
spark: SparkSession = SparkSession. \
builder. \
appName("spark-ch"). \
remote("sc://spark-connect.ns"). \
getOrCreate()
spark.sql("select * from `clickhouse`.db.tablename").show()
Spark version : 3.5.0
pan3793 commented
How do you setup your connect server? e.g. version, jars, start commands, etc.
mohaidoss commented
@pan3793
Version same as client: 3.5.0
Jars used:
- com.github.housepower:clickhouse-spark-runtime-3.4_2.12:0.7.3
- com.clickhouse:clickhouse-jdbc:0.6.0
Properties details:
spark.jars.packages com.github.housepower:clickhouse-spark-runtime-3.4_2.12:0.7.3,com.clickhouse:clickhouse-jdbc:0.6.0
spark.hadoop.parquet.block.size 33554432
spark.connect.grpc.binding.port 15002
spark.databricks.delta.merge.repartitionBeforeWrite.enabled true
spark.databricks.delta.optimize.repartition.enabled true
spark.databricks.delta.properties.defaults.dataSkippingNumIndexedCols -1
spark.databricks.delta.replaceWhere.constraintCheck.enabled false
spark.databricks.delta.replaceWhere.dataColumns.enabled true
spark.databricks.delta.schema.autoMerge.enabled false
spark.decommission.enabled true
spark.delta.logStore.s3.impl io.delta.storage.S3DynamoDBLogStore
spark.delta.logStore.s3a.impl io.delta.storage.S3DynamoDBLogStore
spark.driver.cores 1
spark.driver.extraClassPath /opt/spark/jars/*
spark.driver.extraJavaOptions -XX:+ExitOnOutOfMemoryError -XX:+UseCompressedOops -XX:+UseG1GC
spark.driver.extraLibraryPath /opt/hadoop/lib/native
spark.driver.host ${SPARK_DRIVER_HOST}
spark.driver.maxResultSize 4g
spark.driver.memory 2048m
spark.driver.memoryOverhead 384m
spark.dynamicAllocation.cachedExecutorIdleTimeout 600s
spark.dynamicAllocation.enabled true
spark.dynamicAllocation.executorAllocationRatio 1
spark.dynamicAllocation.executorIdleTimeout 30s
spark.dynamicAllocation.maxExecutors 4
spark.dynamicAllocation.minExecutors 0
spark.dynamicAllocation.schedulerBacklogTimeout 1s
spark.dynamicAllocation.shuffleTracking.enabled true
spark.dynamicAllocation.shuffleTracking.timeout 600s
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 5s
spark.eventLog.enabled false
spark.executor.cores 8
spark.executor.extraClassPath /opt/spark/jars/*:/home/spark/*
spark.executor.extraJavaOptions -XX:+ExitOnOutOfMemoryError -XX:+UseCompressedOops -XX:+UseG1GC
spark.executor.extraLibraryPath /opt/hadoop/lib/native:/home/spark
spark.executor.memory 4096m
spark.executor.memoryOverhead 512m
spark.hadoop.aws.region eu-west-1
spark.hadoop.delta.enableFastS3AListFrom true
spark.hadoop.fs.s3.impl org.apache.hadoop.fs.s3a.S3AFileSystem
spark.hadoop.fs.s3a.aws.credentials.provider com.amazonaws.auth.WebIdentityTokenCredentialsProvider
spark.hadoop.fs.s3a.experimental.input.fadvise random
spark.hadoop.fs.s3a.fast.upload true
spark.hadoop.fs.s3a.fast.upload.default true
spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem
spark.hive.imetastoreclient.factory.class com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory
spark.jars.ivy /tmp/.ivy
spark.kubernetes.allocation.driver.readinessTimeout 60s
spark.kubernetes.authenticate.driver.serviceAccountName spark-connect
spark.kubernetes.authenticate.executor.serviceAccountName spark-connect
spark.kubernetes.authenticate.submission.caCertFile /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
spark.kubernetes.authenticate.submission.oauthTokenFile /var/run/secrets/kubernetes.io/serviceaccount/token
spark.kubernetes.container.image.pullPolicy Always
spark.kubernetes.driver.pod.name ${SPARK_DRIVER_POD_NAME}
spark.kubernetes.executor.annotation.eks.amazonaws.com/role-arn ${SPARK_AWS_ROLE}
spark.kubernetes.executor.container.image sebastiandaberdaku/spark-glue-python:spark-v3.5.0-python-v3.10.12
spark.kubernetes.executor.podTemplateFile /opt/spark/executor-pod-template.yaml
spark.kubernetes.executor.request.cores 4000m
spark.kubernetes.local.dirs.tmpfs false
spark.kubernetes.namespace ns
spark.local.dir /tmp
spark.master k8s://https://kubernetes.default.svc.cluster.local:443
spark.network.timeout 300s
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.sql.catalog.spark_catalog org.apache.spark.sql.delta.catalog.DeltaCatalog
spark.sql.catalogImplementation hive
spark.sql.execution.arrow.pyspark.enabled true
spark.sql.execution.arrow.pyspark.fallback.enabled true
spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension
spark.sql.files.maxPartitionBytes 128MB
spark.sql.hive.metastore.jars builtin
spark.sql.parquet.datetimeRebaseModeInWrite CORRECTED
spark.ui.port 4040
spark.ui.dagGraph.retainedRootRDDs 100
spark.ui.retainedJobs 100
spark.ui.retainedStages 100
spark.ui.retainedTasks 100
spark.worker.ui.retainedExecutors 100
spark.worker.ui.retainedDrivers 10
spark.sql.ui.retainedExecutions 100
spark.streaming.ui.retainedBatches 100
spark.ui.retainedDeadExecutors 10
spark.sql.catalog.clickhouse xenon.clickhouse.ClickHouseCatalog
spark.sql.catalog.clickhouse.host ${CLICKHOUSE_HOST}
spark.sql.catalog.clickhouse.protocol http
spark.sql.catalog.clickhouse.http_port ${CLICKHOUSE_HTTP_PORT}
spark.sql.catalog.clickhouse.user ${CLICKHOUSE_USER}
spark.sql.catalog.clickhouse.password ${CLICKHOUSE_PASSWORD}
spark.sql.catalog.clickhouse.database default
pan3793 commented
com.github.housepower:clickhouse-spark-runtime-3.4_2.12:0.7.3
is only applicable to Spark 3.4, 0.8.0 has not been released yet, please clone the main branch and build via command
./gradlew clean build -x test -Dspark_binary_version=3.5 -Dscala_binary_version=2.12
(JDK 8 is required)
pan3793 commented
BTW, if you are going to access ClickHouse server 23.10+ (or ClickHouse Cloud), please upgrade your ClickHouse Java client too, see details at #342
pan3793 commented
close as I suppose the issue is resolve