/mysqlslave

Library to catch mysql binlog updates online in your application.

Primary LanguageC++

You can configure replication for your any daemon (this daemon has to use MYSQL database, version not below 5.1, and row-based replication has to be configured) with class CLogParser. Your daemon has to be child of CLogParser and has to define three virtual functions:

	int on_insert(const mysql::CTable& tbl, const mysql::CTable::TRows& rows);
	int on_update(const mysql::CTable& tbl, const mysql::CTable::TRows& rows, const mysql::CTable::TRows& old_rows);
	int on_delete(const mysql::CTable& tbl, const mysql::CTable::TRows& rows);

	tbl -- changed table
	rows -- new rows of the table (in on_delete "rows" -- deleted rows)
	old_rows -- old rows, which were changed by UPDATE-query

Class mysql::CTable::TRows is std::list<mysql::CRow>. These operators are defined for class CRow:

	const CValue& operator[](const std::string& name) const;
	const CValue& operator[](size_t i) const;

In order to get value from CValue in appropriate format, these functions are defined:

	typedef uint16_t TYear;
	typedef struct
	{
		TYear y;
		uint8_t m;
		uint8_t d;
	} TDate;
	typedef struct
	{
		uint8_t h;
		uint8_t m;
		uint8_t s;
	} TTime;
	typedef struct
	{
		TDate date;
		TTime time;
	} TDateTime;

	int8_t as_int8() const;
	int16_t as_int16() const;
	int32_t as_int32() const;
	int64_t as_int64() const;
	uint8_t as_uint8() const;
	uint16_t as_uint16() const;
	uint32_t as_uint32() const;
	uint64_t as_uint64() const; 

	double as_double() const;
	float as_float() const;

	bool as_boolean() const;

	time_t as_timestamp() const;
	TDateTime as_datetime() const;
	TDate as_date() const;
	TTime as_time() const;
	TYear as_year() const;

	std::string as_string() const;
	const char* as_string(size_t* length) const;
	void as_string(std::string& dst) const;

	uint64_t as_enum() const;
	uint64_t as_set() const;

In order to select tables for which you want to catch changes, you should use the function:

	void watch(const std::string& db_name, const std::string& table_name);

You need special thread for replication. It has to contain the loop:

	while (dispatch())
	{
		try
		{
			dispatch_events();
		}
		catch (const std::exception& e)
		{
			sleep(1);
		}
	}

You have to use stop_event_loop() for interrupt of replication loop.

Code example:

class my_class : public mysql::CLogParser
{
	...

	pthread_t th_repl;

	...

	~my_class()
	{
		...

		stop_event_loop();
		::pthread_kill(th_repl, SIGTERM);

		...
	}

	...

	void replication_thread_proc()
	{
		while (dispatch())
		{
			try
			{
				dispatch_events();
			}
			catch (const std::exception& e)
			{
				fprintf(stderr, "catch exception: %s\n", e.what());
				sleep(1);
			}
		}
	}

	void connect_mysql_repl(const char* host, const char* user, const char* passwd, int port, int slave_id)
	{
		set_connection_params(host, slave_id, user, passwd, port);
		watch("db1", "db1_table1");
		watch("db1", "db1_table2");
		watch("db2", "db2_table1");
		prepare();
	}

	int on_insert(const mysql::CTable& tbl, const mysql::CTable::TRows& rows)
	{
		if (strcasecmp(tbl.get_table_name(), "db1_table1") == 0)
		{
			for (mysql::CTable::TRows::const_iterator it = rows.begin(); it != rows.end(); ++it)
			{
				fprintf(stdout, "%u\n", *it["id"].as_uint32());
			}
		}
		return 0;
	}
	int on_update(const mysql::CTable& tbl, const mysql::CTable::TRows& rows, const mysql::CTable::TRows& old_rows)
	{
		return 0;
	}
	int on_delete(const mysql::CTable& tbl, const mysql::CTable::TRows& rows)
	{
		return 0;
	}

	...
};