/spark-binlog

A library for querying MySQL Binlog with Apache Spark structure streaming, for Spark SQL , DataFrames and [MLSQL](https://www.mlsql.tech).

Primary LanguageScala

Spark MySQL Binlog Library

A library for querying MySQL Binlog with Apache Spark structure streaming, for Spark SQL , DataFrames and MLSQL.

  1. jianshu: How spark-binlog works
  2. medium: How spark-binlog works

Requirements

This library requires Spark 2.4+ (tested). Some older versions of Spark may work too but they are not officially supported.

Liking

You can link against this library in your program at the following coordinates:

Scala 2.11

groupId: tech.mlsql
artifactId: spark-binlog_2.11
version: 0.2.2-SNAPSHOT

Limitation

  1. MySQL BinlogFormat should be set as "Row"
  2. Version 0.1.1 only support insert/update/delete events. The other events will ignore.

Usage

DataFrame:

val spark: SparkSession = ???

val df = spark.readStream.
format("org.apache.spark.sql.mlsql.sources.MLSQLBinLogDataSource").
option("host","127.0.0.1").
option("port","3306").
option("userName","xxxxx").
option("password","xxxxx").
option("databaseNamePattern","mlsql_console").
option("tableNamePattern","script_file").
optioin("binlogIndex","4").
optioin("binlogFileOffset","4").
load()


df.writeStream.
format("org.apache.spark.sql.delta.sources.MLSQLDeltaDataSource").  
option("__path__","/tmp/sync/tables").
option("mode","Append").
option("idCols","id").
option("duration","5").
option("syncType","binlog").
checkpointLocation("/tmp/cpl-binlog2")
.mode(OutputMode.Append).save("{db}/{table}")

MLSQL:

set streamName="binlog";

load binlog.`` where 
host="127.0.0.1"
and port="3306"
and userName="xxxxx"
and password="xxxxx"
and bingLogNamePrefix="mysql-bin"
and binlogIndex="4"
and binlogFileOffset="4"
and databaseNamePattern="mlsql_console"
and tableNamePattern="script_file"
as table1;

save append table1  
as rate.`mysql_{db}.{table}` 
options mode="Append"
and idCols="id"
and duration="5"
and syncType="binlog"
and checkpointLocation="/tmp/cpl-binlog2";

MLSQL Example

How to get the initial offset

If you are the first time to start spark-binlog, use command like following to get the offset you want:

mysqlbinlog \ 
--start-datetime="2019-06-19 01:00:00" \ 
--stop-datetime="2019-06-20 23:00:00" \ 
--base64-output=decode-rows \
-vv  master-bin.000004

Questions

People may meet some log like following:

Trying to restore lost connectioin to .....
Connected to ....

Please check the server_id is configured in my.cnf of your MySQL Server.