The connector supports reading Google BigQuery tables into Spark's DataFrames, and writing DataFrames back into BigQuery. This is done by using the Spark SQL Data Source API to communicate with BigQuery.
The BigQuery Storage API and this connector are in Beta and are subject to change.
Changes may include, but are not limited to:
- Type conversion
- Partitioning
- Parameters
Breaking changes will be restricted to major and minor versions.
The Storage API streams data in parallel directly from BigQuery via gRPC without using Google Cloud Storage as an intermediary.
It has a number of advantages over using the previous export-based read flow that should generally lead to better read performance:
It does not leave any temporary files in Google Cloud Storage. Rows are read directly from BigQuery servers using the Arrow or Avro wire formats.
The new API allows column and predicate filtering to only read the data you are interested in.
Since BigQuery is backed by a columnar datastore, it can efficiently stream data without reading all columns.
The Storage API supports arbitrary pushdown of predicate filters. Connector version 0.8.0-beta and above support pushdown of arbitrary filters to Bigquery.
There is a known issue in Spark that does not allow pushdown of filters on nested fields. For example - filters like address.city = "Sunnyvale"
will not get pushdown to Bigquery.
The API rebalances records between readers until they all complete. This means that all Map phases will finish nearly concurrently. See this blog article on how dynamic sharding is similarly used in Google Cloud Dataflow.
See Configuring Partitioning for more details.
Follow these instructions.
If you do not have an Apache Spark environment you can create a Cloud Dataproc cluster with pre-configured auth. The following examples assume you are using Cloud Dataproc, but you can use spark-submit
on any cluster.
Any Dataproc cluster using the API needs the 'bigquery' or 'cloud-platform' scopes. Dataproc clusters have the 'bigquery' scope by default, so most clusters in enabled projects should work by default e.g.
MY_CLUSTER=...
gcloud dataproc clusters create "$MY_CLUSTER"
The latest version of the connector is publicly available in the following links:
version | Link |
---|---|
Scala 2.11 | gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.21.1.jar (HTTP link) |
Scala 2.12 | gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.21.1.jar (HTTP link) |
The connector is also available from the
Maven Central
repository. It can be used using the --packages
option or the
spark.jars.packages
configuration property. Use the following value
version | Connector Artifact |
---|---|
Scala 2.11 | com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.21.1 |
Scala 2.12 | com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.21.1 |
If you want to keep up with the latest version of the connector the following links can be used. Notice that for production environments where the connector version should be pinned, one of the above links should be used.
version | Link |
---|---|
Scala 2.11 | gs://spark-lib/bigquery/spark-bigquery-latest_2.11.jar (HTTP link) |
Scala 2.12 | gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar (HTTP link) |
You can run a simple PySpark wordcount against the API without compilation by running
Dataproc image 1.5 and above
gcloud dataproc jobs submit pyspark --cluster "$MY_CLUSTER" \
--jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
examples/python/shakespeare.py
Dataproc image 1.4 and below
gcloud dataproc jobs submit pyspark --cluster "$MY_CLUSTER" \
--jars gs://spark-lib/bigquery/spark-bigquery-latest_2.11.jar \
examples/python/shakespeare.py
https://codelabs.developers.google.com/codelabs/pyspark-bigquery
The connector uses the cross language Spark SQL Data Source API:
df = spark.read \
.format("bigquery") \
.load("bigquery-public-data.samples.shakespeare")
or the Scala only implicit API:
import com.google.cloud.spark.bigquery._
val df = spark.read.bigquery("bigquery-public-data.samples.shakespeare")
For more information, see additional code samples in Python, Scala and Java.
The connector allows you to run any Standard SQL SELECT query on BigQuery and fetch its results directly to a Spark Dataframe. This is easily done as described in the following code sample:
spark.conf.set("viewsEnabled","true")
spark.conf.set("materializationDataset","<dataset>")
sql = """
SELECT tag, COUNT(*) c
FROM (
SELECT SPLIT(tags, '|') tags
FROM `bigquery-public-data.stackoverflow.posts_questions` a
WHERE EXTRACT(YEAR FROM creation_date)>=2014
), UNNEST(tags) tag
GROUP BY 1
ORDER BY 2 DESC
LIMIT 10
"""
df = spark.read.format("bigquery").load(sql)
df.show()
Which yields the result
+----------+-------+
| tag| c|
+----------+-------+
|javascript|1643617|
| python|1352904|
| java|1218220|
| android| 913638|
| php| 911806|
| c#| 905331|
| html| 769499|
| jquery| 608071|
| css| 510343|
| c++| 458938|
+----------+-------+
A second option is to use the query
option like this:
df = spark.read.format("bigquery").option("query", sql).load()
Notice that the execution should be faster as only the result is transmitted over the wire. In a similar fashion the queries can include JOINs more efficiently then running joins on Spark or use other BigQuery features such as subqueries, BigQuery user defined functions, wildcard tables, BigQuery ML and more.
In order to use this feature the following configurations MUST be set:
viewsEnabled
must be set totrue
.materializationDataset
must be set to a dataset where the GCP user has table creation permission.materializationProject
is optional.
Important: This feature is implemented by running the query on BigQuery and saving the result into a temporary table, of which Spark will read the results from. This may add additional costs on your BigQuery account.
Writing a DataFrame to BigQuery is done in a similar manner. Notice that the process writes the data first to GCS and then loads it to BigQuery, a GCS bucket must be configured to indicate the temporary data location.
df.write \
.format("bigquery") \
.option("temporaryGcsBucket","some-bucket") \
.save("dataset.table")
The data is temporarily stored using the Apache parquet format. An alternative format is Apache ORC.
The GCS bucket and the format can also be set globally using Spark"s RuntimeConfig like this:
spark.conf.set("temporaryGcsBucket","some-bucket")
df.write \
.format("bigquery") \
.save("dataset.table")
When streaming a DataFrame to BigQuery, each batch is written in the same manner as a non-streaming DataFrame.
Note that a HDFS compatible
checkpoint location
(eg: path/to/HDFS/dir
or gs://checkpoint-bucket/checkpointDir
) must be specified.
df.writeStream \
.format("bigquery") \
.option("temporaryGcsBucket","some-bucket") \
.option("checkpointLocation", "some-location") \
.option("table", "dataset.table")
Important: The connector does not configure the GCS connector, in order to avoid conflict with another GCS connector, if exists. In order to use the write capabilities of the connector, please configure the GCS connector on your cluster as explained here.
The API Supports a number of options to configure the read
Property | Meaning | Usage |
---|---|---|
table
|
The BigQuery table in the format [[project:]dataset.]table .
It is recommended to use the path parameter of
load() /save() instead. This option has been
deprecated and will be removed in a future version.
(Deprecated) |
Read/Write |
dataset
|
The dataset containing the table.
(Optional unless omitted in table )
|
Read/Write |
project
|
The Google Cloud Project ID of the table.
(Optional. Defaults to the project of the Service Account being used) |
Read/Write |
parentProject
|
The Google Cloud Project ID of the table to bill for the export.
(Optional. Defaults to the project of the Service Account being used) |
Read/Write |
maxParallelism
|
The maximal number of partitions to split the data into. Actual number
may be less if BigQuery deems the data small enough. If there are not
enough executors to schedule a reader per partition, some partitions may
be empty.
Important: The old parameter ( parallelism ) is
still supported but in deprecated mode. It will ve removed in
version 1.0 of the connector.
(Optional. Defaults to one partition per 400MB. See Configuring Partitioning.) |
Read |
viewsEnabled
|
Enables the connector to read from views and not only tables. Please read
the relevant section before activating
this option.
(Optional. Defaults to false )
|
Read |
materializationProject
|
The project id where the materialized view is going to be created
(Optional. Defaults to view's project id) |
Read |
materializationDataset
|
The dataset where the materialized view is going to be created
(Optional. Defaults to view's dataset) |
Read |
materializationExpirationTimeInMinutes
|
The expiration time of the temporary table holding the materialized data
of a view or a query, in minutes. Notice that the connector may re-use
the temporary table due to the use of local cache and in order to reduce
BigQuery computation, so very low values may cause errors. The value must
be a positive integer.
(Optional. Defaults to 1440, or 24 hours) |
Read |
readDataFormat
|
Data Format for reading from BigQuery. Options : ARROW , AVRO
Unsupported Arrow filters are not pushed down and results are filtered later by Spark.
(Currently Arrow does not suport disjunction across columns).
(Optional. Defaults to ARROW )
|
Read |
optimizedEmptyProjection
|
The connector uses an optimized empty projection (select without any
columns) logic, used for count() execution. This logic takes
the data directly from the table metadata or performs a much efficient
`SELECT COUNT(*) WHERE...` in case there is a filter. You can cancel the
use of this logic byt setting this option to false .
(Optional, defaults to true )
|
Read |
pushAllFilters
|
If set to true , the connector pushes all the filters Spark can delegate
to BigQuery Storage API. This reduces amount of data that needs to be sent from
BigQuery Storage API servers to Spark clients.
(Optional, defaults to true )
|
Read |
createDisposition
|
Specifies whether the job is allowed to create new tables. The permitted
values are:
(Optional. Default to CREATE_IF_NEEDED). |
Write |
temporaryGcsBucket
|
The GCS bucket that temporarily holds the data before it is loaded to
BigQuery. Required unless set in the Spark configuration
(spark.conf.set(...) ).
|
Write |
persistentGcsBucket
|
The GCS bucket that holds the data before it is loaded to BigQuery. If informed, the data won't be deleted after write data into BigQuery. | Write |
persistentGcsPath
|
The GCS path that holds the data before it is loaded to
BigQuery. Used only with persistentGcsBucket .
|
Write |
intermediateFormat
|
The format of the data before it is loaded to BigQuery, values can be
either "parquet","orc" or "avro". In order to use the Avro format, the
spark-avro package must be added in runtime.
(Optional. Defaults to parquet ). On write only.
|
Write |
datePartition
|
The date partition the data is going to be written to. Should be given in the
format YYYYMMDD . Can be used to overwrite the data of a single
partition, like this:
(Optional). On write only. |
Write |
partitionField
|
If field is specified together with `partitionType`, the table is partitioned by this field.
The field must be a top-level TIMESTAMP or DATE field. Its mode must be NULLABLE
or REQUIRED.
If the option is not set for a partitioned table, then the table will be partitioned by pseudo
column, referenced via either'_PARTITIONTIME' as TIMESTAMP type, or
'_PARTITIONDATE' as DATE type.
(Optional). |
Write |
partitionExpirationMs
|
Number of milliseconds for which to keep the storage for partitions in the table.
The storage in a partition will have an expiration time of its partition time plus this value.
(Optional). |
Write |
partitionType
|
The only type supported is DAY, which will generate one partition per day. This option is mandatory
for a target table to be partitioned.
(Optional. Defaults to DAY if PartitionField is specified). |
Write |
clusteredFields
|
Comma separated list of non-repeated, top level columns. Clustering is only supported for partitioned tables
(Optional). |
Write |
allowFieldAddition
|
Adds the ALLOW_FIELD_ADDITION
SchemaUpdateOption to the BigQuery LoadJob. Allowed vales are true and false .
(Optional. Default to false ).
|
Write |
allowFieldRelaxation
|
Adds the ALLOW_FIELD_RELAXATION
SchemaUpdateOption to the BigQuery LoadJob. Allowed vales are true and false .
(Optional. Default to false ).
|
Write |
Options can also be set outside of the code, using the --conf
parameter of spark-submit
or --properties
parameter
of the gcloud dataproc submit spark
. In order to use this, prepend the prefix spark.datasource.bigquery.
to any of
the options, for example spark.conf.set("temporaryGcsBucket", "some-bucket")
can also be set as
--conf spark.datasource.bigquery.temporaryGcsBucket=some-bucket
.
With the exception of DATETIME
and TIME
all BigQuery data types directed map into the corresponding Spark SQL data type. Here are all of the mappings:
BigQuery Standard SQL Data Type | Spark SQL
Data Type |
Notes |
BOOL
|
BooleanType
|
|
INT64
|
LongType
|
|
FLOAT64
|
DoubleType
|
|
NUMERIC
|
DecimalType
|
This preserves NUMERIC 's full 38 digits of precision and 9 digits of scope.
|
STRING
|
StringType
|
|
BYTES
|
BinaryType
|
|
STRUCT
|
StructType
|
|
ARRAY
|
ArrayType
|
|
TIMESTAMP
|
TimestampType
|
|
DATE
|
DateType
|
|
DATETIME
|
StringType
|
Spark has no DATETIME type. Casting to TIMESTAMP uses a configured TimeZone, which defaults to the local timezone (UTC in GCE / Dataproc).
We are considering adding an optional TimeZone property to allow automatically converting to TimeStamp, this would be consistent with Spark's handling of CSV/JSON (except they always try to convert when inferring schema, and default to the local timezone) |
TIME
|
LongType
|
Spark has no TIME type. The generated longs, which indicate microseconds since midnight can be safely cast to TimestampType, but this causes the date to be inferred as the current day. Thus times are left as longs and user can cast if they like.
When casting to Timestamp TIME have the same TimeZone issues as DATETIME |
The Spark ML Vector and Matrix are supported, including their dense and sparse versions. The data is saved as a BigQuery RECORD. Notice that a suffix is added to the field's description which includes the spark type of the field.
In order to write those types to BigQuery, use the ORC or Avro intermediate format, and have them as column of the Row (i.e. not a field in a struct).
The connector automatically computes column and pushdown filters the DataFrame's SELECT
statement e.g.
spark.read.bigquery("bigquery-public-data:samples.shakespeare")
.select("word")
.where("word = 'Hamlet' or word = 'Claudius'")
.collect()
filters to the column word
and pushed down the predicate filter word = 'hamlet' or word = 'Claudius'
.
If you do not wish to make multiple read requests to BigQuery, you can cache the DataFrame before filtering e.g.:
val cachedDF = spark.read.bigquery("bigquery-public-data:samples.shakespeare").cache()
val rows = cachedDF.select("word")
.where("word = 'Hamlet'")
.collect()
// All of the table was cached and this doesn't require an API call
val otherRows = cachedDF.select("word_count")
.where("word = 'Romeo'")
.collect()
You can also manually specify the filter
option, which will override automatic pushdown and Spark will do the rest of the filtering in the client.
The pseudo columns _PARTITIONDATE and _PARTITIONTIME are not part of the table schema. Therefore in order to query by the partitions of partitioned tables do not use the where() method shown above. Instead, add a filter option in the following manner:
val df = spark.read.format("bigquery")
.option("filter", "_PARTITIONDATE > '2019-01-01'")
...
.load(TABLE)
By default the connector creates one partition per 400MB in the table being read (before filtering). This should roughly correspond to the maximum number of readers supported by the BigQuery Storage API.
This can be configured explicitly with the parallelism
property. BigQuery may limit the number of partitions based on server constraints.
The connector has a preliminary support for reading from BigQuery views. Please note there are a few caveats:
- BigQuery views are not materialized by default, which means that the connector
needs to materialize them before it can read them. This process affects the
read performance, even before running any
collect()
orcount()
action. - The materialization process can also incur additional costs to your BigQuery bill.
- By default, the materialized views are created in the same project and
dataset. Those can be configured by the optional
viewMaterializationProject
andviewMaterializationDataset
options, respectively. These options can also be globally set by callingspark.conf.set(...)
before reading the views. - Reading from views is disabled by default. In order to enable it,
either set the viewsEnabled option when reading the specific view
(
.option("viewsEnabled", "true")
) or set it globally by callingspark.conf.set("viewsEnabled", "true")
.
The connector can be used in Jupyter notebooks even if it is not installed on the Spark cluster. It can be added as an external jar in using the following code:
Python:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
.config("spark.jars.packages", "com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.21.1")\
.getOrCreate()
df = spark.read.format("bigquery")\
.load("dataset.table")
Scala:
val spark = SparkSession.builder
.config("spark.jars.packages", "com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.21.1")
.getOrCreate()
val df = spark.read.format("bigquery")
.load("dataset.table")
In case Spark cluster is using Scala 2.12 (it's optional for Spark 2.4.x, mandatory in 3.0.x), then the relevant package is com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.21.1. In order to know which Scala version is used, please run the following code:
Python:
spark.sparkContext._jvm.scala.util.Properties.versionString()
Scala:
scala.util.Properties.versionString
Unless you wish to use the implicit Scala API spark.read.bigquery("TABLE_ID")
, there is no need to compile against the connector.
To include the connector in your project:
<dependency>
<groupId>com.google.cloud.spark</groupId>
<artifactId>spark-bigquery-with-dependencies_${scala.version}</artifactId>
<version>0.21.1</version>
</dependency>
libraryDependencies += "com.google.cloud.spark" %% "spark-bigquery-with-dependencies" % "0.21.1"
See the BigQuery pricing documentation.
You can manually set the number of partitions with the maxParallelism
property. BigQuery may provide fewer partitions than you ask for. See Configuring Partitioning.
You can also always repartition after reading in Spark.
Use a service account JSON key and GOOGLE_APPLICATION_CREDENTIALS
as described here.
Credentials can also be provided explicitly either as a parameter or from Spark runtime configuration. It can be passed in as a base64-encoded string directly, or a file path that contains the credentials (but not both).
Example:
spark.read.format("bigquery").option("credentials", "<SERVICE_ACCOUNT_JSON_IN_BASE64>")
or
spark.conf.set("credentials", "<SERVICE_ACCOUNT_JSON_IN_BASE64>")
Alternatively, specify the credentials file name.
spark.read.format("bigquery").option("credentialsFile", "</path/to/key/file>")
or
spark.conf.set("credentialsFile", "</path/to/key/file>")
Another alternative to passing the credentials, is to pass the access token used for authenticating
the API calls to the Google Cloud Platform APIs. You can get the access token by running
gcloud auth application-default print-access-token
.
spark.read.format("bigquery").option("gcpAccessToken", "<acccess-token>")
or
spark.conf.set("gcpAccessToken", "<access-token>")