
Is it possible to get a max value out of delta live table's table to be used in another table?

Closed this issue · 1 comments


I want to join two tables using date (DATE), doing the following

def table_A():
    # partitioned on date
    A_df = spark.readStream.format("cloudFiles").option("cloudFiles.format", "parquet").load("location_A")
    return A_df

def table_B():
    # partitioned on date
    B_df ="delta").load("location_B")
    return B_df

def joined_table():
    A_df = dlt.read_stream("table_A")
    B_df ="table_B")

    conds = [ >=, A_df.some_key == B_df.some_key]
    joined_table = A_df.join(B_df, conds, "inner")
    return joined_table 

However, since table_B would contains a lot of dates that don't matter to the join so I want to use the maximum value in the date column max_date_from_table_A of table_A to filter out table_B before joining. i.e.

def joined_table():
    A_df = dlt.read_stream("table_A")
    B_df ="table_B").filter( >= max_date_from_table_A)

    conds = [ >=, A_df.some_key == B_df.some_key]
    joined_table = A_df.join(B_df, conds, "inner")
    return joined_table 

How can I do so?

I have tried doing

def joined_table():
    A_df = dlt.read_stream("table_A")

    max_date_from_table_A ='date')).first()['max(date)']
    B_df ="table_B").filter( >= max_date_from_table_A)

    conds = [ >=, A_df.some_key == B_df.some_key]
    joined_table = A_df.join(B_df, conds, "inner")
    return joined_table 

but it would say the following error

org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;

Or does the filter before join to get the smaller table table_B NOT matter?

Thank you

The filter before the join could matter, but also the query optimizer may be able to this automatically using Dynamic Partiton Pruning/Dynamic File Pruning.

DLT doesn't support this type of manual optimization and will be improving the optimizer to cover these cases. Thanks!