/gae-das

监听mysql binlog实时生成GAE使用的增量索引

Primary LanguageJava

GAE-DAS

监听mysql binlog并生成GAE使用的增量索引, 输出至本地文件或kafka,支持主从热备高可用方案。

GAE-DAS使用mysql-binlog-connector库进行binlog监听。程序启动连接成功后会查询information_schema库的columns表来读取指定数据库所有表的元数据(列名,位置,数据类型等),并使用该数据解析TABLE_MAP事件。因此,当数据表结构发生变化时,最好重启一下DAS。(除非新增列、位于所有列的最后且该列不涉及索引生成)

项目还处于开发状态。

架构示意

das

  • BinlogClient

mysql-binlog-connector提供,与Mysql通讯。

  • TemplateHolder

解析用户配置的模板,创建并持有DasTemplate 对象

  • AggregationListener

TABLE_MAPUPDATE/INSERT/DELETE EVENT封装成MysqlRowData并传递给下游业务监听器

  • BizListener

用户实现的业务监听器,需先向AggregationListener进行注册,注册时声明自己对哪个库、哪张表感兴趣;同一个BizListener可以注册多次。AggregationListener在收到相关表的事件后会触发onEvent()方法。

双机热备

GAE-DAS支持双机热备的高可用部署方案,同时部署两个实例,当一台挂掉后另一台可自动接替:

HA

  • 使用主从模式时,两个实例部署后会自动"商讨"谁主谁从
  • 确定主从关系后, master定时向slave发送心跳,心跳包中包含当前master的binlog位置信息
  • 当slave超过一定时间未收到心跳时,会自动从上次心跳包里的binlog位置开始监听binlog,变为master
  • 原master修复上线后,会自动变为slave

模板

通过配置模板template.json来指定索引如何生成(配置对哪些表中的哪些字段感兴趣):

{
    "database": "gae-das", # 库名
    "tableList": [
        {
            "tableName": "new_table", # 表名
            "level": 1, # 该表所在层级

			# 对 insert 操作感兴趣
            "insert": [
            	# 输出索引记录的字段顺序
            	# 此例为
            	# id\tname\tage
                {"column": "id"},
                {"column": "name"},
                {"column": "age"}
            ],
            "update": [
                {"column": "id"},
                {"column": "name"},
                {"column": "age"}
            ],
            "delete": [
                {"column": "id"},
                {"column": "name"},
                {"column": "age"}
            ]
        },
        {
            "tableName": "acc",
            "level": 2,

            "insert": [
                {"column": "id"},
                {"column": "name"}
            ],
            "update": [
                {"column": "id"},
                {"column": "name"}
            ],
            "delete": [
                {"column": "id"},
                {"column": "name"}
            ]
        }

    ]
}

输出的索引文件为词表(\t分隔):

# 层级	# 操作类型(0:insert, 1:update, 2: delete)		# 数据项(模板中的column)
1	0	6	apple
1	1	6	apples
2	2	5	aaa
2	0	7	pear

mysql连接信息和binlog同步设置见application.yml文件

构建运行

增量索引写到文件时, 除mysql连接信息和template.json外不需要额外配置:

mvn clean package -Dmaven.test.skip=true
java -jar gae-das.jar

增量索引写入kafka时, 需要配置kafka相关信息(见application.ymldas.store.kafka相关配置)