NVIDIA-Merlin/NVTabular

[BUG] workflow_fit_transform func not saving workflow output under the specified output_path

Tselmeg-C opened this issue · 11 comments

workflow_fit_transform func not saving workflow output under the specified output_path
I am using Merlin-Training Image: nvcr.io/nvidia/merlin/merlin-tensorflow-training:22.05
Is this bug associated with this image only? Cause I was running merlin-ts:22.12 and the workflow was stored under the specified location.
Another thing is when I set save_workflow = True, a 'Category' dir with my categorified features cols and a 'Workflow' dir will be created under the parent of the parent folder that I specified for output_path, and inside Workflow dir the categories will be double stored in another 'Category' dir.

Expected behavior
How do I only write 'Workflow' out to my desired output_path without writing 'Category'?

Code

# specify data pathes
import os
DATA_FOLDER = 'path'
train_path = os.path.join(DATA_FOLDER, "train", "train.parquet")
valid_path = os.path.join(DATA_FOLDER, "valid", "valid.parquet")
output_path = os.path.join(DATA_FOLDER, "processed/")
category_temp_directory = os.path.join(DATA_FOLDER)

import nvtabular as nvt
from merlin.schema.tags import Tags
from nvtabular.ops import Rename, Filter, Dropna, LambdaOp, Categorify, \
    TagAsUserFeatures, TagAsUserID, TagAsItemFeatures, TagAsItemID, AddMetadata

# specify dtypes of IDs and tag accordingly
user_id = ["sku"] >> LambdaOp(lambda col: col.astype("int32")) >> TagAsUserID()
item_id = ["branchcustomernbr"] >> LambdaOp(lambda col: col.astype("int32")) >> TagAsItemID()

# tokenize item features and tag them
item_features = (
    user_cat_feats >> Categorify(dtype="int32") >> TagAsItemFeatures()   #,out_path=category_temp_directory
)

# tokenize user features and tag them
user_features = (
    item_cat_feats >> Categorify(dtype="int32") >> TagAsUserFeatures()  #,out_path=category_temp_directory
)

# tag lable
targets = ["buy"] >> LambdaOp(lambda col: col.astype("int64")) >> AddMetadata(tags=[Tags.BINARY_CLASSIFICATION, "target"])

# assemble all cols
outputs = user_id + item_id + item_features + user_features + targets

from merlin.io.dataset import Dataset
train = Dataset(train_path, part_size="1000MB")
valid = Dataset(valid_path, part_size="1000MB")

from merlin.models.utils.example_utils import workflow_fit_transform
workflow_fit_transform(outputs, train_path, valid_path, output_path, save_workflow = True)

Environment details (please complete the following information):

  • Environment location: [Docker]
  • Method of NVTabular install: [Docker]
    • docker pull & docker run commands used:
      image

Additional context
The reason I am using this image instead of the other is that: I am going to build a kfp pipeline on Vertex AI, the merlin-ts:22.12 was just too big.

rnyak commented

@Tselmeg-C please use our merlin-tensorflow:23.02 or merlin-tensorflor:23.04 images. they are smaller sizes. We changed NVT code since merlin-tensorflow-training:22.05 image.

rnyak commented

How do I only write 'Workflow' out to my desired output_path without writing 'Category'?

do you mind clarifying your question?

  • do you want to save workflow to a desired path after fit_transform step?
  • do you not want to remove Categorify op?
  • do you want to save unique parquet files to a specific path?

@rnyak Thanks again! Pulling 23.02 right now. My goal is to build a kfp pipeline on GCP that includes broadly those steps:

  1. data preprocessing using nvt.workflow
  2. train a dlrm model using Merlin models
  3. do batch prediction using model.predict

do you want to save workflow to a desired path after fit_transform step?
Yes, because I am building up a kfp pipeline, and one component there should be able to define a nvt workflow, fit-transform given data and write the workflow to a desired path.
do you not want to remove Categorify op?
nope, I want to keep Categorify op, but I don't need to write the unique..parquet files out, just to keep the pipeline more efficient. Unless those parquet files are to be used for training a dlrm model? I am not clear about that.
do you want to save unique parquet files to a specific path?
nope, as long as they are not needed for the model.fit and model.predict steps. I would rather not write them to any location at all.

I found example repo building vertex ai pipeline based on the merlin-training image. Is there any recent example using a newer image using merlin-ts instead of HugeCTR?

rnyak commented

@Tselmeg-C

I want to keep Categorify op, but I don't need to write the unique..parquet files out,

These parquet files are written out to a default path (or you define the path if you want) . These parquet files are important for mapping back to the original item and user ids, in case it is required in the inference step. So keep them as they are in the folder where they are exported. You dont need to remove them.

if you want to save nvt workflow to a path just do that after nvt_worfklow.fit_transform() step:

workflow.save(os.path.join(DATA_FOLDER, "workflow"))

Is there any recent example using a newer image using merlin-ts instead of HugeCTR?

do you want to install Merlin on Vertex AI Workbench? can you pls clarify? thanks.

@rnyak Thanks!
I find the Categorify op on IDs quite confusing (it seems to be not just tokenizing IDs into continuous integers), so I used a custom function to tokenize the IDs before I do workflow.fit_transform and write the tokens extra out to a dict to use in the batch prediction step to map them back to the originals. That's why in the workflow definition I did not categorify my user and item IDs (continuous integers).

Currently, I am not using Triton for inference, I'm simply training a dlrm model and using model.predict to to batch predict and writing the result to DB. But I need to build a Vertex AI pipeline to automate the process.

I'm trying to build a kfp pipeline using Vertex AI pipeline, for that

  1. I need a smaller Merlin image to use as a base image to run the Merlin codes
  2. Improve pipeline efficiency by reducing unnecessary read and write lineage data, that's why trying to skip writing unique.feature.parquet, because currently don't need them to map IDs back.

But I am open to hearing some better ideas on how to do things more efficiently.

rnyak commented

@Tselmeg-C did you test merlin-tensorflow:23.02 or merlin-tensorflow:23.04 images? they are smaller size compared to previous images.

rnyak commented

I find the Categorify op on IDs quite confusing (it seems to be not just tokenizing IDs into continuous integers), so I used a custom function to tokenize the IDs before I do workflow.fit_transform and write the tokens extra out to a dict to use in the batch prediction step to map them back to the originals. That's why in the workflow definition I did not categorify my user and item IDs (continuous integers).

I dont think you need a custom funct to encode categorical features. Categorify is designed to encode categorical features. Can you tell us why you think Categorify is not just tokenizing IDs into continuous integers in your dataset? are you getting any errors?

Is your ["sku"] a very long string in your original dataset? can you send us only like 5-10 lines of data with only your sku and ["branchcustomernbr"] columns, so that we can check it out?

Can you please tell us what you get when you only do the following with your dataset?

import os
DATA_FOLDER = 'path'
train_path = os.path.join(DATA_FOLDER, "train", "train.parquet")
valid_path = os.path.join(DATA_FOLDER, "valid", "valid.parquet")
output_path = os.path.join(DATA_FOLDER, "processed/")
category_temp_directory = os.path.join(DATA_FOLDER)

import nvtabular as nvt
from merlin.schema.tags import Tags
from nvtabular.ops import Rename, Filter, Dropna, LambdaOp, Categorify, \
    TagAsUserFeatures, TagAsUserID, TagAsItemFeatures, TagAsItemID, AddMetadata

# specify dtypes of IDs and tag accordingly
user_id = ["sku"] >> Categorify()
item_id = ["branchcustomernbr"] >> Categorify()

# assemble all cols
outputs = user_id + item_id 

train = Dataset(train_path)
workflow = nvt.Workflow(outputs)
workflow.fit_transform(train).to_parquet(output_path)

This should create a categories folder in your pwd, and you should see the unique parquet files that do the mappings. and when you go to output path folder you will see your processed parquet file there.

@Tselmeg-C did you test merlin-tensorflow:23.02 or merlin-tensorflow:23.04 images? they are smaller size compared to previous images.

I have tested merlin-tensorflow:23.02, it is working so far. Thanks!

@rnyak If I recall I was trying to follow the multi-stage workflow at the beginning, and at the inference stage got some problem mapping those categorified IDs back to my originals, got some duplicates or so, that's why I thought Categorify is not only mapping 1:1 string to int and did it outside by myself, but I will double-check and let you know what happens. Thanks!

@rnyak Have looked closer at Categorify IDs, I think I thought the col_size from the unique.col.parquet file was the tokens, that's why when I use this to map back, I got duplicates issue, but actually, the row indexes were the right tokens to look at
image

Do you already have an example for mapping ID back to originals in a recommended way, I think there was an API to call, or?

rnyak commented

@Tselmeg-C yes row indexes are the encoded ids. Please note that https://github.com/NVIDIA-Merlin/Merlin/blob/main/examples/Building-and-deploying-multi-stage-RecSys/02-Deploying-multi-stage-RecSys-with-Merlin-Systems.ipynb is already returning raw (original) item ids.

we dont have an api to call for reverse mappign. but you can easily write a simple script like below to do the mapping.

import nvtabular as nvt
import cudf
df = cudf.DataFrame({'cust_nbr': ["User_A","User_E","User_B","User_C","User_A","User_B","User_B","User_C","User_B"]})
dataset = nvt.Dataset(df)
cats = ['cust_nbr'] >> nvt.ops.Categorify()
workflow = nvt.Workflow(cats)
new_gdf = workflow.fit_transform(dataset).compute()
# print transformed dataset
print(new_gdf)

# do reverse category mapping

import pandas as pd
cust_id_map = pd.read_parquet('./categories/unique.cust_nbr.parquet')
id_map = dict(zip(cust_id_map.index, cust_id_map.cust_nbr))
new_gdf.cust_nbr= new_gdf.cust_nbr.map(id_map)

you can modify that for your custom case.