Columns of same type mixed up during grouping, select and as
mfelsche opened this issue · 4 comments
I took a TypedDataset of case class A, grouped it, mixing the order of two columns of the same type, resulting in a tupled dataset. I had to do this way, don't ask. To get stuff right again, I selected the columns in the right order again, and finally used .as[A]
again to get me back a nice TypedDataset of my type A
.
Expected Behaviour: Everything just as it has been, right columns ending up in the right place.
Actual behaviour: The mixed up columns weren't put in the right order by the select I issued at the end.
I suspect the quirk is somewhere within .as[A]
but i cannot pinpoint it tbh.
Here a small reproducer:
import frameless._
import frameless.syntax._
import frameless.functions.aggregate.{first, min}
// create spark session ...
implicit val sparkSession: SparkSession = session
case class ConfusingColumns(name: String, company: String, created: Long)
val data = ConfusingColumns("Joe", "snakeoil Inc.", 123L) ::
ConfusingColumns("Barb", "ACME", 42L) ::
ConfusingColumns("Joe", "snakeoil Inc.", 0L) :: Nil
val ds: TypedDataset[ConfusingColumns] = TypedDataset.create(data)
val grouped = ds
.groupBy(
ds('company)
)
.agg(
first(ds('name)),
min(ds('created))
)
val confused = grouped
.select(
grouped('_2),
grouped('_1),
grouped('_3)
)
.as[ConfusingColumns]
confused.dataset.show()
Output (compare the case classes in data
above):
+-------------+-------+-------+
| name|company|created|
+-------------+-------+-------+
|snakeoil Inc.| Joe| 0|
| ACME| Barb| 42|
+-------------+-------+-------+
Fiddling around further with the example, it seems the select
doesn't have any effect. When collecting results before or after the select, the result remains the same:
grouped.collect()
// WrappedArray(("snakeoil Inc.", "Foo", 0L), ("ACME", "Bar", 42L))
grouped.select(
grouped('_2),
grouped('_1),
grouped('_3)
).collect()
// WrappedArray(("snakeoil Inc.", "Foo", 0L), ("ACME", "Bar", 42L))
Hi. I reproduce a similar bug with an even simpler case.
Just load partitionned data with the partitioning column being the fisrt case class field. The column is appended at the end of the schema, and collecting the data results in field inverted.
master...AlexisBRENON:case_class_support#diff-dd83f3b1d1a249804b5620473177ce6034efbc5f36b45a9b1ef01283cafd50f9R540
Do you think that this can be related ?
I've been seeing this issue as well when using scalapb-sparksql and a flow that uses encoders to create Datasets of both protobuf-derived types and normal scala case classes.
When I define the following case class:
case class CustomerClientServiceDailyCounts(
customer: String,
client: String,
service: String,
count: Long,
service_count: Long,
date: String
)
And then cast a dataframe with identical column names and types and map using a function which takes the above type as input
dailyTransactionsTbl
.as[CustomerClientServiceDailyCounts]
.map(customerClientServiceFunction)
It will fail in that customerClientServiceFunction with the following error:
A method named "toString" is not declared in any enclosing class nor any supertype, nor through a static import
I now realize that this is happening because one of the long columns is being shuffled with one of the string columns and there's an attempt to cast it to a string when it's being used in the function.
I believe this is related, it just seems that sometimes even columns with different types can be shuffled too which leads to runtime errors