meltano/sdk

Add support for ACTIVATE_VERSION message types

Opened this issue · 4 comments

Migrated from GitLab: https://gitlab.com/meltano/sdk/-/issues/18

Originally created by @aaronsteers on 2021-01-06 23:04:29


From the singer-python library:

ACTIVATE_VERSION message (EXPERIMENTAL).

The ACTIVATE_VERSION messages has these fields:
  * stream - The name of the stream.
  * version - The version number to activate.

This is a signal to the Target that it should delete all previously
seen data and replace it with all the RECORDs it has seen where the
record's version matches this version number.

Note that this feature is experimental. Most Taps and Targets should
not need to use the "version" field of "RECORD" messages or the
"ACTIVATE_VERSION" message at all.

msg = singer.ActivateVersionMessage(
    stream='users',
    version=2)

Implementing for taps:

I think we can safely implement for taps and send the message by default. For cases where targets cannot tolerate the unknown message types, we should support a disable_activate_version_messages=True option.

When FULL_TABLE replication is selected in the tap:

  1. Initialize a version number (likely an epoch-based integer): https://github.com/transferwise/pipelinewise-tap-snowflake/blob/aa89f2e4235999dbeafc7406a7f8b382542d8d5b/tap_snowflake/sync_strategies/common.py#L33
  2. Include version as property within emitted RECORD messages. https://github.com/transferwise/pipelinewise-tap-snowflake/blob/aa89f2e4235999dbeafc7406a7f8b382542d8d5b/tap_snowflake/sync_strategies/common.py#L200
  3. Emit ACTIVATE_VERSION at the beginning of the first FULL_TABLE sync operation: https://github.com/transferwise/pipelinewise-tap-snowflake/blob/aa89f2e4235999dbeafc7406a7f8b382542d8d5b/tap_snowflake/sync_strategies/full_table.py#L87-L95
  4. Emit ACTIVATE_VERSION after a successful FULL_TABLE sync: https://github.com/transferwise/pipelinewise-tap-snowflake/blob/aa89f2e4235999dbeafc7406a7f8b382542d8d5b/tap_snowflake/sync_strategies/full_table.py#L114

On revisiting this issue, I found we already have handling available for targets and sinks in the SDK:

def _process_activate_version_message(self, message_dict: dict) -> None:
"""Handle the optional ACTIVATE_VERSION message extension.
Args:
message_dict: TODO
"""
stream_name = message_dict["stream"]
sink = self.get_sink(stream_name)
sink.activate_version(message_dict["version"])

  • To implement ACTIVATE_VERSION in targets, the developer simply overrides Sink.activate_version(version_num: int).
  • SQLSink classes automatically get some handling.
    • This implementation seems to have some bugs, such as hard_delete being searched for but never declared or advertised in docs. We should add testing and/or a reference implementation in target-postgres or target-snowflake.
  • Whenever run in FULL_TABLE mode, Stream should be modified with the following:
  • At the beginning of the stream, calculate version integer (or perhaps full_table_version for clarity) as an epoch form of the streams' signpost timestamp calculation.
    • At the end of the stream, send an ACTIVATE_VERSION message.
  • The docs site needs a page for ACTIVATE_VERSION message in our "Implementation Reference" section.

I just closed #607 as stale. I did not have bandwidth at the time to get everything polished/tested. But future developers could definitely use this as a starting point.

What is the current state of ACTIVATE_VERSION message type for taps/targets?