/canal-go

Alibaba mysql database binlog incremental subscription & consumer components Canal's golang client[阿里巴巴mysql数据库binlog的增量订阅&消费组件 Canal 的 go 客户端 ] https://github.com/alibaba/canal

Primary LanguageGoApache License 2.0Apache-2.0

canal-go

forked from withlin/canal-go

compile pb with protoc libprotoc 3.20.1

example

// samples/main.go

package main

import (
	"fmt"
	"time"

	pbe "github.com/Q1mi/canal-go/protocol/entry"

	"github.com/Q1mi/canal-go/client"
	"google.golang.org/protobuf/proto"
)

// canal-go client demo

func main() {
	// 连接canal-server
	// 请修改为你的 canal-server 配置
	connector := client.NewSimpleCanalConnector(
		"127.0.0.1", 11111, "", "", "example", 60000, 60*60*1000)
	err := connector.Connect()
	if err != nil {
		panic(err)
	}

	// mysql 数据解析关注的表,Perl正则表达式.
	err = connector.Subscribe(".*\\..*")
	if err != nil {
		fmt.Printf("connector.Subscribe failed, err:%v\n", err)
		panic(err)
	}

	// 消费消息
	for {
		message, err := connector.Get(100, nil, nil)
		if err != nil {
			fmt.Printf("connector.Get failed, err:%v\n", err)
			continue
		}
		batchId := message.Id
		if batchId == -1 || len(message.Entries) <= 0 {
			time.Sleep(time.Second)
			fmt.Println("===暂无数据===")
			continue
		}
		printEntry(message.Entries)
	}
}

func printEntry(entries []*pbe.Entry) {
	for _, entry := range entries {
		// 忽略事务开启和事务关闭类型
		if entry.GetEntryType() == pbe.EntryType_TRANSACTIONBEGIN ||
			entry.GetEntryType() == pbe.EntryType_TRANSACTIONEND {
			continue
		}
		// RowChange对象,包含了一行数据变化的所有特征
		rowChange := new(pbe.RowChange)
		// protobuf解析
		err := proto.Unmarshal(entry.GetStoreValue(), rowChange)
		if err != nil {
			fmt.Printf("proto.Unmarshal failed, err:%v\n", err)
		}
		if rowChange == nil {
			continue
		}
		// 获取并打印Header信息
		header := entry.GetHeader()
		fmt.Printf("binlog[%s : %d], name[%s,%s], eventType: %s\n",
			header.GetLogfileName(),
			header.GetLogfileOffset(),
			header.GetSchemaName(),
			header.GetTableName(),
			header.GetEventType(),
		)
		//判断是否为DDL语句
		if rowChange.GetIsDdl() {
			fmt.Printf("isDdl:true, sql:%v\n", rowChange.GetSql())
		}

		// 获取操作类型:insert/update/delete等
		eventType := rowChange.GetEventType()
		for _, rowData := range rowChange.GetRowDatas() {
			if eventType == pbe.EventType_DELETE {
				printColumn(rowData.GetBeforeColumns())
			} else if eventType == pbe.EventType_INSERT || eventType == pbe.EventType_UPDATE {
				printColumn(rowData.GetAfterColumns())
			} else {
				fmt.Println("---before---")
				printColumn(rowData.GetBeforeColumns())
				fmt.Println("---after---")
				printColumn(rowData.GetAfterColumns())
			}
		}
	}
}

func printColumn(columns []*pbe.Column) {
	for _, col := range columns {
		fmt.Printf("%s:%s  updated=%v\n", col.GetName(), col.GetValue(), col.GetUpdated())
	}
}