audienceproject/spark-dynamodb

Filter Pushdown

Closed this issue · 6 comments

I can't get filter pushdown work.
The table SSEmployeeCountUnion has about 20 million rows. CompanyId is the partition key and RecordHash is the sort key. When I run

  val unionDataset = spark.read.dynamodbAs[CountRecordEx]("SSEmployeeCountUnion")
  unionDataset.filter($"CompanyId" === 209963).foreach(x => println(x))

I expect the result which is 19 records returned immediately but it looks like the full scan is being performed (it runs for a few minutes until I abort the job).

I also tried setting filterPushdown explicitely but it also doesn't work:

  val unionDataset = spark.read.option("filterPushdown", "true").dynamodbAs[CountRecordEx]("SSEmployeeCountUnion")
  unionDataset.filter($"CompanyId" === 209963).foreach(x => println(x))

Could you help?

@jacobfi , can you please provide some insights into this?
If I remember correctly, we do a scan no matter what. The "predicate pushdown" functionality means that data is not brought into Spark, but is handled by the underlying data store, in this case DynamoDB.

That is correct.
"Intelligent" filter pushdown is currently not a feature.
The datasource always performs a full scan, however in this case a query would have been much more efficient. I hope to get around to implementing this in the future.

Thanks for quick reply, guys. Do you mean that currently there is no way to rewrite this piece of code so that a full scan is avoided?

Unfortunately yes.
If it is a big table I suggest you do a direct query outside of Spark.

Got it, Jacob. Thanks for the quick reply.

This issue will have the same solution as #62