python – scikit-learn RandomizedSearchCV doesn’t utilise all available cores

I’m using RandomizedSearchCV to optimize and evaluate several ML models. I’m trying to run the optimization process using joblib Parallel(). But, when running my code on an HPC using 16 cores, only two of them are used.

    jobs = 16        
    x = Parallel(n_jobs=jobs, backend='multiprocessing', verbose=3)(delayed(fit)(m, 
                 X_train,
                 y_train,
                 n_iter= 3,
                 cv = 3,
                 return_train_score= True,
                 scoring='r2',
                 refit="r2",
                 n_jobs=4,
                 verbose = 3) for m in models)

EDIT: If m corresponds to 2 models tested, then Parallel() uses two cores and the other 14 cores are unused (0.0%). The iterations/folds of the fit() function seem to run as threads on these two cores.

Question: Is there a way to nest the parallel execution, so that all cores are used?


Full working example:

import sys
import os
import yaml
import time
import numpy as np
import pandas as pd
import math

from sklearn.model_selection import RandomizedSearchCV
from sklearn.linear_model import LinearRegression
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import RBF, Matern
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import r2_score


import matplotlib.pyplot as plt

from scipy.stats import loguniform, uniform, truncnorm, randint

from joblib import parallel_backend, Parallel, delayed

rng = np.random.default_rng()

def gen_data(n, p, rng):
    prob = 0.5
    xm_a1 = rng.binomial(1, prob, size=(n, p))
    xm_a2 = rng.binomial(1, prob, size=(n, p))
    Xm = xm_a1 + xm_a2
    xf_a1 = rng.binomial(1, prob, size=(n, p))
    xf_a2 = rng.binomial(1, prob, size=(n, p))
    r_m = rng.binomial(1, prob, size=(n, p))
    r_f = rng.binomial(1, prob, size=(n, p))
    xc_a1 = r_m * xm_a2 + (1 - r_m) * xm_a1
    xc_a2 = r_f * xf_a2 + (1 - r_f) * xf_a1
    Xc = xc_a1 + xc_a2
    return Xc

def predict_r2_score(x, X, y):
    y_pred = x.best_estimator_.predict(X)
    return r2_score(y_test,y_pred)

def fit(mod,X ,y , **kwargs):
    grid_search = RandomizedSearchCV(mod['estimator'], mod['param'], **kwargs)
    grid_search.fit(X, y)
    return grid_search, mod['model']

def generate_phenotype(X, n_features, n_samples, var_add, var_dom, var_epi, chunks = 512):
    
    sample = rng.choice(range(X.shape[1]), n_features, replace=False)
    X = X[0:n_samples,sample]
    
    MAF = (X.mean(axis=0)/2).reshape(1,n_features) #+ 1e-3
    
    elim = np.where(MAF == 0)[1]
    X = np.delete(X,elim,axis=0)
    elim = np.where(MAF == 1)[1]
    X = np.delete(X,elim,axis=0)
    
    print(X.shape)
    n_features = X.shape[1]
    MAF = (X.mean(axis=0)/2).reshape(1,n_features)

    e = 1 - (var_add + var_dom + var_epi)
    var_add = var_add/n_features
    var_dom = var_dom/n_features
    var_epi = var_epi/(n_features*(n_features-1)//2)
    
    uA = rng.normal(0, np.sqrt(var_add), n_features)
    uD = rng.normal(0, np.sqrt(var_dom), n_features)
    uAA = rng.normal(0, np.sqrt(var_epi), n_features*(n_features-1)//2)
    e = rng.normal(0, np.sqrt(e), n_samples)
    
    
    xD = np.where(X == 1, 2*MAF, X)
    xD = np.where(xD == 2, 4*MAF-2, xD)

    wA = (X - 2*MAF)/np.sqrt(2*MAF*(1-MAF))
    wD = (xD - 2*(MAF**2))/(2*MAF*(1-MAF))
    
    wAA = np.zeros((n_samples, ))
    start, end = 0, 0
    for i in range(n_features-1):
        end += n_features-i-1
        wAA = wAA + np.sum(wA[:,i].reshape(n_samples,1)*wA[:,i+1:n_features]*uAA[start:end],1)
        start = end

    Y = np.matmul(wA, uA) + np.matmul(wD, uD) + wAA
    
    return Y + e


if __name__ == "__main__":

    test_size = 0.2
    n_informative = 1000
    n_features = 10000
    n_samples = 50000
    heritability = 0.7
    var_dom = 0.1
    var_epi = 0.1
    var_add = heritability - var_epi - var_dom
    
    G = gen_data(n_samples,n_features,rng)
    
    y = generate_phenotype(G, n_informative, n_samples, var_add, var_dom, var_epi)

    models = [{"model":"LinearRegression","estimator": LinearRegression(), "param": [{}]},
              {"model":"GaussianProcessRegressor","estimator": GaussianProcessRegressor(), "param": [{'copy_X_train':[False],'n_restarts_optimizer': [10],"alpha":  loguniform(1e-5, 1e-2),"kernel": [RBF(l,length_scale_bounds=(1e-20, 100000.0)) for l in np.logspace(-10, 1, 10)]},{'copy_X_train':[False],'n_restarts_optimizer': [10],"alpha":  loguniform(1e-10, 1e-2),"kernel": [Matern(l,length_scale_bounds=(1e-20, 100000.0)) for l in np.logspace(-10, 1, 10)]}]},]

    X_train, X_test, y_train, y_test = train_test_split(G, y.reshape(n_samples,1), test_size=0.2, random_state=42, shuffle = True)
    scaler = StandardScaler()
    X_train = scaler.fit_transform(X_train)
    X_test = scaler.transform(X_test)

    y_train = scaler.fit_transform(y_train.reshape(-1,1))
    y_test = scaler.transform(y_test.reshape(-1,1))

    y_train = y_train.ravel()
    y_test = y_test.ravel()

    jobs = 16
    
    print("Start model fitting ...")
            
    x = Parallel(n_jobs=jobs, backend='multiprocessing', verbose=3)(delayed(fit)(m, 
                 X_train,
                 y_train,
                 n_iter= 3,
                 cv = 3,
                 return_train_score= True,
                 scoring='r2',
                 refit="r2",
                 n_jobs=4,
                 verbose = 3) for m in models)
    print("Done model fitting")

    res = []
    for i in range (len(x)):
        m = models[i]['model']
        r = predict_r2_score(x[i][0],X_test,y_test)
        res.append([m,r,n_samples, n_features, n_informative, heritability, var_add, var_dom, var_epi])

    df = pd.DataFrame(res, columns=['models', 'r2','sample', 'features', 'informative', 'heritability','add', 'dom', 'epi'])

print(df)

Read more here: Source link