Source code for odatse.algorithm.exchange

# SPDX-License-Identifier: MPL-2.0
#
# ODAT-SE -- an open framework for data analysis
# Copyright (C) 2020- The University of Tokyo
#
# This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0.
# If a copy of the MPL was not distributed with this file, You can obtain one at http://mozilla.org/MPL/2.0/.

from io import open
import copy
import time
import itertools
import sys

import numpy as np

import odatse
import odatse.algorithm.montecarlo
from odatse.algorithm.montecarlo import read_Ts
from odatse.util.separateT import separateT

[docs] class Algorithm(odatse.algorithm.montecarlo.AlgorithmBase): """Replica Exchange Monte Carlo Attributes ========== x: np.ndarray current configuration inode: np.ndarray current configuration index (discrete parameter space) fx: np.ndarray current "Energy" Tindex: np.ndarray current "Temperature" index istep: int current step (or, the number of calculated energies) best_x: np.ndarray best configuration best_fx: float best "Energy" best_istep: int index of best configuration nreplica: int The number of replicas (= the number of procs) T2rep: np.ndarray Mapping from temperature index to replica index rep2T: np.ndarray Reverse mapping from replica index to temperature index exchange_direction: bool Parity of exchange direction """ x: np.ndarray xmin: np.ndarray xmax: np.ndarray #xunit: np.ndarray xstep: np.ndarray numsteps: int numsteps_exchange: int fx: np.ndarray istep: int nreplica: int Tindex: np.ndarray rep2T: np.ndarray T2rep: np.ndarray exchange_direction: bool
[docs] def __init__(self, info: odatse.Info, runner: odatse.Runner = None, run_mode: str = "initial" ) -> None: """ Initialize the Algorithm class. Parameters ---------- info : odatse.Info Information object containing algorithm parameters. runner : odatse.Runner, optional Runner object for executing the algorithm. run_mode : str, optional Mode to run the algorithm in, by default "initial". """ time_sta = time.perf_counter() info_exchange = info.algorithm["exchange"] nwalkers = info_exchange.get("nreplica_per_proc", 1) super().__init__(info=info, runner=runner, nwalkers=nwalkers, run_mode=run_mode) self.nreplica = self.mpisize * self.nwalkers self.input_as_beta, self.betas = read_Ts(info_exchange, numT=self.nreplica) self.numsteps = info_exchange["numsteps"] self.numsteps_exchange = info_exchange["numsteps_exchange"] time_end = time.perf_counter() self.timer["init"]["total"] = time_end - time_sta
[docs] def _print_info(self) -> None: """ Print information about the algorithm. """ if self.mpirank == 0: pass if self.mpisize > 1: self.mpicomm.barrier()
[docs] def _initialize(self) -> None: """ Initialize the algorithm parameters and state. """ super()._initialize() self.Tindex = np.arange( self.mpirank * self.nwalkers, (self.mpirank + 1) * self.nwalkers ) self.rep2T = np.arange(self.nreplica) self.T2rep = np.arange(self.nreplica) self.exchange_direction = True self.istep = 0 self._show_parameters()
[docs] def _run(self) -> None: """ Run the algorithm. """ # print(">>> _run") if self.mode is None: raise RuntimeError("mode unset") restore_rng = not self.mode.endswith("-resetrand") if self.mode.startswith("init"): self._initialize() elif self.mode.startswith("resume"): self._load_state(self.checkpoint_file, mode="resume", restore_rng=restore_rng) elif self.mode.startswith("continue"): self._load_state(self.checkpoint_file, mode="continue", restore_rng=restore_rng) else: raise RuntimeError("unknown mode {}".format(self.mode)) beta = self.betas[self.Tindex] if self.mode.startswith("init"): # first step self._evaluate() file_trial = open("trial.txt", "w") self._write_result_header(file_trial) self._write_result(file_trial) file_result = open("result.txt", "w") self._write_result_header(file_result) self._write_result(file_result) self.istep += 1 minidx = np.argmin(self.fx) self.best_x = copy.copy(self.x[minidx, :]) self.best_fx = np.min(self.fx[minidx]) self.best_istep = 0 self.best_iwalker = 0 else: file_trial = open("trial.txt", "a") file_result = open("result.txt", "a") next_checkpoint_step = self.istep + self.checkpoint_steps next_checkpoint_time = time.time() + self.checkpoint_interval while self.istep < self.numsteps: # print(">>> istep={}".format(self.istep)) # Exchange if self.istep % self.numsteps_exchange == 0: # print(">>> do exchange") time_sta = time.perf_counter() if self.nreplica > 1: self._exchange(self.exchange_direction) if self.nreplica > 2: self.exchange_direction = not self.exchange_direction time_end = time.perf_counter() self.timer["run"]["exchange"] += time_end - time_sta beta = self.betas[self.Tindex] # print(">>> call local_update") self.local_update(beta, file_trial, file_result) self.istep += 1 if self.checkpoint: time_now = time.time() if self.istep >= next_checkpoint_step or time_now >= next_checkpoint_time: self._save_state(self.checkpoint_file) next_checkpoint_step = self.istep + self.checkpoint_steps next_checkpoint_time = time_now + self.checkpoint_interval file_result.close() file_trial.close() print("complete main process : rank {:08d}/{:08d}".format(self.mpirank, self.mpisize)) # store final state for continuation if self.checkpoint: # print(">>> store final state") self._save_state(self.checkpoint_file)
[docs] def _exchange(self, direction: bool) -> None: """ Try to exchange temperatures. Parameters ---------- direction : bool Direction of the exchange. """ if self.nwalkers == 1: self.__exchange_single_walker(direction) else: self.__exchange_multi_walker(direction)
def __exchange_single_walker(self, direction: bool) -> None: """ Exchange temperatures for a single walker. Parameters ---------- direction : bool Direction of the exchange. """ if self.mpisize > 1: self.mpicomm.Barrier() if direction: if self.Tindex[0] % 2 == 0: other_index = self.Tindex[0] + 1 is_main = True else: other_index = self.Tindex[0] - 1 is_main = False else: if self.Tindex[0] % 2 == 0: other_index = self.Tindex[0] - 1 is_main = False else: other_index = self.Tindex[0] + 1 is_main = True ibuf = np.zeros(1, dtype=np.int64) fbuf = np.zeros(1, dtype=np.float64) if 0 <= other_index < self.nreplica: other_rank = self.T2rep[other_index] if is_main: self.mpicomm.Recv(fbuf, source=other_rank, tag=1) other_fx = fbuf[0] beta = self.betas[self.Tindex[0]] other_beta = self.betas[self.Tindex[0] + 1] logp = (other_beta - beta) * (other_fx - self.fx[0]) if logp >= 0.0 or self.rng.rand() < np.exp(logp): ibuf[0] = self.Tindex self.mpicomm.Send(ibuf, dest=other_rank, tag=2) self.Tindex[0] += 1 else: ibuf[0] = self.Tindex + 1 self.mpicomm.Send(ibuf, dest=other_rank, tag=2) else: fbuf[0] = self.fx[0] self.mpicomm.Send(fbuf, dest=other_rank, tag=1) self.mpicomm.Recv(ibuf, source=other_rank, tag=2) self.Tindex[0] = ibuf[0] self.mpicomm.Barrier() if self.mpirank == 0: self.T2rep[self.Tindex[0]] = self.mpirank for other_rank in range(1, self.nreplica): self.mpicomm.Recv(ibuf, source=other_rank, tag=0) self.T2rep[ibuf[0]] = other_rank else: ibuf[0] = self.Tindex self.mpicomm.Send(ibuf, dest=0, tag=0) self.mpicomm.Bcast(self.T2rep, root=0) def __exchange_multi_walker(self, direction: bool) -> None: """ Exchange temperatures for multiple walkers. Parameters ---------- direction : bool Direction of the exchange. """ comm = self.mpicomm if self.mpisize > 1: fx_all = comm.allgather(self.fx) fx_all = np.array(fx_all).flatten() else: fx_all = self.fx rep2T_diff = [] T2rep_diff = [] for irep in range( self.mpirank * self.nwalkers, (self.mpirank + 1) * self.nwalkers ): iT = self.rep2T[irep] if iT % 2 != 0: continue jT = iT + 1 if direction else iT - 1 if jT < 0 or jT == self.nreplica: continue jrep = self.T2rep[jT] fdiff = fx_all[jrep] - fx_all[irep] bdiff = self.betas[jT] - self.betas[iT] logp = fdiff * bdiff if logp >= 0.0 or self.rng.rand() < np.exp(logp): rep2T_diff.append((irep, jT)) # this means self.rep2T[irep] = jT rep2T_diff.append((jrep, iT)) T2rep_diff.append((iT, jrep)) T2rep_diff.append((jT, irep)) if self.mpisize > 1: rep2T_diff = comm.allgather(rep2T_diff) rep2T_diff = list(itertools.chain.from_iterable(rep2T_diff)) # flatten T2rep_diff = comm.allgather(T2rep_diff) T2rep_diff = list(itertools.chain.from_iterable(T2rep_diff)) # flatten for diff in rep2T_diff: self.rep2T[diff[0]] = diff[1] for diff in T2rep_diff: self.T2rep[diff[0]] = diff[1] self.Tindex = self.rep2T[ self.mpirank * self.nwalkers : (self.mpirank + 1) * self.nwalkers ]
[docs] def _prepare(self) -> None: """ Prepare the algorithm for execution. """ self.timer["run"]["submit"] = 0.0 self.timer["run"]["exchange"] = 0.0
[docs] def _post(self) -> None: """ Post-process the results of the algorithm. """ Ts = self.betas if self.input_as_beta else 1.0 / self.betas if self.mpirank == 0: print(f"start separateT {self.mpirank}") sys.stdout.flush() separateT( Ts=Ts, nwalkers=self.nwalkers, output_dir=self.output_dir, comm=self.mpicomm, use_beta=self.input_as_beta, buffer_size=10000, ) if self.mpisize > 1: # NOTE: # ``gather`` seems not to work with many processes (say, 32) in some MPI implementation. # ``Gather`` and ``allgather`` seem to work fine. # Since the performance is not so important here, we use ``allgather`` for simplicity. best_fx = self.mpicomm.allgather(self.best_fx) best_x = self.mpicomm.allgather(self.best_x) best_istep = self.mpicomm.allgather(self.best_istep) best_iwalker = self.mpicomm.allgather(self.best_iwalker) else: best_fx = [self.best_fx] best_x = [self.best_x] best_istep = [self.best_istep] best_iwalker = [self.best_iwalker] best_rank = np.argmin(best_fx) if self.mpirank == 0: with open("best_result.txt", "w") as f: f.write(f"nprocs = {self.nreplica}\n") f.write(f"rank = {best_rank}\n") f.write(f"step = {best_istep[best_rank]}\n") f.write(f"walker = {best_iwalker[best_rank]}\n") f.write(f"fx = {best_fx[best_rank]}\n") for label, x in zip(self.label_list, best_x[best_rank]): f.write(f"{label} = {x}\n") print("Best Result:") print(f" rank = {best_rank}") print(f" step = {best_istep[best_rank]}") print(f" walker = {best_iwalker[best_rank]}") print(f" fx = {best_fx[best_rank]}") for label, x in zip(self.label_list, best_x[best_rank]): print(f" {label} = {x}") return { "x": best_x[best_rank], "fx": best_fx[best_rank], "nprocs": self.nreplica, "rank": best_rank, "step": best_istep[best_rank], "walker": best_iwalker[best_rank], }
[docs] def _save_state(self, filename) -> None: """ Save the current state of the algorithm. Parameters ---------- filename : str The name of the file to save the state to. """ data = { #-- _algorithm "mpisize": self.mpisize, "mpirank": self.mpirank, "rng": self.rng.get_state(), "timer": self.timer, "info": self.info, #-- montecarlo "x": self.x, "fx": self.fx, "inode": self.inode, "istep": self.istep, "best_x": self.best_x, "best_fx": self.best_fx, "best_istep": self.best_istep, "best_iwalker": self.best_iwalker, "naccepted": self.naccepted, "ntrial": self.ntrial, #-- exchange "nreplica": self.nreplica, "Tindex": self.Tindex, "rep2T": self.rep2T, "T2rep": self.T2rep, "exchange_direction": self.exchange_direction, } self._save_data(data, filename)
[docs] def _load_state(self, filename, mode="resume", restore_rng=True): """ Load the state of the algorithm from a file. Parameters ---------- filename : str The name of the file to load the state from. mode : str, optional The mode to load the state in, by default "resume". restore_rng : bool, optional Whether to restore the random number generator state, by default True. """ data = self._load_data(filename) if not data: print("ERROR: Load status file failed") sys.exit(1) #-- _algorithm assert self.mpisize == data["mpisize"] assert self.mpirank == data["mpirank"] if restore_rng: self.rng = np.random.RandomState() self.rng.set_state(data["rng"]) self.timer = data["timer"] info = data["info"] self._check_parameters(info) #-- montecarlo self.x = data["x"] self.fx = data["fx"] self.inode = data["inode"] self.istep = data["istep"] self.best_x = data["best_x"] self.best_fx = data["best_fx"] self.best_istep = data["best_istep"] self.best_iwalker = data["best_iwalker"] self.naccepted = data["naccepted"] self.ntrial = data["ntrial"] #-- exchange assert self.nreplica == data["nreplica"] self.Tindex = data["Tindex"] self.rep2T = data["rep2T"] self.T2rep = data["T2rep"] self.exchange_direction = data["exchange_direction"]