dask/dask-expr

Best practice for handing off persisted collection partitions

rjzamora opened this issue · 5 comments

While working on rapidsai/dask-cuda#1311, I noticed that a common practice used in down-stream libraries no longer works (cleanly) with the move to dask-expr.

The common practice:

  1. Persist a collection (df = df.persist())
  2. Find the worker-to-partition mapping for the persisted collection using mapping = client.who_has() and df.__dask_keys__()

The problem with dask-expr:

In dask-expr, calling df.persist() will change the "name" (and therefore the keys) of the collection. The name change is a result of both expression optimization, and the creation of a new FromGraph expression. Therefore, you cannot call df = df.persist(), and then search for the keys of df in the cluster.

The question: What is the new "best practice" for patterns like this?

For reference, here is something that seems to work for now:

    df = df.persist()
    try:
        # Only works for FromGraph-backed collection
        persisted_keys = df.keys
    except AttributeError:
        # Only works for a legacy collection
        persisted_keys = df.__dask_keys__()

Okay, thanks - I suppose this approach is backward compatible:

df = df.persist()
persisted_keys = [f.key for f in c.client.futures_of(df)]

Could you provide a little more context for what you're doing? This feels to me like an abstraction leak that bites us whenever we touch this API. I am touching this API with the scheduler integration again and this shortcoming could be fixed but it would be helpful to know a little about the application

I recommend just using the dask.distributed.futures_of function. It's been around for a while and genearally how this probem gets solved.

This feels to me like an abstraction leak that bites us whenever we touch this API.

By "this" API, are you referring to futures_of or who_has? I'm happy to use whatever you all recommend moving forward.

it would be helpful to know a little about the application

I've seen this used in a few down-stream libraries. The specific application I am looking at right now is just a custom shuffling algorithm that I am very comfortable experimenting with. However, other down-stream libraries (e.g. cugraph, nemo) also use who_has to temporarily hand-off execution and communication to something other than dask. For example, cugraph will persist the collection, figure out where all the data is, and then execute a collective operation in C++/NCCL land. This is a very common pattern in rapids.

I recommend just using the dask.distributed.futures_of function. It's been around for a while and genearally how this probem gets solved.

Great. I'm not familiar with this API, but happy to use it and recommend it if it works.