Dataframe has less than 2 non-NaN rows
karanalang opened this issue · 1 comments
Hello ,
we are using FB Prophet for forecasting, and it is working fine.Forecasting runs every 5 days.
I'm trying to enable backfill i.e. if there is processing issue resulting in gaps in forecasting, I want to be able to get the forecast for past days.
Here is the code for this :
@pandas_udf(result_schema, PandasUDFType.GROUPED_MAP)
def forecast_sent_recv(self, history_pd):
print(" in forecast_sent_recv ")
print(" history_pd ", history_pd[['applianceName','timeslot','cktName','tenantName']])
history_pd[['applianceName','timeslot','cktName','tenantName']].head(5)
# changing to timeslot_date, since timeslot is now in utc
history_pd['ds'] = history_pd['timeslot_date']
# 09/25/2023 - change for change
#curr_day = datetime.date.today()
curr_day = datetime.date.today() - datetime.timedelta(days=7)
today = np.datetime64(curr_day)
week1 = np.datetime64(curr_day + datetime.timedelta(days=7))
history_pd = history_pd[history_pd['ds'] < today]
# history_pd = history_pd.dropna(thresh=2)
print(" earliest date in history_pd, ascending=False ")
print(history_pd.sort_values(by='timeslot', ascending=False).head(1))
print(" latest date in history_pd, ascending=True ")
print(history_pd.sort_values(by='timeslot', ascending=True).head(1))
curr_pd = None
for pred in ['sentOctets', 'recvdOctets']:
history_pd['y'] = history_pd[pred].astype('int64')
# print(" history_pd.shape :: ", history_pd.shape, " history_pd.shape[0] :: ", history_pd.shape[0])
# instantiate the model, configure the parameters
model = Prophet()
model.fit(history_pd)
# configure predictions for y days
future_pd = model.make_future_dataframe(
periods=100800,
freq='10min',
include_history=True
)
# This is needed instead of just selecting lesser period because
# empty frames due to a failure in streaming job causes old data
# to be generated
future_pd = future_pd[future_pd['ds'] >= today]
future_pd = future_pd[future_pd['ds'] <= week1]
....... (Additional Code)
results = (
all_DF
.dropna()
.withColumn('total_rows', f_count('applianceName').over(window))
.where(col('total_rows') > lit(100))
.drop('total_rows')
.groupBy('applianceName', 'cktName')
.apply(self.forecast_sent_recv)
.withColumn('training_date', current_date())
)
all_DF.dropna() is called before the pandas_udf forecast_sent_recv() is called. This should drop all rows with any None columns.
However, I get the following error when i run this on the fill dataset.
count in all_DF :: 1657436
/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/pandas/group_ops.py:81: UserWarning: It is preferred to use 'applyInPandas' over this API. This API will be deprecated in the future releases. See SPARK-28264 for more details.
23/09/29 18:10:40 INFO org.apache.arrow.memory.BaseAllocator: Debug mode disabled.
23/09/29 18:10:40 INFO org.apache.arrow.memory.DefaultAllocationManagerOption: allocation manager type not specified, using netty as the default type
23/09/29 18:10:40 INFO org.apache.arrow.memory.CheckAllocator: Using DefaultAllocationManager at memory-netty-2.0.0.jar!/org/apache/arrow/memory/DefaultAllocationManagerFactory.class
23/09/29 18:10:50 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 144.0 in stage 20.0 (TID 560) (versa-intf-rate-predict-backfill-v1-w-3.c.versa-sml-googl.internal executor 12): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 604, in main
process()
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 596, in process
serializer.dump_stream(out_iter, outfile)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 273, in dump_stream
return ArrowStreamSerializer.dump_stream(self, init_stream_yield_batches(), stream)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 81, in dump_stream
for batch in iterator:
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 266, in init_stream_yield_batches
for series in iterator:
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 429, in mapper
return f(keys, vals)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 170, in <lambda>
return lambda k, v: [(wrapped(k, v), to_arrow_type(return_type))]
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 158, in wrapped
result = f(key, pd.concat(value_series, axis=1))
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/util.py", line 73, in wrapper
return f(*args, **kwargs)
File "/tmp/2eec4e36-fc45-4190-a2b8-a3422444f5c3/predictions_backfill_v1.py", line 157, in forecast_sent_recv
File "/opt/conda/default/lib/python3.8/site-packages/prophet/forecaster.py", line 1116, in fit
raise ValueError('Dataframe has less than 2 non-NaN rows.')
ValueError: Dataframe has less than 2 non-NaN rows.
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:517)
at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:99)
at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:49)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:470)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.agg_doAggregateWithoutKey_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:134)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:505)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:508)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Note ; when this run this for a single applianceId, the code goes through.
Also, to backfill - the following change is made:
# curr_day is set to : today - 7 days
curr_day = datetime.date.today() - datetime.timedelta(days=7)
today = np.datetime64(curr_day)
week1 = np.datetime64(curr_day + datetime.timedelta(days=7))
# include_history is set to true
future_pd = model.make_future_dataframe(
periods=100800,
freq='10min',
include_history=True
)
any ideas on how to fix/debug this ?
tia!
I had faced similar issue couple of days ago, maybe you can just skip the ApplianceId where count of NaN rows is less than 2. It worked for me
Check if there are at least two non-NaN rows
if history_pd['applianceId'].notnull().sum() < 2:
print(f"Skipping {compartment} due to insufficient data.")
continue # Skip to the next iteration