🛈 Please note that this is an open source project which is officially supported by Exasol. For any question, you can contact our support team or open a Github issue.
This is a connector library that supports an integration between Exasol and Apache Spark. Using this connector, users can create Spark dataframes from Exasol queries and save Spark dataframes as Exasol tables.
The implementation is based on Spark DataSources API and Exasol Sub Connections.
- Deployed and running Spark cluster
- Deployed and running Exasol cluster
- Make sure Spark cluster has enough resources to start executors that are more or equal to the number of Exasol data nodes
- Make sure that Spark cluster can connect to Exasol nodes using private ip
addresses, e.g,
10.0.0.11
- Make sure that Exasol nodes are reachable from Spark cluster on port
8563
and on port range20000-21000
Here is short code snippets on how to use the connector in your Spark / Scala applications.
Reading data from Exasol as Spark dataframe:
// An Exasol sql syntax query string
val exasolQueryString =
"""
SELECT SALES_DATE, MARKET_ID, PRICE
FROM RETAIL.SALES
WHERE MARKET_ID IN (661, 534, 667)
"""
val df = sparkSession
.read
.format("exasol")
.option("host", "10.0.0.11")
.option("port", "8563")
.option("username", "sys")
.option("password", "exaPass")
.option("query", exasolQueryString)
.load()
Saving a Spark dataframe as an Exasol table:
df
.write
.mode("append")
.option("host", "10.0.0.11")
.option("port", "8563")
.option("username", "sys")
.option("password", "exaPass")
.option("table", "RETAIL.ADJUSTED_SALES")
.format("exasol")
.save()
Additionally, you can set the parameter on SparkConf
:
// Configure spark session
val sparkConf = new SparkConf()
.setMaster("local[*]")
.set("spark.exasol.host", "localhost")
.set("spark.exasol.port", "8563")
.set("spark.exasol.username", "sys")
.set("spark.exasol.password", "exasol")
.set("spark.exasol.max_nodes", "200")
val sparkSession = SparkSession
.builder()
.config(sparkConf)
.getOrCreate()
val queryStr = "SELECT * FROM MY_SCHEMA.MY_TABLE"
val df = sparkSession
.read
.format("exasol")
.option("query", queryStr)
.load()
Please note that parameter values set on Spark configuration will have higher priority.
For an example walkthrough please check docs/example-walkthrough.
The latest release version () is compiled against Scala 2.11 and Spark 2.1+.
In order to use the connector in your Java or Scala applications, you can
include it as a dependency to your projects by adding artifact information into
build.sbt
or pom.xml
files.
resolvers ++= Seq("Exasol Releases" at "https://maven.exasol.com/artifactory/exasol-releases")
libraryDependencies += "com.exasol" % "spark-connector_2.11" % "$LATEST_VERSION"
<repository>
<id>maven.exasol.com</id>
<url>https://maven.exasol.com/artifactory/exasol-releases</url>
</repository>
<dependency>
<groupId>com.exasol</groupId>
<artifactId>spark-connector_2.11</artifactId>
<version>$LATEST_VERSION</version>
</dependency>
As an alternative, you can provide --repositories
and --packages
artifact
coordinates to the spark-submit, spark-shell or pyspark commands.
For example:
spark-shell \
--repositories https://maven.exasol.com/artifactory/exasol-releases \
--packages com.exasol:spark-connector_2.11:$LATEST_VERSION
Similarly, you can submit packaged application into the Spark cluster.
Using spark-submit:
spark-submit \
--master spark://spark-master-url:7077
--repositories https://maven.exasol.com/artifactory/exasol-releases \
--packages com.exasol:spark-connector_2.11:$LATEST_VERSION \
--class com.myorg.SparkExasolConnectorApp \
--conf spark.exasol.password=exaTru3P@ss \
path/to/project/folder/target/scala-2.11/sparkexasolconnectorapp_2.11-5.3.1.jar
This deployment example also shows that you can configure the Exasol parameters
at startup using --conf spark.exasol.keyName=value
syntax.
Please update the $LATEST_VERION
accordingly with the latest artifact
version number.
The following configuration parameters can be provided mainly to facilitate a connection to Exasol cluster.
Spark Configuration | Configuration | Default | Description |
---|---|---|---|
query |
A query string to send to Exasol | ||
table |
A table name (with schema, e.g. my_schema.my_table) to save dataframe | ||
spark.exasol.host |
host |
localhost |
A host ip address to the first Exasol node (e.g. 10.0.0.11) |
spark.exasol.port |
port |
8888 |
A port number to connect to Exasol nodes (e.g. 8563) |
spark.exasol.username |
username |
sys |
An Exasol username for logging in |
spark.exasol.password |
password |
exasol |
An Exasol password for logging in |
spark.exasol.max_nodes |
max_nodes |
200 |
The number of data nodes in Exasol cluster |
spark.exasol.batch_size |
batch_size |
1000 |
The number of records batched before running execute statement when saving dataframe |
spark.exasol.create_table |
create_table |
false |
A permission to create table if it does not exist in Exasol when saving dataframe |
Clone the repository,
git clone https://github.com/exasol/spark-exasol-connector
cd spark-exasol-connector/
Compile,
./sbtx compile
Run unit tests,
./sbtx test
To run integration tests, a separate docker network should be created first,
docker network create -d bridge --subnet 192.168.0.0/24 --gateway 192.168.0.1 dockernet
then run,
./sbtx it:test
The integration tests requires docker, exasol/docker-db, testcontainers and spark-testing-base.
In order to create a bundled jar,
./sbtx assembly
This creates a jar file under target/
folder. The jar file can be used with
spark-submit
, spark-shell
or pyspark
commands. For example,
spark-shell --jars /path/to/spark-exasol-connector-assembly-*.jar
-
Getting an
com.exasol.jdbc.ConnectFailed: Connection refused
exceptionThis usually occurs when the Spark connector cannot reach Exasol data nodes. Please make sure that the Exasol data nodes are reachable on port
8563
and on port ranges20000-21000
.Additionally, please make sure that the
host
parameter value is set to the first Exasol datanode address, for example,10.0.0.11
. -
Getting an
Connection was lost and could not be reestablished
errorFor example:
[error] Caused by: com.exasol.jdbc.ConnectFailed: Connection was lost and could not be reestablished. (SessionID: 1615669509094853970) [error] at com.exasol.jdbc.AbstractEXAConnection.reconnect(AbstractEXAConnection.java:3505) [error] at com.exasol.jdbc.ServerCommunication.handle(ServerCommunication.java:98) [error] at com.exasol.jdbc.AbstractEXAConnection.communication(AbstractEXAConnection.java:2537) [error] at com.exasol.jdbc.AbstractEXAConnection.communication_resultset(AbstractEXAConnection.java:2257) [error] at com.exasol.jdbc.AbstractEXAStatement.execute(AbstractEXAStatement.java:456) [error] at com.exasol.jdbc.EXAStatement.execute(EXAStatement.java:278) [error] at com.exasol.jdbc.AbstractEXAStatement.executeQuery(AbstractEXAStatement.java:601) [error] at com.exasol.spark.rdd.ExasolRDD.compute(ExasolRDD.scala:125) [error] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) [error] at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
This is one of the known issues. This happens when Spark scheduled parallel tasks are less than the number of sub connections. This can be mitigated by submitting Spark application with enough resources so that it can start parallel tasks that are more or equal to number of parallel Exasol connections.
Additionally, you can limit the Exasol parallel connections using
max_nodes
parameter. However, it is not advised to limit this value in production environment.