airbytehq/PyAirbyte

Add an option not to create schema for SnowflakeCache

nakamichiworks opened this issue ยท 4 comments

When I use SnowflakeCache, pyairbyte always tries to create schema if not exists ... in SqlProcessorBase._ensure_schema_exists().

This behavior is, however, not desirable if the target schema already exists and I don't want the snowflake user to have privileges to create or update the schema. (In such situations, _ensure_schema_exists() throws an error like "Insufficient privileges to operate on database 'your_database'" even if the schema already exists.)

Could you add an option to avoid this error?

Currently, my workaround is to define a wrapper class like this:

from airbyte._processors.sql.snowflake import SnowflakeSqlProcessor
from airbyte.caches import SnowflakeCache
from pydantic import PrivateAttr

class PatchedSnowflakeSqlProcessor(SnowflakeSqlProcessor):
    def _ensure_schema_exists(self) -> None:
        return None


class PatchedSnowflakeCache(SnowflakeCache):
    _sql_processor_class = PrivateAttr(default=PatchedSnowflakeSqlProcessor)

@nakamichiworks - Thanks for reporting this! Can you clarify for me if the "Insufficient privileges" failure is fatal (failing execution)? (I think "yes", but just want to confirm.)

If so, I can suggest that we add this specific error message as ignored, similar to how we ignore "already exists" error in that code path today.

Another option is just to always ignore this error, since presumably a subsequent failure will occur when tables are attempted to be created.

A third option would be to use the native Snowflake library to check for schema existence, which would be faster than the SQLAlchemy-based implementation. In the snowflake SQL processor class, there's a method called "get_vendor_client()" which returns a native client for the Snowflake python library.

Very open to ideas here and we'd be happy to accept a PR if you have time to contribute.

Can you clarify for me if the "Insufficient privileges" failure is fatal (failing execution)?

Yes, create schema if not exists ... fails to execute for a snowflake user with the insufficient schema privilege even if the schema already exists.

Among proposed options, the second option (always-ignore-error strategy) is simple and acceptable for my use case.

The first option (ignoring specific error messages) seems fragile. I think we will end up adding more error messages whenever adding new types of caches.

On the third option (vendor specific schema existence check), I don't think we can check schema existence much faster even if we use the native snowflake library. AFAIK, snowflake-sqlalchemy already use show schemas to get the list of existing schemas, which I think the only reliable way to check schema existence (https://github.com/snowflakedb/snowflake-sqlalchemy/blob/58fb1bd556cddbd2b7cf2b8ccebf0ead3363e113/src/snowflake/sqlalchemy/snowdialect.py#L824).

@nakamichiworks - Second option sounds good to me! I've added the accepting pull requests label. Would welcome a contribution PR if you have time. Thanks! ๐Ÿ™

While creating a PR, I found that the root cause of my error was not about schema privileges.

_ensure_schema_exists() failed in my code because the method has inconsistencies about case-sensitivity of schema names.

# My application code
cache = SnowflakeCache(
    # Existing schema name that is case-insensitive. Both "MY_SCHEMA" and "my_schema" should work
    schema_name="MY_SCHEMA",
    ... # Other database parameters
)
source = ab.get_source(...)
result = source.read(cache)

# airbyte/_future_cdk/sql_processor.py
class SqlProcessorBase(RecordProcessorBase):
    ...

    def _ensure_schema_exists(self) -> None:
        # schema_name = "MY_SCHEMA"
        schema_name = self.sql_config.schema_name
        # sqlalchemy returns normalized (i.e. lowercase) names: self._get_schemas_list() = ["my_schema", ...]
        # if clause is incorrectly evaluated to be False.
        if schema_name in self._get_schemas_list():
            return

        # try to create MY_SCHEMA, which led to "Insufficient privileges ..." error
        sql = f"CREATE SCHEMA IF NOT EXISTS {schema_name}"
        try:
            self._execute_sql(sql)

        ...

I will submit a PR to fix this.