Multiple anomaly detection models
Opened this issue · 2 comments
We want to add the option to use any number of models to compute anomaly_score
. That is, we would like to have several models (model
, model_1
, model_2
, model_3
, ..., model_n
), for each of which we would like to get a separate anomaly_score
value (anomaly_score
, anomaly_score_1
, anomaly_score_2
, ..., anomaly_score_n
). For now, the assumption is that there will be a separate .onnx file for each model.
What would be the best way to accomplish this? Right now I see two ways:
- Add an additional argument to the anomaly_score function that will define the name of the model:
...
model_name = ('model', 'model_1', 'model_2', ..., 'model_n')
ad_col_name = ('anomaly_score', 'anomaly_score_1', 'anomaly_score_2', ..., 'anomaly_score_n')
for model, col_name in zip(model_name, ad_col_name):
df = df.withColumn(col_name, anomaly_score("lc_features", model=model))
This approach would probably put too much load on the server, right?
2) Modify the function so that it goes through all models and returns the result as an array [anomaly_score, anomaly_score_1, anomaly_score_2, ..., anomaly_score_n]
.
Hi @Knispel2 -- thanks for the detailed proposal. I guess both methods would lead to more or less the same memory footprint, as one needs to load all models in memory anyway. Probably the second method would be faster because one would use the same pre-processing code, and only run the inference N times. Can you provide a more detailed profiling (timing & memory) for 1 & N (N>1) models ? That would help to understand the best strategy. Let me know if you need help to put in place the profiling.
I used the script which you used to debug the AD module and the julienpeloton/fink-ci:dev
image. In the script I added the option to set the number of models on startup by passing an argument.
#profiler.py
import time
import sys
from fink_utils.spark.utils import concat_col
from fink_science.ad_features.processor import extract_features_ad
t0 = time.time()
from fink_science.anomaly_detection.processor import anomaly_score
print('Time to load the module: {:.2f} seconds'.format(time.time() - t0))
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.functions import array
import numpy as np
num_test = int(sys.argv[1])
print(f'''==========================================================
{num_test} models test ^^^
=============================================================''')
sc = SparkContext()
spark = SparkSession(sc).builder.config("spark.python.profile", "true").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
df = spark.read.format('parquet').load('/fink_profile/fink-science/fink_science/data/alerts/datatest')
print(df.count())
# Retrieve time-series information
to_expand = [
'jd', 'fid', 'magpsf', 'sigmapsf',
'magnr', 'sigmagnr', 'isdiffpos', 'distnr',
'diffmaglim'
]
# Append temp columns with historical + current measurements
prefix = 'c'
for colname in to_expand:
df = concat_col(df, colname, prefix=prefix)
expanded = [prefix + i for i in to_expand]
models = tuple(['']*num_test)
col = [f'anomaly_score{num}' for num in range(num_test)]
ad_args = [
'cmagpsf', 'cjd', 'csigmapsf', 'cfid', 'objectId',
'cdistnr', 'cmagnr', 'csigmagnr', 'cisdiffpos'
]
# Perform feature extraction + classification
df = df.withColumn('lc_features', extract_features_ad(*ad_args))
for num, model in enumerate(models):
df = df.withColumn(f'anomaly_score{num}', anomaly_score('lc_features', (model,))
t0 = time.time()
df_small = df.select(col)
df_small.cache()
df_small.count()
result = time.time() - t0
# Show memory profiling
sc.show_profiles()
spark.stop()
print('Time to execute: {:.2f} seconds'.format(result))
All changes to processor.py of the AD module are in commit 73bdf85.
I ran the script 20 times for the number of models from 1 to 100 in increments of 5:
for i in {1..100..5}; do spark-submit --master local[2] --conf spark.python.profile.memory=true profiler.py $i > test_$i.log; done
I then combined the report for each test into a single file:
cat *.log > first_profile.log
For the second case, I modified the profiler.py code as follows:
import time
import sys
from fink_utils.spark.utils import concat_col
from fink_science.ad_features.processor import extract_features_ad
t0 = time.time()
from fink_science.anomaly_detection.processor import anomaly_score
print('Time to load the module: {:.2f} seconds'.format(time.time() - t0))
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.functions import array
import numpy as np
num_test = int(sys.argv[1])
print(f'''==========================================================
{num_test} models test ^^^
=============================================================''')
sc = SparkContext()
spark = SparkSession(sc).builder.config("spark.python.profile", "true").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
df = spark.read.format('parquet').load('/fink_profile/fink-science/fink_science/data/alerts/datatest')
print(df.count())
# Retrieve time-series information
to_expand = [
'jd', 'fid', 'magpsf', 'sigmapsf',
'magnr', 'sigmagnr', 'isdiffpos', 'distnr',
'diffmaglim'
]
# Append temp columns with historical + current measurements
prefix = 'c'
for colname in to_expand:
df = concat_col(df, colname, prefix=prefix)
expanded = [prefix + i for i in to_expand]
models = tuple(['']*num_test)
col = [f'anomaly_score{num}' for num in range(num_test)]
ad_args = [
'cmagpsf', 'cjd', 'csigmapsf', 'cfid', 'objectId',
'cdistnr', 'cmagnr', 'csigmagnr', 'cisdiffpos'
]
# Perform feature extraction + classification
df = df.withColumn('lc_features', extract_features_ad(*ad_args))
df = df.withColumn('anomaly_score', anomaly_score('lc_features', models)) #<------------------
t0 = time.time()
df_small = df.select(['anomaly_score'])
df_small.cache()
df_small.count()
result = time.time() - t0
# Show memory profiling
sc.show_profiles()
spark.stop()
print('Time to execute: {:.2f} seconds'.format(result))
Here are the final files:
first_profile.log
second_profile.log
The string {num_models} models test ^^^^
is used as a separator for the test results within the files.
I went through these files and plotted the graphs from them:
I have some doubts about whether I did it right. I have two questions:
- Why in the second case memory consumption grows rapidly only at the very beginning and then fluctuates? Could it be because I didn't restart
SparkContext
completely, but instead did onlyspark.stop()
? Maybe I needed to reboot the entire Docker image for each test? - In the first case, I plotted the memory consumption for a single process in the graph. Do I need to multiply this number by the number of models?