physbo.search.discrete.policy のソースコード

import numpy as np
import copy
import pickle as pickle
import itertools

from .results import history, history_mo
from .. import utility

import physbo
import physbo.gp
import physbo.blm
import physbo.misc
import physbo.search.score
from physbo.variable import variable

MAX_SEACH = int(20000)


[ドキュメント]def run_simulator(simulator, action, comm=None): if comm is None: return simulator(action) if comm.rank == 0: t = simulator(action) else: t = 0.0 return comm.bcast(t, root=0)
[ドキュメント]class policy: def __init__(self, test_X, config=None, initial_data=None, comm=None): """ Parameters ---------- test_X: numpy.ndarray or physbo.variable The set of candidates. Each row vector represents the feature vector of each search candidate. config: set_config object (physbo.misc.set_config) initial_data: tuple[np.ndarray, np.ndarray] The initial training datasets. The first elements is the array of actions and the second is the array of value of objective functions """ self.predictor = None self.training = variable() self.test = self._set_test(test_X) self.actions = np.arange(0, self.test.X.shape[0]) self.history = history() self.config = self._set_config(config) if initial_data is not None: if len(initial_data) != 2: msg = "ERROR: initial_data should be 2-elements tuple or list (actions and objectives)" raise RuntimeError(msg) actions, fs = initial_data if len(actions) != len(fs): msg = "ERROR: len(initial_data[0]) != len(initial_data[1])" raise RuntimeError(msg) self.write(actions, fs) self.actions = sorted(list(set(self.actions)-set(actions))) if comm is None: self.mpicomm = None self.mpisize = 1 self.mpirank = 0 else: self.mpicomm = comm self.mpisize = comm.size self.mpirank = comm.rank self.actions = np.array_split(self.actions, self.mpisize)[self.mpirank] self.config.learning.is_disp = self.mpirank == 0
[ドキュメント] def set_seed(self, seed): """ Setting a seed parameter for np.random. Parameters ---------- seed: int seed number ------- """ self.seed = seed np.random.seed(self.seed)
[ドキュメント] def delete_actions(self, index, actions=None): """ Deleteing actions Parameters ---------- index: int Index of an action to be deleted. actions: numpy.ndarray Array of actions. Returns ------- actions: numpy.ndarray Array of actions which does not include action specified by index. """ actions = self._set_unchosen_actions(actions) return np.delete(actions, index)
[ドキュメント] def write(self, action, t, X=None): """ Writing history (update history, not output to a file). Parameters ---------- action: numpy.ndarray Indexes of actions. t: numpy.ndarray N dimensional array. The negative energy of each search candidate (value of the objective function to be optimized). X: numpy.ndarray N x d dimensional matrix. Each row of X denotes the d-dimensional feature vector of each search candidate. Returns ------- """ if X is None: X = self.test.X[action, :] Z = self.test.Z[action, :] if self.test.Z is not None else None else: Z = self.predictor.get_basis(X) if self.predictor is not None else None self.new_data = variable(X, t, Z) self.history.write(t, action) self.training.add(X=X, t=t, Z=Z)
[ドキュメント] def get_score(self, mode, predictor=None, training=None, alpha=1): """ Getting score. Parameters ---------- mode: str The type of aquision funciton. TS, EI and PI are available. These functions are defined in score.py. predictor: predictor object Base class is defined in physbo.predictor. training:physbo.variable Training dataset. alpha: float Tuning parameter which is used if mode = TS. In TS, multi variation is tuned as np.random.multivariate_normal(mean, cov*alpha**2, size). Returns ------- f: float or list of float Score defined in each mode. """ self._set_training(training) self._set_predictor(predictor) actions = self.actions test = self.test.get_subset(actions) if mode == "EI": f = physbo.search.score.EI(predictor, training, test) elif mode == "PI": f = physbo.search.score.PI(predictor, training, test) elif mode == "TS": f = physbo.search.score.TS(predictor, training, test, alpha) else: raise NotImplementedError("mode must be EI, PI or TS.") return f
[ドキュメント] def get_marginal_score(self, mode, chosen_actions, K, alpha): """ Getting marginal scores. Parameters ---------- mode: str The type of aquision funciton. TS (Thompson Sampling), EI (Expected Improvement) and PI (Probability of Improvement) are available. These functions are defined in score.py. chosen_actions: numpy.ndarray Array of selected actions. K: int The total number of search candidates. alpha: float not used. Returns ------- f: list N dimensional scores (score is defined in each mode) """ f = np.zeros((K, len(self.actions))) new_test_local = self.test.get_subset(chosen_actions) if self.mpisize == 1: new_test = new_test_local else: new_test = variable() for nt in self.mpicomm.allgather(new_test_local): new_test.add(X=nt.X, t=nt.t, Z=nt.Z) virtual_t = self.predictor.get_predict_samples(self.training, new_test, K) for k in range(K): predictor = copy.deepcopy(self.predictor) train = copy.deepcopy(self.training) virtual_train = new_test virtual_train.t = virtual_t[k, :] if virtual_train.Z is None: train.add(virtual_train.X, virtual_train.t) else: train.add(virtual_train.X, virtual_train.t, virtual_train.Z) predictor.update(train, virtual_train) f[k, :] = self.get_score(mode, predictor, train) return np.mean(f, axis=0)
[ドキュメント] def get_actions(self, mode, N, K, alpha): """ Getting actions Parameters ---------- mode: str The type of aquision funciton. TS (Thompson Sampling), EI (Expected Improvement) and PI (Probability of Improvement) are available. These functions are defined in score.py. N: int The total number of actions to return. K: int The total number of samples to evaluate marginal score alpha: float Tuning parameter which is used if mode = TS. In TS, multi variation is tuned as np.random.multivariate_normal(mean, cov*alpha**2, size). Returns ------- chosen_actions: numpy.ndarray An N-dimensional array of actions selected in each search process. """ f = self.get_score(mode, self.predictor, self.training, alpha) champion, local_champion, local_index = self._find_champion(f) if champion == local_champion: self.actions = self.delete_actions(local_index) chosen_actions = np.zeros(N, dtype=int) chosen_actions[0] = champion for n in range(1, N): f = self.get_marginal_score(mode, chosen_actions[0:n], K, alpha) champion, local_champion, local_index = self._find_champion(f) if champion == local_champion: self.actions = self.delete_actions(local_index) chosen_actions[n] = champion return chosen_actions
def _find_champion(self, f): local_fmax = np.max(f) local_index = np.argmax(f) local_champion = self.actions[local_index] if self.mpisize == 1: return local_champion, local_champion, local_index else: local_champions = self.mpicomm.allgather(local_champion) local_fs = self.mpicomm.allgather(local_fmax) champion_rank = np.argmax(local_fs) champion = local_champions[champion_rank] return champion, local_champion, local_index
[ドキュメント] def get_random_action(self, N): """ Getting indexes of actions randomly. Parameters ---------- N: int Total number of search candidates. Returns ------- action: numpy.ndarray Indexes of actions selected randomly from search candidates. """ action = np.zeros(N, dtype=np.int) if self.mpisize == 1: index = np.random.choice(len(self.actions), N, replace=False) action = self.actions[index] self.actions = self.delete_actions(index) else: nactions = self.mpicomm.gather(len(self.actions), root=0) local_indices = [[] for _ in range(self.mpisize)] if self.mpirank == 0: hi = np.add.accumulate(nactions) lo = np.roll(hi, 1) lo[0] = 0 index = np.random.choice(hi[-1], N, replace=False) ranks = np.searchsorted(hi, index, side="right") for r,i in zip(ranks, index): local_indices[r].append(i-lo[r]) local_indices = self.mpicomm.scatter(local_indices, root=0) local_actions = self.actions[local_indices] self.actions = self.delete_actions(local_indices) action = self.mpicomm.allgather(local_actions) action = itertools.chain.from_iterable(action) action = np.array(list(action)) return action
[ドキュメント] def save(self, file_history, file_training=None, file_predictor=None): """ Saving history, training and predictor into the corresponding files. Parameters ---------- file_history: str The name of the file that stores the information of the history. file_training: str The name of the file that stores the training dataset. file_predictor: str The name of the file that stores the predictor dataset. Returns ------- """ self.history.save(file_history) if file_training is not None: self.training.save(file_training) if file_predictor is not None: with open(file_predictor, "wb") as f: pickle.dump(self.predictor, f)
[ドキュメント] def load(self, file_history, file_training=None, file_predictor=None): """ Loading files about history, training and predictor. Parameters ---------- file_history: str The name of the file that stores the information of the history. file_training: str The name of the file that stores the training dataset. file_predictor: str The name of the file that stores the predictor dataset. Returns ------- """ self.history.load(file_history) if file_training is None: N = self.history.total_num_search X = self.test.X[self.history.chosen_actions[0:N], :] t = self.history.fx[0:N] self.training = variable(X=X, t=t) else: self.training = variable() self.training.load(file_training) if file_predictor is not None: with open(file_predictor, "rb") as f: self.predictor = pickle.load(f)
[ドキュメント] def export_predictor(self): """ Returning the predictor dataset Returns ------- """ return self.predictor
[ドキュメント] def export_training(self): """ Returning the training dataset Returns ------- """ return self.training
[ドキュメント] def export_history(self): """ Returning the information of the history. Returns ------- """ return self.history
def _set_predictor(self, predictor=None): """ Set predictor if defined. Parameters ---------- predictor: predictor object Base class is defined in physbo.predictor. Returns ------- """ if predictor is None: predictor = self.predictor return predictor def _init_predictor(self, is_rand_expans, predictor=None): """ Setting the initial predictor. Parameters ---------- is_rand_expans: bool If true, physbo.blm.predictor is selected. If false, physbo.gp.predictor is selected. predictor: predictor object Base class is defined in physbo.predictor. Returns ------- predictor: predictor object Base class is defined in physbo.predictor. """ self.predictor = self._set_predictor(predictor) if self.predictor is None: if is_rand_expans: self.predictor = physbo.blm.predictor(self.config) else: self.predictor = physbo.gp.predictor(self.config) return self.predictor def _set_training(self, training=None): """ Set training dataset. Parameters ---------- training: physbo.variable Training dataset. Returns ------- training: physbo.variable Training dataset. """ if training is None: training = self.training return training def _set_unchosen_actions(self, actions=None): """ Parameters ---------- actions: numpy.ndarray An array of indexes of the actions which are not chosen. Returns ------- actions: numpy.ndarray An array of indexes of the actions which are not chosen. """ if actions is None: actions = self.actions return actions def _set_test(self, test_X): """ Set test candidates. Parameters ---------- test_X: numpy.ndarray or physbo.variable The set of candidates. Each row vector represents the feature vector of each search candidate. Returns ------- test_X: numpy.ndarray or physbo.variable The set of candidates. Each row vector represents the feature vector of each search candidate. """ if isinstance(test_X, np.ndarray): test = variable(X=test_X) elif isinstance(test_X, variable): test = test_X else: raise TypeError( "The type of test_X must \ take ndarray or physbo.variable" ) return test def _set_config(self, config=None): """ Set configure information. Parameters ---------- config: set_config object (physbo.misc.set_config) Returns ------- config: set_config object (physbo.misc.set_config) """ if config is None: config = physbo.misc.set_config() return config
[ドキュメント]class policy_mo(policy): def __init__(self, test_X, num_objectives, comm=None, config=None, initial_actions=None): self.num_objectives = num_objectives self.history = history_mo(num_objectives=self.num_objectives) self.training_list = [variable() for i in range(self.num_objectives)] self.predictor_list = [None for i in range(self.num_objectives)] self.test_list = [self._set_test(test_X) for i in range(self.num_objectives)] self.new_data_list = [None for i in range(self.num_objectives)] self.actions = np.arange(0, test_X.shape[0]) self.config = self._set_config(config) self.TS_candidate_num = None if initial_actions is not None: self.actions = sorted(list(set(self.actions) - set(initial_actions))) if comm is None: self.mpicomm = None self.mpisize = 1 self.mpirank = 0 else: self.mpicomm = comm self.mpisize = comm.size self.mpirank = comm.rank self.actions = np.array_split(self.actions, self.mpisize)[self.mpirank]
[ドキュメント] def write(self, action, t, X=None): self.history.write(t, action) action = np.array(action) t = np.array(t) for i in range(self.num_objectives): test = self.test_list[i] predictor = self.predictor_list[i] if X is None: X = test.X[action, :] Z = test.Z[action, :] if test.Z is not None else None else: Z = predictor.get_basis(X) if predictor is not None else None self.new_data_list[i] = variable(X, t[:, i], Z) self.training_list[i].add(X=X, t=t[:, i], Z=Z)
def _switch_model(self, i): self.training = self.training_list[i] self.predictor = self.predictor_list[i] self.test = self.test_list[i] self.new_data = self.new_data_list[i]
[ドキュメント] def get_actions(self, mode, N, K, alpha): f = self.get_score( mode, self.predictor_list, self.training_list, self.history.pareto, alpha ) champion, local_champion, local_index = self._find_champion(f) chosen_actions = np.zeros(N, dtype=int) chosen_actions[0] = champion if champion == local_champion: self.actions = self.delete_actions(local_index) for n in range(1, N): f = self.get_score( mode, self.predictor_list, self.training_list, self.history.pareto, alpha, ) champion, local_champion, local_index = self._find_champion(f) chosen_actions[n] = champion if champion == local_champion: self.actions = self.delete_actions(local_index) return chosen_actions
[ドキュメント] def get_score(self, mode, predictor_list, training_list, pareto, alpha=1): actions = self.actions test = self.test.get_subset(actions) if mode == "EHVI": fmean, fstd = self.get_fmean_fstd(predictor_list, training_list, test) f = physbo.search.score.EHVI(fmean, fstd, pareto) elif mode == "HVPI": fmean, fstd = self.get_fmean_fstd(predictor_list, training_list, test) f = physbo.search.score.HVPI(fmean, fstd, pareto) elif mode == "TS": f = physbo.search.score.TS_MO( predictor_list, training_list, test, alpha, reduced_candidate_num=self.TS_candidate_num, ) else: raise NotImplementedError("mode must be EHVI, HVPI or TS.") return f
[ドキュメント] def get_fmean_fstd(self, predictor_list, training_list, test): fmean = [ predictor.get_post_fmean(training, test) for predictor, training in zip(predictor_list, training_list) ] fcov = [ predictor.get_post_fcov(training, test) for predictor, training in zip(predictor_list, training_list) ] # shape: (N, n_obj) fmean = np.array(fmean).T fstd = np.sqrt(np.array(fcov)).T return fmean, fstd
[ドキュメント] def load(self, file_history, file_training_list=None, file_predictor_list=None): self.history.load(file_history) if file_training_list is None: N = self.history.total_num_search X = self.test_list[0].X[self.history.chosen_actions[0:N], :] t = self.history.fx[0:N] self.training_list = [ variable(X=X, t=t[:, i]) for i in range(self.num_objectives) ] else: self.load_training_list(file_training_list) if file_predictor_list is not None: self.load_predictor_list(file_predictor_list)
[ドキュメント] def save_predictor_list(self, file_name): with open(file_name, "wb") as f: pickle.dump(self.predictor_list, f, 2)
[ドキュメント] def save_training_list(self, file_name): obj = [ {"X": training.X, "t": training.t, "Z": training.Z} for training in self.training_list ] with open(file_name, "wb") as f: pickle.dump(obj, f, 2)
[ドキュメント] def load_predictor_list(self, file_name): with open(file_name, "rb") as f: self.predictor_list = pickle.load(f)
[ドキュメント] def load_training_list(self, file_name): with open(file_name, "rb") as f: data_list = pickle.load(f) self.training_list = [variable() for i in range(self.num_objectives)] for data, training in zip(data_list, self.training_list): training.X = data["X"] training.t = data["t"] training.Z = data["Z"]