The Spark-etl-framework is a pipeline-based data transformation framework using Spark-SQL. For one process flow to transform and move data from end to end, a pipeline is defined. At the start of a pipeline, read-actors (readers) are required to load data from the source(s); in the middle of the pipeline, data normally gets transformed with Spark-SQL based transformers; and finally, at the end of the pipeline, write-actors (writers) write outputs to the target location. A pipeline is not limited to the order of read-transform-write, any of these 3 actions can be in any stages of a pipeline except write cannot be the first action. Also, custom actors can be developed and plugged-in.
Technically a pipeline consists of multiple jobs, and each job contains multiple actions which are represented by Actors. Each job can run under the same Spark Session or separate Spark Sub-Sessions; and dataframes (as views) can be shared across jobs. Most actors require input view(s) and produce output view(s).
To build the project, run
mvn clean install -DskipTests
The following explains the definition of each section in a pipeline:
- singleSparkSession - when a pipeline consists of multiple jobs, each job can be executed under a separate Spark Sub-Session. This provides resource isolation across jobs. When this flag is set to true, all jobs are executed under the same global Spark-Session. The default value is false.
- globalViewAsLocal - global dataframes (views) are shared across jobs (even when they are running under separate Spark Sub-Sessions). Global views are referenced as global_temp.${table}. To make the references easier as local views, set this flag to true. The default value is true.
Please note that variables cannot be referenced in this section.
The aliases section defines the shorthand name used for referencing various actors. However, the alias for each actor must be globally unique. Aliases are optional, if not defined, then the fully-qualified-class-name of an actor must be provided when defining an action for a job.
aliases:
- name: file-reader
type: com.qwshen.source.FileReader
- name: sql
type: com.qwshen.transform.SqlTransformer
- name: hbase-writer
type: com.qwshen.sink.HBaseWriter
Reference an alias in action definition:
actions:
- name: load users
actor:
type: file-reader # file-reader is an alias for com.qwshen.source.FileReader
properties:
# ...
output-view:
name: users
global: true
Please note that variables from application configuration, job-submit arguments can be referenced in this section. However, variables defined inside the current pipeline (see below Variables section) cannot be referenced.
For centralized and mixed Aliases definition, please check here
If custom UDFs are required in a pipeline for transforming data, a custom UDF register needs to be provided to register the related UDFs.
A UDF register must extend com.qwshen.etl.common.UdfRegister, and implement the following method:
//the prefix is a string for prefixing the name of UDFs in case two UDFs have the same name
def register(prefix: String)(implicit session: SparkSession): Unit
Once the UDF register is implemented, it can be configured at pipeline level as follows:
{
"udf-registration": [
{
"prefix": "event_",
"type": "com.qwshen.etl.EventUdfRegister"
},
{
"prefix": "user_",
"type": "com.qwshen.etl.UserUdfRegister"
}
]
}
Please note that variables from application configuration, job-submit arguments can be referenced in this section. However, variables defined inside the current pipeline (see below Variables section) cannot be referenced.
For centralized and mixed UDF-registration, please check here
For UDF example, please check here
The variables defined in this section can be referenced in the definition of the current pipeline, including SQL statements, but not in the Settings, Aliases and Udf-Registration sections
-
A variable must be given a name and value, and is referenced in the format of ${variable-name}.
-
The value can reference any variables defined in the application configuration, as well as from the job submit arguments. The following example shows that process_date is from events.process_date, which is defined in the application configuration:
<variables> <variable name="process_date" value="${events.process_date}" /> <variable name="staging_uri" value="concat('/tmp/staging/events', current_date())" /> <variable name="execution_time" value="now()" /> </variables>
Please Note: a value can also be any valid sql-expression which may reference pre-defined variables. Such as the staging_uri, its value is calculated on the fly when the job starts to run. However, this only applies for the variables defined inside a pipeline. Variables with sql-expression defined in application configuration or from job submit arguments are not evaluated.
-
When a variable is defined more than once, its value from the job submit arguments overrides what from application configuration, and the value from the Variables section of a pipeline has the highest precedence.
-
When a variable contains sensitive data, such as a password, its value can be protected by a custom key. The following describes how to encrypt the value, and how to configure the variable:
java -cp spark-etl-framework-xxx.jar com.qwshen.Encryptor \ --key-string ${key-value} --data ${password}
If the key is stored in a file:
java -cp spark-etl-framework-xxx.jar com.qwshen.Encryptor \ --key-file ${file-name} --data ${password}
The above command will print the encrypted value.
variables: - name: db.password value: ${events.db.password} decryptionKeyString: ${application.security.decryption.key}
or
variables: - name: db.password value: ${events.db.password} decryptionKeyFile: ${application.security.decryption.keyFile}
The ${events.db.password} is the encrypted value from the encryption step.
For variables defined and used inside sql-statements, please check here
For setting up variables with VariableSetter actor, please check here
A pipeline may contain multiple jobs while each job may have multiple actions. A job provides a container for resource isolation (when singleSparkSession = false). The output from an action of a job may be shared across actions and across jobs. Each action in a job is represented by an Actor, which is defined by its type, and may have properties for controlling its behavior.
{
"name": "load features",
"actor": {
"type": "delta-reader",
"properties": {
"options": {
"versionAsOf": "0"
},
"sourcePath": "${delta_dir}"
}
},
"output-view": {
"name": "features",
"global": "false"
}
}
In above example:
- The delta-reader is an alias defined in the aliases section pointing to com.qwshe.etl.source.DeltaReader. If alias is not preferred, the fully-qualified class name needs to be specified:
"type": "com.qwshe.etl.source.DeltaReader"
- The global flag in the output-view determines whether the output of the actor (data-view) can be shared across jobs. Default is false.
Properties of an actor may reference any variables from application configuration, job-submit arguments and/or defined in the Variables section of the current pipeline. Values of the properties can also be any valid sql-expressions which may reference any pre-defined variables.
Each Actor has at most one output (as a view) which can be referenced in any downstream actors. The view can also be referenced as table in a sql statement:
select * from features
If the view is global (shared across jobs) and globalViewAsLocal = true, the global view can be referenced as a local view like in the above query. Otherwise:
select * from global_temp.features
The definition of a job is not necessarily embedded in the definition of a pipeline, especially when the job is re-used across multiple pipelines. Instead, the job may be defined in a separated file and included as follows:
jobs:
- include: jobs/job.yaml
Metrics collection and logging can be enabled per action based on the following configuration in pipeline definition:
metrics-logging:
enabled: "${metrics_logging_enabled}"
uri: "${metrics_logging_uri}"
actions:
- load-events
- transform user-train
In the above setting, the metrics of the two actions (load-events, transform user-train) will be collected and written to ${metrics_uri}. The default metrics include schema in DDL format, row-count, estimate-size and execute-time of the views. Custom metrics can be added by following this guide. Please note: collecting metrics may impact the overall performance.
For situations where the results of one or more actions need to be staged for troubleshooting or data verification. This can be achieved by adding the following section in the definition of a pipeline:
debug-staging:
enabled: "${staging_enabled}"
uri: "${staging_uri}"
actions:
- load-events
- transform-user-train
In the above setting, the output of the two actions (load-events, transform-user-train) will be staged at ${staging_uri}. Please note that more actions for staging more impact on the performance. Therefore, they should be used primarily in dev environments.
- template_pipeline.yaml with included job.yaml
- template_pipeline.json with included job.json
- template_pipeline.xml with included job.xml
One or more application configuration files can be provided when submitting a Spark job. The following is one example:
source.events {
users_input = "data/users"
}
kafka {
bootstrap.servers = "localhost:9092"
schema.registry.url = "http://localhost:8081"
}
application {
security.decryption.key = "my_secret_123"
scripts_uri = "./scripts"
}
There are three approaches to providing the runtime configuration:
application.runtime {
spark {
driver.memory = 16g
executor.memory = 16g
serializer = org.apache.spark.serializer.KryoSerializer
sql {
extensions = io.delta.sql.DeltaSparkSessionExtension
catalog.spark_catalog = org.apache.spark.sql.delta.catalog.DeltaCatalog
}
}
hadoopConfiguration {
# skip writing __SUCCESS
mapreduce.fileoutputcommitter.marksuccessfuljobs = false
}
# skip writing crc files
filesystem.skip.write.checksum = true
# support hive integration
hiveSupport = true
# please check how validation run works in the "Submitting a Spark Job" section.
validationRun: 3
}
Note: spark configs from application configuration have the highest precedence.
- name: set up configuration for s3-access
actor:
type: spark-conf-actor
properties:
configs:
spark.sql.shuffle.partitions: 160
hadoopConfigs:
fs.s3a.path.style.access: "true"
fs.s3a.impl: "org.apache.hadoop.fs.s3a.S3AFileSystem"
fs.s3a.connection.ssl.enabled: "true"
fs.s3a.endpoint: "s3a.qwshen.com:9000"
fs.s3a.access.key: "sa_7891"
fs.s3a.secret.key: "s.UjEjksEnEidFehe\KdenG"
The following is one example of how to submit a Spark job. Note that it also demonstrates how to provide the runtime configs, as well as pass variables.
spark-submit --master yarn|local --deploy-mode client|cluster \
--name test \
--conf spark.executor.memory=24g --conf spark.driver.memory=16g \
--conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog \
--jars ./mysql-connector-jar.jar \
--class com.qwshen.Launcher spark-etl-framework-0.1-SNAPSHOT.jar \
--pipeline-def "./test.yaml#load users;load train" --application-conf ./common.conf,./environment.conf,./application.conf \
--var application.runtime.validationRun=true \
--var process_date=20200921 --var environment=dev \
--vars encryption_key=/tmp/app.key,password_key=/tmp/pwd.key \
--staging-uri hdfs://tmp/staging --staging-actions load-events,combine-users-events \
--metrics-logging-uri hdfs://tmp/metrics-logging --metrics-logging-actions load-events,combine-users-events
- If it is to run particular jobs from a pipeline, put the name of jobs separated by comma or semi-colon after the pipeline file, such as "test.yaml#load users;loading train". If no job name specified, it runs through all jobs.
- When multiple config files are provided, configs from the next file override configs from the previous file. In above example, environment.conf overrides common.conf, and application.conf overrides environments.conf.
- If the application.runtime.validationRun is specified, it is to run through a validation across all jobs and actions from the pipeline, such as validating all columns are valid in a sql actor, etc. Values of application.runtime.validationRun could be:
- true - the validation run will be conducted, data is truncated from input, and no data is outputted;
- false - the validation run is disabled. This is the default value.
- position number => the validation run will be conducted, but for each action, only the number of records are selected from its output and moves forward for any downstream actions.
- negative number => equivalent as false. the validation run is turned off.
- When the staging-uri & staging actions are specified, they override the staging-configuration defined in the pipeline.
- When the metrics-logging-uri & metrics-logging-actions are specified, they override the metrics-logging configuration defined in the pipeline.
Run a live example, and more tutorials
- DeltaReader
- DeltaStreamReader
- FileReader
- FileStreamReader
- FlatReader
- FlatStreamReader
- FlightReader
- HBaseReader
- IcebergReader
- IcebergStreamReader
- JdbcReader
- KafkaReader
- KafkaStreamReader
- MongoReader
- RedisReader
- RedisStreamReader
- SqlReader
- SqlTableReader
- DeltaWriter
- DeltaStreamWriter
- FileWriter
- FileStreamWriter
- FlightWriter
- HBaseWriter
- HBaseStreamWriter
- IcebergWriter
- IcebergStreamWriter
- JdbcWriter
- JdbcStreamWriter
- KafkaWriter
- KafkaStreamWriter
- MongoWriter
- MongoStreamWriter
- RedisWriter
- RedisStreamWriter
- SqlWriter
- SqlTableWriter
In cases where the logic of transforming data is very complicated, or a new reader and/or writer would be preferred, a custom Actor can be created by following this guide.
- Limit the data size by selecting the required columns, adding filtering etc.;
- Do not use "in" key word in where clause when comparing with another dataset, such as
select * from orders where user_id in (select id from users)
- Cache tables when needed. The framework checks the number of times of a table referenced and cache the table automatically if the referenced number is greater than 1;
- Always keep eyes on data-skew - this is the #1 troublemaker.
- Avoid using UDF since it is a black-box and Spark doesn't know how to optimize it;
- Don't write custom Lookup-UDF, instead use broadcast-join.