[BUG] When dataset increases, NVTabular workflow causes to Dtype discrepancy.
ldane opened this issue · 10 comments
@ldane hello. do you mind to clarify When I increase the dataset duration from 28 days to 91days, I'm getting Dtype discrepancy.
? is that coming from your NVT workflow? do you mind to share the entire error stack pls? we would need your workflow pipeline code to check.
The original dtype of feed_date
column in your raw parquet files are all datetime columns? do you have by any chance any string dtype in your feed_date
column? any possibility you can check that?
If you can share your workflow script that gives you this error, and together with the parquet file of the day that gives you this error we can take a look. you can send it via email. thanks.
@rnyak, This is coming from NVT workflow. My training is completed successfully (NVTabular+T4Rec). After training, I'm getting this error while doing inference.
I'm loading the workflow and the dataset and doing a transform operation:
I'm using a pandas data frame. Is there anyway to figure out which row causes this exception? Maybe somehow I could increase the verbosity of logs generated by NVTabular?
I'm attaching some files, for your consideration.
ETL-code.ipynb.zip
workflow.pkl.zip
Full Stack:
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
Cell In[8], line 1
----> 1 user_profile.head(2)
File /usr/local/lib/python3.8/dist-packages/dask/dataframe/core.py:1256, in _Frame.head(self, n, npartitions, compute)
1254 # No need to warn if we're already looking at all partitions
1255 safe = npartitions != self.npartitions
-> 1256 return self._head(n=n, npartitions=npartitions, compute=compute, safe=safe)
File /usr/local/lib/python3.8/dist-packages/dask/dataframe/core.py:1290, in _Frame._head(self, n, npartitions, compute, safe)
1285 result = new_dd_object(
1286 graph, name, self._meta, [self.divisions[0], self.divisions[npartitions]]
1287 )
1289 if compute:
-> 1290 result = result.compute()
1291 return result
File /usr/local/lib/python3.8/dist-packages/dask/base.py:315, in DaskMethodsMixin.compute(self, **kwargs)
291 def compute(self, **kwargs):
292 """Compute this dask collection
293
294 This turns a lazy Dask collection into its in-memory equivalent.
(...)
313 dask.base.compute
314 """
--> 315 (result,) = compute(self, traverse=False, **kwargs)
316 return result
File /usr/local/lib/python3.8/dist-packages/dask/base.py:598, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
595 keys.append(x.__dask_keys__())
596 postcomputes.append(x.__dask_postcompute__())
--> 598 results = schedule(dsk, keys, **kwargs)
599 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
File /usr/local/lib/python3.8/dist-packages/dask/local.py:557, in get_sync(dsk, keys, **kwargs)
552 """A naive synchronous version of get_async
553
554 Can be useful for debugging.
555 """
556 kwargs.pop("num_workers", None) # if num_workers present, remove it
--> 557 return get_async(
558 synchronous_executor.submit,
559 synchronous_executor._max_workers,
560 dsk,
561 keys,
562 **kwargs,
563 )
File /usr/local/lib/python3.8/dist-packages/dask/local.py:500, in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
498 while state["waiting"] or state["ready"] or state["running"]:
499 fire_tasks(chunksize)
--> 500 for key, res_info, failed in queue_get(queue).result():
501 if failed:
502 exc, tb = loads(res_info)
File /usr/lib/python3.8/concurrent/futures/_base.py:437, in Future.result(self, timeout)
435 raise CancelledError()
436 elif self._state == FINISHED:
--> 437 return self.__get_result()
439 self._condition.wait(timeout)
441 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
File /usr/lib/python3.8/concurrent/futures/_base.py:389, in Future.__get_result(self)
387 if self._exception:
388 try:
--> 389 raise self._exception
390 finally:
391 # Break a reference cycle with the exception in self._exception
392 self = None
File /usr/local/lib/python3.8/dist-packages/dask/local.py:542, in SynchronousExecutor.submit(self, fn, *args, **kwargs)
540 fut = Future()
541 try:
--> 542 fut.set_result(fn(*args, **kwargs))
543 except BaseException as e:
544 fut.set_exception(e)
File /usr/local/lib/python3.8/dist-packages/dask/local.py:238, in batch_execute_tasks(it)
234 def batch_execute_tasks(it):
235 """
236 Batch computing of multiple tasks with `execute_task`
237 """
--> 238 return [execute_task(*a) for a in it]
File /usr/local/lib/python3.8/dist-packages/dask/local.py:238, in <listcomp>(.0)
234 def batch_execute_tasks(it):
235 """
236 Batch computing of multiple tasks with `execute_task`
237 """
--> 238 return [execute_task(*a) for a in it]
File /usr/local/lib/python3.8/dist-packages/dask/local.py:229, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
227 failed = False
228 except BaseException as e:
--> 229 result = pack_exception(e, dumps)
230 failed = True
231 return key, result, failed
File /usr/local/lib/python3.8/dist-packages/dask/local.py:224, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
222 try:
223 task, data = loads(task_info)
--> 224 result = _execute_task(task, data)
225 id = get_id()
226 result = dumps((result, id))
File /usr/local/lib/python3.8/dist-packages/dask/core.py:119, in _execute_task(arg, cache, dsk)
115 func, args = arg[0], arg[1:]
116 # Note: Don't assign the subtask results to a variable. numpy detects
117 # temporaries by their reference count and can execute certain
118 # operations in-place.
--> 119 return func(*(_execute_task(a, cache) for a in args))
120 elif not ishashable(arg):
121 return arg
File /usr/local/lib/python3.8/dist-packages/dask/optimization.py:990, in SubgraphCallable.__call__(self, *args)
988 if not len(args) == len(self.inkeys):
989 raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 990 return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
File /usr/local/lib/python3.8/dist-packages/dask/core.py:149, in get(dsk, out, cache)
147 for key in toposort(dsk):
148 task = dsk[key]
--> 149 result = _execute_task(task, cache)
150 cache[key] = result
151 result = _execute_task(out, cache)
File /usr/local/lib/python3.8/dist-packages/dask/core.py:119, in _execute_task(arg, cache, dsk)
115 func, args = arg[0], arg[1:]
116 # Note: Don't assign the subtask results to a variable. numpy detects
117 # temporaries by their reference count and can execute certain
118 # operations in-place.
--> 119 return func(*(_execute_task(a, cache) for a in args))
120 elif not ishashable(arg):
121 return arg
File /usr/local/lib/python3.8/dist-packages/dask/utils.py:41, in apply(func, args, kwargs)
39 def apply(func, args, kwargs=None):
40 if kwargs:
---> 41 return func(*args, **kwargs)
42 else:
43 return func(*args)
File /usr/local/lib/python3.8/dist-packages/merlin/dag/executors.py:69, in LocalExecutor.transform(self, transformable, graph, output_dtypes, additional_columns, capture_dtypes)
66 output_data = None
68 for node in nodes:
---> 69 input_data = self._build_input_data(node, transformable, capture_dtypes=capture_dtypes)
71 if node.op:
72 transformed_data = self._transform_data(
73 node, input_data, capture_dtypes=capture_dtypes
74 )
File /usr/local/lib/python3.8/dist-packages/merlin/dag/executors.py:116, in LocalExecutor._build_input_data(self, node, transformable, capture_dtypes)
114 for parent in node.parents_with_dependencies:
115 parent_output_cols = _get_unique(parent.output_schema.column_names)
--> 116 parent_data = self.transform(transformable, [parent], capture_dtypes=capture_dtypes)
117 if input_data is None or not len(input_data):
118 input_data = parent_data[parent_output_cols]
File /usr/local/lib/python3.8/dist-packages/merlin/dag/executors.py:69, in LocalExecutor.transform(self, transformable, graph, output_dtypes, additional_columns, capture_dtypes)
66 output_data = None
68 for node in nodes:
---> 69 input_data = self._build_input_data(node, transformable, capture_dtypes=capture_dtypes)
71 if node.op:
72 transformed_data = self._transform_data(
73 node, input_data, capture_dtypes=capture_dtypes
74 )
File /usr/local/lib/python3.8/dist-packages/merlin/dag/executors.py:116, in LocalExecutor._build_input_data(self, node, transformable, capture_dtypes)
114 for parent in node.parents_with_dependencies:
115 parent_output_cols = _get_unique(parent.output_schema.column_names)
--> 116 parent_data = self.transform(transformable, [parent], capture_dtypes=capture_dtypes)
117 if input_data is None or not len(input_data):
118 input_data = parent_data[parent_output_cols]
[... skipping similar frames: LocalExecutor._build_input_data at line 116 (1 times), LocalExecutor.transform at line 69 (1 times)]
File /usr/local/lib/python3.8/dist-packages/merlin/dag/executors.py:69, in LocalExecutor.transform(self, transformable, graph, output_dtypes, additional_columns, capture_dtypes)
66 output_data = None
68 for node in nodes:
---> 69 input_data = self._build_input_data(node, transformable, capture_dtypes=capture_dtypes)
71 if node.op:
72 transformed_data = self._transform_data(
73 node, input_data, capture_dtypes=capture_dtypes
74 )
File /usr/local/lib/python3.8/dist-packages/merlin/dag/executors.py:116, in LocalExecutor._build_input_data(self, node, transformable, capture_dtypes)
114 for parent in node.parents_with_dependencies:
115 parent_output_cols = _get_unique(parent.output_schema.column_names)
--> 116 parent_data = self.transform(transformable, [parent], capture_dtypes=capture_dtypes)
117 if input_data is None or not len(input_data):
118 input_data = parent_data[parent_output_cols]
File /usr/local/lib/python3.8/dist-packages/merlin/dag/executors.py:72, in LocalExecutor.transform(self, transformable, graph, output_dtypes, additional_columns, capture_dtypes)
69 input_data = self._build_input_data(node, transformable, capture_dtypes=capture_dtypes)
71 if node.op:
---> 72 transformed_data = self._transform_data(
73 node, input_data, capture_dtypes=capture_dtypes
74 )
75 else:
76 transformed_data = input_data
File /usr/local/lib/python3.8/dist-packages/merlin/dag/executors.py:191, in LocalExecutor._transform_data(self, node, input_data, capture_dtypes)
189 elif len(output_data):
190 if output_col_schema.dtype != output_data_schema.dtype:
--> 191 raise TypeError(
192 f"Dtype discrepancy detected for column {col_name}: "
193 f"operator {node.op.label} reported dtype "
194 f"`{output_col_schema.dtype}` but returned dtype "
195 f"`{output_data_schema.dtype}`."
196 )
197 except Exception:
198 LOG.exception("Failed to transform operator %s", node.op)
TypeError: Dtype discrepancy detected for column feed_date: operator SelectionOp reported dtype `object` but returned dtype `datetime64[s]`.
@rnyak, This is coming from NVT workflow. My training is completed successfully (NVTabular+T4Rec). After training, I'm getting this error while doing inference.
I'm loading the workflow and the dataset and doing a transform operation:
I'm using a pandas data frame. Is there anyway to figure out which row causes this exception? Maybe somehow I could increase the verbosity of logs generated by NVTabular?
I'm attaching some files, for your consideration.
ETL-code.ipynb.zip workflow.pkl.zip
Full Stack:
--------------------------------------------------------------------------- TypeError Traceback (most recent call last) Cell In[8], line 1 ----> 1 user_profile.head(2) File /usr/local/lib/python3.8/dist-packages/dask/dataframe/core.py:1256, in _Frame.head(self, n, npartitions, compute) 1254 # No need to warn if we're already looking at all partitions 1255 safe = npartitions != self.npartitions -> 1256 return self._head(n=n, npartitions=npartitions, compute=compute, safe=safe) File /usr/local/lib/python3.8/dist-packages/dask/dataframe/core.py:1290, in _Frame._head(self, n, npartitions, compute, safe) 1285 result = new_dd_object( 1286 graph, name, self._meta, [self.divisions[0], self.divisions[npartitions]] 1287 ) 1289 if compute: -> 1290 result = result.compute() 1291 return result File /usr/local/lib/python3.8/dist-packages/dask/base.py:315, in DaskMethodsMixin.compute(self, **kwargs) 291 def compute(self, **kwargs): 292 """Compute this dask collection 293 294 This turns a lazy Dask collection into its in-memory equivalent. (...) 313 dask.base.compute 314 """ --> 315 (result,) = compute(self, traverse=False, **kwargs) 316 return result File /usr/local/lib/python3.8/dist-packages/dask/base.py:598, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs) 595 keys.append(x.__dask_keys__()) 596 postcomputes.append(x.__dask_postcompute__()) --> 598 results = schedule(dsk, keys, **kwargs) 599 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)]) File /usr/local/lib/python3.8/dist-packages/dask/local.py:557, in get_sync(dsk, keys, **kwargs) 552 """A naive synchronous version of get_async 553 554 Can be useful for debugging. 555 """ 556 kwargs.pop("num_workers", None) # if num_workers present, remove it --> 557 return get_async( 558 synchronous_executor.submit, 559 synchronous_executor._max_workers, 560 dsk, 561 keys, 562 **kwargs, 563 ) File /usr/local/lib/python3.8/dist-packages/dask/local.py:500, in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs) 498 while state["waiting"] or state["ready"] or state["running"]: 499 fire_tasks(chunksize) --> 500 for key, res_info, failed in queue_get(queue).result(): 501 if failed: 502 exc, tb = loads(res_info) File /usr/lib/python3.8/concurrent/futures/_base.py:437, in Future.result(self, timeout) 435 raise CancelledError() 436 elif self._state == FINISHED: --> 437 return self.__get_result() 439 self._condition.wait(timeout) 441 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: File /usr/lib/python3.8/concurrent/futures/_base.py:389, in Future.__get_result(self) 387 if self._exception: 388 try: --> 389 raise self._exception 390 finally: 391 # Break a reference cycle with the exception in self._exception 392 self = None File /usr/local/lib/python3.8/dist-packages/dask/local.py:542, in SynchronousExecutor.submit(self, fn, *args, **kwargs) 540 fut = Future() 541 try: --> 542 fut.set_result(fn(*args, **kwargs)) 543 except BaseException as e: 544 fut.set_exception(e) File /usr/local/lib/python3.8/dist-packages/dask/local.py:238, in batch_execute_tasks(it) 234 def batch_execute_tasks(it): 235 """ 236 Batch computing of multiple tasks with `execute_task` 237 """ --> 238 return [execute_task(*a) for a in it] File /usr/local/lib/python3.8/dist-packages/dask/local.py:238, in <listcomp>(.0) 234 def batch_execute_tasks(it): 235 """ 236 Batch computing of multiple tasks with `execute_task` 237 """ --> 238 return [execute_task(*a) for a in it] File /usr/local/lib/python3.8/dist-packages/dask/local.py:229, in execute_task(key, task_info, dumps, loads, get_id, pack_exception) 227 failed = False 228 except BaseException as e: --> 229 result = pack_exception(e, dumps) 230 failed = True 231 return key, result, failed File /usr/local/lib/python3.8/dist-packages/dask/local.py:224, in execute_task(key, task_info, dumps, loads, get_id, pack_exception) 222 try: 223 task, data = loads(task_info) --> 224 result = _execute_task(task, data) 225 id = get_id() 226 result = dumps((result, id)) File /usr/local/lib/python3.8/dist-packages/dask/core.py:119, in _execute_task(arg, cache, dsk) 115 func, args = arg[0], arg[1:] 116 # Note: Don't assign the subtask results to a variable. numpy detects 117 # temporaries by their reference count and can execute certain 118 # operations in-place. --> 119 return func(*(_execute_task(a, cache) for a in args)) 120 elif not ishashable(arg): 121 return arg File /usr/local/lib/python3.8/dist-packages/dask/optimization.py:990, in SubgraphCallable.__call__(self, *args) 988 if not len(args) == len(self.inkeys): 989 raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args))) --> 990 return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args))) File /usr/local/lib/python3.8/dist-packages/dask/core.py:149, in get(dsk, out, cache) 147 for key in toposort(dsk): 148 task = dsk[key] --> 149 result = _execute_task(task, cache) 150 cache[key] = result 151 result = _execute_task(out, cache) File /usr/local/lib/python3.8/dist-packages/dask/core.py:119, in _execute_task(arg, cache, dsk) 115 func, args = arg[0], arg[1:] 116 # Note: Don't assign the subtask results to a variable. numpy detects 117 # temporaries by their reference count and can execute certain 118 # operations in-place. --> 119 return func(*(_execute_task(a, cache) for a in args)) 120 elif not ishashable(arg): 121 return arg File /usr/local/lib/python3.8/dist-packages/dask/utils.py:41, in apply(func, args, kwargs) 39 def apply(func, args, kwargs=None): 40 if kwargs: ---> 41 return func(*args, **kwargs) 42 else: 43 return func(*args) File /usr/local/lib/python3.8/dist-packages/merlin/dag/executors.py:69, in LocalExecutor.transform(self, transformable, graph, output_dtypes, additional_columns, capture_dtypes) 66 output_data = None 68 for node in nodes: ---> 69 input_data = self._build_input_data(node, transformable, capture_dtypes=capture_dtypes) 71 if node.op: 72 transformed_data = self._transform_data( 73 node, input_data, capture_dtypes=capture_dtypes 74 ) File /usr/local/lib/python3.8/dist-packages/merlin/dag/executors.py:116, in LocalExecutor._build_input_data(self, node, transformable, capture_dtypes) 114 for parent in node.parents_with_dependencies: 115 parent_output_cols = _get_unique(parent.output_schema.column_names) --> 116 parent_data = self.transform(transformable, [parent], capture_dtypes=capture_dtypes) 117 if input_data is None or not len(input_data): 118 input_data = parent_data[parent_output_cols] File /usr/local/lib/python3.8/dist-packages/merlin/dag/executors.py:69, in LocalExecutor.transform(self, transformable, graph, output_dtypes, additional_columns, capture_dtypes) 66 output_data = None 68 for node in nodes: ---> 69 input_data = self._build_input_data(node, transformable, capture_dtypes=capture_dtypes) 71 if node.op: 72 transformed_data = self._transform_data( 73 node, input_data, capture_dtypes=capture_dtypes 74 ) File /usr/local/lib/python3.8/dist-packages/merlin/dag/executors.py:116, in LocalExecutor._build_input_data(self, node, transformable, capture_dtypes) 114 for parent in node.parents_with_dependencies: 115 parent_output_cols = _get_unique(parent.output_schema.column_names) --> 116 parent_data = self.transform(transformable, [parent], capture_dtypes=capture_dtypes) 117 if input_data is None or not len(input_data): 118 input_data = parent_data[parent_output_cols] [... skipping similar frames: LocalExecutor._build_input_data at line 116 (1 times), LocalExecutor.transform at line 69 (1 times)] File /usr/local/lib/python3.8/dist-packages/merlin/dag/executors.py:69, in LocalExecutor.transform(self, transformable, graph, output_dtypes, additional_columns, capture_dtypes) 66 output_data = None 68 for node in nodes: ---> 69 input_data = self._build_input_data(node, transformable, capture_dtypes=capture_dtypes) 71 if node.op: 72 transformed_data = self._transform_data( 73 node, input_data, capture_dtypes=capture_dtypes 74 ) File /usr/local/lib/python3.8/dist-packages/merlin/dag/executors.py:116, in LocalExecutor._build_input_data(self, node, transformable, capture_dtypes) 114 for parent in node.parents_with_dependencies: 115 parent_output_cols = _get_unique(parent.output_schema.column_names) --> 116 parent_data = self.transform(transformable, [parent], capture_dtypes=capture_dtypes) 117 if input_data is None or not len(input_data): 118 input_data = parent_data[parent_output_cols] File /usr/local/lib/python3.8/dist-packages/merlin/dag/executors.py:72, in LocalExecutor.transform(self, transformable, graph, output_dtypes, additional_columns, capture_dtypes) 69 input_data = self._build_input_data(node, transformable, capture_dtypes=capture_dtypes) 71 if node.op: ---> 72 transformed_data = self._transform_data( 73 node, input_data, capture_dtypes=capture_dtypes 74 ) 75 else: 76 transformed_data = input_data File /usr/local/lib/python3.8/dist-packages/merlin/dag/executors.py:191, in LocalExecutor._transform_data(self, node, input_data, capture_dtypes) 189 elif len(output_data): 190 if output_col_schema.dtype != output_data_schema.dtype: --> 191 raise TypeError( 192 f"Dtype discrepancy detected for column {col_name}: " 193 f"operator {node.op.label} reported dtype " 194 f"`{output_col_schema.dtype}` but returned dtype " 195 f"`{output_data_schema.dtype}`." 196 ) 197 except Exception: 198 LOG.exception("Failed to transform operator %s", node.op) TypeError: Dtype discrepancy detected for column feed_date: operator SelectionOp reported dtype `object` but returned dtype `datetime64[s]`.
@ldane From your workflow code I see that you are trying to use feed_date
column as it is, which has a datetime dtype (please correct me if I misread this). We recommend to convert this feed_date
datetime dtype to timestamp of int dtype at up front. as we do in this example. Please see cell 6. This way you will also avoid any potential issue if you want to serve your model with Triton. You might get error from Triton due to dtype datetime64[s]
as well.
I see you have event_time_ts
. Is that same with feed_date
column? If yes you can use event_time_ts
column. If not same, I'd recommend to convert feed_date
to integer timestamp column and use that way. Hope that helps.
@rnyak I shouldn't be using feed_date column at all. It is there to keep track of the data partitions. If I was using any operations, I could understand it might be causing issues.
Are you saying even though I don't do any modifications, It is not advised to use SelectOperator with unsupported data types?
Is there any documentation that allows us to see which data types are supported by NVTabular and Triton?
There are two points here to be considered
- your NVT workflow should be able to work as it is. You can feed
datetime64[s]
dtype column and return it from NVT workflow. You are already doing it with your train set. But the issue here is one of the files in your test dataset might be corrupted dtype and it is not datetime, but object (string). It is hard to tell which row because we cannot detect that from error stack. We dont have an arg or flag to increase the verbosity of logs generated by NVTabular. So as a workaround solution, if you could convertfeed_date
column toint
timestamp and feed that column to NVT workflow, this might help with the error you are getting. Otherwise you need to find this corrupted file(s) and the row(s) in this file(s). - For Triton supported dtypes, I can share this doc for now, but I can gather more info and get back to you about
datetime
dtype.
hope that helps.
@karlhigley would you like to add something else?
@rnyak I'll explain it better. In this two cases (28 days vs 91 days), the test set is exactly same. There aren't multiple files. I'm using only one day worth of data to verify the inference is working. The data is retrieved and kept in memory as Pandas DataFrame. I verified all of rows for feed_date has exact same value. Therefore, feed_date isn't corrupted as you described.
Let's see if we can't find a way to figure out what is going on, before trying to solve this with workarounds.
@ldane thanks for extra info. Can you please confirm my understanding:
- you use 28 days for train set, you fit the workflow (it works), and then you transform the test set (single file) and transforming the test set works too.
- you use 91 days for training set, you can fit the workflow (it works), but you cannot transform the test set, you get dtype error you mentioned above.
your ETL pipeline shows you read data (parquet files ) from path like that train_data = Dataset(f"{PIPELINE_ROOT}/2023-03-22/data/bq-export/*.parquet")
. that's fine.
can you please print and share the train data dtypes, and sessions_gdf dtypes here?
I am asking these questions to be able to generate an example pipeline with a synthetic data set to repro your error.
@ldane thanks. your raw train data feed_date column is string
it is not datetime.. I thought it is datetime object. it is strange that you are getting SelectionOp reported dtype object but returned dtype datetime64[s] error
.
- Is that what you are generating from BigQuery, i.e.,
string
dtype forfeed_rate
col for raw parquet files? - does your test set have
feed_date
as string dtype as well? - do you know if your dataset has any nulls? particularly if your
feed_date
column might have nulls/Nones/NaNs?
I am not sql person but does that mean you generating your feed_rate col in test set with datetime
dtype with that syntax?
if that's the case, that means your feed_rate
column dtype does not match in train and test set. Meaning one is string
one is date
dtype.
feed_date AS event_feed_date,
CAST('{date}' AS DATE) feed_date
as a debugging step, do you mind removing feed_date
from your workflow pipeline and fit your workflow with 91 files and transform your test set again? let's see if the issue is really with feed_date
column or not.
(Note: in your ETL notebook you are not filtering the sequences based on min_seq_length. Please note that Transformer-based model requires at least two items per session for training and eval steps. I assume you dont have sessions less than 2 items, and that's why you dont add the filtering step in NVT workflow, but I just wanted to mention in any case :) ).
@rnyak Yes, the feed_date column in train data set is string. I'm generating that field as string from BigQuery. I see some on my test dataset, I'm using DATE data type. In the data frame, feed_date has dtype of dbdate.
I've just tried to cast feed_date into string and 91 days is also working. Now my question is, how is the 28days working with dbdate dtype and not causing the discrepancy? and How does the workflow for 91days verify the dtypes and fail?