audienceproject/spark-dynamodb

Very bad read performance for pretty small DynamoDB table

Opened this issue · 9 comments

I want to read a very small DynamoDB table (about 6.500 elements, 200 KB in total) into my Spark Structured Streaming job every micro-batch. I use Pyspark with Spark version 2.4.4, spark-dynamodb version 1.0.4. The DynamoDB table has a provisioned read capacity of 5.

My code looks as follows:

spark.read.format("dynamodb") \
  .option("region", "eu-central-1") \
  .option("tableName", my_table) \
  .load() \
  .filter(<some-easy-filtering-on-one-boolean-column>) \
  .select(<some-column-selects>)

I faced a very slow read performance, where it takes multiple seconds, up to few minutes, to read those few elements from Dynamo:
grafik

I also noticed that only small portion of the provisioned read capacity is used for every read:
grafik

It seems to be random how many read capacity is used. Sometimes, there is used even less. But anyway, even with a read capacity of 1 or so, it should be much faster to read ~6.500 elements from a very small DynamoDB table.

I also tried some configurations like:

  • .option("filterPushdown", False)
  • .option("readPartitions", <1, 2, 4, 8, ...>)
  • .option("targetCapacity", 1)

with no effect at all. I noticed that with readPartitions i. e. of 8, it's a little bit faster (about 20 seconds), but not fast enough for my understanding. And I think that such a small amount of elements should be readable with one partition in a feasible amount of time.

Any ideas, what I'm doing wrong? Any advice on that? Thank's in advance!

Hi
Can you try to do a

.load() \
.cache() \
.count()

on the Dynamo table's dataframe and see if that makes a difference?

Also I think reading 200 KB with 5 read capacity should take around 5 seconds.

Please note that I'm talking about a structured streaming project and thus need to update the Dynamo dataframe in every micro-batch. I need the latest data from the Dynamo table, so caching is not feasible for my use case. Anyway, this would possibly only mitigate the original problem.

Yes, sorry for not explaining - I know it's not a fix and a workaround at best.
I was mostly curious because I suspect the library may have erronous interaction with the Spark planner, leading to symptoms such as this.
Given what you have shown, it seems there is a problem with the throughput/partition calculation and executor utilization. I'm trying to figure out if it's in the Dynamo connector layer or the Spark planning layer.
If indeed a cache().count() causes the library to access the Dynamo table as expected (5 seconds of 5 throughput on however many partitions) then it means this problem occurs in the planning layer.

Ok, I got it!

I tried the following code snippet:

    print(datetime.datetime.now())
    df_count_without_cache = (
        spark.read
        .format("dynamodb")
        .option("region", "eu-central-1")
        .option("tableName", <my_table_name>)
        .load()
        .count()
    )
    print(datetime.datetime.now())
    print(datetime.datetime.now())
    df_count_with_cache = (
        spark.read
        .format("dynamodb")
        .option("region", "eu-central-1")
        .option("tableName", <my_table_name>)
        .load()
        .cache()
        .count()
    )
    print(datetime.datetime.now())

Result was:

2020-05-25 13:00:28.602334
2020-05-25 13:00:42.288196
2020-05-25 13:00:42.288232
2020-05-25 13:00:57.814243

As you can see, both queries took nearly same amount of time (about 15 seconds). What can we conclude from that? The problem is not at Spark planning layer? Any other ideas how to proceed?

Hi
I ran some tests yesterday and something interesting happened!
Yesterday I had the same problem as you, but this morning it could read the whole table in less than a second.
I think the issue is that the DynamoDB table description did not correctly reflect the contents of the table yesterday, when I had only just populated it with data.
The library heavily relies on this description to calculate the optimal parameters for the scan operation. If the numbers are off, the scan will be suboptimal.

If this is indeed the case for you, then a workaround right now would be to set the option bytesPerRCU to something very high, such as 400000 or 4000000

Let me know if this works. It might also just work today if the table description is up-to-date.

Yes, it seems you are right! I also faced more like a random behaviour in read performance. Now that I hard-coded the bytesPerRCU option to 4000000, it's fine.

I will play around a little bit with different values here and use that option for now.

Thank you for providing this workaround!

sfcoy commented

Hi there,
I too have encountered this problem. In our case we are loading 6 or so dynamodb tables and performing a big union on these as well as conventional HDFS tables (hosted in S3). These dynamo tables vary in size between 12 and 200 or so rows (i.e. tiny). Everything seemed to be grinding to a halt in the RateLimiter (as revealed by a thread dump).

Changing bytesPerRCU to 400_000 seemed to resolve this problem.

We also had a second problem where we had to set readPartitions to 1 on a couple of the tables because it seemed to lose track of the table schema when running in a five node cluster. I will raise a separate issue for this.

cozos commented

@jacobfi When you say "DynamoDB table description", what are you referring to? Is this referring to the output of the DynamoDB DescribeTable API call?

Is the problem that DynamoDB's DescribeTable API call is returning bad/outdated information (I'm guessing because it's eventually consistent), which messes up the calculation of optimal scan parameters?

Yes the library uses the DescribeTable API, and it is possible this is returning misleading information. But it is also possible that there is just something wrong with the throughput calculation in the library.