Firstly, I hard coded a vec into the code to represent the tables whose binlogs needs to be processed.
const TABLES: &[&str] = &["paymentA", "paymentB", "paymentC"];
As described in the requirements, the event for each table needs to be sent to a specific kafka topic. So, before starting to replicate binlogs to Kafka, we create a topic for each table. The LogEventClient
contains all the information needed to replicate the binlogs for a table, including the topic name, partition client, and the kafka offset.
// create topics for each table
let mut clients: HashMap<String, LogEventClient> = HashMap::new();
for table in TABLES {
let topic = format!("{mysql_database}_{table}");
kafka_producer.create_topic(&topic).await;
let partition_client = kafka_producer.get_partition_client(0).await.unwrap();
let offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
clients.insert(
table.to_string(),
LogEventClient {
topic,
client: partition_client,
offset,
},
);
}
Like before, we receive binlog events from the BinlogClient and convert the events to json.
for result in binlog_client.replicate()? {
let (header, event) = result?;
let json_event = serde_json::to_string(&event).expect("Couldn't convert sql event to json");
let json_header =
serde_json::to_string(&header).expect("Couldn't convert sql header to json");
// process events...
binlog_client.commit(&header, &event);
}
The BinlogEvent
doesn't contain table information. We must determine which table(s) the event belongs to. We can do this by parsing the inner sql statement using the sqlparser
library.
For demonstration purpose, I only take a limited conditions into considerations. Only sqls like CREATE TABLE xxx
, SELECT xxx FROM xxx
, and INSERT xxx INTO xxx
will be correctly recognized.
let tables = get_event_table(&event);
fn get_event_table(event: &BinlogEvent) -> HashSet<String> {
match event {
BinlogEvent::QueryEvent(e) => extract_tables_from_sql(&e.sql_statement),
_ => HashSet::new(),
}
}
fn extract_tables_from_sql(sql: &str) -> HashSet<String> {
let ast = Parser::parse_sql(&MySqlDialect {}, sql).expect("failed to parse sql");
let mut tables = HashSet::new();
for st in ast {
match st {
Statement::Insert { table_name, .. } => {
for table in table_name.0 {
tables.insert(table.value);
}
}
Statement::CreateTable { name, .. } => {
for table in name.0 {
tables.insert(table.value);
}
}
Statement::Query(q) => {
if let sqlparser::ast::SetExpr::Select(s) = *q.body {
for f in s.from {
if let sqlparser::ast::TableFactor::Table { name, .. } = f.relation {
for table in name.0 {
tables.insert(table.value);
}
}
}
}
}
_ => {}
}
}
tables
}
Now that we know which tables the event belongs to, we can write the record in the corresponding kafka topic. Then, we consume these log records and print it out.
for table in tables {
// The event for tables not needed will be skipped
let Some(client) = clients.get_mut(&table) else {
continue;
};
client
.produce_log_record(json_header.clone(), json_event.clone())
.await;
let records = client.consume_log_records().await;
// print record
for r in records {
println!(
"===================Event from kafka topic {}===================",
client.topic
);
// ...
}
}
The code for producing and consuming log records(much like before):
impl LogEventClient {
async fn produce_log_record(&mut self, header: String, event: String) {
let record = Record {
key: None,
value: Some(event.into_bytes()),
headers: BTreeMap::from([("mysql_binlog_headers".to_owned(), header.into_bytes())]),
timestamp: Utc.timestamp_millis(42),
};
self.client
.produce(vec![record], Compression::default())
.await
.expect("failed to produce");
}
async fn consume_log_records(&mut self) -> Vec<Record> {
let (records, high_watermark) = self
.client
.fetch_records(
self.offset, // offset
1..100_000, // min..max bytes
1_000, // max wait time
)
.await
.unwrap();
self.offset = high_watermark;
records.into_iter().map(|ro| ro.record).collect()
}
}
Test code is similar to the original insert.wasm
, except that we now insert the payment records into multiple tables rather than one table. The code is in mysql-binlog-kafka/src/sql-commands-test-rust/insert_tables.rs
const TABLES: &[&str] = &["paymentA", "paymentB", "paymentC"];
for table in TABLES {
// create table and insert records...
}
To verify my implementation does the job, the github ci file is changed to:
- name: Run tests
run: |
cd mysql-binlog-kafka
wasmedge --env "DATABASE_URL=mysql://root:password@127.0.0.1:3306/mysql" sql-commands-test-wasm/insert_tables.wasm
sleep 10
cat out.log | grep "Event from kafka topic mysql_paymentA" && echo "Query executed successfully" || exit 1
cat out.log | grep "Event from kafka topic mysql_paymentB" && echo "Query executed successfully" || exit 1
cat out.log | grep "Event from kafka topic mysql_paymentC" && echo "Query executed successfully" || exit 1
The ci will pass if the event is sent to the corresponding kafka topic.
And it does!