facebook/prophet

Dataframe has less than 2 non-NaN rows

karanalang opened this issue · 1 comments

Hello ,
we are using FB Prophet for forecasting, and it is working fine.Forecasting runs every 5 days.

I'm trying to enable backfill i.e. if there is processing issue resulting in gaps in forecasting, I want to be able to get the forecast for past days.

Here is the code for this :

@pandas_udf(result_schema, PandasUDFType.GROUPED_MAP)
    def forecast_sent_recv(self, history_pd):

        print(" in forecast_sent_recv ")
        print(" history_pd ", history_pd[['applianceName','timeslot','cktName','tenantName']])

        history_pd[['applianceName','timeslot','cktName','tenantName']].head(5)

        # changing to timeslot_date, since timeslot is now in utc
        history_pd['ds'] = history_pd['timeslot_date']
        # 09/25/2023 - change for change
        #curr_day = datetime.date.today()
        curr_day = datetime.date.today() - datetime.timedelta(days=7)
        today = np.datetime64(curr_day)
        week1 = np.datetime64(curr_day + datetime.timedelta(days=7))

        history_pd = history_pd[history_pd['ds'] < today]
        # history_pd = history_pd.dropna(thresh=2)
        print(" earliest date in history_pd, ascending=False ")
        print(history_pd.sort_values(by='timeslot', ascending=False).head(1))

        print(" latest date in history_pd, ascending=True ")
        print(history_pd.sort_values(by='timeslot', ascending=True).head(1))

        curr_pd = None
        for pred in ['sentOctets', 'recvdOctets']:
            history_pd['y'] = history_pd[pred].astype('int64')

            # print(" history_pd.shape :: ", history_pd.shape, " history_pd.shape[0] :: ", history_pd.shape[0])

            # instantiate the model, configure the parameters
            model = Prophet()

            model.fit(history_pd)

            # configure predictions for y days
            future_pd = model.make_future_dataframe(
                periods=100800,
                freq='10min',
                include_history=True
            )

            # This is needed instead of just selecting lesser period because
            # empty frames due to a failure in streaming job causes old data
            # to be generated
            future_pd = future_pd[future_pd['ds'] >= today]
            future_pd = future_pd[future_pd['ds'] <= week1]
            
            ....... (Additional Code)



results = (
            all_DF
                .dropna()
                .withColumn('total_rows', f_count('applianceName').over(window))
                .where(col('total_rows') > lit(100))
                .drop('total_rows')
                .groupBy('applianceName', 'cktName')
                .apply(self.forecast_sent_recv)
                .withColumn('training_date', current_date())
        )


all_DF.dropna() is called before the pandas_udf forecast_sent_recv() is called. This should drop all rows with any None columns.

However, I get the following error when i run this on the fill dataset.

count in all_DF :: 1657436
/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/pandas/group_ops.py:81: UserWarning: It is preferred to use 'applyInPandas' over this API. This API will be deprecated in the future releases. See SPARK-28264 for more details.
23/09/29 18:10:40 INFO org.apache.arrow.memory.BaseAllocator: Debug mode disabled.
23/09/29 18:10:40 INFO org.apache.arrow.memory.DefaultAllocationManagerOption: allocation manager type not specified, using netty as the default type
23/09/29 18:10:40 INFO org.apache.arrow.memory.CheckAllocator: Using DefaultAllocationManager at memory-netty-2.0.0.jar!/org/apache/arrow/memory/DefaultAllocationManagerFactory.class
23/09/29 18:10:50 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 144.0 in stage 20.0 (TID 560) (versa-intf-rate-predict-backfill-v1-w-3.c.versa-sml-googl.internal executor 12): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 604, in main
    process()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 596, in process
    serializer.dump_stream(out_iter, outfile)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 273, in dump_stream
    return ArrowStreamSerializer.dump_stream(self, init_stream_yield_batches(), stream)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 81, in dump_stream
    for batch in iterator:
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 266, in init_stream_yield_batches
    for series in iterator:
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 429, in mapper
    return f(keys, vals)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 170, in <lambda>
    return lambda k, v: [(wrapped(k, v), to_arrow_type(return_type))]
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 158, in wrapped
    result = f(key, pd.concat(value_series, axis=1))
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/util.py", line 73, in wrapper
    return f(*args, **kwargs)
  File "/tmp/2eec4e36-fc45-4190-a2b8-a3422444f5c3/predictions_backfill_v1.py", line 157, in forecast_sent_recv
  File "/opt/conda/default/lib/python3.8/site-packages/prophet/forecaster.py", line 1116, in fit
    raise ValueError('Dataframe has less than 2 non-NaN rows.')
ValueError: Dataframe has less than 2 non-NaN rows.

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:517)
    at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:99)
    at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:49)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:470)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.agg_doAggregateWithoutKey_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:134)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:505)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:508)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)

Note ; when this run this for a single applianceId, the code goes through.

Also, to backfill - the following change is made:

# curr_day is set to : today - 7 days 
curr_day = datetime.date.today() - datetime.timedelta(days=7)
today = np.datetime64(curr_day)
week1 = np.datetime64(curr_day + datetime.timedelta(days=7))


# include_history is set to true
future_pd = model.make_future_dataframe(
                periods=100800,
                freq='10min',
                include_history=True
            )

any ideas on how to fix/debug this ?

tia!

I had faced similar issue couple of days ago, maybe you can just skip the ApplianceId where count of NaN rows is less than 2. It worked for me

Check if there are at least two non-NaN rows

  if history_pd['applianceId'].notnull().sum() < 2:
       print(f"Skipping {compartment} due to insufficient data.")
       continue  # Skip to the next iteration