Lyonk71/pandas-dedupe

Multiprocessing Runtime Error

Closed this issue · 7 comments

I go through the training aspect, but during the clustering processing I ran into this error. When I disconnect the debugger (using VS Code), additona errors pop up. File is already being used. This may be an issue with the dedupe library itself, but I'm not a programmer, but understand how to read a lot and write some.

File "C:\Users\xxx\AppData\Local\Programs\Python\Python38\lib\site-packages\pandas_dedupe\dedupe_dataframe.py", line 245, in dedupe_dataframe
clustered_df = _cluster(deduper, data_d, threshold, canonicalize)
File "C:\Users\xxx\AppData\Local\Programs\Python\Python38\lib\site-packages\pandas_dedupe\dedupe_dataframe.py", line 143, in _cluster
clustered_dupes = deduper.partition(data, threshold)
File "C:\Users\xxx\AppData\Local\Programs\Python\Python38\lib\site-packages\dedupe\api.py", line 170, in partition
pair_scores = self.score(pairs)
File "C:\Users\xxx\AppData\Local\Programs\Python\Python38\lib\site-packages\dedupe\api.py", line 108, in score
raise RuntimeError('''
RuntimeError:
You need to either turn off multiprocessing or protect
the calls to the Dedupe methods with a
if __name__ == '__main__' in your main module, see
https://docs.python.org/3/library/multiprocessing.html#the-spawn-and-forkserver-start-methods
Traceback (most recent call last):
File "C:\Users\xxx\AppData\Local\Programs\Python\Python38\lib\shutil.py", line 613, in _rmtree_unsafe
os.unlink(fullname)
PermissionError: [WinError 32] The process cannot access the file because it is being used by another process: 'C:\Users\xxx\AppData\Local\Temp\tmp6vi500li\blocks.db'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "C:\Users\xxx\AppData\Local\Programs\Python\Python38\lib\tempfile.py", line 803, in onerror
_os.unlink(path)
PermissionError: [WinError 32] The process cannot access the file because it is being used by another process: 'C:\Users\xxx\AppData\Local\Temp\tmp6vi500li\blocks.db'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "C:\Users\xxx\AppData\Local\Programs\Python\Python38\lib\weakref.py", line 642, in _exitfunc
f()
File "C:\Users\xxx\AppData\Local\Programs\Python\Python38\lib\weakref.py", line 566, in call
return info.func(*info.args, **(info.kwargs or {}))
File "C:\Users\xxx\AppData\Local\Programs\Python\Python38\lib\tempfile.py", line 818, in _cleanup
cls._rmtree(name)
File "C:\Users\xxx\AppData\Local\Programs\Python\Python38\lib\tempfile.py", line 814, in _rmtree
_shutil.rmtree(name, onerror=onerror)
File "C:\Users\xxx\AppData\Local\Programs\Python\Python38\lib\shutil.py", line 737, in rmtree
return _rmtree_unsafe(path, onerror)
File "C:\Users\xxx\AppData\Local\Programs\Python\Python38\lib\shutil.py", line 615, in _rmtree_unsafe
onerror(os.unlink, fullname, sys.exc_info())
File "C:\Users\xxx\AppData\Local\Programs\Python\Python38\lib\tempfile.py", line 806, in onerror
cls._rmtree(path)
File "C:\Users\xxx\AppData\Local\Programs\Python\Python38\lib\tempfile.py", line 814, in _rmtree
_shutil.rmtree(name, onerror=onerror)
File "C:\Users\xxx\AppData\Local\Programs\Python\Python38\lib\shutil.py", line 737, in rmtree
return _rmtree_unsafe(path, onerror)
File "C:\Users\xxx\AppData\Local\Programs\Python\Python38\lib\shutil.py", line 596, in _rmtree_unsafe
onerror(os.scandir, path, sys.exc_info())
File "C:\Users\xxx\AppData\Local\Programs\Python\Python38\lib\shutil.py", line 593, in _rmtree_unsafe
with os.scandir(path) as scandir_it:
NotADirectoryError: [WinError 267] The directory name is invalid: 'C:\Users\xxx\AppData\Local\Temp\tmp6vi500li\blocks.db'

My script is very simple:

import pandas as pd
import pandas_dedupe as pdd

cmfile = pd.read_csv(r'C:\Users\xxx\Python\customer_master\Customer_List_20200901.csv')

df_final = pdd.dedupe_dataframe(cmfile,['Name 1', 'Street', 'City'])

df_final.to_csv(r'C:\Users\xxx\Python\customer_master\deduped_custlist.csv')

My dataset is large, 330k records which is why I think its creating 5 different sub-processes

Hi,

by default num_cores is set to None in pandas dedupe.
When set to None, the dedupe library will spawn processes for an amount equal to your CPU count.

We might work on making the num_cores argument more flexible, i.e. user to specify cores.
The idea would be to turn off multiprocessing. It might get a bit slower but it should work.

Ok, thanks. It says in the error to protect the calls using the if name == 'main': statement. I did this in the main dedupe_dataframe.py, but unfortunately that did not resolve it.

if __name__ == '__main__':
    dedupe_dataframe(df, field_properties, canonicalize=False,
                     config_name="dedupe_dataframe", update_model=False, threshold=0.4,
                     sample_size=0.3)

What if you converted the main function to a class and used that functionality would that resolve? I don't know anything about multiprocessing. For now, I'll edit my local library and overwrite the cpu parameter to 1 to disable multi-processing for this request.

Opened up the dedupe_dataframe.py and set the num_cores = 0 per the dedupe docs. That worked.

def _train(settings_file, training_file, data, field_properties, sample_size, update_model):
    """Internal method that trains the deduper model from scratch or update
        an existing dedupe model.
        Parameters
        ----------
        settings_file : str
            A path to a settings file that will be loaded if it exists.
        training_file : str
            A path to a training file that will be loaded to keep training
            from.
        data : dict
            The dictionary form of the dataframe that dedupe requires.
        field_properties : dict
            The mapping of fields to their respective data types. Please
            see the dedupe documentation for further details.
        sample_size : float, default 0.3
            Specify the sample size used for training as a float from 0 to 1.
            By default it is 30% (0.3) of our data.
        update_model : bool, default False
            If True, it allows user to update existing model by uploading
            training file.
        Returns
        -------
        dedupe.Dedupe
            A dedupe model instance.
    """
    # Define the fields dedupe will pay attention to
    fields = []
    select_fields(fields, field_properties)
    
    if update_model == False:
        
        # If a settings file already exists, we'll just load that and skip training
        if os.path.exists(settings_file):
            print('Reading from', settings_file)
            with open(settings_file, 'rb') as f:
                deduper = dedupe.StaticDedupe(f, num_cores=0)
        
        #Create a new deduper object and pass our data model to it.
        else:
            # Initialise dedupe
            deduper = dedupe.Dedupe(fields, num_cores=0)
            
            # Launch active learning
            deduper = _active_learning(data, sample_size, deduper, training_file, settings_file)
            
    else:
        # ## Training
        # Initialise dedupe
        deduper = dedupe.Dedupe(fields, num_cores=0)
        
        # Import existing model
        print('Reading labeled examples from ', training_file)
        with open(training_file, 'rb') as f:
            deduper.prepare_training(data, training_file=f)
        
        # Launch active learning
        deduper = _active_learning(data, sample_size, deduper, training_file, settings_file)

    return deduper

Exactly.
I will make the parameter num_cores flexible so that users can overwrite the default.

Thank you for testing. This is really helpful.
I will create a PR in the next few days.

Awesome, glad I could help. Honestly, clustering didn't take that long even with no multiprocessing. The process that took the longest was the initial load to create the training file... that took 3 hours. Now that I have the training file, its done in less than 5 minutes.