cylondata/cylon

Can't do distributed sorting when a dataframe in other process has no rows

Opened this issue · 1 comments

azisak commented

Hello team, I'm new to PyCylon and have an issue related to distributed sorting.

It seems that an empty dataframe in one process triggers an Exception when I perform sort_values in distributed way.
As the context, I'm using pycylon version 0.6.0 and using MPI environment.

To reproduce the issue, I created the following snippet code.
Basically, it will initiate 2 rows of dataframe on each process. Then, it will perform the distributed groupby on column id. Only one id will be kept (id=0), this is done by filtering the resulting dataframe. This will make every process to have no rows, except for one process that holds the row of id=0. After that, I run another distributed groupby and success. Next, it performs the distributed sort_values and triggers an exception ValueError: Operation failed: : empty vector passed onto merge.

The following is the simple code from above scenario.

import pycylon as pc

env = pc.CylonEnv(config=pc.MPIConfig())
df = pc.DataFrame([
        [3, 4],
        [0, 1],
      ],
      columns=['weight', 'id']
    )

df2 = df.groupby(by=["id"], env=env).sum()
df3 = df2[df2["id"] == 0]
print(f"[Rank-{env.rank}] [After filter] df3 shape {df3.shape}\n{df3}")


df4 = df3.groupby(by=["id"], env=env).sum()
print(f"[Rank-{env.rank}] [After groupby] df4 shape {df4.shape}")

df5 = df4.sort_values(by="id", ascending=False, env=env)
print(f"[Rank-{env.rank}] [After sort] df5 shape {df5.shape}")

print(env.rank, df5)

Based on above code and behavior, the distributed groupby operation can work when one process has row and others don't. However, it's not the same case with the distributed sort_values.

Thank you.

Hi @azisak. Thank you for reporting this error. We will look into this ASAP.