twosigma/flint

join not only by time but additionally also by column

geoHeil opened this issue · 3 comments

How can I join not only by time but also by a column?

Currently, I get: Found duplicate columns, but I would like to perform the time series join per group.

val left = Seq((1,1L, 0.1), (1, 2L,0.2), (3,1L,0.3), (3, 2L,0.4)).toDF("group", "time", "valueA")
  val right = Seq((1,1L, 11), (1, 2L,12), (3,1L,13), (3, 2L,14)).toDF("group", "time", "valueB")
  val leftTs = TimeSeriesRDD.fromDF(dataFrame = left)(isSorted = false, timeUnit = MILLISECONDS)
  val rightTS        = TimeSeriesRDD.fromDF(dataFrame = right)(isSorted = false, timeUnit = MILLISECONDS)

  val mergedPerGroup = leftTs.leftJoin(rightTS, tolerance = "1s")

fails due to duplicate columns.

When renaming the columns:

val left = Seq((1,1L, 0.1), (1, 2L,0.2), (3,1L,0.3), (3, 2L,0.4)).toDF("groupA", "time", "valueA")
  val right = Seq((1,1L, 11), (1, 2L,12), (3,1L,13), (3, 2L,14)).toDF("groupB", "time", "valueB")
  val leftTs = TimeSeriesRDD.fromDF(dataFrame = left)(isSorted = false, timeUnit = MILLISECONDS)
  val rightTS        = TimeSeriesRDD.fromDF(dataFrame = right)(isSorted = false, timeUnit = MILLISECONDS)

  val mergedPerGroup = leftTs.leftJoin(rightTS, tolerance = "1s")
  mergedPerGroup.toDF.printSchema
  mergedPerGroup.toDF.show
+-------+------+------+------+------+
|   time|groupA|valueA|groupB|valueB|
+-------+------+------+------+------+
|1000000|     1|   0.1|     3|    13|
|1000000|     3|   0.3|     3|    13|
|2000000|     1|   0.2|     3|    14|
|2000000|     3|   0.4|     3|    14|
+-------+------+------+------+------+

a cross join is performed between each group and time series.
that needs to be manually reduced.

mergedPerGroup.toDF.filter(col("groupA") === col("groupB")).show
+-------+------+------+------+------+
|   time|groupA|valueA|groupB|valueB|
+-------+------+------+------+------+
|1000000|     3|   0.3|     3|    13|
|2000000|     3|   0.4|     3|    14|

Is there any functionality to perform this type of join more efficiently / built in?

leftJoin takes a "key" argument that allows you to specify the secondary join key (equality only)

Thanks.

one fighter question: when using an interval which is rather large (i.e. multiple values from the right fall into the interval from the left the join will only join the first record. Which means the distinct I previously used is not required.

leftTs.leftJoin(rightTS, tolerance = "1s", key = Seq("group")).toDF.show
+-------+-----+------+------+
|   time|group|valueA|valueB|
+-------+-----+------+------+
|1000000|    1|   0.1|    11|
|1000000|    3|   0.3|    13|
|2000000|    1|   0.2|    12|
|2000000|    3|   0.4|    14|
+-------+-----+------+------+

is looking better

leftTs.leftJoin(rightTS, tolerance = "1hour", key = Seq("group")).toDF.show
+-------+-----+------+------+
|   time|group|valueA|valueB|
+-------+-----+------+------+
|1000000|    1|   0.1|    11|
|1000000|    3|   0.3|    13|
|2000000|    1|   0.2|    12|
|2000000|    3|   0.4|    14|
+-------+-----+------+------+

I believe this is also stated in the documentation:

leftJoin A function performs the temporal left-join to the right TimeSeriesRDD, i.e. left-join using
inexact timestamp matches. For each row in the left, append the most recent row from the right at or before the same time. An example to join two TimeSeriesRDDs is as follows.