Parallelisation for scores estimation
Opened this issue · 3 comments
Implement parallel processing for _estimate_scores
function to speed up the estimation of reliability scores.
Attached code snippet implements a solution for parallelisation for scores estimation using @fugue-project. Huge thanks to @kvnkho who generously helped out with this during a SciPy sprint session!
# Parallelization with Fugue
# imports
import pandas as pd
import numpy as np
import pickle
import fugue_dask
from fugue import transform
from typing import List, Any, Dict, Iterable
# Dataframe Prep
n_rows = 1000
df = pd.DataFrame({"x1": np.random.random(n_rows),
"x2": np.random.random(n_rows),
"target": np.random.random(n_rows)})
df = df.reset_index()
# parallelization jobs
n_jobs = 100
jobs = pd.DataFrame({"jobs": list(range(n_jobs))})
# set the key for merge
jobs['key'] = 0
df['key'] = 0
# create jobs dataframe
jobs_df = df.merge(jobs, on='key', how='outer').drop("key", axis=1).sort_values(["jobs", "index"])
jobs_df = jobs_df[["jobs", "index", "x1", "x2", "target"]]
# determine samples
n_samples = 3
# define function to estimate scores
# returns loss and indices for one iteration that needs to run in parallel
def rsr_one(df: pd.DataFrame, n_samples, loss_function) -> Iterable[Dict[str, Any]]:
# get job id
jobs = df.iloc[0]["jobs"]
# setup a condition for full rank
cond = False
while not cond:
in_sample = df.sample(n=n_samples) # sample using pandas
temp = in_sample.drop(['jobs', 'index', 'target'], axis=1) # get the features
if np.linalg.matrix_rank(temp) == temp.shape[1]:
cond=True
else:
continue
# estimate the model params
target = in_sample.target
features = in_sample.drop(['jobs', 'index', 'target'], axis=1)
params = np.linalg.inv(features.T @ features) @ (features.T @ target)
# index in and out of subsample
in_indices = in_sample.index.values
out_indices = np.setdiff1d(df.index.values, in_indices)
# out of bag
out_sample = df.iloc[out_indices]
out_features = out_sample.drop(['jobs', 'index', 'target'], axis=1)
out_target = out_sample.target
preds = out_features @ params
# get the loss
loss = loss_function(out_target, preds)
# output pickled
yield {"jobs": jobs, "index": pickle.dumps(list(out_sample.index)),"loss": pickle.dumps(loss)}
# define fugue transform to parallelize using dask
resampling_loss = transform(jobs_df,
rsr_one,
schema="jobs:int,index:binary,loss:binary",
params={"n_samples": n_samples, "loss_function": lambda y, yhat: np.abs(y-yhat)},
partition={"by": "jobs"},
engine="dask"
)
resampling_loss.compute()
# output matrix for all losses of original df and over all jobs
out_loss = np.zeros([df.shape[0], jobs.shape[0]])
# collect results
for _index, row in resampling_loss.iterrows():
out_loss[pickle.loads(row['index']), row["jobs"]] = pickle.loads(row['loss'])
# transform to reliability scores
average_loss = np.array(out_loss.mean(axis=1)) # average over the losses row-wise
# reverse the relationship and scale between 0 and 1 for scores
# take absolute value to prevent -0
scores = np.abs((average_loss - np.max(average_loss))/
(np.min(average_loss) - np.max(average_loss)))
This was a fun SciPy sprint! Passing engine=None
will run on Pandas without changes so you can just do:
if parallel:
engine = "dask"
else:
engine = None
before the transform
call if you want to integrate this into your code.
If you use the Dask engine, the output is a Dask DataFrame, otherwise it is a Pandas DataFrame so you might need an if/else based on the engine to convert from DaskDataFrame to Pandas (which is just the compute()
) method.
Thanks for pointing this out! I'll keep that in mind when integrating the above snippet to the existing structure. Thanks again for your help, it was indeed a fun sprint!