julien-duponchelle/python-mysql-replication

About event extensions

Opened this issue · 1 comments

Hello,
I take the liberty to disturb you for a while, I hope this is a discussion issue, whether it is possible to extend the operation of event processing freely or conveniently, such as adding extension to events without destroying the original structure.

I found an interesting thing. I am researching TDSQL, which is a distributed database in China. The bottom layer is based on the mysql event structure, and new events are added. If you want to use the project smoothly, you need to implement object-oriented on the original basis. The inheritance and then adjustment, of course, for the object-oriented method of __map_events, it should be a private attribute and cannot be inherited as a subclass. But there are also channels to get it, which is also related to the Python mechanism, but this breaks the code. This is not graceful. So you can only copy the code and increase it by upgrading the version.

├── binlogstream_expand.py
├── constants_expand
│   ├── BINLOG.py
│   └── init.py
├── event_expand
│   ├── event.py
│   └── init.py
├── main.py
├── packet_expand
│   ├── packet.py
│   └── init.py
├── requirements.txt
└── venv

main.py

def main():
    stream = BinlogGtid(
        connection_settings=MYSQL_SETTINGS,
        server_id=623365281,
        start_file="binlog.000004",
        # only_events=[TDSQLGtidEvent],
        map_events={TDSQL_GLOBAL_GTID_EVENT: TDSQLGtidEvent},
    )
    stream.get_gtid_events()


if __name__ == "__main__":
    main()

binlogstream_expand.py

init add map_events
map_events = {TDSQL_GLOBAL_GTID_EVENT: TDSQLGtidEvent}

class BinLogStreamReaderV2(object):
    """Connect to replication stream and read event
    """
    report_slave = None

    def __init__(self, connection_settings, server_id, ctl_connection_settings=None, resume_stream=False,
                 blocking=False, only_events=None, log_file=None, log_pos=None,
                 filter_non_implemented_events=True,
                 ignored_events=None, auto_position=None,
                 only_tables=None, ignored_tables=None,
                 only_schemas=None, ignored_schemas=None,
                 freeze_schema=False, skip_to_timestamp=None,
                 report_slave=None, slave_uuid=None,
                 pymysql_wrapper=None,
                 fail_on_table_metadata_unavailable=False,
                 slave_heartbeat=None, map_events=None):


    def fetchone(self):
        while True:
            if not self.__connected_stream:
                self.__connect_to_stream()

            if not self.__connected_ctl:
                self.__connect_to_ctl()

            try:
                if pymysql.__version__ < "0.6":
                    pkt = self._stream_connection.read_packet()
                else:
                    pkt = self._stream_connection._read_packet()
            except pymysql.OperationalError as error:
                code, message = error.args
                if code in MYSQL_EXPECTED_ERROR_CODES:
                    self._stream_connection.close()
                    self.__connected_stream = False
                    continue

            if pkt.is_eof_packet():
                self.close()
                return None

            if not pkt.is_ok_packet():
                continue

            binlog_event = BinLogPacketWrapperV2(pkt, self.table_map,
                                                 self._ctl_connection,
                                                 self.__use_checksum,
                                                 self.__allowed_events_in_packet,
                                                 self.__only_tables,
                                                 self.__ignored_tables,
                                                 self.__only_schemas,
                                                 self.__ignored_schemas,
                                                 self.__freeze_schema,
                                                 self.__fail_on_table_metadata_unavailable,
                                                 self.map_events)

            if binlog_event.event_type == ROTATE_EVENT:
                self.log_pos = binlog_event.event.position
                self.log_file = binlog_event.event.next_binlog
                # Table ID in binlog are NOT persistent in MySQL - they are in-memory identifiers
                # that means that when MySQL master restarts, it will reuse same table id for different tables
                # which will cause errors for us since our in-memory map will try to decode row data with
                # wrong table schema.
                # The fix is to rely on the fact that MySQL will also rotate to a new binlog file every time it
                # restarts. That means every rotation we see *could* be a sign of restart and so potentially
                # invalidates all our cached table id to schema mappings. This means we have to load them all
                # again for each logfile which is potentially wasted effort, but we can't really do much better
                # without being broken in restart case
                self.table_map = {}
            elif binlog_event.log_pos:
                self.log_pos = binlog_event.log_pos

            # This check must not occur before clearing the ``table_map`` as a
            # result of a RotateEvent.
            #
            # The first RotateEvent in a binlog file has a timestamp of
            # zero.  If the server has moved to a new log and not written a
            # timestamped RotateEvent at the end of the previous log, the
            # RotateEvent at the beginning of the new log will be ignored
            # if the caller provided a positive ``skip_to_timestamp``
            # value.  This will result in the ``table_map`` becoming
            # corrupt.
            #
            # https://dev.mysql.com/doc/internals/en/event-data-for-specific-event-types.html
            # From the MySQL Internals Manual:
            #
            #   ROTATE_EVENT is generated locally and written to the binary
            #   log on the master. It is written to the relay log on the
            #   slave when FLUSH LOGS occurs, and when receiving a
            #   ROTATE_EVENT from the master. In the latter case, there
            #   will be two rotate events in total originating on different
            #   servers.
            #
            #   There are conditions under which the terminating
            #   log-rotation event does not occur. For example, the server
            #   might crash.
            if self.skip_to_timestamp and binlog_event.timestamp < self.skip_to_timestamp:
                continue

            if binlog_event.event_type == TABLE_MAP_EVENT and \
                    binlog_event.event is not None:
                self.table_map[binlog_event.event.table_id] = \
                    binlog_event.event.get_table()

            # event is none if we have filtered it on packet level
            # we filter also not allowed events
            if binlog_event.event is None or (binlog_event.event.__class__ not in self.__allowed_events):
                continue

            return binlog_event.event

packet.py

class BinLogPacketWrapperV2(object):
    """
    Bin Log Packet Wrapper. It uses an existing packet object, and wraps
    around it, exposing useful variables while still providing access
    to the original packet objects variables and methods.
    """

    __event_map = {
        # event
        constants.QUERY_EVENT: event.QueryEvent,
        constants.ROTATE_EVENT: event.RotateEvent,
        constants.FORMAT_DESCRIPTION_EVENT: event.FormatDescriptionEvent,
        constants.XID_EVENT: event.XidEvent,
        constants.INTVAR_EVENT: event.IntvarEvent,
        constants.GTID_LOG_EVENT: event.GtidEvent,
        constants.STOP_EVENT: event.StopEvent,
        constants.BEGIN_LOAD_QUERY_EVENT: event.BeginLoadQueryEvent,
        constants.EXECUTE_LOAD_QUERY_EVENT: event.ExecuteLoadQueryEvent,
        constants.HEARTBEAT_LOG_EVENT: event.HeartbeatLogEvent,
        # row_event
        constants.UPDATE_ROWS_EVENT_V1: row_event.UpdateRowsEvent,
        constants.WRITE_ROWS_EVENT_V1: row_event.WriteRowsEvent,
        constants.DELETE_ROWS_EVENT_V1: row_event.DeleteRowsEvent,
        constants.UPDATE_ROWS_EVENT_V2: row_event.UpdateRowsEvent,
        constants.WRITE_ROWS_EVENT_V2: row_event.WriteRowsEvent,
        constants.DELETE_ROWS_EVENT_V2: row_event.DeleteRowsEvent,
        constants.TABLE_MAP_EVENT: row_event.TableMapEvent,
        # 5.6 GTID enabled replication events
        constants.ANONYMOUS_GTID_LOG_EVENT: event.NotImplementedEvent,
        constants.PREVIOUS_GTIDS_LOG_EVENT: event.NotImplementedEvent,

        # ADD TDSQL GTID EVENTS
        TDSQL_GLOBAL_GTID_EVENT: event_exp.TDSQLGtidEvent
    }

    def __init__(self, from_packet, table_map, ctl_connection, use_checksum,
                 allowed_events,
                 only_tables,
                 ignored_tables,
                 only_schemas,
                 ignored_schemas,
                 freeze_schema,
                 fail_on_table_metadata_unavailable,
                 map_events):
        if map_events:
            self.__event_map.update(map_events)

Sorry for the late response. It's indeed an interesting problem. I wonder if we could allow to inherit from the base class to extend them in the user code and have settings to replace the base class by the new one .