# SPDX-License-Identifier: MPL-2.0
#
# ODAT-SE -- an open framework for data analysis
# Copyright (C) 2020- The University of Tokyo
#
# This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0.
# If a copy of the MPL was not distributed with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
from typing import List
import time
import shutil
import copy
from pathlib import Path
import physbo
import numpy as np
import odatse
import odatse.domain
[docs]
class Algorithm(odatse.algorithm.AlgorithmBase):
"""
A class to represent the Bayesian optimization algorithm.
Attributes
----------
mesh_list : np.ndarray
The mesh grid list.
label_list : List[str]
The list of labels.
random_max_num_probes : int
The maximum number of random probes.
bayes_max_num_probes : int
The maximum number of Bayesian probes.
score : str
The scoring method.
interval : int
The interval for Bayesian optimization.
num_rand_basis : int
The number of random basis.
xopt : np.ndarray
The optimal solution.
best_fx : List[float]
The list of best function values.
best_action : List[int]
The list of best actions.
fx_list : List[float]
The list of function values.
param_list : List[np.ndarray]
The list of parameters.
"""
[docs]
def __init__(self, info: odatse.Info, runner: odatse.Runner = None, domain = None, run_mode: str = "initial") -> None:
"""
Constructs all the necessary attributes for the Algorithm object.
Parameters
----------
info : odatse.Info
The information object.
runner : odatse.Runner, optional
The runner object (default is None).
domain : optional
The domain object (default is None).
run_mode : str, optional
The run mode (default is "initial").
"""
super().__init__(info=info, runner=runner, run_mode=run_mode)
info_param = info.algorithm.get("param", {})
info_bayes = info.algorithm.get("bayes", {})
for key in ("random_max_num_probes", "bayes_max_num_probes", "score", "interval", "num_rand_basis"):
if key in info_param and key not in info_bayes:
print(f"WARNING: algorithm.param.{key} is deprecated. Use algorithm.bayes.{key} .")
info_bayes[key] = info_param[key]
self.random_max_num_probes = info_bayes.get("random_max_num_probes", 20)
self.bayes_max_num_probes = info_bayes.get("bayes_max_num_probes", 40)
self.score = info_bayes.get("score", "TS")
self.interval = info_bayes.get("interval", 5)
self.num_rand_basis = info_bayes.get("num_rand_basis", 5000)
if self.mpirank == 0:
print("# parameter")
print(f"random_max_num_probes = {self.random_max_num_probes}")
print(f"bayes_max_num_probes = {self.bayes_max_num_probes}")
print(f"score = {self.score}")
print(f"interval = {self.interval}")
print(f"num_rand_basis = {self.num_rand_basis}")
if domain and isinstance(domain, odatse.domain.MeshGrid):
self.domain = domain
else:
self.domain = odatse.domain.MeshGrid(info)
self.mesh_list = np.array(self.domain.grid)
X_normalized = physbo.misc.centering(self.mesh_list[:, 1:])
comm = self.mpicomm if self.mpisize > 1 else None
self.policy = physbo.search.discrete.policy(test_X=X_normalized, comm=comm)
if "seed" in info.algorithm:
seed = info.algorithm["seed"]
self.policy.set_seed(seed)
self.file_history = "history.npz"
self.file_training = "training.npz"
self.file_predictor = "predictor.dump"
[docs]
def _initialize(self):
"""
Initializes the algorithm parameters and timers.
"""
self.istep = 0
self.param_list = []
self.fx_list = []
self.timer["run"]["random_search"] = 0.0
self.timer["run"]["bayes_search"] = 0.0
self._show_parameters()
[docs]
def _run(self) -> None:
"""
Runs the Bayesian optimization process.
"""
runner = self.runner
mesh_list = self.mesh_list
class simulator:
def __call__(self, action: np.ndarray) -> float:
"""
Simulates the function evaluation for a given action.
Parameters
----------
action : np.ndarray
The action to be evaluated.
Returns
-------
float
The negative function value.
"""
a = int(action[0])
args = (a, 0)
x = mesh_list[a, 1:]
fx = runner.submit(x, args)
fx_list.append(fx)
param_list.append(mesh_list[a])
return -fx
if self.mode is None:
raise RuntimeError("mode unset")
restore_rng = not self.mode.endswith("-resetrand")
if self.mode.startswith("init"):
self._initialize()
elif self.mode.startswith("resume"):
self._load_state(self.checkpoint_file, mode="resume", restore_rng=restore_rng)
elif self.mode.startswith("continue"):
self._load_state(self.checkpoint_file, mode="continue", restore_rng=restore_rng)
else:
raise RuntimeError("unknown mode {}".format(self.mode))
fx_list = self.fx_list
param_list = self.param_list
if self.mode.startswith("init"):
time_sta = time.perf_counter()
res = self.policy.random_search(
max_num_probes=self.random_max_num_probes, simulator=simulator()
)
time_end = time.perf_counter()
self.timer["run"]["random_search"] = time_end - time_sta
if self.checkpoint:
self._save_state(self.checkpoint_file)
else:
if self.istep >= self.bayes_max_num_probes:
res = copy.deepcopy(self.policy.history)
next_checkpoint_step = self.istep + self.checkpoint_steps
next_checkpoint_time = time.time() + self.checkpoint_interval
while self.istep < self.bayes_max_num_probes:
intv = 0 if self.istep % self.interval == 0 else -1
time_sta = time.perf_counter()
res = self.policy.bayes_search(
max_num_probes=1,
simulator=simulator(),
score=self.score,
interval=intv,
num_rand_basis=self.num_rand_basis,
)
time_end = time.perf_counter()
self.timer["run"]["bayes_search"] += time_end - time_sta
self.istep += 1
if self.checkpoint:
time_now = time.time()
if self.istep >= next_checkpoint_step or time_now >= next_checkpoint_time:
self.fx_list = fx_list
self.param_list = param_list
self._save_state(self.checkpoint_file)
next_checkpoint_step = self.istep + self.checkpoint_steps
next_checkpoint_time = time_now + self.checkpoint_interval
self.best_fx, self.best_action = res.export_all_sequence_best_fx()
self.xopt = mesh_list[int(self.best_action[-1]), 1:]
self.fx_list = fx_list
self.param_list = param_list
if self.checkpoint:
self._save_state(self.checkpoint_file)
[docs]
def _prepare(self) -> None:
"""
Prepares the algorithm for execution.
"""
pass
[docs]
def _post(self) -> None:
"""
Finalizes the algorithm execution and writes the results to a file.
"""
label_list = self.label_list
if self.mpirank == 0:
with open("BayesData.txt", "w") as file_BD:
file_BD.write("#step")
for label in label_list:
file_BD.write(f" {label}")
file_BD.write(" fx")
for label in label_list:
file_BD.write(f" {label}_action")
file_BD.write(" fx_action\n")
for step, fx in enumerate(self.fx_list):
file_BD.write(str(step))
best_idx = int(self.best_action[step])
for v in self.mesh_list[best_idx][1:]:
file_BD.write(f" {v}")
file_BD.write(f" {-self.best_fx[step]}")
for v in self.param_list[step][1:]:
file_BD.write(f" {v}")
file_BD.write(f" {fx}\n")
print("Best Solution:")
for x, y in zip(label_list, self.xopt):
print(x, "=", y)
return {"x": self.xopt, "fx": self.best_fx}
[docs]
def _save_state(self, filename):
"""
Saves the current state of the algorithm to a file.
Parameters
----------
filename : str
The name of the file to save the state.
"""
data = {
"mpisize": self.mpisize,
"mpirank": self.mpirank,
"rng": self.rng.get_state(),
"timer": self.timer,
"info": self.info,
"istep": self.istep,
"param_list": self.param_list,
"fx_list": self.fx_list,
"file_history": self.file_history,
"file_training": self.file_training,
"file_predictor": self.file_predictor,
"random_number": np.random.get_state(),
}
self._save_data(data, filename)
self.policy.save(file_history=Path(self.output_dir, self.file_history),
file_training=Path(self.output_dir, self.file_training),
file_predictor=Path(self.output_dir, self.file_predictor))
[docs]
def _load_state(self, filename, mode="resume", restore_rng=True):
"""
Loads the state of the algorithm from a file.
Parameters
----------
filename : str
The name of the file to load the state from.
mode : str, optional
The mode to load the state (default is "resume").
restore_rng : bool, optional
Whether to restore the random number generator state (default is True).
"""
data = self._load_data(filename)
if not data:
print("ERROR: Load status file failed")
sys.exit(1)
assert self.mpisize == data["mpisize"]
assert self.mpirank == data["mpirank"]
if restore_rng:
self.rng = np.random.RandomState()
self.rng.set_state(data["rng"])
np.random.set_state(data["random_number"])
self.timer = data["timer"]
info = data["info"]
self._check_parameters(info)
self.istep = data["istep"]
self.param_list = data["param_list"]
self.fx_list = data["fx_list"]
self.policy.load(file_history=Path(self.output_dir, self.file_history),
file_training=Path(self.output_dir, self.file_training),
file_predictor=Path(self.output_dir, self.file_predictor))