meltano/sdk

fix: Initialize `max_replication_key_value` via `SELECT max(<replication_key>) ...` before starting a native `BATCH` sync

Opened this issue ยท 20 comments

UPDATE (from @aaronsteers) per #976 (comment):

Since this is high urgency, I took an initial pass over the break here on tap-snowflake:

That work can be migrated here but I wanted to start with a 'real' implementation for testing and to prove the design approach.

Details

Singer SDK Version

0.10.0

Python Version

3.8

Bug scope

Taps (catalog, state, stream maps, etc.)

Operating System

macOS

Description

When passing previous state to a tap with batch mode enabled, that state is not made available via the documented Stream.get_starting_timestamp() and Stream.get_starting_replication_key_value() methods.

I believe this is because of missing setup of a starting_replication_value, which the methods above depend on to retrieve state. This is easiest to see when comparing the Stream._sync_records() and Stream._sync_batches() methods on singer_sdk.streams.core.Stream (snippets below). I think the critical missing call is self._write_starting_replication_value(current_context).

https://github.com/meltano/sdk/blob/main/singer_sdk/streams/core.py#L1034

Code

def _sync_records(
        self,
        context: dict | None = None,
        write_messages: bool = True,
    ) -> Generator[dict, Any, Any]:
        """Sync records, emitting RECORD and STATE messages.

        Args:
            context: Stream partition or context dictionary.
            write_messages: Whether to write Singer messages to stdout.

        Raises:
            InvalidStreamSortException: TODO

        Yields:
            Each record from the source.
        """
        record_count = 0
        current_context: dict | None
        context_list: list[dict] | None
        context_list = [context] if context is not None else self.partitions
        selected = self.selected

        for current_context in context_list or [{}]:
            partition_record_count = 0
            current_context = current_context or None
            state = self.get_context_state(current_context)
            state_partition_context = self._get_state_partition_context(current_context)
            self._write_starting_replication_value(current_context)
            child_context: dict | None = (
                None if current_context is None else copy.copy(current_context)
            )

            for record_result in self.get_records(current_context):
              ...
    def _sync_batches(
        self,
        batch_config: BatchConfig,
        context: dict | None = None,
    ) -> None:
        """Sync batches, emitting BATCH messages.

        Args:
            batch_config: The batch configuration.
            context: Stream partition or context dictionary.
        """
        for encoding, manifest in self.get_batches(batch_config, context):
            self._write_batch_message(encoding=encoding, manifest=manifest)
            self._write_state_message()

@edgarrmondragon I guess incremental replication with BATCH is still an open question, but it seems to me that this is worth adding for cases when i) users first enable BATCH having run incrementally previously or ii) self-manage a state.json (as I have done for testing). It is so close to 'just working', at least on the select side ๐Ÿ˜…

Yeah, so the implementation of Stream._sync_batches is missing the following:

  • Initial state via Stream._write_starting_replication_value
  • State update via State._increment_stream_state(record, context=...

They should be called at some point, but the latter relies on record-by-record sync so it's not useful really...

Makes sense ๐Ÿ™‚ In terms of the mechanism to retrieve the bookmark (because it cannot be automatically inferred from the batch itself without decompressing and reading the last line of the last file ๐Ÿ˜ฑ), what do you think of allowing the user to return it? Something like:

for encoding, manifest, state in self.get_batches(batch_config, context):
    ...

where state is just the max replication key value for that given manifest (defaulting to None)? For tap-snowflake I would then start a transaction and retrieve the max value before executing the copy command ๐Ÿš€

Note to self: @aaronsteers - reply here after some thoughts. โœ…

@kgpayne - thanks for raising this! @edgarrmondragon and I spoke about it today in our regular 1:1.

What about this approach?

  1. Run a "min/max query" before starting the batch.
    • Before extracting a batch, we use generic SQLAlchemy methods to collect the starting_replication_key_value and ending_replication_key_value.
    • Non-SQL implementations would need their own way of initializing these values, but presumably all SQL-based implementations could use something built into the SDK, such as:
      SELECT 
          min({self.replication_key}) AS starting_replication_key_value,
          max({self.replication_key}) AS ending_replication_key_value
      FROM {self.table_name} AS source_table
      WHERE {self.replication_key} >= {self.get_starting_replication_key_value()}
      
    • The reason of running before the batch executes is because running afterwards would in some cases cause a gap between the max value at time of unload versus the max value at time of post-query. That gap could and would cause data loss, which we would want to avoid.
  2. Use the min/max to limit the batch.
    • When extracting the batch, developers can optionally limit their output to be within the bounds of min/max (inclusive).
    • If records are not able to be limited by min or the max range, then some records may arrive more than once - which is acceptable according to spec.
      • This should be a small number of sources that have native BATCH support but no support for range filtering. Certainly not ideal, but we may want to consider just declaring that STATE isn't supported for those streams when run in BATCH mode, which would mean that basically BATCH on streams with no batch filtering capability would require replication mode of FULL_TABLE mode instead of INCREMENTAL.
  3. Send STATE messages as usual, but with output from the "min/max query".
    • The value of ending_replication_key_value would determine the STATE message content after the batch is transmitted.

Other notes:

@aaronsteers approach sounds good ๐Ÿ‘ What are the next steps? Would you like me to work on this, or will we ship tap-snowflake without incremental support in batch mode for now?

@kgpayne - Let's ship batch messaging as soon as it's available on tap- and target-snowflake. We need the example implementation as soon as possible for community members to reference.

If we can put placeholders, at discussed, that would light up based on upstream SDK capability getting implemented, that would be ideal.

Code comments and a note in the readme, with cross-links to the logged issue(s), would be much appreciated.

@aaronsteers based on your comment above, how does:

Other notes:

square with #1011? In my mind, they are the same feature - i.e. min, max and limit values are passed in the get_records/get_batches signatures, and in the incremental case the value of max is derived from a SQLAlchemy MAX(replication_key) ๐Ÿค” The tap would then i) only get records <= the max and ii) emit the max as its state value.

It would be good to get to a complete spec for tap-snowflake as it sounded from our 1:1 like having INCREMENTAL support sooner rather than later is preferable? Not least to set the overall pattern for anyone wanting to build their own Bigquery/Redshift etc. with incremental support ๐Ÿ˜…

@edgarrmondragon FYI

@kgpayne - Following from our past conversations, I logged a new proposal that would suggests a StreamFilter object that can be passed to get_records(), get_batches(), get_url_params(), etc.

This would eliminate the need for the developer directly calling get_starting_replication_key_value(), get_replication_key_signpost(), get_starting_timestamp(). The developer can then also completely ignore self.replication_method and self.replication_key, since all constraints are precalculated and baked into the filter object that is sent to the method.

This also bakes in the max_record_limit constraint discussed elsewhere.

cc @edgarrmondragon

@kgpayne and @edgarrmondragon - The above discussion regarding how to communicate constraints to get_records() and get_batches() are important, but I don't think they necessarily should be a blocker here.

The critical path item here, I think, is to something like the following to capture the max_replication_key_value even while not touching any records ourselves:

  1. Before the SDK calls SQLStream.get_batches(), we run using SQL Alchemy the equivalent of SELECT MIN(replication_key), MAX(replication_key) FROM source_table.
    • The min value has no function as of now, so we can omit it if we want from the query.
  2. We set the internal signpost value to the max replication key value.
    • This has no critical function, and is purely cosmetic for our purposes. It does not filter records - unlike we previously were thinking, because it is unsafe for us to do so without ensuring we then do another 'catchup sync' afterwards.
  3. After the batch messages are sent, the same signpost value (MAX(replication_key) from our original query) should be set as the max_replication_key_value - and then would be sent automatically in the subsequent STATE message for that stream.
  4. On subsequent executions: any implementation of SQLStream.get_batches() that consults either get_starting_replication_key_value() or the new/proposed StreamFilter argument will immediately get incremental support.

Does this sound right?

@kgpayne - If this sounds reasonable, feel free to pick up as you have availability. Delivery sooner will mean less rework for those who build on the tap-snowflake example over the coming weeks.

@kgpayne - Do I remember correctly that you had this fixed, either in SDK or in the Snowflake implementation?

@aaronsteers I think you might be thinking of this PR ๐Ÿ™‚:

@aaronsteers I think you might be thinking of this PR ๐Ÿ™‚:

@kgpayne Thanks. I didn't realize until now that you'd added the commits into the existing PR. Definitely we should try to merge that and get into the SDK. I'll see if I can get is merged while you are out next week, and maybe Derek can assist with adapting to the SDK.

As the fix (linked above) has now merged into tap-snowflake, next step is to port to the SDK.

What's the progress of this?

ping @kgpayne since this is assigned to you. I've added it to the SDK v1 issue as well.

@tayloramurphy @luisvicenteatprima this is implemented in tap-snowflake but not yet ported to the SDK. Its marked as v1, so should hopefully be picked up in this cadence ๐Ÿ‘

@edgarrmondragon FYI this was one of my next up which would be good to hand over ๐Ÿฃ

This is on my TODO list for the week.

Update: PR in #1894

Any progress on this? There haven't been any updates in the PR for almost 2 months.

Bumping to see if there's any progress on this? Thank you!