Refactoring My PyTorch Hyperparameter Search Using Evolutionary Optimization Demo

One of the advantages that experienced developers have compared to early-career developers is subjective intuition. Several days ago, I implemented a program that searches for PyTorch neural network hyperparameter values (number of hidden nodes, batch size, and so on) using an evolutionary optimization algorithm. The demo worked but my intuition told me the demo needed refactoring.

The refactoring effort took me about 12 hours but I was satisfied with the resulting demo. I tidied up a lot of details but the primary change was using a completely object oriented design. This makes the main() function look very simple because all the details are hidden:


  # 1. create train_ds and test_ds datasets

  # 2. find best model, save to Models directory
  print("Creating EO Searcher object ")
  scr = Searcher(train_ds, test_ds, pop_sz=10, dim=6,
    max_gen=100, p_mutate=0.5, seed=0)
  print("Searching for best hyperparams ")
  scr.search()

  # 3. display results

An encoded solution is an array of 6 integers, each a value from 0 to 9. For example, the best solution found in my demo is [4 5 6 2 7 5]. Each integer represents a hyperparameter value, in this case:

4 : num hidden nodes = 12
5 : hidden activation = relu
6 : batch size = 12
2 : learn rate = 0.0080
7 : max epochs = 800
5 : optimizer = adam

I used one of my standard synthetic datasets where the goal is to predict a person’s political leaning (conservative = 0, moderate = 1, liberal = 2) from sex, age, State of residence, and annual income.

The resulting model had 90.50% accuracy on the 200-item training data, and 80.00% accuracy on the 40-item test dataset.

My motivation for hyperparameter search using evolutionary optimization is to apply it to complex neural systems that use a Transformer component.



A home remodeling project is analogous to code refactoring. Here are two examples of shower remodels that I’d rate as not entirely successful.


Demo code. Replace “lt” (less-than) etc. with Boolean operator symbols. The data is at jamesmccaffrey.wordpress.com/2022/09/01/multi-class-classification-using-pytorch-1-12-1-on-windows-10-11/.


# people_evo_hyperparameter_2.py

# PyTorch 2.0.0-CPU Anaconda3-2022.10  Python 3.9.13
# Windows 10/11 

import numpy as np
import torch as T
import pickle
from datetime import datetime

device = T.device('cpu')  # apply to Tensor or Module

# -----------------------------------------------------------

class PeopleDataset(T.utils.data.Dataset):
  # sex  age    state    income   politics
  # -1   0.27   0  1  0   0.7610   2
  # +1   0.19   0  0  1   0.6550   0
  # sex: -1 = male, +1 = female
  # state: michigan, nebraska, oklahoma
  # politics: conservative, moderate, liberal

  def __init__(self, src_file):
    all_xy = np.loadtxt(src_file, usecols=range(0,7),
      delimiter="\t", comments="#", dtype=np.float32)
    tmp_x = all_xy[:,0:6]   # cols [0,6) = [0,5]
    tmp_y = all_xy[:,6]     # 1-D

    self.x_data = T.tensor(tmp_x, 
      dtype=T.float32).to(device)
    self.y_data = T.tensor(tmp_y,
      dtype=T.int64).to(device)  # 1-D

  def __len__(self):
    return len(self.x_data)

  def __getitem__(self, idx):
    preds = self.x_data[idx]
    trgts = self.y_data[idx] 
    return preds, trgts  # as a Tuple

# -----------------------------------------------------------

class Net(T.nn.Module):
  def __init__(self, n_hid, activ='tanh'):
    super(Net, self).__init__()
    self.hid1 = T.nn.Linear(6, n_hid)  # 6-(nh-nh)-3
    self.hid2 = T.nn.Linear(n_hid, n_hid)
    self.oupt = T.nn.Linear(n_hid, 3)

    if activ == 'tanh':
      self.activ = T.nn.Tanh()
    elif activ == 'relu':
      self.activ = T.nn.ReLU()

    # use default weight init

  def forward(self, x):
    z = self.activ(self.hid1(x))
    z = self.activ(self.hid2(z)) 
    z = T.log_softmax(self.oupt(z), dim=1)  # NLLLoss() 
    return z

# -----------------------------------------------------------

def train(net, ds, bs, lr, me, opt, verbose=False):
  # dataset, bat_size, lrn_rate, max_epochs, optimizer
  v = verbose
  train_ldr = T.utils.data.DataLoader(ds, batch_size=bs,
    shuffle=True)
  loss_func = T.nn.NLLLoss()  # log_softmax() activation
  if opt == 'sgd':
    optimizer = T.optim.SGD(net.parameters(), lr=lr)
  elif opt == 'adam':
    optimizer = T.optim.Adam(net.parameters(), lr=lr)  

  if v: print("\nStarting training ")
  le = me // 4  # log interval: 4 log prints
  for epoch in range(0, me):
    epoch_loss = 0.0  # for one full epoch
    for (batch_idx, batch) in enumerate(train_ldr):
      X = batch[0]  # inputs
      Y = batch[1]  # correct class/label/politics

      optimizer.zero_grad()
      oupt = net(X)
      loss_val = loss_func(oupt, Y)  # a tensor
      epoch_loss += loss_val.item()  # accumulate
      loss_val.backward()
      optimizer.step()

    if v:
      if epoch % le == 0:
        print("epoch = %5d  |  loss = %10.4f" % \
          (epoch, epoch_loss)) 
  if v: print("Done ") 

# -----------------------------------------------------------

def accuracy_q(model, dataset):
  # assumes model.eval()
  X = dataset[0:len(dataset)][0]
  Y = dataset[0:len(dataset)][1]
  with T.no_grad():
    oupt = model(X)  #  [40,3]  logits
  arg_maxs = T.argmax(oupt, dim=1)  # argmax() is new
  num_correct = T.sum(Y==arg_maxs)
  acc = (num_correct * 1.0 / len(dataset))
  return acc.item()

# -----------------------------------------------------------

class Searcher():
  # assumes Net(), train(), accuracy_q() exist

  def __init__(self, trn_ds, tst_ds, pop_sz, dim, max_gen,
    p_mutate, seed):
    self.train_ds = trn_ds
    self.test_ds = tst_ds
    self.pop_size = pop_sz
    self.dim = dim  # 6
    self.max_gen = max_gen
    self.p_mutate = p_mutate
    self.rnd = np.random.RandomState(seed)

    self.pop = []
    self.used = {}  # avoid duplicating a solution

    self.best_soln = np.array([0,0,0,0,0,0], dtype=int)
    self.best_err = 10.0
    self.best_train_acc = 0.0
    self.best_test_acc = 0.0

  # ---------------------------------------------------------

  def make_rnd_soln(self):
    soln = self.rnd.randint(low=0, high=10, size=self.dim,
      dtype=int)
    soln_key = "".join(str(x) for x in soln)

    while soln_key in self.used:
      soln = self.rnd.randint(low=0, high=10, size=self.dim,
        dtype=int)
      soln_key = "".join(str(x) for x in soln)

    self.used[soln_key] = 1
    return soln  # not used before

  # ---------------------------------------------------------

  def make_child(self, parent_idxs):
    i = parent_idxs[0]
    j = parent_idxs[1]
    child_soln = np.zeros(self.dim, dtype=int)
    parent1 = self.pop[i][0]
    parent2 = self.pop[j][0]
    for k in range(0, self.dim // 2):  # left half
      child_soln[k] = parent1[k]
    for k in range(self.dim // 2, self.dim):  # right half
      child_soln[k] = parent2[k]
    return child_soln  # possible dup -- mutate() will handle

  # ---------------------------------------------------------

  def mutate(self, child_soln):
    for k in range(self.dim):
      q = self.rnd.random()  # [0.0, 1.0] 
      if q "lt" self.p_mutate:
        child_soln[k] = self.rnd.randint(0, 10, size=1,
          dtype=int)
    child_key = "".join(str(x) for x in child_soln)

    while child_key in self.used:
      for k in range(self.dim):  # mutate again
        q = self.rnd.random()  # [0.0, 1.0] 
        if q "lt" self.p_mutate:
          child_soln[k] = self.rnd.randint(0, 10, size=1, 
            dtype=int)
      child_key = "".join(str(x) for x in child_soln)

    self.used[child_key] = 1
    return  # in-place modification

  # ---------------------------------------------------------

  def evaluate(self, soln, verbose=False):
    # [n_hid, activ, bs, lr, me, opt]
    #   [0]    [1]   [2] [3] [4] [5]
    v = verbose

    # hard-coded. modify as needed
    n_hids = [2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
    activs = ['tanh', 'tanh','tanh','tanh','tanh',
      'relu', 'relu', 'relu', 'relu', 'relu']
    b_szs = [1, 2, 4, 6, 8, 10, 12, 14, 16, 20]
    rates = [0.001, 0.005, 0.008, 0.01, 0.02, 0.03, 0.05,
      0.08, 0.10, 0.12]
    max_eps = [50, 100, 200, 300, 400, 500, 600, 700,
      800, 1000]
    opts = ['sgd', 'sgd', 'sgd', 'sgd', 'sgd',
      'adam', 'adam', 'adam', 'adam', 'adam']

    n_hid = n_hids[soln[0]]
    activ = activs[soln[1]]
    bs = b_szs[soln[2]]
    lr = rates[soln[3]]
    me = max_eps[soln[4]]
    opt = opts[soln[5]]

    T.manual_seed(1)  # controls weight init, not EO
    np.random.seed(1)

    net = Net(n_hid, activ).to(device)  # create NN
    net.train()
    if v: print("\nsoln: " + str(soln))
    train(net, self.train_ds, bs, lr, me, opt, verbose) 

    net.eval()
    acc_train = accuracy_q(net, self.train_ds)
    acc_test = accuracy_q(net, self.test_ds) 
    acc_weighted = ((1 * acc_train) + (3 * acc_test)) / 4
    error = 1.0 - acc_weighted  # [0.0, 1.0]
    if v: print("train acc = %0.4f " % acc_train)
    if v: print("test_acc = %0.4f " % acc_test)
    return (acc_train, acc_test, error)

  # ---------------------------------------------------------

  def save_info(self):
    # as date_time_soln_trainAcc_testAcc.txt
    dt = datetime.now().strftime('%Y-%m-%d_%H-%M')
    ss = "".join(str(x) for x in self.best_soln)  # soln str
    trna = str("%0.4f" % self.best_train_acc) 
    tsta = str("%0.4f" % self.best_test_acc) 
    fn = ".\\Models\\" + dt + "_" + ss + "_" + \
      trna + "_" + tsta + ".txt"
    f = open(fn, "w")
    f.write("soln = " + ss + "\n")
    f.write("train acc = " + trna + "\n")
    f.write("test acc = " + tsta + "\n")
    f.close()

  # ---------------------------------------------------------

  def create_pop(self):
    for i in range(self.pop_size):
      soln = self.make_rnd_soln()  # unique soln, not yet used
      trn_acc, tst_acc, err = self.evaluate(soln, verbose=True)
      self.pop.append( (soln,err) )
      if err "lt" self.best_err:
        self.best_err = err
        self.best_soln = soln.copy()
        self.best_train_acc = trn_acc
        self.best_test_acc = tst_acc

    self.pop = sorted(self.pop, key=lambda tup:tup[1])  # by err
    self.save_info()

  # ---------------------------------------------------------

  def search(self):
    print("\nCreating size = " + \
      str(self.pop_size) + " initial population ")
    self.create_pop()

    for gen in range(self.max_gen):
      print("\ngeneration = " + str(gen))

      # 4a. pick two parents
      first = \
        self.rnd.randint(0, self.pop_size // 2)  # good one
      second = \
        self.rnd.randint(self.pop_size // 2, self.pop_size) 
      flip = self.rnd.randint(2)  # 0 or 1
      if flip == 0:
        parent_idxs = (first, second)
      else:
        parent_idxs = (second, first)

      # 4b. make a child
      child_soln = self.make_child(parent_idxs)

      # 4c. mutate child (and avoid duplicate)
      self.mutate(child_soln)

      # 4d. evaluate child soln
      (trn_acc, tst_acc, child_err) = \
        self.evaluate(child_soln, verbose=True)
      if child_err "lt" self.best_err:
        print("New best solution found in gen " + str(gen))
        self.best_soln = child_soln.copy()
        self.best_err = child_err
        self.best_train_acc = trn_acc
        self.best_test_acc = tst_acc
        self.save_info()
      else:
        pass  # could print a message here

      # 4e. replace weak pop soln with child
      idx = self.rnd.randint(self.pop_size // 2, \
        self.pop_size)
      self.pop[idx] = (child_soln, child_err)  # Tuple
      self.pop = sorted(self.pop, key=lambda tup:tup[1])
    
    print("\nEnd evolution ")

  # ---------------------------------------------------------    

# -----------------------------------------------------------

def show_soln_to_hyperparams(soln):
  # hard-coded. modify as needed
  n_hids = [4, 6, 8, 10, 12, 14, 16, 18, 20, 24]
  activs = ['tanh', 'tanh','tanh','tanh','tanh',
    'relu', 'relu', 'relu', 'relu', 'relu']
  b_szs = [1, 2, 4, 6, 8, 10, 12, 14, 16, 20]
  rates = [0.001, 0.005, 0.008, 0.01, 0.02, 0.03, 0.05,
    0.08, 0.10, 0.12]
  max_eps = [100, 200, 300, 400, 500, 600, 700, 800,
    900, 1000]
  opts = ['sgd', 'sgd', 'sgd', 'sgd', 'sgd',
    'adam', 'adam', 'adam', 'adam', 'adam']

  n_hid = n_hids[soln[0]]
  activ = activs[soln[1]]
  bs = b_szs[soln[2]]
  lr = rates[soln[3]]
  me = max_eps[soln[4]]
  opt = opts[soln[5]]

  print("num hidden nodes = " + str(n_hid))
  print("hidden activation = " + str(activ))
  print("batch size = " + str(bs))
  print("learn rate = %0.4f " % lr)
  print("max epochs = " + str(me))
  print("optimizer = " + str(opt))

# -----------------------------------------------------------

def main():
  # 0. get started
  print("\nBegin People politics EO parameter search ")
  T.manual_seed(1)  # is reset in evaluate()
  np.random.seed(1)  
  
  # 1. create Dataset objects
  print("\nCreating People train and test Datasets ")
  train_file = ".\\Data\\people_train.txt"
  train_ds = PeopleDataset(train_file)  # 200 rows

  test_file = ".\\Data\\people_test.txt"
  test_ds = PeopleDataset(test_file)    # 40 rows

  # 2. find best model, save to Models directory
  print("\nCreating EO Searcher object ")
  scr = Searcher(train_ds, test_ds, pop_sz=10, dim=6,
    max_gen=100, p_mutate=0.5, seed=0)
  print("\nSearching for best hyperparams ")
  scr.search()

  # 3. display results
  print("\nBest solution found = " + \
    str(scr.best_soln))
  print("Best train accuracy = %0.4f " % scr.best_train_acc)
  print("Best test accuracy = %0.4f " % scr.best_test_acc)
  print("\nHyperparameters are: \n ")
  show_soln_to_hyperparams(scr.best_soln)

  print("\nEnd evolutionary parameter search ")

if __name__ == "__main__":
  main()

Read more here: Source link