okasag/samplefit

Parallelisation for scores estimation

Opened this issue · 3 comments

Implement parallel processing for _estimate_scores function to speed up the estimation of reliability scores.

Attached code snippet implements a solution for parallelisation for scores estimation using @fugue-project. Huge thanks to @kvnkho who generously helped out with this during a SciPy sprint session!

# Parallelization with Fugue

# imports
import pandas as pd
import numpy as np
import pickle
import fugue_dask
from fugue import transform
from typing import List, Any, Dict, Iterable

# Dataframe Prep
n_rows = 1000
df = pd.DataFrame({"x1": np.random.random(n_rows),
                   "x2": np.random.random(n_rows),
                   "target": np.random.random(n_rows)})
df = df.reset_index()

# parallelization jobs
n_jobs = 100
jobs = pd.DataFrame({"jobs": list(range(n_jobs))})

# set the key for merge
jobs['key'] = 0
df['key'] = 0

# create jobs dataframe
jobs_df = df.merge(jobs, on='key', how='outer').drop("key", axis=1).sort_values(["jobs", "index"])
jobs_df = jobs_df[["jobs", "index", "x1", "x2", "target"]]

# determine samples
n_samples = 3

# define function to estimate scores
# returns loss and indices for one iteration that needs to run in parallel
def rsr_one(df: pd.DataFrame, n_samples, loss_function) -> Iterable[Dict[str, Any]]:
    
    # get job id
    jobs = df.iloc[0]["jobs"]
    
    # setup a condition for full rank
    cond = False
    while not cond:
        in_sample = df.sample(n=n_samples) # sample using pandas
        temp = in_sample.drop(['jobs', 'index', 'target'], axis=1) # get the features
        if np.linalg.matrix_rank(temp) == temp.shape[1]:
            cond=True
        else:
            continue
    
    # estimate the model params
    target = in_sample.target
    features = in_sample.drop(['jobs', 'index', 'target'], axis=1)
    params = np.linalg.inv(features.T @ features) @ (features.T @ target)
    
    # index in and out of subsample
    in_indices = in_sample.index.values
    out_indices = np.setdiff1d(df.index.values, in_indices)
    
    # out of bag
    out_sample = df.iloc[out_indices]
    out_features = out_sample.drop(['jobs', 'index', 'target'], axis=1)
    out_target = out_sample.target
    preds = out_features @ params
    
    # get the loss
    loss = loss_function(out_target, preds)
    
    # output pickled
    yield {"jobs": jobs, "index": pickle.dumps(list(out_sample.index)),"loss": pickle.dumps(loss)}
    
# define fugue transform to parallelize using dask
resampling_loss = transform(jobs_df,
                            rsr_one,
                            schema="jobs:int,index:binary,loss:binary",
                            params={"n_samples": n_samples, "loss_function": lambda y, yhat: np.abs(y-yhat)},
                            partition={"by": "jobs"},
                            engine="dask"
                            )
resampling_loss.compute()

# output matrix for all losses of original df and over all jobs
out_loss = np.zeros([df.shape[0], jobs.shape[0]])
# collect results
for _index, row in resampling_loss.iterrows():
    out_loss[pickle.loads(row['index']), row["jobs"]] = pickle.loads(row['loss'])
# transform to reliability scores
average_loss = np.array(out_loss.mean(axis=1)) # average over the losses row-wise
# reverse the relationship and scale between 0 and 1 for scores
# take absolute value to prevent -0
scores = np.abs((average_loss - np.max(average_loss))/
                (np.min(average_loss) - np.max(average_loss)))

This was a fun SciPy sprint! Passing engine=None will run on Pandas without changes so you can just do:

if parallel:
    engine = "dask"
else:
    engine = None

before the transform call if you want to integrate this into your code.

If you use the Dask engine, the output is a Dask DataFrame, otherwise it is a Pandas DataFrame so you might need an if/else based on the engine to convert from DaskDataFrame to Pandas (which is just the compute()) method.

Thanks for pointing this out! I'll keep that in mind when integrating the above snippet to the existing structure. Thanks again for your help, it was indeed a fun sprint!