GoogleCloudDataproc/spark-bigquery-connector

Empty array filtering issue

Opened this issue · 1 comments

I have a piece of code which reads the same data from GCS or BigQuery. Simplified schema for the data looks like this:

root
 |-- time: timestamp (nullable = true)
 |-- name: string (nullable = true)
 |-- ids: array (nullable = true)
 |    |-- element: string (containsNull = true)

I need to read only the data where ids array is not empty and it seem to work fine with a GCS connector.

spark.read.format("parquet")
  .load("gs://some-path")
  .filter($"ids" =!= Array.empty[String])
  ... // other filters

However when I try to do the same with a BigQuery connector I get an error:

spark.read.format("bigquery")
  .option("viewsEnabled", "true")
  .load("internal-table-name")
  .filter($"ids" =!= Array.empty[String])
  ... // other filters

24/07/23 11:00:30 INFO ReadSessionCreator: |creation a read session for table null, parameters: |selectedFields=[name,ids],|filter=[(NOT (`ids` = ArraySeq())) AND (`ids` IS NOT NULL) AND (`name` = 'John') AND (`name` IS NOT NULL) AND (`time` <= TIMESTAMP '2024-07-18T23:59:59.999999Z') AND (`time` >= TIMESTAMP '2024-07-18T00:00:00Z') AND (`time` IS NOT NULL)]
com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.rpc.InvalidArgumentException: com.google.cloud.spark.bigquery.repackaged.io.grpc.StatusRuntimeException: INVALID_ARGUMENT: request failed: Query error: Function not found: ArraySeq at [1:189]
        at com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:92)
        at com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:41)
        at com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:86)
        at com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:66)
        at com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)
...

Seems like a predicate pushdown doesn't work correctly for this case.
Changing filter condition to .filter(array_size($"ids") > 0) works but this condition most likely not going to be pushed down.

Dependencies used:

"org.apache.spark" %% "spark-sql" % "3.5.0",
"com.google.cloud.bigdataoss" % "gcs-connector" % "3.0.0",
"com.google.cloud.spark" % "[spark-3.5-bigquery / spark-bigquery-with-dependencies]" % "0.39.1"

Any updates on this?