ClickHouse/spark-clickhouse-connector

Usage with SparkConnect

mohaidoss opened this issue · 5 comments

Following the example with spark-shell

I get the error below when connecting a remote spark using sparkConnect

pyspark.errors.exceptions.connect.SparkConnectGrpcException: (org.apache.spark.SparkException) Job aborted due to stage failure: Task 0 in stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 (TID 12) (10.2.3.29 executor 3): java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.sql.execution.datasources.v2.DataSourceRDDPartition.inputPartitions of type scala.collection.Seq in instance of org.apache.spark.sql.execution.datasources.v2.DataSourceRDDPartition
        at java.base/java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2096)
        at java.base/java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(ObjectStreamClass.java:2060)
        at java.base/java.io.ObjectStreamClass.checkObjFieldValueTypes(ObjectStreamClass.java:1347)
        at java.base/java.io.ObjectInputStream$FieldValues.defaultCheckFieldValues(ObjectInputStream.java:2679)
        at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2486)
        at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
        at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
        at java.base/java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606)
        at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457)
        at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
        at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
        at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:509)
        at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:467)
        at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)
        at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:579)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.r..

Code snippet to reproduce

from pyspark.sql import SparkSession

spark: SparkSession = SparkSession. \
    builder. \
    appName("spark-ch"). \
    remote("sc://spark-connect.ns"). \
    getOrCreate()

spark.sql("select * from `clickhouse`.db.tablename").show()

Spark version : 3.5.0

How do you setup your connect server? e.g. version, jars, start commands, etc.

@pan3793
Version same as client: 3.5.0
Jars used:

  • com.github.housepower:clickhouse-spark-runtime-3.4_2.12:0.7.3
  • com.clickhouse:clickhouse-jdbc:0.6.0

Properties details:

   spark.jars.packages com.github.housepower:clickhouse-spark-runtime-3.4_2.12:0.7.3,com.clickhouse:clickhouse-jdbc:0.6.0
    spark.hadoop.parquet.block.size 33554432
    spark.connect.grpc.binding.port 15002
    spark.databricks.delta.merge.repartitionBeforeWrite.enabled true
    spark.databricks.delta.optimize.repartition.enabled true
    spark.databricks.delta.properties.defaults.dataSkippingNumIndexedCols -1
    spark.databricks.delta.replaceWhere.constraintCheck.enabled false
    spark.databricks.delta.replaceWhere.dataColumns.enabled true
    spark.databricks.delta.schema.autoMerge.enabled false
    spark.decommission.enabled true
    spark.delta.logStore.s3.impl io.delta.storage.S3DynamoDBLogStore
    spark.delta.logStore.s3a.impl io.delta.storage.S3DynamoDBLogStore
    spark.driver.cores 1
    spark.driver.extraClassPath /opt/spark/jars/*
    spark.driver.extraJavaOptions -XX:+ExitOnOutOfMemoryError -XX:+UseCompressedOops -XX:+UseG1GC
    spark.driver.extraLibraryPath /opt/hadoop/lib/native
    spark.driver.host ${SPARK_DRIVER_HOST}
    spark.driver.maxResultSize 4g
    spark.driver.memory 2048m
    spark.driver.memoryOverhead 384m
    spark.dynamicAllocation.cachedExecutorIdleTimeout 600s
    spark.dynamicAllocation.enabled true
    spark.dynamicAllocation.executorAllocationRatio	1
    spark.dynamicAllocation.executorIdleTimeout	30s
    spark.dynamicAllocation.maxExecutors 4
    spark.dynamicAllocation.minExecutors 0
    spark.dynamicAllocation.schedulerBacklogTimeout	1s
    spark.dynamicAllocation.shuffleTracking.enabled	true
    spark.dynamicAllocation.shuffleTracking.timeout	600s
    spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 5s
    spark.eventLog.enabled false
    spark.executor.cores 8
    spark.executor.extraClassPath /opt/spark/jars/*:/home/spark/*
    spark.executor.extraJavaOptions -XX:+ExitOnOutOfMemoryError -XX:+UseCompressedOops -XX:+UseG1GC
    spark.executor.extraLibraryPath /opt/hadoop/lib/native:/home/spark
    spark.executor.memory 4096m
    spark.executor.memoryOverhead 512m
    spark.hadoop.aws.region eu-west-1
    spark.hadoop.delta.enableFastS3AListFrom true
    spark.hadoop.fs.s3.impl org.apache.hadoop.fs.s3a.S3AFileSystem
    spark.hadoop.fs.s3a.aws.credentials.provider com.amazonaws.auth.WebIdentityTokenCredentialsProvider
    spark.hadoop.fs.s3a.experimental.input.fadvise random
    spark.hadoop.fs.s3a.fast.upload true
    spark.hadoop.fs.s3a.fast.upload.default true
    spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem
    spark.hive.imetastoreclient.factory.class com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory
    spark.jars.ivy /tmp/.ivy
    spark.kubernetes.allocation.driver.readinessTimeout 60s
    spark.kubernetes.authenticate.driver.serviceAccountName spark-connect
    spark.kubernetes.authenticate.executor.serviceAccountName spark-connect
    spark.kubernetes.authenticate.submission.caCertFile /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
    spark.kubernetes.authenticate.submission.oauthTokenFile /var/run/secrets/kubernetes.io/serviceaccount/token
    spark.kubernetes.container.image.pullPolicy Always
    spark.kubernetes.driver.pod.name ${SPARK_DRIVER_POD_NAME}
    spark.kubernetes.executor.annotation.eks.amazonaws.com/role-arn ${SPARK_AWS_ROLE}
    spark.kubernetes.executor.container.image sebastiandaberdaku/spark-glue-python:spark-v3.5.0-python-v3.10.12
    spark.kubernetes.executor.podTemplateFile /opt/spark/executor-pod-template.yaml
    spark.kubernetes.executor.request.cores 4000m
    spark.kubernetes.local.dirs.tmpfs false
    spark.kubernetes.namespace ns
    spark.local.dir /tmp
    spark.master k8s://https://kubernetes.default.svc.cluster.local:443
    spark.network.timeout 300s
    spark.serializer org.apache.spark.serializer.KryoSerializer
    spark.sql.catalog.spark_catalog org.apache.spark.sql.delta.catalog.DeltaCatalog
    spark.sql.catalogImplementation hive
    spark.sql.execution.arrow.pyspark.enabled true
    spark.sql.execution.arrow.pyspark.fallback.enabled true
    spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension
    spark.sql.files.maxPartitionBytes 128MB
    spark.sql.hive.metastore.jars builtin
    spark.sql.parquet.datetimeRebaseModeInWrite CORRECTED
    spark.ui.port 4040
    spark.ui.dagGraph.retainedRootRDDs 100
    spark.ui.retainedJobs 100
    spark.ui.retainedStages 100
    spark.ui.retainedTasks 100
    spark.worker.ui.retainedExecutors 100
    spark.worker.ui.retainedDrivers	10
    spark.sql.ui.retainedExecutions 100
    spark.streaming.ui.retainedBatches 100
    spark.ui.retainedDeadExecutors 10

    spark.sql.catalog.clickhouse xenon.clickhouse.ClickHouseCatalog
    spark.sql.catalog.clickhouse.host ${CLICKHOUSE_HOST}
    spark.sql.catalog.clickhouse.protocol http
    spark.sql.catalog.clickhouse.http_port ${CLICKHOUSE_HTTP_PORT}
    spark.sql.catalog.clickhouse.user ${CLICKHOUSE_USER}
    spark.sql.catalog.clickhouse.password ${CLICKHOUSE_PASSWORD}
    spark.sql.catalog.clickhouse.database default

com.github.housepower:clickhouse-spark-runtime-3.4_2.12:0.7.3 is only applicable to Spark 3.4, 0.8.0 has not been released yet, please clone the main branch and build via command

./gradlew clean build -x test -Dspark_binary_version=3.5 -Dscala_binary_version=2.12

(JDK 8 is required)

BTW, if you are going to access ClickHouse server 23.10+ (or ClickHouse Cloud), please upgrade your ClickHouse Java client too, see details at #342

close as I suppose the issue is resolve