astronomer/astro-sdk

Include Snowflake staged file metadata in LoadFileOperator

Andrew-Wichmann opened this issue · 4 comments

Please describe the feature you'd like to see
I would like to be able to include the Snowflake metadata for staged data in the table that I'm loading data into using the LoadFileOperator.

Describe the solution you'd like

    new_table = aql.load_file(
        input_file=File(path="s3://astro-sdk/python_sdk/example_dags/data/sample.csv"),
        output_table=Table(
            conn_id="snowflake_conn",
        ),
        # here, I would like an option to include the METADATA$XYZ columns in the output table
        load_options=[
            SnowflakeMetadataOptions(
                include=["METADATA$FILENAME", "METADATA$FILE_ROW_NUMBER", ...]
            )
        ],
    )

Are there any alternatives to this feature?
I could (and will if instructed to by senior management) use a SnowflakeOperator to perform the COPY INTO command exactly how I want it instead of this nice shiny astro-sdk.

Additional context
Nope. I'm really enjoying the toolchain so far, and I hope the ease of use continues. Great work.

Acceptance Criteria

  • All checks and tests in the CI should pass
  • Unit tests (90% code coverage or more, once available)
  • Integration tests (if the feature relates to a new database or external service)
  • Example DAG
  • Docstrings in reStructuredText for each of methods, classes, functions and module-level attributes (including Example DAG on how it should be used)
  • Exception handling in case of errors
  • Logging (are we exposing useful information to the user? e.g. source and destination)
  • Improve the documentation (README, Sphinx, and any other relevant)
  • How to use Guide for the feature (example)

@pankajkoti adding it to your backlog

Okay @phanikumv 👍🏾

hi @Andrew-Wichmann

We have just released Astro SDK 1.7.0 https://pypi.org/project/astro-sdk-python/1.7.0/ which includes the PR #2023 supporting the implementation for this issue.

Would you like to try out? Here is an example operator usage

aql.load_file(
task_id="s3_to_snowflake_native_with_metadata_columns",
input_file=File("s3://astro-sdk/python_sdk/example_dags/data/sample.csv", conn_id=AWS_CONN_ID),
output_table=Table(
conn_id=SNOWFLAKE_CONN_ID,
),
load_options=[
SnowflakeLoadOptions(
file_options={"SKIP_HEADER": 1, "SKIP_BLANK_LINES": True},
copy_options={"ON_ERROR": "CONTINUE"},
metadata_columns=[
"METADATA$FILENAME",
"METADATA$FILE_ROW_NUMBER",
"METADATA$FILE_CONTENT_KEY",
"METADATA$FILE_LAST_MODIFIED",
"METADATA$START_SCAN_TIME",
],
)
],
)

Nice thank you!