NVIDIA-Merlin/NVTabular

[BUG] When dataset increases, NVTabular workflow causes to Dtype discrepancy.

ldane opened this issue · 10 comments

ldane commented

When I increase the dataset duration from 28 days to 91days, I'm getting Dtype discrepancy.
image

Same code works with 28 days:
image

What would be the best way to troubleshoot this issue? Should I share pkl files for both workflow?

rnyak commented

@ldane hello. do you mind to clarify When I increase the dataset duration from 28 days to 91days, I'm getting Dtype discrepancy.? is that coming from your NVT workflow? do you mind to share the entire error stack pls? we would need your workflow pipeline code to check.

The original dtype of feed_date column in your raw parquet files are all datetime columns? do you have by any chance any string dtype in your feed_date column? any possibility you can check that?

If you can share your workflow script that gives you this error, and together with the parquet file of the day that gives you this error we can take a look. you can send it via email. thanks.

ldane commented

@rnyak, This is coming from NVT workflow. My training is completed successfully (NVTabular+T4Rec). After training, I'm getting this error while doing inference.

I'm loading the workflow and the dataset and doing a transform operation:
image

I'm using a pandas data frame. Is there anyway to figure out which row causes this exception? Maybe somehow I could increase the verbosity of logs generated by NVTabular?

I'm attaching some files, for your consideration.

ETL-code.ipynb.zip
workflow.pkl.zip

Full Stack:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[8], line 1
----> 1 user_profile.head(2)

File /usr/local/lib/python3.8/dist-packages/dask/dataframe/core.py:1256, in _Frame.head(self, n, npartitions, compute)
   1254 # No need to warn if we're already looking at all partitions
   1255 safe = npartitions != self.npartitions
-> 1256 return self._head(n=n, npartitions=npartitions, compute=compute, safe=safe)

File /usr/local/lib/python3.8/dist-packages/dask/dataframe/core.py:1290, in _Frame._head(self, n, npartitions, compute, safe)
   1285 result = new_dd_object(
   1286     graph, name, self._meta, [self.divisions[0], self.divisions[npartitions]]
   1287 )
   1289 if compute:
-> 1290     result = result.compute()
   1291 return result

File /usr/local/lib/python3.8/dist-packages/dask/base.py:315, in DaskMethodsMixin.compute(self, **kwargs)
    291 def compute(self, **kwargs):
    292     """Compute this dask collection
    293 
    294     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    313     dask.base.compute
    314     """
--> 315     (result,) = compute(self, traverse=False, **kwargs)
    316     return result

File /usr/local/lib/python3.8/dist-packages/dask/base.py:598, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    595     keys.append(x.__dask_keys__())
    596     postcomputes.append(x.__dask_postcompute__())
--> 598 results = schedule(dsk, keys, **kwargs)
    599 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File /usr/local/lib/python3.8/dist-packages/dask/local.py:557, in get_sync(dsk, keys, **kwargs)
    552 """A naive synchronous version of get_async
    553 
    554 Can be useful for debugging.
    555 """
    556 kwargs.pop("num_workers", None)  # if num_workers present, remove it
--> 557 return get_async(
    558     synchronous_executor.submit,
    559     synchronous_executor._max_workers,
    560     dsk,
    561     keys,
    562     **kwargs,
    563 )

File /usr/local/lib/python3.8/dist-packages/dask/local.py:500, in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
    498 while state["waiting"] or state["ready"] or state["running"]:
    499     fire_tasks(chunksize)
--> 500     for key, res_info, failed in queue_get(queue).result():
    501         if failed:
    502             exc, tb = loads(res_info)

File /usr/lib/python3.8/concurrent/futures/_base.py:437, in Future.result(self, timeout)
    435     raise CancelledError()
    436 elif self._state == FINISHED:
--> 437     return self.__get_result()
    439 self._condition.wait(timeout)
    441 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:

File /usr/lib/python3.8/concurrent/futures/_base.py:389, in Future.__get_result(self)
    387 if self._exception:
    388     try:
--> 389         raise self._exception
    390     finally:
    391         # Break a reference cycle with the exception in self._exception
    392         self = None

File /usr/local/lib/python3.8/dist-packages/dask/local.py:542, in SynchronousExecutor.submit(self, fn, *args, **kwargs)
    540 fut = Future()
    541 try:
--> 542     fut.set_result(fn(*args, **kwargs))
    543 except BaseException as e:
    544     fut.set_exception(e)

File /usr/local/lib/python3.8/dist-packages/dask/local.py:238, in batch_execute_tasks(it)
    234 def batch_execute_tasks(it):
    235     """
    236     Batch computing of multiple tasks with `execute_task`
    237     """
--> 238     return [execute_task(*a) for a in it]

File /usr/local/lib/python3.8/dist-packages/dask/local.py:238, in <listcomp>(.0)
    234 def batch_execute_tasks(it):
    235     """
    236     Batch computing of multiple tasks with `execute_task`
    237     """
--> 238     return [execute_task(*a) for a in it]

File /usr/local/lib/python3.8/dist-packages/dask/local.py:229, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    227     failed = False
    228 except BaseException as e:
--> 229     result = pack_exception(e, dumps)
    230     failed = True
    231 return key, result, failed

File /usr/local/lib/python3.8/dist-packages/dask/local.py:224, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    222 try:
    223     task, data = loads(task_info)
--> 224     result = _execute_task(task, data)
    225     id = get_id()
    226     result = dumps((result, id))

File /usr/local/lib/python3.8/dist-packages/dask/core.py:119, in _execute_task(arg, cache, dsk)
    115     func, args = arg[0], arg[1:]
    116     # Note: Don't assign the subtask results to a variable. numpy detects
    117     # temporaries by their reference count and can execute certain
    118     # operations in-place.
--> 119     return func(*(_execute_task(a, cache) for a in args))
    120 elif not ishashable(arg):
    121     return arg

File /usr/local/lib/python3.8/dist-packages/dask/optimization.py:990, in SubgraphCallable.__call__(self, *args)
    988 if not len(args) == len(self.inkeys):
    989     raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 990 return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))

File /usr/local/lib/python3.8/dist-packages/dask/core.py:149, in get(dsk, out, cache)
    147 for key in toposort(dsk):
    148     task = dsk[key]
--> 149     result = _execute_task(task, cache)
    150     cache[key] = result
    151 result = _execute_task(out, cache)

File /usr/local/lib/python3.8/dist-packages/dask/core.py:119, in _execute_task(arg, cache, dsk)
    115     func, args = arg[0], arg[1:]
    116     # Note: Don't assign the subtask results to a variable. numpy detects
    117     # temporaries by their reference count and can execute certain
    118     # operations in-place.
--> 119     return func(*(_execute_task(a, cache) for a in args))
    120 elif not ishashable(arg):
    121     return arg

File /usr/local/lib/python3.8/dist-packages/dask/utils.py:41, in apply(func, args, kwargs)
     39 def apply(func, args, kwargs=None):
     40     if kwargs:
---> 41         return func(*args, **kwargs)
     42     else:
     43         return func(*args)

File /usr/local/lib/python3.8/dist-packages/merlin/dag/executors.py:69, in LocalExecutor.transform(self, transformable, graph, output_dtypes, additional_columns, capture_dtypes)
     66 output_data = None
     68 for node in nodes:
---> 69     input_data = self._build_input_data(node, transformable, capture_dtypes=capture_dtypes)
     71     if node.op:
     72         transformed_data = self._transform_data(
     73             node, input_data, capture_dtypes=capture_dtypes
     74         )

File /usr/local/lib/python3.8/dist-packages/merlin/dag/executors.py:116, in LocalExecutor._build_input_data(self, node, transformable, capture_dtypes)
    114 for parent in node.parents_with_dependencies:
    115     parent_output_cols = _get_unique(parent.output_schema.column_names)
--> 116     parent_data = self.transform(transformable, [parent], capture_dtypes=capture_dtypes)
    117     if input_data is None or not len(input_data):
    118         input_data = parent_data[parent_output_cols]

File /usr/local/lib/python3.8/dist-packages/merlin/dag/executors.py:69, in LocalExecutor.transform(self, transformable, graph, output_dtypes, additional_columns, capture_dtypes)
     66 output_data = None
     68 for node in nodes:
---> 69     input_data = self._build_input_data(node, transformable, capture_dtypes=capture_dtypes)
     71     if node.op:
     72         transformed_data = self._transform_data(
     73             node, input_data, capture_dtypes=capture_dtypes
     74         )

File /usr/local/lib/python3.8/dist-packages/merlin/dag/executors.py:116, in LocalExecutor._build_input_data(self, node, transformable, capture_dtypes)
    114 for parent in node.parents_with_dependencies:
    115     parent_output_cols = _get_unique(parent.output_schema.column_names)
--> 116     parent_data = self.transform(transformable, [parent], capture_dtypes=capture_dtypes)
    117     if input_data is None or not len(input_data):
    118         input_data = parent_data[parent_output_cols]

    [... skipping similar frames: LocalExecutor._build_input_data at line 116 (1 times), LocalExecutor.transform at line 69 (1 times)]

File /usr/local/lib/python3.8/dist-packages/merlin/dag/executors.py:69, in LocalExecutor.transform(self, transformable, graph, output_dtypes, additional_columns, capture_dtypes)
     66 output_data = None
     68 for node in nodes:
---> 69     input_data = self._build_input_data(node, transformable, capture_dtypes=capture_dtypes)
     71     if node.op:
     72         transformed_data = self._transform_data(
     73             node, input_data, capture_dtypes=capture_dtypes
     74         )

File /usr/local/lib/python3.8/dist-packages/merlin/dag/executors.py:116, in LocalExecutor._build_input_data(self, node, transformable, capture_dtypes)
    114 for parent in node.parents_with_dependencies:
    115     parent_output_cols = _get_unique(parent.output_schema.column_names)
--> 116     parent_data = self.transform(transformable, [parent], capture_dtypes=capture_dtypes)
    117     if input_data is None or not len(input_data):
    118         input_data = parent_data[parent_output_cols]

File /usr/local/lib/python3.8/dist-packages/merlin/dag/executors.py:72, in LocalExecutor.transform(self, transformable, graph, output_dtypes, additional_columns, capture_dtypes)
     69 input_data = self._build_input_data(node, transformable, capture_dtypes=capture_dtypes)
     71 if node.op:
---> 72     transformed_data = self._transform_data(
     73         node, input_data, capture_dtypes=capture_dtypes
     74     )
     75 else:
     76     transformed_data = input_data

File /usr/local/lib/python3.8/dist-packages/merlin/dag/executors.py:191, in LocalExecutor._transform_data(self, node, input_data, capture_dtypes)
    189         elif len(output_data):
    190             if output_col_schema.dtype != output_data_schema.dtype:
--> 191                 raise TypeError(
    192                     f"Dtype discrepancy detected for column {col_name}: "
    193                     f"operator {node.op.label} reported dtype "
    194                     f"`{output_col_schema.dtype}` but returned dtype "
    195                     f"`{output_data_schema.dtype}`."
    196                 )
    197 except Exception:
    198     LOG.exception("Failed to transform operator %s", node.op)

TypeError: Dtype discrepancy detected for column feed_date: operator SelectionOp reported dtype `object` but returned dtype `datetime64[s]`.
rnyak commented

@rnyak, This is coming from NVT workflow. My training is completed successfully (NVTabular+T4Rec). After training, I'm getting this error while doing inference.

I'm loading the workflow and the dataset and doing a transform operation: image

I'm using a pandas data frame. Is there anyway to figure out which row causes this exception? Maybe somehow I could increase the verbosity of logs generated by NVTabular?

I'm attaching some files, for your consideration.

ETL-code.ipynb.zip workflow.pkl.zip

Full Stack:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[8], line 1
----> 1 user_profile.head(2)

File /usr/local/lib/python3.8/dist-packages/dask/dataframe/core.py:1256, in _Frame.head(self, n, npartitions, compute)
   1254 # No need to warn if we're already looking at all partitions
   1255 safe = npartitions != self.npartitions
-> 1256 return self._head(n=n, npartitions=npartitions, compute=compute, safe=safe)

File /usr/local/lib/python3.8/dist-packages/dask/dataframe/core.py:1290, in _Frame._head(self, n, npartitions, compute, safe)
   1285 result = new_dd_object(
   1286     graph, name, self._meta, [self.divisions[0], self.divisions[npartitions]]
   1287 )
   1289 if compute:
-> 1290     result = result.compute()
   1291 return result

File /usr/local/lib/python3.8/dist-packages/dask/base.py:315, in DaskMethodsMixin.compute(self, **kwargs)
    291 def compute(self, **kwargs):
    292     """Compute this dask collection
    293 
    294     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    313     dask.base.compute
    314     """
--> 315     (result,) = compute(self, traverse=False, **kwargs)
    316     return result

File /usr/local/lib/python3.8/dist-packages/dask/base.py:598, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    595     keys.append(x.__dask_keys__())
    596     postcomputes.append(x.__dask_postcompute__())
--> 598 results = schedule(dsk, keys, **kwargs)
    599 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File /usr/local/lib/python3.8/dist-packages/dask/local.py:557, in get_sync(dsk, keys, **kwargs)
    552 """A naive synchronous version of get_async
    553 
    554 Can be useful for debugging.
    555 """
    556 kwargs.pop("num_workers", None)  # if num_workers present, remove it
--> 557 return get_async(
    558     synchronous_executor.submit,
    559     synchronous_executor._max_workers,
    560     dsk,
    561     keys,
    562     **kwargs,
    563 )

File /usr/local/lib/python3.8/dist-packages/dask/local.py:500, in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
    498 while state["waiting"] or state["ready"] or state["running"]:
    499     fire_tasks(chunksize)
--> 500     for key, res_info, failed in queue_get(queue).result():
    501         if failed:
    502             exc, tb = loads(res_info)

File /usr/lib/python3.8/concurrent/futures/_base.py:437, in Future.result(self, timeout)
    435     raise CancelledError()
    436 elif self._state == FINISHED:
--> 437     return self.__get_result()
    439 self._condition.wait(timeout)
    441 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:

File /usr/lib/python3.8/concurrent/futures/_base.py:389, in Future.__get_result(self)
    387 if self._exception:
    388     try:
--> 389         raise self._exception
    390     finally:
    391         # Break a reference cycle with the exception in self._exception
    392         self = None

File /usr/local/lib/python3.8/dist-packages/dask/local.py:542, in SynchronousExecutor.submit(self, fn, *args, **kwargs)
    540 fut = Future()
    541 try:
--> 542     fut.set_result(fn(*args, **kwargs))
    543 except BaseException as e:
    544     fut.set_exception(e)

File /usr/local/lib/python3.8/dist-packages/dask/local.py:238, in batch_execute_tasks(it)
    234 def batch_execute_tasks(it):
    235     """
    236     Batch computing of multiple tasks with `execute_task`
    237     """
--> 238     return [execute_task(*a) for a in it]

File /usr/local/lib/python3.8/dist-packages/dask/local.py:238, in <listcomp>(.0)
    234 def batch_execute_tasks(it):
    235     """
    236     Batch computing of multiple tasks with `execute_task`
    237     """
--> 238     return [execute_task(*a) for a in it]

File /usr/local/lib/python3.8/dist-packages/dask/local.py:229, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    227     failed = False
    228 except BaseException as e:
--> 229     result = pack_exception(e, dumps)
    230     failed = True
    231 return key, result, failed

File /usr/local/lib/python3.8/dist-packages/dask/local.py:224, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    222 try:
    223     task, data = loads(task_info)
--> 224     result = _execute_task(task, data)
    225     id = get_id()
    226     result = dumps((result, id))

File /usr/local/lib/python3.8/dist-packages/dask/core.py:119, in _execute_task(arg, cache, dsk)
    115     func, args = arg[0], arg[1:]
    116     # Note: Don't assign the subtask results to a variable. numpy detects
    117     # temporaries by their reference count and can execute certain
    118     # operations in-place.
--> 119     return func(*(_execute_task(a, cache) for a in args))
    120 elif not ishashable(arg):
    121     return arg

File /usr/local/lib/python3.8/dist-packages/dask/optimization.py:990, in SubgraphCallable.__call__(self, *args)
    988 if not len(args) == len(self.inkeys):
    989     raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 990 return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))

File /usr/local/lib/python3.8/dist-packages/dask/core.py:149, in get(dsk, out, cache)
    147 for key in toposort(dsk):
    148     task = dsk[key]
--> 149     result = _execute_task(task, cache)
    150     cache[key] = result
    151 result = _execute_task(out, cache)

File /usr/local/lib/python3.8/dist-packages/dask/core.py:119, in _execute_task(arg, cache, dsk)
    115     func, args = arg[0], arg[1:]
    116     # Note: Don't assign the subtask results to a variable. numpy detects
    117     # temporaries by their reference count and can execute certain
    118     # operations in-place.
--> 119     return func(*(_execute_task(a, cache) for a in args))
    120 elif not ishashable(arg):
    121     return arg

File /usr/local/lib/python3.8/dist-packages/dask/utils.py:41, in apply(func, args, kwargs)
     39 def apply(func, args, kwargs=None):
     40     if kwargs:
---> 41         return func(*args, **kwargs)
     42     else:
     43         return func(*args)

File /usr/local/lib/python3.8/dist-packages/merlin/dag/executors.py:69, in LocalExecutor.transform(self, transformable, graph, output_dtypes, additional_columns, capture_dtypes)
     66 output_data = None
     68 for node in nodes:
---> 69     input_data = self._build_input_data(node, transformable, capture_dtypes=capture_dtypes)
     71     if node.op:
     72         transformed_data = self._transform_data(
     73             node, input_data, capture_dtypes=capture_dtypes
     74         )

File /usr/local/lib/python3.8/dist-packages/merlin/dag/executors.py:116, in LocalExecutor._build_input_data(self, node, transformable, capture_dtypes)
    114 for parent in node.parents_with_dependencies:
    115     parent_output_cols = _get_unique(parent.output_schema.column_names)
--> 116     parent_data = self.transform(transformable, [parent], capture_dtypes=capture_dtypes)
    117     if input_data is None or not len(input_data):
    118         input_data = parent_data[parent_output_cols]

File /usr/local/lib/python3.8/dist-packages/merlin/dag/executors.py:69, in LocalExecutor.transform(self, transformable, graph, output_dtypes, additional_columns, capture_dtypes)
     66 output_data = None
     68 for node in nodes:
---> 69     input_data = self._build_input_data(node, transformable, capture_dtypes=capture_dtypes)
     71     if node.op:
     72         transformed_data = self._transform_data(
     73             node, input_data, capture_dtypes=capture_dtypes
     74         )

File /usr/local/lib/python3.8/dist-packages/merlin/dag/executors.py:116, in LocalExecutor._build_input_data(self, node, transformable, capture_dtypes)
    114 for parent in node.parents_with_dependencies:
    115     parent_output_cols = _get_unique(parent.output_schema.column_names)
--> 116     parent_data = self.transform(transformable, [parent], capture_dtypes=capture_dtypes)
    117     if input_data is None or not len(input_data):
    118         input_data = parent_data[parent_output_cols]

    [... skipping similar frames: LocalExecutor._build_input_data at line 116 (1 times), LocalExecutor.transform at line 69 (1 times)]

File /usr/local/lib/python3.8/dist-packages/merlin/dag/executors.py:69, in LocalExecutor.transform(self, transformable, graph, output_dtypes, additional_columns, capture_dtypes)
     66 output_data = None
     68 for node in nodes:
---> 69     input_data = self._build_input_data(node, transformable, capture_dtypes=capture_dtypes)
     71     if node.op:
     72         transformed_data = self._transform_data(
     73             node, input_data, capture_dtypes=capture_dtypes
     74         )

File /usr/local/lib/python3.8/dist-packages/merlin/dag/executors.py:116, in LocalExecutor._build_input_data(self, node, transformable, capture_dtypes)
    114 for parent in node.parents_with_dependencies:
    115     parent_output_cols = _get_unique(parent.output_schema.column_names)
--> 116     parent_data = self.transform(transformable, [parent], capture_dtypes=capture_dtypes)
    117     if input_data is None or not len(input_data):
    118         input_data = parent_data[parent_output_cols]

File /usr/local/lib/python3.8/dist-packages/merlin/dag/executors.py:72, in LocalExecutor.transform(self, transformable, graph, output_dtypes, additional_columns, capture_dtypes)
     69 input_data = self._build_input_data(node, transformable, capture_dtypes=capture_dtypes)
     71 if node.op:
---> 72     transformed_data = self._transform_data(
     73         node, input_data, capture_dtypes=capture_dtypes
     74     )
     75 else:
     76     transformed_data = input_data

File /usr/local/lib/python3.8/dist-packages/merlin/dag/executors.py:191, in LocalExecutor._transform_data(self, node, input_data, capture_dtypes)
    189         elif len(output_data):
    190             if output_col_schema.dtype != output_data_schema.dtype:
--> 191                 raise TypeError(
    192                     f"Dtype discrepancy detected for column {col_name}: "
    193                     f"operator {node.op.label} reported dtype "
    194                     f"`{output_col_schema.dtype}` but returned dtype "
    195                     f"`{output_data_schema.dtype}`."
    196                 )
    197 except Exception:
    198     LOG.exception("Failed to transform operator %s", node.op)

TypeError: Dtype discrepancy detected for column feed_date: operator SelectionOp reported dtype `object` but returned dtype `datetime64[s]`.

@ldane From your workflow code I see that you are trying to use feed_date column as it is, which has a datetime dtype (please correct me if I misread this). We recommend to convert this feed_date datetime dtype to timestamp of int dtype at up front. as we do in this example. Please see cell 6. This way you will also avoid any potential issue if you want to serve your model with Triton. You might get error from Triton due to dtype datetime64[s] as well.

I see you have event_time_ts. Is that same with feed_date column? If yes you can use event_time_ts column. If not same, I'd recommend to convert feed_date to integer timestamp column and use that way. Hope that helps.

ldane commented

@rnyak I shouldn't be using feed_date column at all. It is there to keep track of the data partitions. If I was using any operations, I could understand it might be causing issues.

Are you saying even though I don't do any modifications, It is not advised to use SelectOperator with unsupported data types?

Is there any documentation that allows us to see which data types are supported by NVTabular and Triton?

rnyak commented

There are two points here to be considered

  • your NVT workflow should be able to work as it is. You can feed datetime64[s] dtype column and return it from NVT workflow. You are already doing it with your train set. But the issue here is one of the files in your test dataset might be corrupted dtype and it is not datetime, but object (string). It is hard to tell which row because we cannot detect that from error stack. We dont have an arg or flag to increase the verbosity of logs generated by NVTabular. So as a workaround solution, if you could convert feed_date column to int timestamp and feed that column to NVT workflow, this might help with the error you are getting. Otherwise you need to find this corrupted file(s) and the row(s) in this file(s).
  • For Triton supported dtypes, I can share this doc for now, but I can gather more info and get back to you about datetime dtype.

hope that helps.

@karlhigley would you like to add something else?

ldane commented

@rnyak I'll explain it better. In this two cases (28 days vs 91 days), the test set is exactly same. There aren't multiple files. I'm using only one day worth of data to verify the inference is working. The data is retrieved and kept in memory as Pandas DataFrame. I verified all of rows for feed_date has exact same value. Therefore, feed_date isn't corrupted as you described.

Let's see if we can't find a way to figure out what is going on, before trying to solve this with workarounds.

rnyak commented

@ldane thanks for extra info. Can you please confirm my understanding:

  • you use 28 days for train set, you fit the workflow (it works), and then you transform the test set (single file) and transforming the test set works too.
  • you use 91 days for training set, you can fit the workflow (it works), but you cannot transform the test set, you get dtype error you mentioned above.

your ETL pipeline shows you read data (parquet files ) from path like that train_data = Dataset(f"{PIPELINE_ROOT}/2023-03-22/data/bq-export/*.parquet"). that's fine.

can you please print and share the train data dtypes, and sessions_gdf dtypes here?

I am asking these questions to be able to generate an example pipeline with a synthetic data set to repro your error.

ldane commented

28d train_data:
image
28d sessions_gdf:
image

91d train_data:
image
91d sessions_gdf:
image

rnyak commented

@ldane thanks. your raw train data feed_date column is string it is not datetime.. I thought it is datetime object. it is strange that you are getting SelectionOp reported dtype object but returned dtype datetime64[s] error.

  • Is that what you are generating from BigQuery, i.e., string dtype for feed_rate col for raw parquet files?
  • does your test set have feed_date as string dtype as well?
  • do you know if your dataset has any nulls? particularly if your feed_date column might have nulls/Nones/NaNs?

I am not sql person but does that mean you generating your feed_rate col in test set with datetime dtype with that syntax?
if that's the case, that means your feed_rate column dtype does not match in train and test set. Meaning one is string one is date dtype.

    feed_date AS event_feed_date,
    CAST('{date}' AS DATE) feed_date

as a debugging step, do you mind removing feed_date from your workflow pipeline and fit your workflow with 91 files and transform your test set again? let's see if the issue is really with feed_date column or not.

(Note: in your ETL notebook you are not filtering the sequences based on min_seq_length. Please note that Transformer-based model requires at least two items per session for training and eval steps. I assume you dont have sessions less than 2 items, and that's why you dont add the filtering step in NVT workflow, but I just wanted to mention in any case :) ).

ldane commented

@rnyak Yes, the feed_date column in train data set is string. I'm generating that field as string from BigQuery. I see some on my test dataset, I'm using DATE data type. In the data frame, feed_date has dtype of dbdate.

I've just tried to cast feed_date into string and 91 days is also working. Now my question is, how is the 28days working with dbdate dtype and not causing the discrepancy? and How does the workflow for 91days verify the dtypes and fail?

This is the dtypes for test data:
image