ChenghaoMou/text-dedup

Deduplication results vary depending on whether Spark is used or not.

cjmp1 opened this issue · 6 comments

Hi ChenghaoMou
I have recently been using your program to deduplicate a huge amount of data. However, in certain situations, there is a significant difference between deduplication using spark and without spark.
I deduplicated 29GB of Python code data, and when minhash_spark.py was used, 95% remained and was almost not erased, but when minhash.py was used, 5% remained.
To analyze the problem, I randomly output a deduplicated cluster, and minhash.py did not produce incorrect results.
Therefore, it seems to be a problem occurring in minhash_spark.py, but seeing as it worked well even with data exceeding 1TB, it seems like the problem is occurring for a specific reason regardless of the data size.
Algorithmically, the spark process that obtains the min index through map reduce in connected components and minhash.py that obtains the min index through union find also seem to have no problem.
What is the difference? Are there any issues you are aware of?

@cjmp1 Thank you so much for opening this issue.

It does seem off based on your description. However, to effectively debug this discrepancy, could you provide 1) the commands you have used, and 2) some data so I can reproduce the issue?

The clustering method is mostly likely not the issue. You can see in the benchmark section in the README file, they have minor differences, generally from different hash signatures (not deterministic) and therefore outputs as well.

Thank you for your reply. In the case of data, this data cannot be shared under license.
This is normal python code, nothing special.
First, I'm looking for open datasets that produce similar results.
Additionally, below is the command bash file I used for deduplication.

spark-submit
--master spark://${master_ip}:7077
--conf spark.driver.host=${master_ip}
--conf 'spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem'
--conf 'spark.hadoop.fs.s3a.endpoint={our s3 endpoint}
--conf 'spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider'
--conf 'spark.hadoop.fs.s3a.connection.maximum=5000'
--conf 'spark.driver.extraClassPath=/spark/jars/'
--conf 'spark.executor.extraClassPath=/spark/jars/
'
--driver-memory 70g --executor-memory 16g --executor-cores 3 --total-executor-cores 2200
minhash_spark.py --input $dataset --output /dedup/test/$dataset/2 --column content

@ChenghaoMou Hi

It seems that a problem has been found in the minhash.py algorithm.
uf.union(x, idx) method is executed on the index values ​​on the cluster.
In this case, there is a possibility that one connected component may be separated into components with two different minimum parents. If the union operation is not executed again between the two components
In the case of minhash_spark.py, in this case, it is classified as a component with one parent.
Of course, this does not prove that more deduplication occurs in minhash.py.
In fact, as the number of components increases, shouldn't the min index itself, which remains due to deduplication, increase?

Thanks for providing the info.

If you check out the union code, you can see that we are connecting parent index when calling union:

    def union(self, x, y):
        px = self.find(x)
        py = self.find(y)

        # If both elements are already in the same set, do nothing
        # The line in original UnionFind `self.parent[px] = self.parent[py] = min(px, py)` is redundant when px == py
        if px == py:
            return

        if self.rank[px] == self.rank[py]:
            # If ranks are equal, choose one as the new root and increment its rank
            # with few duplicates this is likely to be the most common case
            self.parent[py] = px
            self.rank[px] += 1
        # otherwise, assume that leftside is more likely to be higher rank
        # Attach the smaller rank tree under the root of the larger rank tree
        elif self.rank[px] > self.rank[py]:
            self.parent[py] = px
        else:
            self.parent[px] = py

So regardless of the order, two parts will be connected as long as there are any edges that link them together.

If you are seeing issues with minhash.py's results, can I ask if you made any changes to the script?

Another you can try is to run this notebook with some of your data with spark installed as standalone: notebooks/quick_debug.ipynb (I added this in the latest commit). This should give you almost the same results.

Oh no, it seems to me that we applied the previous version of union_find when running minhash.py. It seems to have been used interchangeably with files from the old bigcode-dataset repository. I will run it after editing and let you know. Thank you.

Stale issue message