databricks/spark-avro

Spark 2.2 + spark-avro_2.11-4.0.0.jar is very slow for certain job comparing Spark 1.6.3 + spark-avro_2.10-2.0.1.jar

Opened this issue · 2 comments

We have some jobs that input source is AVRO files, run very slow in Spark 2, but works fine in Spark 1. The slow behavior looks like Spark2 AVRO is almost hang when reading the avro files. From the Spark UI, 100 executors only read about 2G input data in 1 hour. The issue is that this slowness only happened in one particular AVRO files. Other schema avro files read very fast. And this happens only in Spark 2.

While we have this issue on production, I also happen to find out a simple performance issue using this particular avro schema data in a simple test case in Spark 2, but works fine in Spark 1.

Here is my test case:
The data is about 1.3G avro file, with the following schema, and we use 12 executors and 4g executor memory for both Spark2 and Spark1
{
"namespace" : "com.xxx",
"type" : "record",
"name" : "Lists",
"fields" : [
{"name" : "account_id", "type" : "long"},
{"name" : "list_id", "type" : "string"},
{"name" : "sequence_id", "type" : ["null", "int"]} ,
{"name" : "name", "type" : ["null", "string"]},
{"name" : "state", "type" : ["null", "string"]},
{"name" : "description", "type" : ["null", "string"]},
{"name" : "dynamic_filtered_list", "type" : ["null", "int"]},
{"name" : "filter_criteria", "type" : ["null", "string"]},
{"name" : "created_at", "type" : ["null", "long"]},
{"name" : "updated_at", "type" : ["null", "long"]},
{"name" : "deleted_at", "type" : ["null", "long"]},
{"name" : "favorite", "type" : ["null", "int"]},
{"name" : "delta", "type" : ["null", "boolean"]},
{
"name" : "list_memberships", "type" : {
"type" : "array", "items" : {
"name" : "ListMembership", "type" : "record",
"fields" : [
{"name" : "channel_id", "type" : "string"},
{"name" : "created_at", "type" : ["null", "long"]},
{"name" : "created_source", "type" : ["null", "string"]},
{"name" : "deleted_at", "type" : ["null", "long"]},
{"name" : "sequence_id", "type" : ["null", "int"]}
]
}
}
}
]
}
On Spark 2, (I tested with spark-avro_2.11-4.0.0.jar and spark-avro_2.11-3.2.0.jar with Spark 2.2.0, both have the issue for the following simple case:)

import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.types.StringType
import com.databricks.spark.avro._
val rawlists = spark.read.avro("/data/lists")
val lists = rawlists.withColumn("partitioner", $"account_id".cast(StringType).substr(-2,2))
lists.write.partitionBy("partitioner").mode(SaveMode.Overwrite).parquet("/pdata/lists")

It took 4.5 hours to finish on Spark2, and the executors have no status update for hours (only input size grow very slowly)

The following code run for about 15 minutes in Spark1.6.3 + spark-avro_2.10-2.0.1.jar
val rawlists = sqlContext.read.avro("/data/lists")
val lists = rawlists.withColumn("partitioner", $"account_id".cast(StringType).substr(-2,2))
lists.write.partitionBy("partitioner").mode(SaveMode.Overwrite).parquet("/pdata/lists")

I am not sure where is the problem, but in Spark2, for other avro file (different dataset/schema), performance is fine. I think the problem is the spark-avro library reading this particular kind of avro files, as it is very slow during the scan/read stage. Any idea?

Thanks

Yes it was explained in the issue I open for the same reason

#267

@arthurdk Thanks for the information.

Our case is a little different. We have 2 datasets, both in AVRO, and both with array of nested struct in the schema.

But querying the datasets themselves look fine for us, like df.count. But in the above partitionBy example, it is very slow for one dataset, but fine with another one. And in a more complex query plan in Spark2, we need to explode of array of struct of both datasets, and the stage of scanning one dataset runs fine in Spark2, but the stage of scanning another one is almost hanging forever.

Will some kind of array of struct cause this issue? And the performance difference between these 2 datasets will reach 100x. Will the #267 issue can cause such huge difference?