I’m using RandomizedSearchCV to optimize and evaluate several ML models. I’m trying to run the optimization process using joblib Parallel()
. But, when running my code on an HPC using 16 cores, only two of them are used.
jobs = 16
x = Parallel(n_jobs=jobs, backend='multiprocessing', verbose=3)(delayed(fit)(m,
X_train,
y_train,
n_iter= 3,
cv = 3,
return_train_score= True,
scoring='r2',
refit="r2",
n_jobs=4,
verbose = 3) for m in models)
EDIT: If m
corresponds to 2 models tested, then Parallel()
uses two cores and the other 14 cores are unused (0.0%). The iterations/folds of the fit()
function seem to run as threads on these two cores.
Question: Is there a way to nest the parallel execution, so that all cores are used?
Full working example:
import sys
import os
import yaml
import time
import numpy as np
import pandas as pd
import math
from sklearn.model_selection import RandomizedSearchCV
from sklearn.linear_model import LinearRegression
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import RBF, Matern
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import r2_score
import matplotlib.pyplot as plt
from scipy.stats import loguniform, uniform, truncnorm, randint
from joblib import parallel_backend, Parallel, delayed
rng = np.random.default_rng()
def gen_data(n, p, rng):
prob = 0.5
xm_a1 = rng.binomial(1, prob, size=(n, p))
xm_a2 = rng.binomial(1, prob, size=(n, p))
Xm = xm_a1 + xm_a2
xf_a1 = rng.binomial(1, prob, size=(n, p))
xf_a2 = rng.binomial(1, prob, size=(n, p))
r_m = rng.binomial(1, prob, size=(n, p))
r_f = rng.binomial(1, prob, size=(n, p))
xc_a1 = r_m * xm_a2 + (1 - r_m) * xm_a1
xc_a2 = r_f * xf_a2 + (1 - r_f) * xf_a1
Xc = xc_a1 + xc_a2
return Xc
def predict_r2_score(x, X, y):
y_pred = x.best_estimator_.predict(X)
return r2_score(y_test,y_pred)
def fit(mod,X ,y , **kwargs):
grid_search = RandomizedSearchCV(mod['estimator'], mod['param'], **kwargs)
grid_search.fit(X, y)
return grid_search, mod['model']
def generate_phenotype(X, n_features, n_samples, var_add, var_dom, var_epi, chunks = 512):
sample = rng.choice(range(X.shape[1]), n_features, replace=False)
X = X[0:n_samples,sample]
MAF = (X.mean(axis=0)/2).reshape(1,n_features) #+ 1e-3
elim = np.where(MAF == 0)[1]
X = np.delete(X,elim,axis=0)
elim = np.where(MAF == 1)[1]
X = np.delete(X,elim,axis=0)
print(X.shape)
n_features = X.shape[1]
MAF = (X.mean(axis=0)/2).reshape(1,n_features)
e = 1 - (var_add + var_dom + var_epi)
var_add = var_add/n_features
var_dom = var_dom/n_features
var_epi = var_epi/(n_features*(n_features-1)//2)
uA = rng.normal(0, np.sqrt(var_add), n_features)
uD = rng.normal(0, np.sqrt(var_dom), n_features)
uAA = rng.normal(0, np.sqrt(var_epi), n_features*(n_features-1)//2)
e = rng.normal(0, np.sqrt(e), n_samples)
xD = np.where(X == 1, 2*MAF, X)
xD = np.where(xD == 2, 4*MAF-2, xD)
wA = (X - 2*MAF)/np.sqrt(2*MAF*(1-MAF))
wD = (xD - 2*(MAF**2))/(2*MAF*(1-MAF))
wAA = np.zeros((n_samples, ))
start, end = 0, 0
for i in range(n_features-1):
end += n_features-i-1
wAA = wAA + np.sum(wA[:,i].reshape(n_samples,1)*wA[:,i+1:n_features]*uAA[start:end],1)
start = end
Y = np.matmul(wA, uA) + np.matmul(wD, uD) + wAA
return Y + e
if __name__ == "__main__":
test_size = 0.2
n_informative = 1000
n_features = 10000
n_samples = 50000
heritability = 0.7
var_dom = 0.1
var_epi = 0.1
var_add = heritability - var_epi - var_dom
G = gen_data(n_samples,n_features,rng)
y = generate_phenotype(G, n_informative, n_samples, var_add, var_dom, var_epi)
models = [{"model":"LinearRegression","estimator": LinearRegression(), "param": [{}]},
{"model":"GaussianProcessRegressor","estimator": GaussianProcessRegressor(), "param": [{'copy_X_train':[False],'n_restarts_optimizer': [10],"alpha": loguniform(1e-5, 1e-2),"kernel": [RBF(l,length_scale_bounds=(1e-20, 100000.0)) for l in np.logspace(-10, 1, 10)]},{'copy_X_train':[False],'n_restarts_optimizer': [10],"alpha": loguniform(1e-10, 1e-2),"kernel": [Matern(l,length_scale_bounds=(1e-20, 100000.0)) for l in np.logspace(-10, 1, 10)]}]},]
X_train, X_test, y_train, y_test = train_test_split(G, y.reshape(n_samples,1), test_size=0.2, random_state=42, shuffle = True)
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)
y_train = scaler.fit_transform(y_train.reshape(-1,1))
y_test = scaler.transform(y_test.reshape(-1,1))
y_train = y_train.ravel()
y_test = y_test.ravel()
jobs = 16
print("Start model fitting ...")
x = Parallel(n_jobs=jobs, backend='multiprocessing', verbose=3)(delayed(fit)(m,
X_train,
y_train,
n_iter= 3,
cv = 3,
return_train_score= True,
scoring='r2',
refit="r2",
n_jobs=4,
verbose = 3) for m in models)
print("Done model fitting")
res = []
for i in range (len(x)):
m = models[i]['model']
r = predict_r2_score(x[i][0],X_test,y_test)
res.append([m,r,n_samples, n_features, n_informative, heritability, var_add, var_dom, var_epi])
df = pd.DataFrame(res, columns=['models', 'r2','sample', 'features', 'informative', 'heritability','add', 'dom', 'epi'])
print(df)
Read more here: Source link