RedisLabs/spark-redis

Conditional update

Opened this issue · 9 comments

dggc commented

Hi, how are you doing?

I'd like to know if there is a recommended way to perform a conditional update on redis from spark.

Basically, what I want to do is:

  • Given a spark dataframe with columns key, hash_field, value, value_version
  • For each row in the spark dataframe, I'd like to check whether the corresponding key exists in Redis. If not, run hset key hash_field hash_field+version value_version
  • If it does exist, I'd like to only update the value and value version if the version in my spark dataframe is higher than the version in the redis hash field.

I've searched for this in this Github and in the web in general, and the most similar question I found was this -> https://stackoverflow.com/questions/22906778/conditional-redis-set-only-update-with-newest-version

However, this involves writing and running a Lua script. While I'm fine with that, I wonder what would the best course of action be. Is there a recommended way to do this conditional upsert? If Lua is indeed the way to go, what's the best way to do so from spark? Is it possible using spark-redis, or should I go down a level and use Jedis directly for each row?

Thanks in advance, and sorry if this wasn't the right place for this kind of question :)

fe2s commented

Hi @dggc ,
Could you perform the update logic in Spark, i.e. read the dataframe from Redis to Spark, find out the rows that needs to be updated and then write them back with SaveMode.Append?

dggc commented

Hi @fe2s , thanks for your feedback!
Yes, that would be possible, and in our case there wouldn't even be a race condition scenario, because we have a single process writing to Redis (that is, this spark job).
However, wouldn't that add a lot of overhead and latency to the process?
But it does seem like a simpler solution :)

dggc commented

Would it be possible to read only the keys in the spark dataframe? If we have to read the whole table, it would be too expensive, but if we are able to retrieve only the keys in the DF, it would indeed be a viable solution

fe2s commented

Yes, it's possible. You can specify only the id in the dataframe schema. https://github.com/RedisLabs/spark-redis/blob/master/doc/dataframe.md#reading-redis-hashes

dggc commented

But suppose I have a DF with, say 5 rows, each with unique IDs with unique counterparts in Redis. But our Redis database has millions of rows. I want to retrieve only the 5 rows back, so I can make the necessary comparisons and write only the 5 rows back.

I believe this read option you mentioned would retrieve every row, not just the 5 I want, is that correct?

fe2s commented

With the read operation you specify the key prefix .option("keys.pattern", "person:*") so only the corresponding records are loaded into Spark.

dggc commented

Hm, unfortunately, I believe this wouldn't work in our case. Since we are using Spark Streaming to update the Redis Database, using foreachBatch and micro batches dataframes, we are working with only a small fraction of the Redis database at each time.

While loading the whole database would work, it would be very slow. Unless the Redis connector has some built-in logic to do Redis-side filtering, instead of loading all the keys to Spark and then filtering from spark?

We could use a regex such as "id1|id2|id3|..." generated from collecting the microbatch ids, but that doesn't seem like it would scale well.

I believe we'll probably have to use mapPartitions to open a separate redis connection for each partition of the microbatch, and then go over each row/key individually.

The fromRedisHash also accepts an Array[String] of keys so you could do this -

val fromRedisDf = sc.fromRedisHash(Array("key1", "key2")).toDF("key", "value1", "value2")

This does all filtering on the Redis server, and is more efficient than pattern.

dggc commented