MeltanoLabs/tap-snowflake

Limit Discoverable Tables

Closed this issue ยท 10 comments

The pipelinewise variant requires a tables setting to define the tables to discover from. We should consider something along the same lines for this implementation to make it easier to select a subset of tables.

I had two use cases that I wanted to highlight at a conceptual level since we can probably come up with a better solution than the current pipelinewise configs:

  1. Schema/table filters

I have multiple environments that are isolated using different databases and schemas. I think this is a pretty common pattern. I'd like to set my database and schema in my meltano environment configuration then the stream selection criteria in my base non-environment configurations. This allows me to configure streams once to share across all environments and only toggle my schema/database in the environment configs. Today I'm doing this with the tables setting in squared then I use my share selection criteria with wild cards for the database/schema stream prefix.

This breaks down when I use prefixes in the same schema for isolation, I do this in CI. For example CICD_PROD.{branch_name}_schema_x.table_y. In this case my hack of using wildcard selects breaks down and all variations of this table get synced. The schema config today doesnt seem to exclude other schemas.

  1. Limiting discovery time

My database isnt too big so its not a huge deal for me right now but I did notice a decent slow down for discovery once I switched to this variant since it looks like its discovering all tables. Maybe SQLAlchemy + the information schema limits this from growing out of control somehow but some users might still want limits.

The pipelinewise implementation is primarily used to avoid long running queries during discovery. Although it's not the best implementation because:

  • its a comma separated string
  • it accepts tables using dot notation which I got tripped up on because select criteria needs to be converted to stream underscore + dash syntax so keeping the two straight was tricky.

@kgpayne what do you think? I wanted to get my thoughts down after working on meltano/squared#425 and running into a few things. I'm happy to move this to the SDK repo if this is generically useful.

Thanks @pnadolny13, this is great context! 100% agree this is a problem, especially in the snowflake case as unnecessary metadata queries consume credits.

My preference would be to use the existing selection syntax and mechanisms to source the table filter list, with a flag to enable/disable the behaviour (i.e. discover all). I think 'limit discovery to selected streams' is a helpful default, but it should also be overridable. This would effectively rely on the catalog to supply the filter, rather than config.json, but means we aren't duplicating/polluting config.json with stream-level config. We should also interpret no provided catalog as a case to discover all.

For use-case number 1. I think this issue to add support for env var interpolation in select blocks would compliment the above approach. This would allow you to inject the database/schema names into your selection patterns at catalog generation time (i.e. in Meltano) and allow the SDK to just read the catalogs "selected": true property verbatim to filter discovery ๐Ÿš€

I have created this issue in the sdk to track the idea and work ๐Ÿ™‚

@kgpayne thanks for the response I put my thoughts over in the SDK issue.

@kgpayne @aaronsteers can you all help me understand the timing on all of this? I'm curious when I'll be able to use tap-snowflake in production for Squared. I see this issue spun out a few SDK issues to solve it generically, which is probably a smart idea:

Do we have a timeline for when those will be prioritized?

cc @tayloramurphy

@pnadolny13 - There are many issues ahead of these in priority. Can you help me understand why these would be blockers?

@aaronsteers I described my use cases above in this issue, hoping to keep them generic enough for you all to find the best solution vs receiving this as a feature spec. I explained in meltano/squared#425 (comment) but basically I tried to swap this tap in for my current variant in Squared and was stuck because I have no way of injecting my "isolated" CI environment branch_name prefixes (e.g. CICD_PROD.{branch_name}_schema_x.table_y) into selection criteria. So my CI tests with this variant have no way to only pull data from a schema with the matching branch_name prefix, it pulls from all schemas in the CICD_PROD database, causing my tests to fail when many tables are synced vs 1 as expected. Does that make sense?

If I follow this chain of issues It looks like this one gets linked eventually, if thats the recommended path forward for this use case then I'd say thats the issue thats blocking me. I can move my prioritization question over there. What do you think?

@pnadolny13 - Thank you for the additional context above and in meltano/squared#425 (comment).

Can you clarify a few points?

  1. In the source code (below), I don't see any reference to the tables config of the pipelinewise tap-snowflake extractor. Can you confirm that tables config feature is being used, and if so, how/where it is it implemented?
    • The tables implementation is non-standard and I don't know yet if there's sufficient rationale to bring that into the SDK.
  2. It looks like all staging and prod use cases share the same DB name, so is it correct that this only would block 'reverse-etl' use cases, and only in CI/CD?
    • If the issue is only affecting the CI/CD environment, then additional options would be:
      1. Insert a command-line step in the CICD engine to apply custom rules - with interpolation handled by the bash shell.
      2. Just skip the publish step in CI/CD.
  3. Looking at the source code, it looks like each instance of the reverse ETL only pulls from a single source schema. If we add support filtering for a schema_name (singular) or schema_names (plural), could this also solve the issue? (Benefit of the simpler solve is that it has smaller scope and no downsides or new technical challenges to tackle. It's also a common-sense addition to the SDK for SQL Taps.)
Code view

https://github.com/meltano/squared/blob/b44f7fbe3f4a4d7f9e81026c8d797e5a4f350b46/data/extract/extractors.meltano.yml#L139-L186

For context, none of these are yet decided that we will+should implement. These all have downsides, tradeoffs, and/or come with cost of significant additional/new surface area:

Related issue list

Given limited timelines and the need to switch priorities back to Cloud, SDK 1.0, and Meltano core, we need to evaluate all the available workaround or 'standard solutions' for these problems before we can make the call on implementing net new features.

This is a blocker for us as well for two reasons:

  1. We currently have over 1000+ schemas in Snowflake making discovery too slow for comfort
  2. We limit the permissions of the users connecting to Snowflake, but this method requires a user that has access to all schemas and tables for discovery which is not possible for us. If the user does not have enough permissions the following exception is triggered and discovery fails:
Traceback (most recent call last):
  File "/home/devcontainer/bazel/output_base/execroot/data_platform/bazel-out/aarch64-fastbuild/bin/meltano/plugins/extractors/tap_snowflake_meltano/tap_snowflake_meltano_tap-snowflake.runfiles/data_platform/meltano/plugins/extractors/tap_snowflake_meltano/tap_snowflake_meltano_tap_snowflake.py", line 23, in <module>
    __main__()
  File "/home/devcontainer/bazel/output_base/execroot/data_platform/bazel-out/aarch64-fastbuild/bin/meltano/plugins/extractors/tap_snowflake_meltano/tap_snowflake_meltano_tap-snowflake.runfiles/data_platform/meltano/plugins/extractors/tap_snowflake_meltano/tap_snowflake_meltano_tap_snowflake.py", line 19, in __main__
    TapSnowflake.cli()
  File "/home/devcontainer/bazel/output_base/execroot/data_platform/bazel-out/aarch64-fastbuild/bin/meltano/plugins/extractors/tap_snowflake_meltano/tap_snowflake_meltano_tap-snowflake.runfiles/tap_snowflake_meltano_click/site-packages/click/core.py", line 1130, in __call__
    return self.main(*args, **kwargs)
  File "/home/devcontainer/bazel/output_base/execroot/data_platform/bazel-out/aarch64-fastbuild/bin/meltano/plugins/extractors/tap_snowflake_meltano/tap_snowflake_meltano_tap-snowflake.runfiles/tap_snowflake_meltano_click/site-packages/click/core.py", line 1055, in main
    rv = self.invoke(ctx)
  File "/home/devcontainer/bazel/output_base/execroot/data_platform/bazel-out/aarch64-fastbuild/bin/meltano/plugins/extractors/tap_snowflake_meltano/tap_snowflake_meltano_tap-snowflake.runfiles/tap_snowflake_meltano_click/site-packages/click/core.py", line 1404, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/home/devcontainer/bazel/output_base/execroot/data_platform/bazel-out/aarch64-fastbuild/bin/meltano/plugins/extractors/tap_snowflake_meltano/tap_snowflake_meltano_tap-snowflake.runfiles/tap_snowflake_meltano_click/site-packages/click/core.py", line 760, in invoke
    return __callback(*args, **kwargs)
  File "/home/devcontainer/bazel/output_base/execroot/data_platform/bazel-out/aarch64-fastbuild/bin/meltano/plugins/extractors/tap_snowflake_meltano/tap_snowflake_meltano_tap-snowflake.runfiles/tap_snowflake_meltano_singer_sdk/site-packages/singer_sdk/tap_base.py", line 492, in cli
    tap = cls(  # type: ignore  # Ignore 'type not callable'
  File "/home/devcontainer/bazel/output_base/execroot/data_platform/bazel-out/aarch64-fastbuild/bin/meltano/plugins/extractors/tap_snowflake_meltano/tap_snowflake_meltano_tap-snowflake.runfiles/tap_snowflake_meltano_singer_sdk/site-packages/singer_sdk/tap_base.py", line 543, in __init__
    super().__init__(
  File "/home/devcontainer/bazel/output_base/execroot/data_platform/bazel-out/aarch64-fastbuild/bin/meltano/plugins/extractors/tap_snowflake_meltano/tap_snowflake_meltano_tap-snowflake.runfiles/tap_snowflake_meltano_singer_sdk/site-packages/singer_sdk/tap_base.py", line 97, in __init__
    self.mapper.register_raw_streams_from_catalog(self.catalog)
  File "/home/devcontainer/bazel/output_base/execroot/data_platform/bazel-out/aarch64-fastbuild/bin/meltano/plugins/extractors/tap_snowflake_meltano/tap_snowflake_meltano_tap-snowflake.runfiles/tap_snowflake_meltano_singer_sdk/site-packages/singer_sdk/tap_base.py", line 159, in catalog
    self._catalog = self.input_catalog or self._singer_catalog
  File "/home/devcontainer/bazel/output_base/execroot/data_platform/bazel-out/aarch64-fastbuild/bin/meltano/plugins/extractors/tap_snowflake_meltano/tap_snowflake_meltano_tap-snowflake.runfiles/tap_snowflake_meltano_singer_sdk/site-packages/singer_sdk/tap_base.py", line 251, in _singer_catalog
    for stream in self.streams.values()
  File "/home/devcontainer/bazel/output_base/execroot/data_platform/bazel-out/aarch64-fastbuild/bin/meltano/plugins/extractors/tap_snowflake_meltano/tap_snowflake_meltano_tap-snowflake.runfiles/tap_snowflake_meltano_singer_sdk/site-packages/singer_sdk/tap_base.py", line 122, in streams
    for stream in self.load_streams():
  File "/home/devcontainer/bazel/output_base/execroot/data_platform/bazel-out/aarch64-fastbuild/bin/meltano/plugins/extractors/tap_snowflake_meltano/tap_snowflake_meltano_tap-snowflake.runfiles/tap_snowflake_meltano_singer_sdk/site-packages/singer_sdk/tap_base.py", line 283, in load_streams
    for stream in self.discover_streams():
  File "/home/devcontainer/bazel/output_base/execroot/data_platform/bazel-out/aarch64-fastbuild/bin/meltano/plugins/extractors/tap_snowflake_meltano/tap_snowflake_meltano_tap-snowflake.runfiles/tap_snowflake_meltano_singer_sdk/site-packages/singer_sdk/tap_base.py", line 579, in discover_streams
    for catalog_entry in self.catalog_dict["streams"]:
  File "/home/devcontainer/bazel/output_base/execroot/data_platform/bazel-out/aarch64-fastbuild/bin/meltano/plugins/extractors/tap_snowflake_meltano/tap_snowflake_meltano_tap-snowflake.runfiles/tap_snowflake_meltano_singer_sdk/site-packages/singer_sdk/tap_base.py", line 567, in catalog_dict
    result["streams"].extend(connector.discover_catalog_entries())
  File "/home/devcontainer/bazel/output_base/execroot/data_platform/bazel-out/aarch64-fastbuild/bin/meltano/plugins/extractors/tap_snowflake_meltano/tap_snowflake_meltano_tap-snowflake.runfiles/tap_snowflake_meltano_meltanolabs_tap_snowflake/site-packages/tap_snowflake/client.py", line 77, in discover_catalog_entries
    for table_name, is_view in self.get_object_names(
  File "/home/devcontainer/bazel/output_base/execroot/data_platform/bazel-out/aarch64-fastbuild/bin/meltano/plugins/extractors/tap_snowflake_meltano/tap_snowflake_meltano_tap-snowflake.runfiles/tap_snowflake_meltano_singer_sdk/site-packages/singer_sdk/connectors/sql.py", line 317, in get_object_names
    table_names = inspected.get_table_names(schema=schema_name)
  File "/home/devcontainer/bazel/output_base/execroot/data_platform/bazel-out/aarch64-fastbuild/bin/meltano/plugins/extractors/tap_snowflake_meltano/tap_snowflake_meltano_tap-snowflake.runfiles/tap_snowflake_meltano_sqlalchemy/site-packages/sqlalchemy/engine/reflection.py", line 266, in get_table_names
    return self.dialect.get_table_names(
  File "<string>", line 2, in get_table_names
  File "/home/devcontainer/bazel/output_base/execroot/data_platform/bazel-out/aarch64-fastbuild/bin/meltano/plugins/extractors/tap_snowflake_meltano/tap_snowflake_meltano_tap-snowflake.runfiles/tap_snowflake_meltano_sqlalchemy/site-packages/sqlalchemy/engine/reflection.py", line 55, in cache
    ret = fn(self, con, *args, **kw)
  File "/home/devcontainer/bazel/output_base/execroot/data_platform/bazel-out/aarch64-fastbuild/bin/meltano/plugins/extractors/tap_snowflake_meltano/tap_snowflake_meltano_tap-snowflake.runfiles/tap_snowflake_meltano_snowflake_sqlalchemy/site-packages/snowflake/sqlalchemy/snowdialect.py", line 682, in get_table_names
    cursor = connection.execute(
  File "/home/devcontainer/bazel/output_base/execroot/data_platform/bazel-out/aarch64-fastbuild/bin/meltano/plugins/extractors/tap_snowflake_meltano/tap_snowflake_meltano_tap-snowflake.runfiles/tap_snowflake_meltano_sqlalchemy/site-packages/sqlalchemy/engine/base.py", line 1380, in execute
    return meth(self, multiparams, params, _EMPTY_EXECUTION_OPTS)
  File "/home/devcontainer/bazel/output_base/execroot/data_platform/bazel-out/aarch64-fastbuild/bin/meltano/plugins/extractors/tap_snowflake_meltano/tap_snowflake_meltano_tap-snowflake.runfiles/tap_snowflake_meltano_sqlalchemy/site-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection
    return connection._execute_clauseelement(
  File "/home/devcontainer/bazel/output_base/execroot/data_platform/bazel-out/aarch64-fastbuild/bin/meltano/plugins/extractors/tap_snowflake_meltano/tap_snowflake_meltano_tap-snowflake.runfiles/tap_snowflake_meltano_sqlalchemy/site-packages/sqlalchemy/engine/base.py", line 1572, in _execute_clauseelement
    ret = self._execute_context(
  File "/home/devcontainer/bazel/output_base/execroot/data_platform/bazel-out/aarch64-fastbuild/bin/meltano/plugins/extractors/tap_snowflake_meltano/tap_snowflake_meltano_tap-snowflake.runfiles/tap_snowflake_meltano_sqlalchemy/site-packages/sqlalchemy/engine/base.py", line 1943, in _execute_context
    self._handle_dbapi_exception(
  File "/home/devcontainer/bazel/output_base/execroot/data_platform/bazel-out/aarch64-fastbuild/bin/meltano/plugins/extractors/tap_snowflake_meltano/tap_snowflake_meltano_tap-snowflake.runfiles/tap_snowflake_meltano_sqlalchemy/site-packages/sqlalchemy/engine/base.py", line 2124, in _handle_dbapi_exception
    util.raise_(
  File "/home/devcontainer/bazel/output_base/execroot/data_platform/bazel-out/aarch64-fastbuild/bin/meltano/plugins/extractors/tap_snowflake_meltano/tap_snowflake_meltano_tap-snowflake.runfiles/tap_snowflake_meltano_sqlalchemy/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/home/devcontainer/bazel/output_base/execroot/data_platform/bazel-out/aarch64-fastbuild/bin/meltano/plugins/extractors/tap_snowflake_meltano/tap_snowflake_meltano_tap-snowflake.runfiles/tap_snowflake_meltano_sqlalchemy/site-packages/sqlalchemy/engine/base.py", line 1900, in _execute_context
    self.dialect.do_execute(
  File "/home/devcontainer/bazel/output_base/execroot/data_platform/bazel-out/aarch64-fastbuild/bin/meltano/plugins/extractors/tap_snowflake_meltano/tap_snowflake_meltano_tap-snowflake.runfiles/tap_snowflake_meltano_sqlalchemy/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
    cursor.execute(statement, parameters)
  File "/home/devcontainer/bazel/output_base/execroot/data_platform/bazel-out/aarch64-fastbuild/bin/meltano/plugins/extractors/tap_snowflake_meltano/tap_snowflake_meltano_tap-snowflake.runfiles/tap_snowflake_meltano_snowflake_connector_python/site-packages/snowflake/connector/cursor.py", line 827, in execute
    Error.errorhandler_wrapper(self.connection, self, error_class, errvalue)
  File "/home/devcontainer/bazel/output_base/execroot/data_platform/bazel-out/aarch64-fastbuild/bin/meltano/plugins/extractors/tap_snowflake_meltano/tap_snowflake_meltano_tap-snowflake.runfiles/tap_snowflake_meltano_snowflake_connector_python/site-packages/snowflake/connector/errors.py", line 275, in errorhandler_wrapper
    handed_over = Error.hand_to_other_handler(
  File "/home/devcontainer/bazel/output_base/execroot/data_platform/bazel-out/aarch64-fastbuild/bin/meltano/plugins/extractors/tap_snowflake_meltano/tap_snowflake_meltano_tap-snowflake.runfiles/tap_snowflake_meltano_snowflake_connector_python/site-packages/snowflake/connector/errors.py", line 330, in hand_to_other_handler
    cursor.errorhandler(connection, cursor, error_class, error_value)
  File "/home/devcontainer/bazel/output_base/execroot/data_platform/bazel-out/aarch64-fastbuild/bin/meltano/plugins/extractors/tap_snowflake_meltano/tap_snowflake_meltano_tap-snowflake.runfiles/tap_snowflake_meltano_snowflake_connector_python/site-packages/snowflake/connector/errors.py", line 209, in default_errorhandler
    raise error_class(
sqlalchemy.exc.ProgrammingError: (snowflake.connector.errors.ProgrammingError) 002043 (02000): 01aa4e60-0405-b3de-0000-5fa13ae46e42: SQL compilation error:
Object does not exist, or operation cannot be performed.

Actually a bit surprised that the .get_schema_names method from SQLAlchemy returns ALL schemas even though the connection uri has a specified database and schema name, but that's beyond the scope of this issue I guess ๐Ÿ˜….

Thank you for sharing that context @mvgijssel !

@pnadolny13 looks like AJ had a few questions sent your way - can you answer those when you get a chance?

@aaronsteers thanks for looking into my use cases for this!

In the source code (below), I don't see any reference to the tables config of the pipelinewise tap-snowflake extractor. Can you confirm that tables config feature is being used, and if so, how/where it is it implemented?

I use stream selection in the base config then I use the table config in the environment specific configs because the name changes between environments.

It looks like all staging and prod use cases share the same DB name, so is it correct that this only would block 'reverse-etl' use cases, and only in CI/CD?

They dont use the same DB name, for example prod is PROD.MELTANO_HUB.FACT_HUB_METRICS and userdev is USERDEV_PROD.${USER_PREFIX}_MELTANO_HUB.FACT_HUB_METRICS. But yes this only affects 'reverse-etl' use cases for me, I would guess that many of the tap-snowflake use cases would be 'reverse-etl'.

Looking at the source code, it looks like each instance of the reverse ETL only pulls from a single source schema. If we add support filtering for a schema_name (singular) or schema_names (plural), could this also solve the issue? (Benefit of the simpler solve is that it has smaller scope and no downsides or new technical challenges to tackle. It's also a common-sense addition to the SDK for SQL Taps.)

Yeah I think if I could filter for a database and a schema that would support my use cases. The downside is that we'd need a distinct inherited tap for each reverse etl implementation, but I have to do that already today so its not a worse experience than status quo.

Closed by #15

FYI @pnadolny13 @mvgijssel