Is it possible to get a max value out of delta live table's table to be used in another table?
Closed this issue · 1 comments
surfii3z commented
Hi
I want to join two tables using date (DATE), doing the following
@dlt.table
def table_A():
# partitioned on date
A_df = spark.readStream.format("cloudFiles").option("cloudFiles.format", "parquet").load("location_A")
return A_df
@dlt.table
def table_B():
# partitioned on date
B_df = spark.read.format("delta").load("location_B")
return B_df
@dlt.table
def joined_table():
A_df = dlt.read_stream("table_A")
B_df = dlt.read("table_B")
conds = [A_df.date >= B_df.date, A_df.some_key == B_df.some_key]
joined_table = A_df.join(B_df, conds, "inner")
return joined_table
However, since table_B
would contains a lot of dates that don't matter to the join so I want to use the maximum value in the date column max_date_from_table_A
of table_A
to filter out table_B
before joining. i.e.
@dlt.table
def joined_table():
A_df = dlt.read_stream("table_A")
B_df = dlt.read("table_B").filter(B_df.date >= max_date_from_table_A)
conds = [A_df.date >= B_df.date, A_df.some_key == B_df.some_key]
joined_table = A_df.join(B_df, conds, "inner")
return joined_table
How can I do so?
I have tried doing
@dlt.table
def joined_table():
A_df = dlt.read_stream("table_A")
max_date_from_table_A = A_df.select(F.max('date')).first()['max(date)']
B_df = dlt.read("table_B").filter(B_df.date >= max_date_from_table_A)
conds = [A_df.date >= B_df.date, A_df.some_key == B_df.some_key]
joined_table = A_df.join(B_df, conds, "inner")
return joined_table
but it would say the following error
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
Or does the filter before join to get the smaller table table_B
NOT matter?
Thank you
chrishfish commented
The filter before the join could matter, but also the query optimizer may be able to this automatically using Dynamic Partiton Pruning/Dynamic File Pruning.
DLT doesn't support this type of manual optimization and will be improving the optimizer to cover these cases. Thanks!