snowflakedb/snowpark-python

SNOW-1529368: JOIN followed by SELECT doesn't work as expected

sfc-gh-pkommini opened this issue · 6 comments

Please answer these questions before submitting your issue. Thanks!

  1. What version of Python are you using?

3.10

  1. What are the Snowpark Python and pandas versions in the environment?
pandas==2.2.2
snowflake-snowpark-python==1.19.0
  1. What did you do?

Script with local_testing session:

from snowflake.snowpark import Session
from snowflake.snowpark.types import (
    StringType,
    StructField,
    StructType,
    IntegerType,
)


local_session = Session.builder.config("local_testing", True).create()
qualified_table_name_1 = [
    "MY_DB",
    "MY_SCHEMA",
    "MY_TABLE_1",
]

qualified_table_name_2 = [
    "MY_DB",
    "MY_SCHEMA",
    "MY_TABLE_1",
]

a_schema = StructType(
    [
        StructField("a_common", IntegerType()),
        StructField("b_common", StringType()),
        StructField("a_specific", StringType(), nullable=True),
    ]
)

b_schema = StructType(
    [
        StructField("a_common", IntegerType()),
        StructField("b_common", StringType()),
        StructField("b_specific", StringType(), nullable=True),
    ]
)

mock_df_1 = local_session.create_dataframe(
    data=[[1, "a", "a_specific_1"], [2, "b", "a_specific_2"]],
    schema=a_schema,
)
mock_df_1.write.save_as_table(
    table_name=qualified_table_name_1,
    mode="overwrite",
    table_type="temporary",
)

mock_df_2 = local_session.create_dataframe(
    data=[[1, "a", "b_specific_1"], [2, "b", "b_specific_2"]],
    schema=b_schema,
)
mock_df_2.write.save_as_table(
    table_name=qualified_table_name_2,
    mode="overwrite",
    table_type="temporary",
)


df_table_1 = local_session.table(qualified_table_name_1)
df_table_2 = local_session.table(qualified_table_name_2)

df_join = df_table_1.join(
    df_table_2,
    on=(
        (df_table_1.col("a_common") == df_table_2.col("a_common"))
        & (df_table_1.col("b_common") == df_table_2.col("b_common"))
    ),
)
print(df_join.columns)
df_selected = df_join.select(["b_specific"])

# print(df_join.columns)
df_selected.show()

Script with live connection:

from snowflake.snowpark import Session
from snowflake.snowpark.types import (
    StringType,
    StructField,
    StructType,
    IntegerType,
)

connection_params = {
    "account": "my_account",
    "user": "my_user",
    "password": "my_password",
    "role": "my_role",
    "database": "my_db",
    "schema": "my_schema",
    "warehouse": "my_warehouse",
}
session = Session.builder.configs(connection_params).create()
qualified_table_name_1 = [
    "SNOWALERT",
    "DATA",
    "MY_TABLE_1",
]

qualified_table_name_2 = [
    "SNOWALERT",
    "DATA",
    "MY_TABLE_2",
]

a_schema = StructType(
    [
        StructField("a_common", IntegerType()),
        StructField("b_common", StringType()),
        StructField("a_specific", StringType(), nullable=True),
    ]
)

b_schema = StructType(
    [
        StructField("a_common", IntegerType()),
        StructField("b_common", StringType()),
        StructField("b_specific", StringType(), nullable=True),
    ]
)

mock_df_1 = session.create_dataframe(
    data=[[1, "a", "a_specific_1"], [2, "b", "a_specific_2"]],
    schema=a_schema,
)
mock_df_1.write.save_as_table(
    table_name=qualified_table_name_1,
    mode="overwrite",
)

mock_df_2 = session.create_dataframe(
    data=[[1, "a", "b_specific_1"], [2, "b", "b_specific_2"]],
    schema=b_schema,
)
mock_df_2.write.save_as_table(
    table_name=qualified_table_name_2,
    mode="overwrite",
)


df_table_1 = session.table(qualified_table_name_1)
df_table_2 = session.table(qualified_table_name_2)

df_join = df_table_1.join(
    df_table_2,
    on=(
        (df_table_1.col("a_common") == df_table_2.col("a_common"))
        & (df_table_1.col("b_common") == df_table_2.col("b_common"))
    ),
)
print(df_join.columns)
df_selected = df_join.select(df_table_2["b_specific"])

# print(df_join.columns)
df_selected.show()
  1. What did you expect to see?
    I expect to see table with selected column, instead I get an error and when I print df_join.columns, this is what I get. But this is not what we expect. Let me know if I'm missing something here.
['"l_wtxx_A_COMMON"', '"l_wtxx_B_COMMON"', '"l_wtxx_B_SPECIFIC"', '"r_hgyd_A_COMMON"', '"r_hgyd_B_COMMON"', '"r_hgyd_B_SPECIFIC"']

Meanwhile, live connection's output is:

['"l_j0xq_A_COMMON"', '"l_j0xq_B_COMMON"', 'A_SPECIFIC', '"r_6gvx_A_COMMON"', '"r_6gvx_B_COMMON"', 'B_SPECIFIC']

Diagnosis:

Table specific columns don’t need a random prefix as they don't cause name conflicts.

I played around with this a little and find this only occurrs when at least one side of the join is a Table instead of a DataFrame.

Found that this specific issue is due to both tables having the same name. Additionally, it also occurrs when we have a column name conflict and we're missing an lsuffix or rsuffix.

I think there's still a behavior difference with live connection in that live connection doesn't error out due to the lack of a lsuffix or rsuffix.

The behavior difference is that live connection calls out the error more clearly:

snowflake.snowpark.exceptions.SnowparkSQLAmbiguousJoinException: (1303): The reference to the column 'A_COMMON' is ambiguous. The column is present in both DataFrames used in the join. To identify the DataFrame that you want to use in the reference, use the syntax <df>["A_COMMON"] in join conditions and in select() calls on the result of the join. Alternatively, you can rename the column in either DataFrame for disambiguation. See the API documentation of the DataFrame.join() method for more details.

the emulator only says this:

snowflake.snowpark.mock.exceptions.SnowparkLocalTestingException: invalid identifier "A_COMMON"

thanks @sfc-gh-pkommini for the updates. closing the issue as this is not a bug.
we will work on the error message improvement.