gabledata/recap

Implement SnowflakeReader

Closed this issue ยท 4 comments

Discussion on #257 has lead me to to try and implement a SnowflakeReader using Snowflake's dbapi instead of SQLAlchemy.

Here's what I have so far:

import re

from snowflake.connector import connect

from recap.types import BoolType, BytesType, FloatType, IntType, StringType, StructType


class SnowflakeReader:
    def __init__(self, **kwargs):
        self.con_params = kwargs

    def struct(self, table: str) -> StructType:
        """
        Return a StructType for the given table.
        """
        fields = []

        with connect(**self.con_params) as con, con.cursor() as cur:
            cur.execute(f"DESCRIBE TABLE {table}")
            for row in cur:
                col_type = row[1]
                params = self._parse_parameters(col_type)
                col_type = col_type.split("(")[0]

                if col_type in {
                    "NUMBER",
                    "DECIMAL",
                    "NUMERIC",
                    "INT",
                    "INTEGER",
                    "BIGINT",
                    "SMALLINT",
                    "TINYINT",
                    "BYTEINT",
                }:
                    col_type = BytesType(
                        logical="build.recap.Decimal",
                        bytes_=32,
                        variable=False,
                        precision=params[0] if params else 38,
                        scale=params[1] if params else 0,
                    )
                elif col_type in {
                    "FLOAT",
                    "FLOAT4",
                    "FLOAT8",
                    "DOUBLE",
                    "DOUBLE PRECISION",
                    "REAL",
                }:
                    col_type = FloatType(bits=64)
                elif col_type in {
                    "VARCHAR",
                    "STRING",
                    "TEXT",
                    "NVARCHAR",
                    "NVARCHAR2",
                    "CHAR VARYING",
                    "NCHAR VARYING",
                }:
                    col_type = StringType(bytes_=int(params[0]) if params else 16_777_216)
                elif col_type in {"CHAR", "CHARACTER", "NCHAR"}:
                    col_type = StringType(bytes_=int(params[0]) if params else 1)
                elif col_type in {"BINARY", "VARBINARY"}:
                    col_type = BytesType(bytes_=int(params[0]) if params else 8_388_608)
                elif col_type == "BOOLEAN":
                    col_type = BoolType()
                elif col_type == "DATE":
                    col_type = IntType(bits=32, logical="build.recap.Date", unit="day")
                elif col_type == "TIME":
                    unit = self._get_time_unit(params)
                    col_type = IntType(bits=32, logical="build.recap.Time", unit=unit)
                elif col_type in {
                    "DATETIME",
                    "TIMESTAMP",
                    "TIMESTAMP_NTZ",
                    "TIMESTAMPNTZ",
                    "TIMESTAMP WITHOUT TIME ZONE",
                }:
                    unit = self._get_time_unit(params)
                    col_type = IntType(
                        bits=64,
                        logical="build.recap.Timestamp",
                        unit=unit,
                    )

                fields.append(col_type)

        return StructType(fields)

    def _parse_parameters(self, col_type: str) -> list[str] | None:
        """
        Parse types that have parameters.
        """
        match = re.search(r"\((.*?)\)", col_type)
        if match:
            return [p.strip() for p in match.group(1).split(",")]
        return None

    def _get_time_unit(self, params: list[str] | None) -> str:
        match params:
            case [unit, _] if int(unit) == 0:
                return "second"
            case [unit, _] if int(unit) <= 3:
                return "millisecond"
            case [unit, _] if int(unit) <= 6:
                return "microsecond"
            case _:
                return "nanosecond"

It supports nearly all of Snowflake's types. I opted not to deal with TIMESTAMP_TZ and TIMESTAMP_LTZ at the moment.

I've come across another issue, though. Snowflake has a VARIANT type, which is basically "Any" in Python, "Object" in Java, etc. Recap doesn't currently support this. The VARIANT type is important for Snowflake because both the OBJECT and LIST types use it (OBJECTs have a VARCHAR key and a VARIANT value to emulate JSON objects, and LIST items are all VARIANTS).

So, question is how to deal with this? Three potential ideas come to mind:

  1. Add an any type to Recap's type system.
  2. Change list's (and map's) values attribute from required to optional. An unset values would be treated as a a variant type.
  3. Do nothing. This would prevent OBJECT and ARRAY support in SnowflakeReader.

I'm partial to (2). It solves the immediate problem without adding the baggage of an any type. I really don't want to overcomplicate Recap's type system. I feel like an any type would have a lot of unintended consequences that I haven't thought through right now.

@cpard @adrianisk Looking for your thoughts/feedback.

Sorry I missed this somehow... I agree that adding an any type seems like overkill, and could cause problems down the line. Option 2 seems like the best idea, as it also fits the Postgres JSON types which I believe can only store arrays/objects, not primitives.

The potential problem I see with option 2 for Snowflake is that VARIANT really is an any type, there's nothing preventing someone from creating a VARIANT column and only storing strings in it, or even a mix of types. Going from Recap array/map -> VARIANT makes sense, but going from Snowflake VARIANT -> Recap still won't have a type that 100% makes sense. That being said, in practice I've really only seen VARIANT columns being used to store objects & arrays, so that's probably fine.

Actually I was wrong, took a closer look at the documentation I linked, and you can store primitives in Postgres' json type

-- Simple scalar/primitive value
-- Primitive values can be numbers, quoted strings, true, false, or null
SELECT '5'::json;
cpard commented

VARIANT type of snowflake is an interesting one.

I've primarily seen it used as part of ingestion pipelines during the "staging" of the data. You can see an example of how this works by checking the Airbyte connector for Snowflake. All data is inserted into a table with a column of type VARIANT, after this has happened, a DBT model is automatically generated based on schema information that Airbyte has and it gets applied to the VARIANT column, generating new tables and columns that the user can actually use.

The main benefit of having a VARIANT type is that your pipelines will not break because of schema conflicts. Obviously you are just pushing the problem down further the pipeline stages because at some point you will have to figure out how to deal with the schema changes. BUT, the ingestion process does not break and in some cases like the ones Airbyte deals with, moderate to low volume but expensive extraction, it does make sense to ensure that data gets delivered to the warehouse and the let data engineer deals with the issue later on.

Having said that, VARIANT makes sense for Snowflake but I don't believe that it makes sense for a system like Recap. Being expressive is good but becoming too expressive turns you into something that nobody understands. My recommendation would be to keep the VARIANT type out of Recap's type system for now.

Cool, so we're agreed on not supporting VARIANT in Recap.

I'm inclined to punt on dealing with Snowflake VARIANT for now. This will cause VARIANT columns to fail in SnowflakeReader, which I think is OK for the time being.

Another thought occurred to me: just treat VARIANT as a string no matter what. That will work reasonably well for mixed primitives in a variant col (e.g. "foo", 1, true, 3.0) but not for complex types. Again, I think that's fine.

I'm still going to punt on this for now, but leaving it as a note. I'll open a separate GH issue for VARIANT.