Source code for odatse.algorithm.exchange

# SPDX-License-Identifier: MPL-2.0
#
# ODAT-SE -- an open framework for data analysis
# Copyright (C) 2020- The University of Tokyo
#
# This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0.
# If a copy of the MPL was not distributed with this file, You can obtain one at http://mozilla.org/MPL/2.0/.

from typing import Dict

from io import open
import copy
import time
import itertools
import sys

import numpy as np

import odatse
import odatse.algorithm.montecarlo
from odatse.util.read_ts import read_Ts
from odatse.util.separateT import separateT
from odatse.util.data_writer import DataWriter


[docs] class Algorithm(odatse.algorithm.montecarlo.AlgorithmBase): """ Replica Exchange Monte Carlo (REMC) Algorithm Implementation. This class implements the Replica Exchange Monte Carlo algorithm, also known as Parallel Tempering. The algorithm runs multiple replicas of the system at different temperatures and periodically attempts to swap configurations between adjacent temperature levels. Attributes ---------- x : np.ndarray Current configuration state for all walkers. xmin : np.ndarray Minimum allowed values for parameters. xmax : np.ndarray Maximum allowed values for parameters. xstep : np.ndarray Step sizes for parameter updates. numsteps : int Total number of Monte Carlo steps to perform. numsteps_exchange : int Number of steps between exchange attempts. fx : np.ndarray Current energy/objective function values. istep : int Current step number. nreplica : int Total number of replicas across all processes. Tindex : np.ndarray Temperature indices for current replicas. rep2T : np.ndarray Mapping from replica index to temperature index. T2rep : np.ndarray Mapping from temperature index to replica index. exchange_direction : bool Direction for attempting exchanges (alternates between True/False). """ x: np.ndarray xmin: np.ndarray xmax: np.ndarray #xunit: np.ndarray xstep: np.ndarray numsteps: int numsteps_exchange: int fx: np.ndarray istep: int nreplica: int Tindex: np.ndarray rep2T: np.ndarray T2rep: np.ndarray exchange_direction: bool
[docs] def __init__(self, info: odatse.Info, runner: odatse.Runner = None, run_mode: str = "initial" ) -> None: """ Initialize the Algorithm class. Parameters ---------- info : odatse.Info Information object containing algorithm parameters. runner : odatse.Runner, optional Runner object for executing the algorithm. run_mode : str, optional Mode to run the algorithm in, by default "initial". """ time_sta = time.perf_counter() info_exchange = info.algorithm["exchange"] nwalkers = info_exchange.get("nreplica_per_proc", 1) super().__init__(info=info, runner=runner, nwalkers=nwalkers, run_mode=run_mode) self.nreplica = self.mpisize * self.nwalkers self.input_as_beta, self.betas = read_Ts(info_exchange, numT=self.nreplica) self.numsteps = info_exchange["numsteps"] self.numsteps_exchange = info_exchange["numsteps_exchange"] self.export_combined_files = info_exchange.get("export_combined_files", False) self.separate_T = info_exchange.get("separate_T", True) time_end = time.perf_counter() self.timer["init"]["total"] = time_end - time_sta
[docs] def _initialize(self) -> None: """ Initialize the algorithm parameters and state. """ # Initialize base class first super()._initialize() # Set up temperature indices for each walker # Each process handles a contiguous block of temperature indices # based on its rank and number of walkers self.Tindex = np.arange( self.mpirank * self.nwalkers, (self.mpirank + 1) * self.nwalkers ) # Initialize mappings between replica and temperature indices # Initially, replica i has temperature i self.rep2T = np.arange(self.nreplica) # Maps replica index -> temperature index self.T2rep = np.arange(self.nreplica) # Maps temperature index -> replica index # Initialize exchange direction - alternates between True/False # to ensure all adjacent pairs get chance to exchange self.exchange_direction = True self.istep = 0 self._show_parameters()
[docs] def _run(self) -> None: """ Run the algorithm. """ # Validate run mode is set if self.mode is None: raise RuntimeError("mode unset") # Determine whether to restore RNG state from checkpoint restore_rng = not self.mode.endswith("-resetrand") # Initialize or restore simulation state based on mode if self.mode.startswith("init"): self._initialize() elif self.mode.startswith("resume"): self._load_state(self.checkpoint_file, mode="resume", restore_rng=restore_rng) elif self.mode.startswith("continue"): self._load_state(self.checkpoint_file, mode="continue", restore_rng=restore_rng) else: raise RuntimeError("unknown mode {}".format(self.mode)) # Get current beta (inverse temperature) values for each replica beta = self.betas[self.Tindex] # Set up output file writers write_mode = "w" if self.mode.startswith("init") else "a" item_list = [ "step", "walker", ("beta" if self.input_as_beta else "T"), "fx", *self.label_list, ] # Create writers for both trial moves and accepted results fp_trial = DataWriter("trial.txt", mode=write_mode, item_list=item_list, combined=self.export_combined_files) fp_result = DataWriter("result.txt", mode=write_mode, item_list=item_list, combined=self.export_combined_files) self._set_writer(fp_trial, fp_result) # For new runs, evaluate initial configuration if self.mode.startswith("init"): self.fx = self._evaluate(self.state) self._write_result(fp_trial) self._write_result(fp_result) self.istep += 1 # Track best solution found minidx = np.argmin(self.fx) self.best_x = copy.copy(self.state.x[minidx, :]) self.best_fx = np.min(self.fx[minidx]) self.best_istep = 0 self.best_iwalker = 0 # Set up checkpointing intervals next_checkpoint_step = self.istep + self.checkpoint_steps next_checkpoint_time = time.time() + self.checkpoint_interval # Main simulation loop while self.istep < self.numsteps: # Attempt replica exchange periodically if self.istep % self.numsteps_exchange == 0: time_sta = time.perf_counter() if self.nreplica > 1: self._exchange(self.exchange_direction) # Alternate exchange direction for next attempt if self.nreplica > 2: self.exchange_direction = not self.exchange_direction time_end = time.perf_counter() self.timer["run"]["exchange"] += time_end - time_sta # Update beta values after exchange beta = self.betas[self.Tindex] # Perform local Monte Carlo updates self.local_update(beta) self.istep += 1 # Handle checkpointing if enabled if self.checkpoint: time_now = time.time() if self.istep >= next_checkpoint_step or time_now >= next_checkpoint_time: self._save_state(self.checkpoint_file) next_checkpoint_step = self.istep + self.checkpoint_steps next_checkpoint_time = time_now + self.checkpoint_interval # Clean up file handles fp_trial.close() fp_result.close() print("complete main process : rank {:08d}/{:08d}".format(self.mpirank, self.mpisize)) # Save final state for possible continuation if self.checkpoint: self._save_state(self.checkpoint_file)
[docs] def _exchange(self, direction: bool) -> None: """ Attempt temperature exchanges between replicas. This method implements the core replica exchange logic, attempting to swap temperatures between adjacent replicas based on the Metropolis criterion: P(accept) = min(1, exp((β_j - β_i)(E_i - E_j))) Parameters ---------- direction : bool If True, attempt exchanges between even-odd pairs. If False, attempt exchanges between odd-even pairs. """ if self.nwalkers == 1: self.__exchange_single_walker(direction) else: self.__exchange_multi_walker(direction)
def __exchange_single_walker(self, direction: bool) -> None: """ Handle temperature exchanges for single walker per process case. This method implements the exchange logic when each process has only one walker, requiring MPI communication to coordinate exchanges between processes. Parameters ---------- direction : bool If True, attempt exchanges between even-odd pairs. If False, attempt exchanges between odd-even pairs. """ if self.mpisize > 1: self.mpicomm.Barrier() if direction: if self.Tindex[0] % 2 == 0: other_index = self.Tindex[0] + 1 is_main = True else: other_index = self.Tindex[0] - 1 is_main = False else: if self.Tindex[0] % 2 == 0: other_index = self.Tindex[0] - 1 is_main = False else: other_index = self.Tindex[0] + 1 is_main = True ibuf = np.zeros(1, dtype=np.int64) fbuf = np.zeros(1, dtype=np.float64) if 0 <= other_index < self.nreplica: other_rank = self.T2rep[other_index] if is_main: self.mpicomm.Recv(fbuf, source=other_rank, tag=1) other_fx = fbuf[0] beta = self.betas[self.Tindex[0]] other_beta = self.betas[self.Tindex[0] + 1] logp = (other_beta - beta) * (other_fx - self.fx[0]) if logp >= 0.0 or self.rng.rand() < np.exp(logp): ibuf[0] = self.Tindex self.mpicomm.Send(ibuf, dest=other_rank, tag=2) self.Tindex[0] += 1 else: ibuf[0] = self.Tindex + 1 self.mpicomm.Send(ibuf, dest=other_rank, tag=2) else: fbuf[0] = self.fx[0] self.mpicomm.Send(fbuf, dest=other_rank, tag=1) self.mpicomm.Recv(ibuf, source=other_rank, tag=2) self.Tindex[0] = ibuf[0] self.mpicomm.Barrier() if self.mpirank == 0: self.T2rep[self.Tindex[0]] = self.mpirank for other_rank in range(1, self.nreplica): self.mpicomm.Recv(ibuf, source=other_rank, tag=0) self.T2rep[ibuf[0]] = other_rank else: ibuf[0] = self.Tindex self.mpicomm.Send(ibuf, dest=0, tag=0) self.mpicomm.Bcast(self.T2rep, root=0) def __exchange_multi_walker(self, direction: bool) -> None: """ Handle temperature exchanges for multiple walkers per process case. This method implements the exchange logic when each process has multiple walkers, requiring collective MPI operations to coordinate exchanges across all processes. Parameters ---------- direction : bool If True, attempt exchanges between even-odd pairs. If False, attempt exchanges between odd-even pairs. """ comm = self.mpicomm if self.mpisize > 1: fx_all = comm.allgather(self.fx) fx_all = np.array(fx_all).flatten() else: fx_all = self.fx rep2T_diff = [] T2rep_diff = [] for irep in range( self.mpirank * self.nwalkers, (self.mpirank + 1) * self.nwalkers ): iT = self.rep2T[irep] if iT % 2 != 0: continue jT = iT + 1 if direction else iT - 1 if jT < 0 or jT == self.nreplica: continue jrep = self.T2rep[jT] fdiff = fx_all[jrep] - fx_all[irep] bdiff = self.betas[jT] - self.betas[iT] logp = fdiff * bdiff if logp >= 0.0 or self.rng.rand() < np.exp(logp): rep2T_diff.append((irep, jT)) # this means self.rep2T[irep] = jT rep2T_diff.append((jrep, iT)) T2rep_diff.append((iT, jrep)) T2rep_diff.append((jT, irep)) if self.mpisize > 1: rep2T_diff = comm.allgather(rep2T_diff) rep2T_diff = list(itertools.chain.from_iterable(rep2T_diff)) # flatten T2rep_diff = comm.allgather(T2rep_diff) T2rep_diff = list(itertools.chain.from_iterable(T2rep_diff)) # flatten for diff in rep2T_diff: self.rep2T[diff[0]] = diff[1] for diff in T2rep_diff: self.T2rep[diff[0]] = diff[1] self.Tindex = self.rep2T[ self.mpirank * self.nwalkers : (self.mpirank + 1) * self.nwalkers ]
[docs] def _prepare(self) -> None: """ Prepare the algorithm for execution. """ self.timer["run"]["submit"] = 0.0 self.timer["run"]["exchange"] = 0.0
[docs] def _post(self) -> Dict: """ Post-process the results of the algorithm. """ # Separate results by temperature if requested if self.separate_T and not self.export_combined_files: if self.mpirank == 0: print(f"start separateT {self.mpirank}") sys.stdout.flush() # Convert beta to temperature if needed Ts = self.betas if self.input_as_beta else 1.0 / self.betas # Organize results by temperature separateT( Ts=Ts, nwalkers=self.nwalkers, output_dir=self.output_dir, comm=self.mpicomm, use_beta=self.input_as_beta, buffer_size=10000, ) # Gather best results from all processes if self.mpisize > 1: # NOTE: # ``gather`` seems not to work with many processes (say, 32) in some MPI implementation. # ``Gather`` and ``allgather`` seem to work fine. # Since the performance is not so important here, we use ``allgather`` for simplicity. best_fx = self.mpicomm.allgather(self.best_fx) best_x = self.mpicomm.allgather(self.best_x) best_istep = self.mpicomm.allgather(self.best_istep) best_iwalker = self.mpicomm.allgather(self.best_iwalker) else: best_fx = [self.best_fx] best_x = [self.best_x] best_istep = [self.best_istep] best_iwalker = [self.best_iwalker] # Find process with best overall solution best_rank = np.argmin(best_fx) # Write best result to file (rank 0 only) if self.mpirank == 0: with open("best_result.txt", "w") as f: f.write(f"nprocs = {self.nreplica}\n") f.write(f"rank = {best_rank}\n") f.write(f"step = {best_istep[best_rank]}\n") f.write(f"walker = {best_iwalker[best_rank]}\n") f.write(f"fx = {best_fx[best_rank]}\n") for label, x in zip(self.label_list, best_x[best_rank]): f.write(f"{label} = {x}\n") # Print summary to stdout print("Best Result:") print(f" rank = {best_rank}") print(f" step = {best_istep[best_rank]}") print(f" walker = {best_iwalker[best_rank]}") print(f" fx = {best_fx[best_rank]}") for label, x in zip(self.label_list, best_x[best_rank]): print(f" {label} = {x}") # Return best solution information return { "x": best_x[best_rank], "fx": best_fx[best_rank], "nprocs": self.nreplica, "rank": best_rank, "step": best_istep[best_rank], "walker": best_iwalker[best_rank], }
[docs] def _save_state(self, filename) -> None: """ Save the current algorithm state to a checkpoint file. Saves all necessary data to resume the simulation later, including: - MPI configuration - Random number generator state - Timer information - Current configurations and energies - Best solutions found - Replica exchange state information Parameters ---------- filename : str Path to the checkpoint file to write. """ data = { #-- _algorithm "mpisize": self.mpisize, "mpirank": self.mpirank, "rng": self.rng.get_state(), "timer": self.timer, "info": self.info, #-- montecarlo "x": self.state, "fx": self.fx, #"inode": self.inode, "istep": self.istep, "best_x": self.best_x, "best_fx": self.best_fx, "best_istep": self.best_istep, "best_iwalker": self.best_iwalker, "naccepted": self.naccepted, "ntrial": self.ntrial, #-- exchange "nreplica": self.nreplica, "Tindex": self.Tindex, "rep2T": self.rep2T, "T2rep": self.T2rep, "exchange_direction": self.exchange_direction, } self._save_data(data, filename)
[docs] def _load_state(self, filename, mode="resume", restore_rng=True): """ Load algorithm state from a checkpoint file. Restores all necessary data to resume a previous simulation run. Parameters ---------- filename : str Path to the checkpoint file to read. mode : str, optional Loading mode - either "resume" or "continue", by default "resume". restore_rng : bool, optional Whether to restore the random number generator state, by default True. Set to False to use a fresh RNG state. Raises ------ AssertionError If loaded state doesn't match current MPI configuration. """ data = self._load_data(filename) if not data: print("ERROR: Load status file failed") sys.exit(1) #-- _algorithm assert self.mpisize == data["mpisize"] assert self.mpirank == data["mpirank"] if restore_rng: self.rng = np.random.RandomState() self.rng.set_state(data["rng"]) self.timer = data["timer"] info = data["info"] self._check_parameters(info) #-- montecarlo self.state = data["x"] self.fx = data["fx"] #self.inode = data["inode"] self.istep = data["istep"] self.best_x = data["best_x"] self.best_fx = data["best_fx"] self.best_istep = data["best_istep"] self.best_iwalker = data["best_iwalker"] self.naccepted = data["naccepted"] self.ntrial = data["ntrial"] #-- exchange assert self.nreplica == data["nreplica"] self.Tindex = data["Tindex"] self.rep2T = data["rep2T"] self.T2rep = data["T2rep"] self.exchange_direction = data["exchange_direction"] #-- restore rng state in statespace self.statespace.rng = self.rng