Source code for physbo.search.discrete._policy

# SPDX-License-Identifier: MPL-2.0
# Copyright (C) 2020- The University of Tokyo
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.

import numpy as np
import copy
import pickle as pickle
import itertools
import time

from ._history import History
from .. import utility
from .. import score as search_score
from ...gp import Predictor as gp_predictor
from ...blm import Predictor as blm_predictor
from ...misc import SetConfig

from ..._variable import Variable


[docs] class Policy: """Single objective Bayesian optimization with discrete search space""" def __init__(self, test_X, config=None, initial_data=None, comm=None): """ Parameters ---------- test_X: numpy.ndarray or physbo.Variable The set of candidates. Each row vector represents the feature vector of each search candidate. config: SetConfig object (physbo.misc.SetConfig) initial_data: tuple[np.ndarray, np.ndarray] The initial training datasets. The first elements is the array of actions and the second is the array of value of objective functions comm: MPI.Comm, optional MPI Communicator """ self.predictor = None self.training = Variable() self.new_data = None self.test = self._make_variable_X(test_X) self.actions = np.arange(0, self.test.X.shape[0]) self.history = History() if config is None: self.config = SetConfig() else: self.config = config if initial_data is not None: if len(initial_data) != 2: msg = "ERROR: initial_data should be 2-elements tuple or list (actions and objectives)" raise RuntimeError(msg) actions, fs = initial_data if len(actions) != len(fs): msg = "ERROR: len(initial_data[0]) != len(initial_data[1])" raise RuntimeError(msg) self.write(actions, fs) self.actions = np.array(sorted(list(set(self.actions) - set(actions)))) if comm is None: self.mpicomm = None self.mpisize = 1 self.mpirank = 0 else: self.mpicomm = comm self.mpisize = comm.size self.mpirank = comm.rank self.actions = np.array_split(self.actions, self.mpisize)[self.mpirank] self.config.learning.is_disp = ( self.config.learning.is_disp and self.mpirank == 0 )
[docs] def set_seed(self, seed): """ Setting a seed parameter for np.random. Parameters ---------- seed: int seed number ------- """ self.seed = seed np.random.seed(self.seed)
[docs] def write( self, action, t, X=None, time_total=None, time_update_predictor=None, time_get_action=None, time_run_simulator=None, ): """ Writing history (update history, not output to a file). Parameters ---------- action: numpy.ndarray Indexes of actions. t: numpy.ndarray N dimensional array. The negative energy of each search candidate (value of the objective function to be optimized). X: numpy.ndarray N x d dimensional matrix. Each row of X denotes the d-dimensional feature vector of each search candidate. time_total: numpy.ndarray N dimenstional array. The total elapsed time in each step. If None (default), filled by 0.0. time_update_predictor: numpy.ndarray N dimenstional array. The elapsed time for updating predictor (e.g., learning hyperparemters) in each step. If None (default), filled by 0.0. time_get_action: numpy.ndarray N dimenstional array. The elapsed time for getting next action in each step. If None (default), filled by 0.0. time_run_simulator: numpy.ndarray N dimenstional array. The elapsed time for running the simulator in each step. If None (default), filled by 0.0. Returns ------- Note ---- In this function, (X, t, Z) is added to the training data set (self.training), but not yet to predictor (self.predictor). self.newdata means such data. To add (X, t, Z) to the predictor, call self._update_predictor() after self.write(). """ if X is None: X = self.test.X[action, :] Z = self.test.Z[action, :] if self.test.Z is not None else None else: Z = self.predictor.get_basis(X) if self.predictor is not None else None self.history.write( t, action, time_total=time_total, time_update_predictor=time_update_predictor, time_get_action=time_get_action, time_run_simulator=time_run_simulator, ) self.training.add(X=X, t=t, Z=Z) # remove the selected actions from the list of candidates if exists if len(self.actions) > 0: local_index = np.searchsorted(self.actions, action) local_index = local_index[ np.take(self.actions, local_index, mode="clip") == action ] self.actions = self._delete_actions(local_index) if self.new_data is None: self.new_data = Variable(X=X, t=t, Z=Z) else: self.new_data.add(X=X, t=t, Z=Z)
@staticmethod def _warn_no_predictor(method_name): print("Warning: Since policy.predictor is not yet set,") print(" a GP predictor (num_rand_basis=0) is used for predicting") print(" If you want to use a BLM predictor (num_rand_basis>0),") print(" call bayes_search(max_num_probes=0, num_rand_basis=nrb)") print(" before calling {}.".format(method_name))
[docs] def get_post_fmean(self, xs): """ Calculate mean value of predictor (post distribution) Parameters ---------- xs: physbo.Variable or np.ndarray input parameters to calculate mean value shape is (num_points, num_parameters) Returns ------- fmean: numpy.ndarray Mean value of the post distribution. Returned shape is (num_points). """ X = self._make_variable_X(xs) if self.predictor is None: self._warn_no_predictor("get_post_fmean()") predictor = gp_predictor(self.config) predictor.fit(self.training, 0, comm=self.mpicomm) predictor.prepare(self.training) return predictor.get_post_fmean(self.training, X) else: self._update_predictor() return self.predictor.get_post_fmean(self.training, X)
[docs] def get_post_fcov(self, xs, diag=True): """ Calculate covariance of predictor (post distribution) Parameters ---------- xs: physbo.Variable or np.ndarray input parameters to calculate covariance shape is (num_points, num_parameters) diag: bool If true, only variances (diagonal elements) are returned. Returns ------- fcov: numpy.ndarray Covariance matrix of the post distribution. Returned shape is (num_points) if diag=true, (num_points, num_points) if diag=false. """ X = self._make_variable_X(xs) if self.predictor is None: self._warn_no_predictor("get_post_fcov()") predictor = gp_predictor(self.config) predictor.fit(self.training, 0, comm=self.mpicomm) predictor.prepare(self.training) return predictor.get_post_fcov(self.training, X, diag) else: self._update_predictor() return self.predictor.get_post_fcov(self.training, X, diag)
[docs] def get_permutation_importance(self, n_perm: int, split_features_parallel=False): """ Calculating permutation importance of model Parameters ========== n_perm: int number of permutations split_features_parallel: bool If true, split features in parallel. Returns ======= importance_mean: numpy.ndarray importance_mean importance_std: numpy.ndarray importance_std """ if self.predictor is None: self._warn_no_predictor("get_post_fmean()") predictor = gp_predictor(self.config) predictor.fit(self.training, 0) predictor.prepare(self.training) return predictor.get_permutation_importance( self.training, n_perm, comm=self.mpicomm, split_features_parallel=split_features_parallel, ) else: self._update_predictor() return self.predictor.get_permutation_importance( self.training, n_perm, comm=self.mpicomm, split_features_parallel=split_features_parallel, )
[docs] def get_score( self, mode, *, actions=None, xs=None, predictor=None, training=None, parallel=True, alpha=1, ): """ Calcualte score (acquisition function) Parameters ---------- mode: str The type of aquisition funciton. TS, EI and PI are available. These functions are defined in score.py. actions: array of int actions to calculate score xs: physbo.Variable or np.ndarray input parameters to calculate score predictor: predictor object predictor used to calculate score. If not given, self.predictor will be used. training:physbo.Variable Training dataset. If not given, self.training will be used. parallel: bool Calculate scores in parallel by MPI (default: True) alpha: float Tuning parameter which is used if mode = TS. In TS, multi variation is tuned as np.random.multivariate_normal(mean, cov*alpha**2, size). Returns ------- f: float or list of float Score defined in each mode. Raises ------ RuntimeError If both *actions* and *xs* are given Notes ----- When neither *actions* nor *xs* are given, scores for actions not yet searched will be calculated. When *parallel* is True, it is assumed that the function receives the same input (*actions* or *xs*) for all the ranks. If you want to split the input array itself, set *parallel* be False and merge results by yourself. """ if training is None: training = self.training if training.X is None or training.X.shape[0] == 0: msg = "ERROR: No training data is registered." raise RuntimeError(msg) if predictor is None: if self.predictor is None: self._warn_no_predictor("get_score()") predictor = gp_predictor(self.config) predictor.fit(training, 0, comm=self.mpicomm) predictor.prepare(training) else: self._update_predictor() predictor = self.predictor if xs is not None: if actions is not None: raise RuntimeError("ERROR: both actions and xs are given") test = self._make_variable_X(xs) if parallel and self.mpisize > 1: actions = np.array_split(np.arange(test.X.shape[0]), self.mpisize) test = test.get_subset(actions[self.mpirank]) else: if actions is None: actions = self.actions else: if isinstance(actions, int): actions = [actions] if parallel and self.mpisize > 1: actions = np.array_split(actions, self.mpisize)[self.mpirank] test = self.test.get_subset(actions) f = search_score.score( mode, predictor=predictor, training=training, test=test, alpha=alpha ) if parallel and self.mpisize > 1: fs = self.mpicomm.allgather(f) f = np.hstack(fs) return f
def _get_marginal_score(self, mode, chosen_actions, K, alpha): """ Getting marginal scores. Parameters ---------- mode: str The type of aquision funciton. TS (Thompson Sampling), EI (Expected Improvement) and PI (Probability of Improvement) are available. These functions are defined in score.py. chosen_actions: numpy.ndarray Array of selected actions. K: int The number of samples for evaluating score. alpha: float not used. Returns ------- f: list N dimensional scores (score is defined in each mode) """ f = np.zeros((K, len(self.actions)), dtype=float) # draw K samples of the values of objective function of chosen actions new_test_local = self.test.get_subset(chosen_actions) virtual_t_local = self.predictor.get_predict_samples( self.training, new_test_local, K ) if self.mpisize == 1: new_test = new_test_local virtual_t = virtual_t_local else: new_test = Variable() for nt in self.mpicomm.allgather(new_test_local): new_test.add(X=nt.X, t=nt.t, Z=nt.Z) virtual_t = np.concatenate(self.mpicomm.allgather(virtual_t_local), axis=1) # virtual_t = self.predictor.get_predict_samples(self.training, new_test, K) for k in range(K): predictor = copy.deepcopy(self.predictor) train = copy.deepcopy(self.training) virtual_train = new_test virtual_train.t = virtual_t[k, :] if virtual_train.Z is None: train.add(virtual_train.X, virtual_train.t) else: train.add(virtual_train.X, virtual_train.t, virtual_train.Z) predictor.update(train, virtual_train) f[k, :] = self.get_score( mode, predictor=predictor, training=train, parallel=False ) return np.mean(f, axis=0) def _get_actions(self, mode, N, K, alpha): """ Getting next candidates Parameters ---------- mode: str The type of aquisition funciton. TS (Thompson Sampling), EI (Expected Improvement) and PI (Probability of Improvement) are available. These functions are defined in score.py. N: int The total number of actions to return. K: int The total number of samples to evaluate marginal score alpha: float Tuning parameter which is used if mode = TS. In TS, multi variation is tuned as np.random.multivariate_normal(mean, cov*alpha**2, size). Returns ------- chosen_actions: numpy.ndarray An N-dimensional array of actions selected in each search process. """ f = self.get_score( mode, predictor=self.predictor, training=self.training, alpha=alpha, parallel=False, ) champion, local_champion, local_index = self._find_champion(f) if champion == -1: return np.zeros(0, dtype=int) if champion == local_champion: self.actions = self._delete_actions(local_index) chosen_actions = [champion] for n in range(1, N): f = self._get_marginal_score(mode, chosen_actions[0:n], K, alpha) champion, local_champion, local_index = self._find_champion(f) if champion == -1: break if champion == local_champion: self.actions = self._delete_actions(local_index) chosen_actions.append(champion) return np.array(chosen_actions) def _find_champion(self, f): if len(f) == 0: local_fmax = -float("inf") local_index = -1 local_champion = -1 else: local_fmax = np.max(f) local_index = np.argmax(f) local_champion = self.actions[local_index] if self.mpisize == 1: champion = local_champion else: local_champions = self.mpicomm.allgather(local_champion) local_fs = self.mpicomm.allgather(local_fmax) champion_rank = np.argmax(local_fs) champion = local_champions[champion_rank] return champion, local_champion, local_index def _get_random_action(self, N): """ Getting indexes of actions randomly. Parameters ---------- N: int Total number of search candidates. Returns ------- action: numpy.ndarray Indexes of actions selected randomly from search candidates. """ if self.mpisize == 1: n = len(self.actions) if n <= N: index = np.arange(0, n) else: index = np.random.choice(len(self.actions), N, replace=False) action = self.actions[index] self.actions = self._delete_actions(index) else: nactions = self.mpicomm.gather(len(self.actions), root=0) local_indices = [[] for _ in range(self.mpisize)] if self.mpirank == 0: hi = np.add.accumulate(nactions) lo = np.roll(hi, 1) lo[0] = 0 if hi[-1] <= N: index = np.arange(0, hi[-1]) else: index = np.random.choice(hi[-1], N, replace=False) ranks = np.searchsorted(hi, index, side="right") for r, i in zip(ranks, index): local_indices[r].append(i - lo[r]) local_indices = self.mpicomm.scatter(local_indices, root=0) local_actions = self.actions[local_indices] self.actions = self._delete_actions(local_indices) action = self.mpicomm.allgather(local_actions) action = itertools.chain.from_iterable(action) action = np.array(list(action)) return action
[docs] def save(self, file_history, file_training=None, file_predictor=None): """ Saving history, training and predictor into the corresponding files. Parameters ---------- file_history: str The name of the file that stores the information of the history. file_training: str The name of the file that stores the training dataset. file_predictor: str The name of the file that stores the predictor dataset. Returns ------- """ if self.mpirank == 0: self.history.save(file_history) if file_training is not None: self.training.save(file_training) if file_predictor is not None: with open(file_predictor, "wb") as f: pickle.dump(self.predictor, f)
[docs] def load(self, file_history, file_training=None, file_predictor=None): """ Loading files about history, training and predictor. Parameters ---------- file_history: str The name of the file that stores the information of the history. file_training: str The name of the file that stores the training dataset. file_predictor: str The name of the file that stores the predictor dataset. Returns ------- """ self.history.load(file_history) if file_training is None: N = self.history.total_num_search X = self.test.X[self.history.chosen_actions[0:N], :] t = self.history.fx[0:N] self.training = Variable(X=X, t=t) else: self.training = Variable() self.training.load(file_training) if file_predictor is not None: with open(file_predictor, "rb") as f: self.predictor = pickle.load(f) N = self.history.total_num_search visited = self.history.chosen_actions[:N] local_index = np.searchsorted(self.actions, visited) local_index = local_index[ np.take(self.actions, local_index, mode="clip") == visited ] self.actions = self._delete_actions(local_index)
[docs] def export_predictor(self): """ Returning the predictor dataset Returns ------- """ return self.predictor
[docs] def export_training(self): """ Returning the training dataset Returns ------- """ return self.training
[docs] def export_history(self): """ Returning the information of the history. Returns ------- """ return self.history
def _init_predictor(self, is_rand_expans): """ Initialize predictor. Parameters ---------- is_rand_expans: bool If true, physbo.blm.predictor is selected. If false, physbo.gp.Predictor is selected. """ if is_rand_expans: self.predictor = blm_predictor(self.config) else: self.predictor = gp_predictor(self.config) def _learn_hyperparameter(self, num_rand_basis): self.predictor.fit(self.training, num_rand_basis, comm=self.mpicomm) self.test.Z = self.predictor.get_basis(self.test.X) self.training.Z = self.predictor.get_basis(self.training.X) self.predictor.prepare(self.training) self.new_data = None def _update_predictor(self): """ Updating predictor by using new_data, which means a subset of the trained data (self.training) that is not yet added to the predictor. """ if self.new_data is not None: self.predictor.update(self.training, self.new_data) self.new_data = None def _make_variable_X(self, test_X): """ Make a new *Variable* with X=test_X Parameters ---------- test_X: numpy.ndarray or physbo.Variable The set of candidates. Each row vector represents the feature vector of each search candidate. Returns ------- test_X: numpy.ndarray or physbo.Variable The set of candidates. Each row vector represents the feature vector of each search candidate. """ if isinstance(test_X, np.ndarray): test = Variable(X=test_X) elif isinstance(test_X, Variable): test = test_X else: raise TypeError("The type of test_X must be ndarray or physbo.Variable") return test def _delete_actions(self, index, actions=None): """ Returns remaining actions Notes ----- This method itself does not modify *self* Parameters ---------- index: int Index of an action to be deleted. actions: numpy.ndarray Array of actions. Returns ------- actions: numpy.ndarray Array of actions which does not include action specified by index. """ if actions is None: actions = self.actions return np.delete(actions, index)
def _run_simulator(simulator, action, comm=None): if comm is None: return simulator(action) if comm.rank == 0: t = simulator(action) else: t = 0.0 return comm.bcast(t, root=0)