Change data capture realization using Spark and Sqoop
HDFS table needs to capture changes from the source RDB table. For example source RDB table has 1 million records and half of them have been changed, so we need to synchronize changed records with our HDFS database.
- We're going to use Sqoop in incremental mode to import only those records that were modified
- We will store modified records in parquet file
- Using Spark SQL, create new tables: one for extracted data and another for updated original table.
- Using Spark SQL, insert unchanged data merged with modified data using LEFT JOIN
- Installed Spark and Sqoop
- JDBC driver for Sqoop
- Running hadoop cluster
- Running spark-shell
Standart template for sqoop import as parquet file:
sqoop import --connect <your jdbc driver with DB> --username <username> --password <password> --table <table name> --target-dir <path to HDFS directory> --as-parquetfile
Sqoop incremental import can capture both new and modified records. If we need to import new records, we need to add next parameter:
--check-column <name of column> --incremental append --last-value <value of the last record, that wasn't changed>
If we need to capture updates:
--check-column <name of column> --incremental lastmodified --last-value <value of the last record, that wasn't changed>
We can save our incremental import command for multiple usage without specifying --last-value attribute. Example:
sqoop job --create MY_JOB_NAME --import --connect
etc.
Sqoop will identify last-value attribute on first run and will change it if there will be larger value, so we don't need to specify it manually. To execute job run
sqoop job --exec JOB_NAME
Let our original table be called Customers. After sqoop operation there will be parquet file with changed data. To make updated variant of original table, follow next steps.
- Create temporary table called customers_extract
val parquetFile = sqlContext.read.parquet("path/to/your/parquet")
parquetFile.registerTempTable("customers_extract")
- Create table called customers_updated, where we will merge updates and original table
sql = "DROP TABLE if exists customers_updated"
sqlContext.sql(sql)
sql = "CREATE TABLE customers_updated ( cust_no int ,birth_date date ,first_name string ,last_name string ,gender string ,join_date date ,created_date timestamp ,modified_date timestamp ) STORED AS PARQUET"
sqlContext.sql(sql)
- Insert data into customers_updated
sql = "INSERT INTO TABLE customer_update_spark SELECT a.cust_no, a.birth_date, a.first_name, a.last_name, a.gender, a.join_date, a.created_date, a.modified_date FROM customer a LEFT OUTER JOIN customer_extract b ON a.cust_no = b.cust_no WHERE b.cust_no IS NULL"
sqlContext.sql(sql)