Dataframe has less than 2 non-NaN rows
karanalang opened this issue · 1 comments
Hello ,
we are using FB Prophet for forecasting, and it is working fine.Forecasting runs every 5 days.
I'm trying to enable backfill i.e. if there is processing issue resulting in gaps in forecasting, I want to be able to get the forecast for past days.
Here is the code for this :
@pandas_udf(result_schema, PandasUDFType.GROUPED_MAP)
def forecast_sent_recv(self, history_pd):
print(" in forecast_sent_recv ")
print(" history_pd ", history_pd[['applianceName','timeslot','cktName','tenantName']])
# changing to timeslot_date, since timeslot is now in utc
history_pd['ds'] = history_pd['timeslot_date']
# 09/25/2023 - change for change
#curr_day =
curr_day = - datetime.timedelta(days=7)
today = np.datetime64(curr_day)
week1 = np.datetime64(curr_day + datetime.timedelta(days=7))
history_pd = history_pd[history_pd['ds'] < today]
# history_pd = history_pd.dropna(thresh=2)
print(" earliest date in history_pd, ascending=False ")
print(history_pd.sort_values(by='timeslot', ascending=False).head(1))
print(" latest date in history_pd, ascending=True ")
print(history_pd.sort_values(by='timeslot', ascending=True).head(1))
curr_pd = None
for pred in ['sentOctets', 'recvdOctets']:
history_pd['y'] = history_pd[pred].astype('int64')
# print(" history_pd.shape :: ", history_pd.shape, " history_pd.shape[0] :: ", history_pd.shape[0])
# instantiate the model, configure the parameters
model = Prophet()
# configure predictions for y days
future_pd = model.make_future_dataframe(
# This is needed instead of just selecting lesser period because
# empty frames due to a failure in streaming job causes old data
# to be generated
future_pd = future_pd[future_pd['ds'] >= today]
future_pd = future_pd[future_pd['ds'] <= week1]
....... (Additional Code)
results = (
.withColumn('total_rows', f_count('applianceName').over(window))
.where(col('total_rows') > lit(100))
.groupBy('applianceName', 'cktName')
.withColumn('training_date', current_date())
all_DF.dropna() is called before the pandas_udf forecast_sent_recv() is called. This should drop all rows with any None columns.
However, I get the following error when i run this on the fill dataset.
count in all_DF :: 1657436
23/09/29 18:10:50 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 144.0 in stage 20.0 (TID 560) (versa-intf-rate-predict-backfill-v1-w-3.c.versa-sml-googl.internal executor 12): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
Note ; when this run this for a single applianceId, the code goes through.
Also, to backfill - the following change is made:
# curr_day is set to : today - 7 days
curr_day = - datetime.timedelta(days=7)
today = np.datetime64(curr_day)
week1 = np.datetime64(curr_day + datetime.timedelta(days=7))
# include_history is set to true
future_pd = model.make_future_dataframe(
any ideas on how to fix/debug this ?
I had faced similar issue couple of days ago, maybe you can just skip the ApplianceId where count of NaN rows is less than 2. It worked for me
Check if there are at least two non-NaN rows
if history_pd['applianceId'].notnull().sum() < 2:
print(f"Skipping {compartment} due to insufficient data.")
continue # Skip to the next iteration