/spark-redshift

Redshift data source for Spark

Primary LanguageScalaApache License 2.0Apache-2.0

Redshift Data Source for Apache Spark

Build Status codecov.io

A library to load data into Spark SQL DataFrames from Amazon Redshift, and write them back to Redshift tables. Amazon S3 is used to efficiently transfer data in and out of Redshift, and JDBC is used to automatically trigger the appropriate COPY and UNLOAD commands on Redshift.

This library is more suited to ETL than interactive queries, since large amounts of data could be extracted to S3 for each query execution. If you plan to perform many queries against the same Redshift tables then we recommend saving the extracted data in a format such as Parquet.

Installation

This library requires Apache Spark 2.0+ and Amazon Redshift 1.0.963+.

For version that works with Spark 1.x, please check for the 1.x branch.

You may use this library in your applications with the following dependency information:

Scala 2.10

groupId: com.databricks
artifactId: spark-redshift_2.10
version: 2.0.1

Scala 2.11

groupId: com.databricks
artifactId: spark-redshift_2.11
version: 2.0.1

You will also need to provide a JDBC driver that is compatible with Redshift. Amazon recommend that you use their driver, which is distributed as a JAR that is hosted on Amazon's website. This library has also been successfully tested using the Postgres JDBC driver.

Note on Hadoop versions: This library depends on spark-avro, which should automatically be downloaded because it is declared as a dependency. However, you may need to provide the corresponding avro-mapred dependency which matches your Hadoop distribution. In most deployments, however, this dependency will be automatically provided by your cluster's Spark assemblies and no additional action will be required.

Note on Amazon SDK dependency: This library declares a provided dependency on components of the AWS Java SDK. In most cases, these libraries will be provided by your deployment environment. However, if you get ClassNotFoundExceptions for Amazon SDK classes then you will need to add explicit dependencies on com.amazonaws.aws-java-sdk-core and com.amazonaws.aws-java-sdk-s3 as part of your build / runtime configuration. See the comments in project/SparkRedshiftBuild.scala for more details.

Usage

Data Sources API

Once you have configured your AWS credentials, you can use this library via the Data Sources API in Scala, Python or SQL, as follows:

Scala

import org.apache.spark.sql._

val sc = // existing SparkContext
val sqlContext = new SQLContext(sc)

// Get some data from a Redshift table
val df: DataFrame = sqlContext.read
    .format("com.databricks.spark.redshift")
    .option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass")
    .option("dbtable", "my_table")
    .option("tempdir", "s3n://path/for/temp/data")
    .load()

// Can also load data from a Redshift query
val df: DataFrame = sqlContext.read
    .format("com.databricks.spark.redshift")
    .option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass")
    .option("query", "select x, count(*) my_table group by x")
    .option("tempdir", "s3n://path/for/temp/data")
    .load()

// Apply some transformations to the data as per normal, then you can use the
// Data Source API to write the data back to another table

df.write
  .format("com.databricks.spark.redshift")
  .option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass")
  .option("dbtable", "my_table_copy")
  .option("tempdir", "s3n://path/for/temp/data")
  .mode("error")
  .save()

// Using IAM Role based authentication
df.write
  .format("com.databricks.spark.redshift")
  .option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass")
  .option("dbtable", "my_table_copy")
  .option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
  .option("tempdir", "s3n://path/for/temp/data")
  .mode("error")
  .save()

Python

from pyspark.sql import SQLContext

sc = # existing SparkContext
sql_context = SQLContext(sc)

# Read data from a table
df = sql_context.read \
    .format("com.databricks.spark.redshift") \
    .option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass") \
    .option("dbtable", "my_table") \
    .option("tempdir", "s3n://path/for/temp/data") \
    .load()

# Read data from a query
df = sql_context.read \
    .format("com.databricks.spark.redshift") \
    .option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass") \
    .option("query", "select x, count(*) my_table group by x") \
    .option("tempdir", "s3n://path/for/temp/data") \
    .load()

# Write back to a table
df.write \
  .format("com.databricks.spark.redshift") \
  .option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass") \
  .option("dbtable", "my_table_copy") \
  .option("tempdir", "s3n://path/for/temp/data") \
  .mode("error") \
  .save()

# Using IAM Role based authentication
df.write \
  .format("com.databricks.spark.redshift") \
  .option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass") \
  .option("dbtable", "my_table_copy") \
  .option("tempdir", "s3n://path/for/temp/data") \
  .option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role") \
  .mode("error") \
  .save()

SQL

Reading data using SQL:

CREATE TABLE my_table
USING com.databricks.spark.redshift
OPTIONS (
  dbtable 'my_table',
  tempdir 's3n://path/for/temp/data',
  url 'jdbc:redshift://redshifthost:5439/database?user=username&password=pass'
);

Writing data using SQL:

-- Create a new table, throwing an error if a table with the same name already exists:
CREATE TABLE my_table
USING com.databricks.spark.redshift
OPTIONS (
  dbtable 'my_table',
  tempdir 's3n://path/for/temp/data'
  url 'jdbc:redshift://redshifthost:5439/database?user=username&password=pass'
)
AS SELECT * FROM tabletosave;

Note that the SQL API only supports the creation of new tables and not overwriting or appending; this corresponds to the default save mode of the other language APIs.

Hadoop InputFormat

The library contains a Hadoop input format for Redshift tables unloaded with the ESCAPE option, which you may make direct use of as follows:

import com.databricks.spark.redshift.RedshiftInputFormat

val records = sc.newAPIHadoopFile(
  path,
  classOf[RedshiftInputFormat],
  classOf[java.lang.Long],
  classOf[Array[String]])

Configuration

AWS Credentials

This library reads and writes data to S3 when transferring data to/from Redshift. As a result, it requires AWS credentials with read and write access to a S3 bucket (specified using the tempdir configuration parameter). Assuming that Spark has been configured to access S3, it should automatically discover the proper credentials to pass to Redshift.

There are four ways of configuring AWS credentials for use by this library:

  1. Set keys in Hadoop conf (best option for most users): You can specify AWS keys via Hadoop configuration properties. For example, if your tempdir configuration points to a s3n:// filesystem then you can set the fs.s3n.awsAccessKeyId and fs.s3n.awsSecretAccessKey properties in a Hadoop XML configuration file or call sc.hadoopConfiguration.set() to mutate Spark's global Hadoop configuration.

For example, if you are using the s3n filesystem then add

sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "YOUR_KEY_ID")
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "YOUR_SECRET_ACCESS_KEY")

and for the s3a filesystem add

sc.hadoopConfiguration.set("fs.s3a.access.key", "YOUR_KEY_ID")
sc.hadoopConfiguration.set("fs.s3a.secret.key", "YOUR_SECRET_ACCESS_KEY")

Python users will have to use a slightly different method to modify the hadoopConfiguration, since this field is not exposed in all versions of PySpark. Although the following command relies on some Spark internals, it should work with all PySpark versions and is unlikely to break or change in the future:

sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "YOUR_KEY_ID")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "YOUR_SECRET_ACCESS_KEY")
  1. Encode keys in tempdir URI: For example, the URI s3n://ACCESSKEY:SECRETKEY@bucket/path/to/temp/dir encodes the key pair (ACCESSKEY, SECRETKEY). Due to Hadoop limitations, this approach will not work for secret keys which contain forward slash (/) characters.
  2. Set the aws_iam_role parameter: If set, this takes precedence over any other authentication option. You will need to have this IAM role attached to the Redshift cluster which allows read/write access to your tempdir bucket. More info here
  3. IAM instance profiles: If you are running on EC2 and authenticate to S3 using IAM and instance profiles, then you must must configure the temporary_aws_access_key_id, temporary_aws_secret_access_key, and temporary_aws_session_token configuration properties to point to temporary keys created via the AWS Security Token Service. These temporary keys will then be passed to Redshift via LOAD and UNLOAD commands.

⚠️ Note: This library does not clean up the temporary files that it creates in S3. As a result, we recommend that you use a dedicated temporary S3 bucket with an object lifecycle configuration to ensure that temporary files are automatically deleted after a specified expiration period.

Parameters

The parameter map or OPTIONS provided in Spark SQL supports the following settings.

Parameter Required Default Notes
dbtable Yes, unless query is specified No default The table to create or read from in Redshift. This parameter is required when saving data back to Redshift.
query Yes, unless dbtable is specified No default The query to read from in Redshift
user No No default The Redshift username. Must be used in tandem with password option. May only be used if the user and password are not passed in the URL, passing both will result in an error.
password No No default The Redshift password. Must be used in tandem with user option. May only be used if the user and password are not passed in the URL; passing both will result in an error.
url Yes No default

A JDBC URL, of the format, jdbc:subprotocol://host:port/database?user=username&password=password

  • subprotocol can be postgresql or redshift, depending on which JDBC driver you have loaded. Note however that one Redshift-compatible driver must be on the classpath and match this URL.
  • host and port should point to the Redshift master node, so security groups and/or VPC will need to be configured to allow access from your driver application.
  • database identifies a Redshift database name
  • user and password are credentials to access the database, which must be embedded in this URL for JDBC, and your user account should have necessary privileges for the table being referenced.
aws_iam_role Only if using IAM roles to authorize Redshift COPY/UNLOAD operations No default Fully specified ARN of the IAM Role attached to the Redshift cluster, ex: arn:aws:iam::123456789000:role/redshift_iam_role
temporary_aws_access_key_id No, unless using EC2 instance profile authentication No default AWS access key, must have write permissions to the S3 bucket.
temporary_aws_secret_access_key No, unless using EC2 instance profile authentication No default AWS secret access key corresponding to provided access key.
temporary_aws_session_token No, unless using EC2 instance profile authentication No default AWS session token corresponding to provided access key.
tempdir Yes No default A writeable location in Amazon S3, to be used for unloaded data when reading and Avro data to be loaded into Redshift when writing. If you're using Redshift data source for Spark as part of a regular ETL pipeline, it can be useful to set a Lifecycle Policy on a bucket and use that as a temp location for this data.
jdbcdriver No Determined by the JDBC URL's subprotocol The class name of the JDBC driver to use. This class must be on the classpath. In most cases, it should not be necessary to specify this option, as the appropriate driver classname should automatically be determined by the JDBC URL's subprotocol.
diststyle No EVEN The Redshift Distribution Style to be used when creating a table. Can be one of EVEN, KEY or ALL (see Redshift docs). When using KEY, you must also set a distribution key with the distkey option.
distkey No, unless using DISTSTYLE KEY No default The name of a column in the table to use as the distribution key when creating a table.
sortkeyspec No No default

A full Redshift Sort Key definition.

Examples include:

  • SORTKEY(my_sort_column)
  • COMPOUND SORTKEY(sort_col_1, sort_col_2)
  • INTERLEAVED SORTKEY(sort_col_1, sort_col_2)
usestagingtable (Deprecated) No true

Setting this deprecated option to false will cause an overwrite operation's destination table to be dropped immediately at the beginning of the write, making the overwrite operation non-atomic and reducing the availability of the destination table. This may reduce the temporary disk space requirements for overwrites.

<p>Since setting <tt>usestagingtable=false</tt> operation risks data loss / unavailability, we have chosen to deprecate it in favor of requiring users to manually drop the destination table themselves.</p>
</td>
description No No default

A description for the table. Will be set using the SQL COMMENT command, and should show up in most query tools. See also the description metadata to set descriptions on individual columns.

preactions No No default

This can be a ; separated list of SQL commands to be executed before loading COPY command. It may be useful to have some DELETE commands or similar run here before loading new data. If the command contains %s, the table name will be formatted in before execution (in case you're using a staging table).

Be warned that if this commands fail, it is treated as an error and you'll get an exception. If using a staging table, the changes will be reverted and the backup table restored if pre actions fail.

postactions No No default

This can be a ; separated list of SQL commands to be executed after a successful COPY when loading data. It may be useful to have some GRANT commands or similar run here when loading new data. If the command contains %s, the table name will be formatted in before execution (in case you're using a staging table).

Be warned that if this commands fail, it is treated as an error and you'll get an exception. If using a staging table, the changes will be reverted and the backup table restored if post actions fail.

extracopyoptions No No default

A list extra options to append to the Redshift COPY command when loading data, e.g. TRUNCATECOLUMNS or MAXERROR n (see the Redshift docs for other options).

Note that since these options are appended to the end of the COPY command, only options that make sense at the end of the command can be used, but that should cover most possible use cases.

Additional configuration options

Configuring the maximum size of string columns

When creating Redshift tables, this library's default behavior is to create TEXT columns for string columns. Redshift stores TEXT columns as VARCHAR(256), so these columns have a maximum size of 256 characters (source).

To support larger columns, you can use the maxlength column metadata field to specify the maximum length of individual string columns. This can also be done as a space-savings performance optimization in order to declare columns with a smaller maximum length than the default.

⚠️ Note: Due to limitations in Spark, metadata modification is unsupported in the Python, SQL, and R language APIs.

Here is an example of updating multiple columns' metadata fields using Spark's Scala API:

import org.apache.spark.sql.types.MetadataBuilder

// Specify the custom width of each column
val columnLengthMap = Map(
  "language_code" -> 2,
  "country_code" -> 2,
  "url" -> 2083
)

var df = ... // the dataframe you'll want to write to Redshift

// Apply each column metadata customization
columnLengthMap.foreach { case (colName, length) =>
  val metadata = new MetadataBuilder().putLong("maxlength", length).build()
  df = df.withColumn(colName, df(colName).as(colName, metadata))
}

df.write
  .format("com.databricks.spark.redshift")
  .option("url", jdbcURL)
  .option("tempdir", s3TempDirectory)
  .option("dbtable", sessionTable)
  .save()

Setting a custom column type

If you need to manually set a column type, you can use the redshift_type column metadata. For example, if you desire to override the Spark SQL Schema -> Redshift SQL type matcher to assign a user-defined column type, you can do the following:

import org.apache.spark.sql.types.MetadataBuilder

// Specify the custom width of each column
val columnTypeMap = Map(
  "language_code" -> "CHAR(2)",
  "country_code" -> "CHAR(2)",
  "url" -> "BPCHAR(111)"
)

var df = ... // the dataframe you'll want to write to Redshift

// Apply each column metadata customization
columnTypeMap.foreach { case (colName, colType) =>
  val metadata = new MetadataBuilder().putString("redshift_type", colType).build()
  df = df.withColumn(colName, df(colName).as(colName, metadata))
}

Configuring column encoding

When creating a table, this library can be configured to use a specific compression encoding on individual columns. You can use the encoding column metadata field to specify a compression encoding for each column (see Amazon docs for available encodings).

Setting descriptions on columns

Redshift allows columns to have descriptions attached that should show up in most query tools (using the COMMENT command). You can set the description column metadata field to specify a description for individual columns.

Transactional Guarantees

This section describes the transactional guarantees of the Redshift data source for Spark

General background on Redshift and S3's properties

For general information on Redshift's transactional guarantees, see the Managing Concurrent Write Operations chapter in the Redshift documentation. In a nutshell, Redshift provides serializable isolation (according to the documentation for Redshift's BEGIN command, "[although] you can use any of the four transaction isolation levels, Amazon Redshift processes all isolation levels as serializable"). According to its documentation, "Amazon Redshift supports a default automatic commit behavior in which each separately-executed SQL command commits individually." Thus, individual commands like COPY and UNLOAD are atomic and transactional, while explicit BEGIN and END should only be necessary to enforce the atomicity of multiple commands / queries.

When reading from / writing to Redshift, this library reads and writes data in S3. Both Spark and Redshift produce partitioned output which is stored in multiple files in S3. According to the Amazon S3 Data Consistency Model documentation, S3 bucket listing operations are eventually-consistent, so the files must to go to special lengths to avoid missing / incomplete data due to this source of eventual-consistency.

Guarantees of the Redshift data source for Spark

Appending to an existing table: In the COPY command, this library uses manifests to guard against certain eventually-consistent S3 operations. As a result, it appends to existing tables have the same atomic and transactional properties as regular Redshift COPY commands.

Appending to an existing table: When inserting rows into Redshift, this library uses the COPY command and specifies manifests to guard against certain eventually-consistent S3 operations. As a result, spark-redshift appends to existing tables have the same atomic and transactional properties as regular Redshift COPY commands.

Creating a new table (SaveMode.CreateIfNotExists): Creating a new table is a two-step process, consisting of a CREATE TABLE command followed by a COPY command to append the initial set of rows. Both of these operations are performed in a single transaction.

Overwriting an existing table: By default, this library uses transactions to perform overwrites, which are implemented by deleting the destination table, creating a new empty table, and appending rows to it.

If the deprecated usestagingtable setting is set to false then this library will commit the DELETE TABLE command before appending rows to the new table, sacrificing the atomicity of the overwrite operation but reducing the amount of staging space that Redshift needs during the overwrite.

Querying Redshift tables: Queries use Redshift's UNLOAD command to execute a query and save its results to S3 and use manifests to guard against certain eventually-consistent S3 operations. As a result, queries from Redshift data source for Spark should have the same consistency properties as regular Redshift queries.

Migration Guide

  • Version 2.0 removed a number of deprecated APIs; for details, see databricks#239