Support for Tasks that outputs different types and other extensions...
JaimeArboleda opened this issue · 3 comments
I started using d6tflow for a data science project but, after several weeks, I got somewhat frustrated with this limitation: I needed Tasks able to generate outputs of different types (like, for example, csvs and pickles).
Finally, we decided to write our own "version" of d6tflow by extending the luigi library.
I share how we implemented this, just in case anyone is interested in making this public (I am a newby in github).
Comments and names are sometimes in spanish. But I think it's easy to understand the logic.
Appart from this improvement (having a dict-like "persist" allowing to state the type of each target), there are more differences that for us made life easier:
- Integration with cookiesutter template for a data science project.
- Generation of one only folder for each Task, and leaving TaskId for the name of the files.
- Auto-generation of a txt with the parameters, to see for each file how it was generated.
- A common_params function, that returns the common parameters between a dict of parameters and a Task class.
- Support for docx targets (we use them to generate documentation of the process).
- MidasTask requieres a get_path method, to state where the files will be located.
I know this is "too much" for an issue, but just in case someone finds something interesting and worth integrating in d6tflow, I share it with the community.
class MidasLocalTarget(luigi.LocalTarget):
'''
path: ruta completa de la carpeta final donde se almacena el objeto. Es un objeto tipo Pathlib
task_id: identificador de la tarea propietaria del target
name: nombre del objeto (clave del diccionario persist)
extension: extensión del archivo
'''
def __init__(self, path=None, name=None, task_id=None, extension = None):
self.path = path
self.task_id = task_id
self.name = name
self.extension = extension
self.path_completo = self.path / (self.task_id + '_' + self.name + '.{}'.format(self.extension))
super().__init__(self.path_completo)
# Restauramos el path por si ha sido modificado:
self.path = path
def exists(self):
return self.path_completo.exists()
def invalidate(self):
if self.exists():
self.path_completo.unlink()
return not self.exists()
def write_params_file(self, path, parameters):
path_file = path / (self.task_id + '_' + 'parameters.txt')
file = open(path_file,"w")
file.write(parameters)
file.close()
class MidasPickleTarget(MidasLocalTarget):
def load(self, **kwargs):
if self.exists():
with open(self.path_completo,"rb" ) as fhandle:
data = pickle.load(fhandle)
return data
else:
raise RuntimeError('Target does not exist, make sure task is complete')
def save(self, obj, parameters, **kwargs):
self.path.mkdir(parents=True, exist_ok=True)
self.write_params_file(self.path, parameters)
with open(self.path_completo, "wb") as fhandle:
pickle.dump(obj, fhandle, **kwargs)
return self.path_completo
class MidasPandasTarget(MidasLocalTarget):
def generate_profiling(self, reports_path, parameters):
(reports_path).mkdir(parents=True, exist_ok=True)
self.write_params_file(reports_path, parameters)
pd_profile_name = reports_path / (self.task_id + '_' + self.name + '_' + 'pandas_profiling.html')
df = self.load()
profile = ProfileReport(df, title="Pandas Profiling Report", minimal=True)
profile.to_file(pd_profile_name)
if 'target' in df.columns:
sv_profile_name = reports_path / (self.task_id + '_' + self.name + '_' + 'sweetviz.html')
my_report = sv.analyze(df, target_feat='target', pairwise_analysis = 'off')
my_report.show_html(sv_profile_name, open_browser=False)
class MidasCSVTarget(MidasPandasTarget):
def load(self, **kwargs):
if self.exists():
opts = {**{'sep':';','decimal':','},**kwargs}
df = pd.read_csv(self.path_completo, **opts)
return df
else:
raise RuntimeError('Target does not exist, make sure task is complete')
def save(self, df, parameters, save_index=True,**kwargs):
(self.path).mkdir(parents=True, exist_ok=True)
self.write_params_file(self.path, parameters)
opts = {**{'sep':';','decimal':',', 'compression':'gzip', 'index': save_index},**kwargs}
df.to_csv(self.path_completo,**opts)
return self.path_completo
class MidasPqTarget(MidasPandasTarget):
def load(self, **kwargs):
if self.exists():
df = pd.read_parquet(self.path_completo)
return df
else:
raise RuntimeError('Target does not exist, make sure task is complete')
def save(self, df, parameters, save_index=True,**kwargs):
(self.path).mkdir(parents=True, exist_ok=True)
self.write_params_file(self.path, parameters)
opts = {**{'compression':'gzip', 'index': save_index, 'engine': 'pyarrow'},**kwargs}
df.to_parquet(self.path_completo,**opts)
return self.path_completo
class MidasDocxTarget(MidasLocalTarget):
def load(self, **kwargs):
if self.exists():
docx = Document(self.path_completo)
return docx
else:
raise RuntimeError('Target does not exist, make sure task is complete')
def save(self, docx, parameters, **kwargs):
(self.path).mkdir(parents=True, exist_ok=True)
self.write_params_file(self.path, parameters)
docx.save(self.path_completo)
class MidasCacheTarget(luigi.LocalTarget):
'''
task_id: identificador de la tarea propietaria del target
name: nombre del objeto (clave del diccionario persist)
'''
def __init__(self, name=None, task_id=None):
super().__init__(Path(os.path.abspath(os.getcwd())).parent)
self.task_id = task_id
self.name = name
self.clave = task_id + name
def exists(self):
return self.clave in cached_targets
def invalidate(self):
if self.clave in cached_targets:
cached_targets.pop(self.clave)
def load(self):
if self.exists():
return cached_targets.get(self.clave)
else:
raise RuntimeError('Target does not exist, make sure task is complete')
def save(self, o):
"""
Save object to in-memory cache
Args:
df (obj): pandas dataframe
Returns: filename
"""
cached_targets[self.clave] = o
return self.clave
class MidasTask(luigi.Task):
"""
Clase propia que añade funcionalidad sobre la clase genérica de Luigi. Características:
- Tiene un método save, al que se debe invocar al final del run(). Este método save realiza el guardado de todos los objetos
definidos en el atributo persist. No es necesario implementar este método en las clases que extiendan a MidasTask.
- Tiene un método write_parameters, que devuelve un string con todos los parámetros (significativos) de la clase.
Se usa de manera instrmental, y para guardar un txt asociado a cada objeto que se guarde en disco, y de esta manera
tener a mano los parámetros de creación de cada objeto). No es necesario implementar este método en las clases
que extiendan a MidasTask.
- Hay que implementar un método get_path, que, en función del tipo de ubicación, devuelve el Path donde se almacenará el objeto.
Nota: a ese Path se añadirá otro nivel más con el nombre de la tarea, pero eso se hace de manera automática.
- Tiene un método para generar reports con PandasProfiling y SweetViz en todos los targets tipo csv.
Es preciso que los targets existan para que funcione.
El diccionario persist debe ser creado en cada clase particular que extienda a MidasTask. Debe contener un item por
cada objeto que se quiere guardar en disco, y los valores son una lista de dos elementos, el formato y el tipo de
ubicación. Ejemplos:
persist = {
'dataset_salida': ['csv','data_processed']
}
persist = {
'modelo': ['pkl','modelos']
}
Admite tres formatos:
csv (que se guarda comprimido para ahorrar espacio)
docx
pkl
pq (para parquet)
cache (que no guarda en disco)
Respecto las ubicaciones, admite los siguientes valores:
'data_interim'
'data_processed'
'docs'
'modelos'
'reports'
'resultados'
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.task_id_hash = self.task_id.split('_')[-1]
def save(self, data, save_index=True,**kwargs):
targets = self.output()
if not set(data.keys())==set(targets.keys()):
print(data.keys())
print(targets.keys())
raise ValueError('El diccionario guardado ha de ser consistente con el objeto persist')
for k, v in data.items():
if isinstance(targets[k],MidasCSVTarget) or isinstance(targets[k],MidasPqTarget):
targets[k].save(v, self.str_parameters(), save_index=save_index, **kwargs)
elif isinstance(targets[k],MidasCacheTarget):
targets[k].save(v, **kwargs)
else:
targets[k].save(v, self.str_parameters(), **kwargs)
def output(self):
output = {}
for k,v in self.persist.items():
if v[0] == 'csv':
extension = 'csv.gz'
elif v[0] == 'pq':
extension = 'pq.gz'
else:
extension = v[0]
save_path = self.get_path(v[1]) / self.get_task_family()
if v[0] == 'csv':
output[k] = MidasCSVTarget(path=save_path, name = k, task_id=self.task_id_hash, extension = extension)
elif v[0] == 'pq':
output[k] = MidasPqTarget(path=save_path, name = k, task_id=self.task_id_hash, extension = extension)
elif v[0] == 'pkl':
output[k] = MidasPickleTarget(path=save_path, name = k, task_id=self.task_id_hash, extension = extension)
elif v[0] == 'docx':
output[k] = MidasDocxTarget(path=save_path, name = k, task_id=self.task_id_hash, extension = extension)
elif v[0] == 'cache':
output[k] = MidasCacheTarget(name = k, task_id=self.task_id_hash)
else:
raise ValueError('Formato de objeto no implmentado: ' + v[0])
return output
def generate_profiling(self):
reports_path = self.get_path('reports')
for k,v in self.output().items():
if isinstance(v,MidasPandasTarget):
v.generate_profiling(reports_path,self.str_parameters())
def get_path(self,tipo_ubicacion):
"""
Este método se debe implementar en cada clase final que extienda de MidasTask. Debe definir dónde se guarda cada tipo de objeto.
No se debe añadir el nombre de la tarea (eso se hace automáticamente después)
Debe devolver un objeto tipo Path.
Ejemplo de implementación:
project_root = Path(os.path.abspath(os.getcwd())).parent
if tipo_ubicacion == 'data_processed':
return project_root / 'data' / 'processed'
...
"""
raise Exception("get_path() not implemented")
def str_parameters(self):
params_text = ""
for k in self.param_kwargs.keys():
params_text = params_text + '{key} tiene el valor {value}\n'.format(key=k, value=self.param_kwargs[k])
return params_text
def common_params(dict_params, task_cls):
"""
Grab all the values in dict_params that are found in task_cls.
Función similar a luigi.utils.common_params, pero no entre instancia y clase sino entre diccionario y clase
"""
dict_param_names = dict_params.keys()
task_cls_params_dict = dict(task_cls.get_params())
task_cls_param_names = task_cls_params_dict.keys()
common_param_names = set(dict_param_names).intersection(set(task_cls_param_names))
common_param_vals = {key : dict_params[key] for key in common_param_names}
return common_param_vals
If anyone were interested in having this implemented (whether in d6tflow or in a new library) I would be delighted to colaborate. Just let me know.
Hi! d6tflow does have targets for both pickle and csv.
d6tflow.targets.PickleTarget
d6tflow.targets.CSVPandasTarget
Here are some more targets available to use.
https://d6tflow.readthedocs.io/en/latest/targets.html#core-task-targets-pandas
Hello,
the problem is not the lack of pre-defined targets. The problem is that de design of d6tflow makes it impossible to define different types of outputs for the same Task. For example, if you want to define a Task that persists a pandas dataframe and a pickled model, you cannot do it in d6tflow.