shuffle datasets
simplew2011 opened this issue · 3 comments
- Is it okay not to shuffle the data?
def _exact_dup_ids(self, df: dd.DataFrame):
"""
Get the id's for text/documents that are exact duplicates
Parameters
----------
df: dask.dataframe.core.DataFrame
A dataframe with the following requirements:
* A column where each row is the text from one document
* A unique ID column for each document
"""
hash_df = self._compute_hashes(df)
shuffle_context = (
config.set({"dataframe.shuffle.method": "tasks"})
if DASK_P2P_ERROR
else nullcontext()
)
with shuffle_context:
dup_ids = hash_df.shuffle(
on=["_hashes"],
ignore_index=True,
npartitions=max(1, (hash_df.npartitions // 3)),
).map_partitions(lambda x: x[x["_hashes"].duplicated(keep=False)])
return dup_ids
- modify with
def _exact_dup_ids(self, df: dd.DataFrame):
"""
Get the id's for text/documents that are exact duplicates
Parameters
----------
df: dask.dataframe.core.DataFrame
A dataframe with the following requirements:
* A column where each row is the text from one document
* A unique ID column for each document
"""
hash_df = self._compute_hashes(df)
dup_ids = hash_df.map_partitions(lambda x: x[x["_hashes"].duplicated(keep=False)])
return dup_ids
Algorithmically, the shuffle is needed to ensure all duplicates are found. I'll try to illustrate with an example:
# Assume input data is divided into 2 partitions
Partition 0 Partition 1
---------- ----------
ID hash ID hash
0 xyz 3 xyz
1. abc 4 def
2. xyz 5 abc
Skipping the shuffle and using map_partitions
will only identify 0,1
as a duplicate group. It will fail to identify 3
in the same group & it will also miss out on 1,5
since they were in different partitions.
Shuffling by hash ensures that documents with the same hash are in the same partition.
# After shuffle
Partition 0 Partition 1
---------- ----------
ID hash ID hash
0 xyz 1 abc
3. xyz 4 def
2. xyz 5 abc
Now calling the lambda on each partition allows finding the duplicates correctly.
If your data fits in a single partition then shuffling is not needed, but for anything more than 1 partition, shuffling is needed to ensure correctness.
@simplew2011 Are you running into issues with shuffle (either errors or performance)?
- i can‘t understanding this line: npartitions=max(1, (hash_df.npartitions // 3))
- why (hash_df.npartitions // 3)
- max(1, hash_df.npartitions) is ok ?
max(1,hash_df.npartitions)
is theoretically okay.
hash_df.npartitions//3
is just an optimization for improved performance. In our experience, the input document dataset is much larger in size than the hash_df
which only contains ID's & hashes. If the input df fits in 100 partitions, it is possible for the smaller hash_df to potentially fit within 30-35 partitions. Calling shuffle
with fewer output partitions often leads to better performance. So we picked based on some testing that num_partitions//3
as a good tradeoff between performance and avoiding any OOM errors if we go too low.