GoogleCloudDataproc/spark-bigquery-connector

Reading a view results in a empty data frame

Opened this issue · 8 comments

I'm running into a strange situation reading a view from BigQuery using DIRECT method. The view in question contains a Boolean column which I'm attempting to filter on. Running a count on the dataframe returns the correct number of rows, but when collecting the results the dataframe appears empty.

The filter is being pushed down and there are streams returned from the BigQuery storage API.

INFO  [2024-07-23T09:09:57.690200Z] com.google.cloud.bigquery.connector.common.BigQueryClient: DestinationTable is GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=<redacted>, projectId=<redacted>, tableId=<redacted>}}
INFO  [2024-07-23T09:09:57.701162Z] com.google.cloud.bigquery.connector.common.BigQueryClient: running query SELECT * FROM `<redacted>.hub.txn_sales_order_header` FOR SYSTEM_TIME AS OF TIMESTAMP_MILLIS(1721692800000)
INFO  [2024-07-23T09:10:05.171100Z] com.google.cloud.spark.bigquery.direct.DirectBigQueryRelation: |Querying table <redacted>.<redacted>.<redacted> created from "SELECT * FROM `<redacted>.hub.txn…", parameters sent from Spark:|requiredColumns=[],|filter=[(`boolean_flag` = true)]
INFO  [2024-07-23T09:10:06.522624Z] com.google.cloud.spark.bigquery.direct.DirectBigQueryRelation: Used optimized BQ count(*) path. Count: 7906080
INFO  [2024-07-23T09:10:18.972066Z] com.google.cloud.spark.bigquery.direct.DirectBigQueryRelation: |Querying table <redacted>.<redacted>.<redacted> created from "SELECT * FROM `<redacted>.hub.txn…", parameters sent from Spark:|requiredColumns=[<redacted>],|filter=[(`boolean_flag` = true)]
INFO  [2024-07-23T09:10:19.210747Z] com.google.cloud.bigquery.connector.common.BigQueryClientFactory: Channel pool size set to 2
INFO  [2024-07-23T09:10:19.483697Z] com.google.cloud.bigquery.connector.common.ReadSessionCreator: |creation a read session for table {}, parameters: |selectedFields=[<redacted>],|filter=[(`boolean_flag` = true)]
INFO  [2024-07-23T09:10:20.732685Z] com.google.cloud.bigquery.connector.common.ReadSessionCreator: Read session:{"readSessionName":"projects/<redacted>/locations/us/sessions/<redacted>","readSessionCreationStartTime":"2024-07-23T09:10:18.972392754Z","readSessionCreationEndTime":"2024-07-23T09:10:20.719572186Z","readSessionPrepDuration":527,"readSessionCreationDuration":1220,"readSessionDuration":1747}
INFO  [2024-07-23T09:10:20.732949Z] com.google.cloud.bigquery.connector.common.ReadSessionCreator: Received 50 partitions from the BigQuery Storage API for session projects/<redacted>/locations/us/sessions/<redacted>. Notice that the number of streams in actual may be lower than the requested number, depending on the amount parallelism that is reasonable for the table and the maximum amount of parallelism allowed by the system.
INFO  [2024-07-23T09:10:20.735739Z] com.google.cloud.spark.bigquery.direct.BigQueryRDDFactory: Created read session for table '<redacted>.<redacted>.<redacted>': projects/<redacted>/locations/us/sessions/<redacted>

As I say, you can see in the logs above the count returning the correct number of rows - it hits this code path and as I understand it, this should be getting the row count from the TableInfo of the materialized table (since actualTable is passed in) rather than running a count query on the view.

However, I can see the metrics reported for each stream and they are all returning 0 rows and 0 bytes.

INFO  [2024-07-23T09:10:23.825977Z] com.google.cloud.bigquery.connector.common.LoggingBigQueryStorageReadRowsTracer: ReadStream Metrics :{"Stream Name":"projects/<redacted>/locations/us/sessions/<redacted>/streams/GgJpYxoCamYoAg,projects/<redacted>/locations/us/sessions/<redacted>/streams/CAEaAmljGgJqZigC","Started":"2024-07-23T09:10:21.694042175Z","Ended":"2024-07-23T09:10:23.792793440Z","Parse Timings":"Not enough samples.","Time in Spark":"Not enough samples.","Time waiting for service":"Not enough samples.","Bytes/s":0,"Rows/s":0,"Bytes":0,"Rows":0,"I/O time in ms":0} (_requestId: 13cd4c76a4b9334a, mdc.taskName: task 1.0 in stage 4.0 (TID 5))

Any idea how to debug this further? I'll note the materialization project is different to the project containing the view. Unsure this would cause issues though?

Can you please share the connector jar being used and the Spark version?

Also it would help if you can share the steps to reproduce the issue.

Spark 3.4 and 0.39.1 of the connector. I'm struggling to get a reliable way of reproducing the error. Will try and get that.

Thanks, @tom-s-powell What do you mean by reading a view from BigQuery using DIRECT method?

Ah apologies ignore the "DIRECT" method comment. This is reading a BigQuery view by simply using .load("<view-name">), rather than a custom query.

Hi @tom-s-powell, a sample code will really help to debug this issue. If you have a small testcase with this behavior, please add it.

Sorry for the slow reply here. I unfortunately don't have access to the BigQuery account where I'm running into issues so I can't provide too many details on the table/view itself. I've shared a sample of the pyspark code that is failing below. I'll note this is not limited to just views, I've seen the behaviour happen with tables as well.

df = (
    spark_session.read
        .format("bigquery")
        .option("parentProject", "<project>")
        .option("viewsEnabled", "true")
        .option("materializationProject", "<materialization-project>")
        .option("materializationDataset", "<materialization-dataset>")
        .load("<project>.<dataset>.<view>")
)
df = df.filter(F.col("<boolean-column>"))

The strange thing is this seems to be non-deterministic. The first time I ran this it succeeded whereas the second time it returned 0 results. The query plans looked identical and they had the same number of Spark tasks.

In the logs I see the same message of:

com.google.cloud.spark.bigquery.direct.DirectBigQueryRelation: Used optimized BQ count(*) path

This returns non-zero number of rows in both cases so the materialized table seems like it'd have rows given https://github.com/GoogleCloudDataproc/spark-bigquery-connector/blob/master/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/direct/DirectBigQueryRelation.java#L194 is just getting the number of rows off of TableInfo.

In the Spark job where no rows were returned I see the following (metrics show 0 rows/bytes):

INFO  [2024-08-15T19:14:14.163484Z] com.google.cloud.bigquery.connector.common.LoggingBigQueryStorageReadRowsTracer: ReadStream Metrics :{"Stream Name":"projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/GgJqZhoCaWcoAg,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CAEaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CAIaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CAMaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CAQaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CAUaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CAYaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CAcaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CAgaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CAkaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CAoaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CAsaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CAwaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CA0aAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CA4aAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CA8aAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CBAaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CBEaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CBIaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CBMaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CBQaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CBUaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CBYaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CBcaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CBgaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CBkaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CBoaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CBsaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CBwaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CB0aAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CB4aAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CB8aAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CCAaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CCEaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CCIaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CCMaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CCQaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CCUaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CCYaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CCcaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CCgaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CCkaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CCoaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CCsaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CCwaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CC0aAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CC4aAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CC8aAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CDAaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CDEaAmpmGgJpZygC","Started":"2024-08-15T19:14:11.844322312Z","Ended":"2024-08-15T19:14:14.130096049Z","Parse Timings":"Not enough samples.","Time in Spark":"Not enough samples.","Time waiting for service":"Not enough samples.","Bytes/s":0,"Rows/s":0,"Bytes":0,"Rows":0,"I/O time in ms":0}  

In the Spark job where rows were returned I see the following (metrics show rows/bytes were consumed):

INFO  [2024-08-15T18:50:06.712734Z] com.google.cloud.bigquery.connector.common.LoggingBigQueryStorageReadRowsTracer: ReadStream Metrics :{"Stream Name":"projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/GgJqZhoCaWcoAg,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CAEaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CAIaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CAMaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CAQaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CAUaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CAYaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CAcaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CAgaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CAkaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CAoaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CAsaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CAwaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CA0aAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CA4aAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CA8aAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CBAaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CBEaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CBIaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CBMaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CBQaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CBUaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CBYaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CBcaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CBgaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CBkaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CBoaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CBsaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CBwaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CB0aAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CB4aAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CB8aAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CCAaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CCEaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CCIaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CCMaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CCQaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CCUaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CCYaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CCcaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CCgaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CCkaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CCoaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CCsaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CCwaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CC0aAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CC4aAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CC8aAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CDAaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CDEaAmpmGgJpZygC","Started":"2024-08-15T18:49:52.258943954Z","Ended":"2024-08-15T18:50:06.678977966Z","Parse Timings":"Average: PT0.003503972S Samples: 278","Time in Spark":"Average: PT0.039637556S","Time waiting for service":"Average: PT0.000120159S Samples: 278","Bytes/s":1506461333,"Rows/s":164397,"Bytes":49713224,"Rows":160123,"I/O time in ms":33}  

Hi @tom-s-powell , it's not clear where exactly you see empty rows. Are you seeing empty results on running df.collect() ? If so can you please share the complete sequence of steps (load, filter, collect, I assume)?
If the issue is only limited to log lines sometimes showing 0 rows, then that's a non-issue. These log lines are expected in the initial stages of the job when not enough samples are available to report metrics. (Metrics are reported at time based intervals and also on completion)

Also it appears that you're using the DSv1 connector, is that right? (gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.39.1.jar)