physbo.search.range_multi._history のソースコード

# SPDX-License-Identifier: MPL-2.0
# Copyright (C) 2020- The University of Tokyo
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.

import numpy as np
import pickle
import copy

from .. import pareto

MAX_SEARCH = int(30000)


[ドキュメント] class History(object): def __init__(self, num_objectives, dim): self.dim = dim self.num_objectives = num_objectives self.pareto = pareto.Pareto(num_objectives=self.num_objectives) self.num_runs = int(0) self.total_num_search = int(0) self.fx = np.zeros((MAX_SEARCH, self.num_objectives), dtype=float) self.action_X = np.zeros((MAX_SEARCH, self.dim), dtype=float) self.terminal_num_run = np.zeros(MAX_SEARCH, dtype=int) self._time_total = np.zeros(MAX_SEARCH, dtype=float) self._time_update_predictor = np.zeros(MAX_SEARCH, dtype=float) self._time_get_action = np.zeros(MAX_SEARCH, dtype=float) self._time_run_simulator = np.zeros(MAX_SEARCH, dtype=float) @property def time_total(self): return copy.copy(self._time_total[0 : self.num_runs]) @property def time_update_predictor(self): return copy.copy(self._time_update_predictor[0 : self.num_runs]) @property def time_get_action(self): return copy.copy(self._time_get_action[0 : self.num_runs]) @property def time_run_simulator(self): return copy.copy(self._time_run_simulator[0 : self.num_runs])
[ドキュメント] def write( self, t, action_X, time_total=None, time_update_predictor=None, time_get_action=None, time_run_simulator=None, ): """ Overwrite fx and action_X by t and action_X. Parameters ---------- t: numpy.ndarray N dimensional array. The negative energy of each search candidate (value of the objective function to be optimized). action_X: numpy.ndarray N x d dimensional array. The input of each search candidate. time_total: numpy.ndarray N dimenstional array. The total elapsed time in each step. If None (default), filled by 0.0. time_update_predictor: numpy.ndarray N dimenstional array. The elapsed time for updating predictor (e.g., learning hyperparemters) in each step. If None (default), filled by 0.0. time_get_action: numpy.ndarray N dimenstional array. The elapsed time for getting next action in each step. If None (default), filled by 0.0. time_run_simulator: numpy.ndarray N dimenstional array. The elapsed time for running the simulator in each step. If None (default), filled by 0.0. Returns ------- """ t = np.array(t) if t.ndim == 1: N = 1 if len(t) != self.num_objectives: raise ValueError("t does not match the number of objectives") else: N = t.shape[0] if t.shape[1] != self.num_objectives: raise ValueError("t does not match the number of objectives") st = self.total_num_search en = st + N self.terminal_num_run[self.num_runs] = en self.fx[st:en, :] = t self.action_X[st:en, :] = action_X self.num_runs += 1 self.total_num_search += N # update Pareto set self.pareto.update_front(t) if time_total is None: time_total = np.zeros(N, dtype=float) self._time_total[st:en] = time_total if time_update_predictor is None: time_update_predictor = np.zeros(N, dtype=float) self._time_update_predictor[st:en] = time_update_predictor if time_get_action is None: time_get_action = np.zeros(N, dtype=float) self._time_get_action[st:en] = time_get_action if time_run_simulator is None: time_run_simulator = np.zeros(N, dtype=float) self._time_run_simulator[st:en] = time_run_simulator
[ドキュメント] def export_pareto_front(self): return self.pareto.export_front()
[ドキュメント] def save(self, filename): N = self.total_num_search M = self.num_runs obj = { "num_runs": M, "total_num_search": N, "fx": self.fx[0:N, :], "action_X": self.action_X[0:N, :], "terminal_num_run": self.terminal_num_run[0:M], "pareto": self.pareto, } with open(filename, "wb") as f: pickle.dump(obj, f)
[ドキュメント] def load(self, filename): with open(filename, "rb") as f: data = pickle.load(f) M = int(data["num_runs"]) N = int(data["total_num_search"]) self.num_runs = M self.total_num_search = N self.fx[0:N, :] = data["fx"] self.action_X[0:N, :] = data["action_X"] self.terminal_num_run[0:M] = data["terminal_num_run"] self.pareto = data["pareto"]
[ドキュメント] def show_search_results_mo(self, N, disp_pareto_set=False): n = self.total_num_search pset, step = self.pareto.export_front() def msg_pareto_set_updated(indent=False): prefix = " " if indent else "" if self.pareto.front_updated: print(prefix + "Pareto set updated.") if disp_pareto_set: print( prefix + "current Pareto set = %s (steps = %s) \n" % (str(pset), str(step + 1)) ) else: print( prefix + "the number of Pareto frontiers = %s \n" % str(len(step)) ) if N == 1: print( "%04d-th step: f(x) = %s (X = %s)" % (n, str(self.fx[n - 1]), str(self.action_X[n - 1])) ) msg_pareto_set_updated(indent=True) else: msg_pareto_set_updated() print("list of simulation results") st = self.total_num_search - N en = self.total_num_search for n in range(st, en): print("f(x) = %s (X = %s)" % (str(self.fx[n]), str(self.action_X[n]))) print("\n")