d6t/d6tflow

Support for Tasks that outputs different types and other extensions...

JaimeArboleda opened this issue · 3 comments

I started using d6tflow for a data science project but, after several weeks, I got somewhat frustrated with this limitation: I needed Tasks able to generate outputs of different types (like, for example, csvs and pickles).

Finally, we decided to write our own "version" of d6tflow by extending the luigi library.

I share how we implemented this, just in case anyone is interested in making this public (I am a newby in github).

Comments and names are sometimes in spanish. But I think it's easy to understand the logic.

Appart from this improvement (having a dict-like "persist" allowing to state the type of each target), there are more differences that for us made life easier:

  • Integration with cookiesutter template for a data science project.
  • Generation of one only folder for each Task, and leaving TaskId for the name of the files.
  • Auto-generation of a txt with the parameters, to see for each file how it was generated.
  • A common_params function, that returns the common parameters between a dict of parameters and a Task class.
  • Support for docx targets (we use them to generate documentation of the process).
  • MidasTask requieres a get_path method, to state where the files will be located.

I know this is "too much" for an issue, but just in case someone finds something interesting and worth integrating in d6tflow, I share it with the community.

class MidasLocalTarget(luigi.LocalTarget):
   
   
    '''
    path: ruta completa de la carpeta final donde se almacena el objeto. Es un objeto tipo Pathlib
    task_id: identificador de la tarea propietaria del target
    name: nombre del objeto (clave del diccionario persist)
    extension: extensión del archivo
    '''
    def __init__(self, path=None, name=None, task_id=None, extension = None):
        self.path = path
        self.task_id = task_id
        self.name = name
        self.extension = extension
        self.path_completo = self.path / (self.task_id + '_' + self.name + '.{}'.format(self.extension))
        super().__init__(self.path_completo)
        # Restauramos el path por si ha sido modificado:
        self.path = path
       
    def exists(self):
        return self.path_completo.exists()

    def invalidate(self):
        if self.exists():
            self.path_completo.unlink()
        return not self.exists()
   
    def write_params_file(self, path, parameters):
        path_file = path / (self.task_id + '_' + 'parameters.txt')
        file = open(path_file,"w")
        file.write(parameters)
        file.close()
   
class MidasPickleTarget(MidasLocalTarget):  
   
       
    def load(self, **kwargs):
       
        if self.exists():
            with open(self.path_completo,"rb" ) as fhandle:
                data = pickle.load(fhandle)
            return data
        else:
            raise RuntimeError('Target does not exist, make sure task is complete')
       
    def save(self, obj, parameters, **kwargs):
       
        self.path.mkdir(parents=True, exist_ok=True)
        self.write_params_file(self.path, parameters)
        with open(self.path_completo, "wb") as fhandle:
            pickle.dump(obj, fhandle, **kwargs)
        return self.path_completo

class MidasPandasTarget(MidasLocalTarget):
    def generate_profiling(self, reports_path, parameters):
        (reports_path).mkdir(parents=True, exist_ok=True)
        self.write_params_file(reports_path, parameters)
        pd_profile_name = reports_path / (self.task_id + '_' + self.name + '_' + 'pandas_profiling.html')
        df = self.load()
        profile = ProfileReport(df, title="Pandas Profiling Report", minimal=True)
        profile.to_file(pd_profile_name)
        if 'target' in df.columns:
            sv_profile_name = reports_path / (self.task_id + '_' + self.name + '_' + 'sweetviz.html')
            my_report = sv.analyze(df, target_feat='target', pairwise_analysis = 'off')
            my_report.show_html(sv_profile_name, open_browser=False)

class MidasCSVTarget(MidasPandasTarget):
   
       
    def load(self, **kwargs):
        if self.exists():
            opts = {**{'sep':';','decimal':','},**kwargs}
            df = pd.read_csv(self.path_completo, **opts)
            return df
        else:
            raise RuntimeError('Target does not exist, make sure task is complete')
   
   
    def save(self, df, parameters, save_index=True,**kwargs):
       
        (self.path).mkdir(parents=True, exist_ok=True)
        self.write_params_file(self.path, parameters)
        opts = {**{'sep':';','decimal':',', 'compression':'gzip', 'index': save_index},**kwargs}
        df.to_csv(self.path_completo,**opts)
        return self.path_completo


class MidasPqTarget(MidasPandasTarget):
   
       
    def load(self, **kwargs):
        if self.exists():
            df = pd.read_parquet(self.path_completo)
            return df
        else:
            raise RuntimeError('Target does not exist, make sure task is complete')
       
   
    def save(self, df, parameters, save_index=True,**kwargs):
       
        (self.path).mkdir(parents=True, exist_ok=True)
        self.write_params_file(self.path, parameters)
        opts = {**{'compression':'gzip', 'index': save_index, 'engine': 'pyarrow'},**kwargs}
        df.to_parquet(self.path_completo,**opts)
        return self.path_completo

class MidasDocxTarget(MidasLocalTarget):
   
       
    def load(self, **kwargs):
        if self.exists():
            docx = Document(self.path_completo)
            return docx
        else:
            raise RuntimeError('Target does not exist, make sure task is complete')
   
   
    def save(self, docx, parameters, **kwargs):
         (self.path).mkdir(parents=True, exist_ok=True)
         self.write_params_file(self.path, parameters)
         docx.save(self.path_completo)
         
         
         
class MidasCacheTarget(luigi.LocalTarget):
   
    '''
    task_id: identificador de la tarea propietaria del target
    name: nombre del objeto (clave del diccionario persist)
    '''
    def __init__(self, name=None, task_id=None):
        super().__init__(Path(os.path.abspath(os.getcwd())).parent)
        self.task_id = task_id
        self.name = name
        self.clave = task_id + name
   
    def exists(self):
        return self.clave in cached_targets

    def invalidate(self):
        if self.clave in cached_targets:
            cached_targets.pop(self.clave)

    def load(self):
       
        if self.exists():
            return cached_targets.get(self.clave)
        else:
            raise RuntimeError('Target does not exist, make sure task is complete')

    def save(self, o):
        """
        Save object to in-memory cache

        Args:
            df (obj): pandas dataframe

        Returns: filename

        """
        cached_targets[self.clave] = o
        return self.clave
       
class MidasTask(luigi.Task):
    """
    Clase propia que añade funcionalidad sobre la clase genérica de Luigi. Características:
       
        - Tiene un método save, al que se debe invocar al final del run(). Este método save realiza el guardado de todos los objetos
        definidos en el atributo persist. No es necesario implementar este método en las clases que extiendan a MidasTask.
       
        - Tiene un método write_parameters, que devuelve un string con todos los parámetros (significativos) de la clase.
        Se usa de manera instrmental, y para guardar un txt asociado a cada objeto que se guarde en disco, y de esta manera
        tener a mano los parámetros de creación de cada objeto). No es necesario implementar este método en las clases
        que extiendan a MidasTask.
       
        - Hay que implementar un método get_path, que, en función del tipo de ubicación, devuelve el Path donde se almacenará el objeto.
        Nota: a ese Path se añadirá otro nivel más con el nombre de la tarea, pero eso se hace de manera automática.
       
        - Tiene un método para generar reports con PandasProfiling y SweetViz en todos los targets tipo csv.
        Es preciso que los targets existan para que funcione.
               
       
    El diccionario persist debe ser creado en cada clase particular que extienda a MidasTask. Debe contener un item por
    cada objeto que se quiere guardar en disco, y los valores son una lista de dos elementos, el formato y el tipo de
    ubicación. Ejemplos:
    persist = {
        'dataset_salida': ['csv','data_processed']
        }
    persist = {
        'modelo': ['pkl','modelos']
        }
    Admite tres formatos:
        csv (que se guarda comprimido para ahorrar espacio)
        docx
        pkl
        pq (para parquet)
        cache (que no guarda en disco)
    Respecto las ubicaciones, admite los siguientes valores:
        'data_interim'
        'data_processed'
        'docs'
        'modelos'
        'reports'
        'resultados'
       
    """

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.task_id_hash = self.task_id.split('_')[-1]

       
    def save(self, data, save_index=True,**kwargs):

        targets = self.output()
        if not set(data.keys())==set(targets.keys()):
            print(data.keys())
            print(targets.keys())
            raise ValueError('El diccionario guardado ha de ser consistente con el objeto persist')
        for k, v in data.items():
            if isinstance(targets[k],MidasCSVTarget) or isinstance(targets[k],MidasPqTarget):    
                targets[k].save(v, self.str_parameters(), save_index=save_index, **kwargs)
            elif isinstance(targets[k],MidasCacheTarget):    
                targets[k].save(v, **kwargs)
            else:
                targets[k].save(v, self.str_parameters(), **kwargs)

    def output(self):

        output = {}
        for k,v in self.persist.items():
            if v[0] == 'csv':
                extension = 'csv.gz'
            elif v[0] == 'pq':
                extension = 'pq.gz'
            else:
                extension = v[0]
           
            save_path = self.get_path(v[1]) / self.get_task_family()      
           
            if v[0] == 'csv':
                output[k] = MidasCSVTarget(path=save_path, name = k, task_id=self.task_id_hash, extension = extension)
            elif v[0] == 'pq':
                output[k] = MidasPqTarget(path=save_path, name = k, task_id=self.task_id_hash, extension = extension)
            elif v[0] == 'pkl':
                output[k] = MidasPickleTarget(path=save_path, name = k, task_id=self.task_id_hash, extension = extension)
            elif v[0] == 'docx':
                output[k] = MidasDocxTarget(path=save_path, name = k, task_id=self.task_id_hash, extension = extension)
            elif v[0] == 'cache':
                output[k] = MidasCacheTarget(name = k, task_id=self.task_id_hash)
            else:
                raise ValueError('Formato de objeto no implmentado: ' + v[0])
               
        return output
   
    def generate_profiling(self):
        reports_path = self.get_path('reports')
        for k,v in self.output().items():
            if isinstance(v,MidasPandasTarget):
                v.generate_profiling(reports_path,self.str_parameters())
   
    def get_path(self,tipo_ubicacion):
        """
        Este método se debe implementar en cada clase final que extienda de MidasTask. Debe definir dónde se guarda cada tipo de objeto.
        No se debe añadir el nombre de la tarea (eso se hace automáticamente después)
        Debe devolver un objeto tipo Path.
        Ejemplo de implementación:
            project_root = Path(os.path.abspath(os.getcwd())).parent
            if tipo_ubicacion == 'data_processed':
                return project_root / 'data' / 'processed'
            ...
        """
        raise Exception("get_path() not implemented")
   
    def str_parameters(self):
        params_text = ""
        for k in self.param_kwargs.keys():
            params_text = params_text + '{key} tiene el valor {value}\n'.format(key=k, value=self.param_kwargs[k])
        return params_text
   
   
   
def common_params(dict_params, task_cls):
    """
    Grab all the values in dict_params that are found in task_cls.
    Función similar a luigi.utils.common_params, pero no entre instancia y clase sino entre diccionario y clase
    """
   
    dict_param_names = dict_params.keys()
    task_cls_params_dict = dict(task_cls.get_params())
    task_cls_param_names = task_cls_params_dict.keys()
    common_param_names = set(dict_param_names).intersection(set(task_cls_param_names))
    common_param_vals = {key : dict_params[key] for key in common_param_names}
    return common_param_vals

If anyone were interested in having this implemented (whether in d6tflow or in a new library) I would be delighted to colaborate. Just let me know.

Hi! d6tflow does have targets for both pickle and csv.

d6tflow.targets.PickleTarget

d6tflow.targets.CSVPandasTarget

Here are some more targets available to use.
https://d6tflow.readthedocs.io/en/latest/targets.html#core-task-targets-pandas

Hello,
the problem is not the lack of pre-defined targets. The problem is that de design of d6tflow makes it impossible to define different types of outputs for the same Task. For example, if you want to define a Task that persists a pandas dataframe and a pickled model, you cannot do it in d6tflow.