Source code for odatse.algorithm.montecarlo

# SPDX-License-Identifier: MPL-2.0
#
# ODAT-SE -- an open framework for data analysis
# Copyright (C) 2020- The University of Tokyo
#
# This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0.
# If a copy of the MPL was not distributed with this file, You can obtain one at http://mozilla.org/MPL/2.0/.

import typing
from typing import TextIO, Union, List, Tuple
import copy
import time
from pathlib import Path

import numpy as np

import odatse
import odatse.domain
from odatse import mpi
from odatse.algorithm.state import ContinuousStateSpace, DiscreteStateSpace
from odatse.util.data_writer import DataWriter


[docs] class AlgorithmBase(odatse.algorithm.AlgorithmBase): """ Base class for Monte Carlo algorithms implementing common functionality. This class provides the foundation for various Monte Carlo methods, handling both continuous and discrete parameter spaces. It supports parallel execution with multiple walkers and temperature-based sampling methods. Implementation Details ---------------------- The class handles two types of parameter spaces: 1. Continuous: Uses real-valued parameters within specified bounds 2. Discrete: Uses node-based parameters with defined neighbor relationships For continuous problems: - Parameters are bounded by xmin and xmax - Steps are controlled by xstep for each dimension For discrete problems: - Parameters are represented as nodes in a graph - Transitions are only allowed between neighboring nodes - Neighbor relationships must form a connected, bidirectional graph The sampling process: 1. Initializes walkers in valid positions 2. Proposes moves based on the parameter space type 3. Evaluates the objective function ("Energy") 4. Accepts/rejects moves based on the Monte Carlo criterion 5. Tracks the best solution found Key Methods ----------- _initialize() : Sets up initial walker positions and counters propose() : Generates candidate moves for walkers local_update() : Performs one Monte Carlo step _evaluate() : Computes objective function values """ nwalkers: int iscontinuous: bool # # continuous problem # x: np.ndarray # xmin: np.ndarray # xmax: np.ndarray # xstep: np.ndarray # # discrete problem # inode: np.ndarray # nnodes: int # node_coordinates: np.ndarray # neighbor_list: List[List[int]] # ncandidates: np.ndarray # len(neighbor_list[i])-1 # state: Union[ContinuousState, DiscreteState] numsteps: int fx: np.ndarray istep: int best_x: np.ndarray best_fx: float best_istep: int best_iwalker: int betas: np.ndarray input_as_beta: bool Tindex: np.ndarray ntrial: int naccepted: int
[docs] def __init__(self, info: odatse.Info, runner: odatse.Runner = None, domain = None, nwalkers: int = 1, run_mode: str = "initial") -> None: """ Initialize the AlgorithmBase class. Parameters ---------- info : odatse.Info Information object containing algorithm parameters. runner : odatse.Runner, optional Runner object for executing the algorithm, by default None. domain : odatse.domain.Domain, optional Domain object defining the problem space, by default None. nwalkers : int, optional Number of walkers to use in the simulation, by default 1. run_mode : str, optional Mode of the run, e.g., "initial", by default "initial". Raises ------ ValueError If an unsupported domain type is provided or required parameters are missing. Examples -------- >>> info = odatse.Info(config_file_path) >>> runner = odatse.Runner() >>> algorithm = AlgorithmBase(info, runner, nwalkers=100) """ time_sta = time.perf_counter() super().__init__(info=info, runner=runner, run_mode=run_mode) self.nwalkers = nwalkers if domain: if isinstance(domain, odatse.domain.MeshGrid): self.iscontinuous = False elif isinstance(domain, odatse.domain.Region): self.iscontinuous = True else: raise ValueError("ERROR: unsupoorted domain type {}".format(type(domain))) self.domain = domain else: info_param = info.algorithm["param"] if "mesh_path" in info_param: self.iscontinuous = False self.domain = odatse.domain.MeshGrid(info) elif "use_grid" in info_param and info_param["use_grid"] == True: self.iscontinuous = False self.domain = odatse.domain.MeshGrid(info) else: self.iscontinuous = True self.domain = odatse.domain.Region(info) if self.iscontinuous: self.statespace = ContinuousStateSpace(self.domain, info_param, limitation=self.runner.limitation, rng=self.rng) else: self.statespace = DiscreteStateSpace(self.domain, info_param, rng=self.rng) time_end = time.perf_counter() self.timer["init"]["total"] = time_end - time_sta self.Tindex = 0 self.input_as_beta = False #-- writer self.fp_trial = None self.fp_result = None
[docs] def _initialize(self): """ Initialize the Monte Carlo simulation state. For continuous problems: - Uses domain.initialize to generate valid initial positions - Respects any additional limitations from the runner For discrete problems: - Randomly assigns walkers to valid nodes - Maps node indices to actual coordinate positions Also initializes: - Objective function values (fx) to zero - Best solution tracking variables - Acceptance counters for monitoring convergence """ self.state = self.statespace.initialize(self.nwalkers) self.fx = np.zeros(self.nwalkers) self.best_fx = 0.0 self.best_istep = 0 self.best_iwalker = 0 self.naccepted = 0 self.ntrial = 0
[docs] def _evaluate(self, state, in_range: np.ndarray = None) -> np.ndarray: """ Evaluate objective function for current walker positions. Optimization Features: - Skips evaluation for out-of-bounds positions - Tracks evaluation timing statistics - Supports parallel evaluation across walkers Parameters ---------- in_range : np.ndarray, optional Boolean mask indicating valid positions True = position is valid and should be evaluated False = position is invalid, will be assigned inf Returns ------- np.ndarray Array of objective function values Invalid positions are assigned inf """ fx = np.zeros(self.nwalkers, dtype=np.float64) for iwalker in range(self.nwalkers): x = state.x[iwalker, :] if in_range is None or in_range[iwalker]: args = (self.istep, iwalker) time_sta = time.perf_counter() fx[iwalker] = self.runner.submit(x, args) time_end = time.perf_counter() self.timer["run"]["submit"] += time_end - time_sta else: fx[iwalker] = np.inf return fx
[docs] def local_update( self, beta: Union[float, np.ndarray], extra_info_to_write: Union[List, Tuple] = None, ) -> None: """ Perform one step of the Monte Carlo algorithm. Algorithm Flow: 1. Generate proposed moves for all walkers 2. Check if proposals are within valid bounds 3. Evaluate objective function for valid proposals 4. Apply Metropolis acceptance criterion: ``P(accept) = min(1, exp(-beta * (f_new - f_old)))`` 5. For discrete case, adjust acceptance probability by: ``P *= (n_neighbors_old / n_neighbors_new)`` 6. Update positions and energies 7. Track best solution found 8. Log results if writers are configured Parameters ---------- beta : Union[float, np.ndarray] Inverse temperature(s) controlling acceptance probability Can be single value or array (one per walker) extra_info_to_write : Union[List, Tuple], optional Additional data to log with results Notes ----- - Handles numerical overflow in exponential calculation - Maintains detailed acceptance statistics - Supports both single and multiple temperature values - Preserves best solution across all steps """ # make candidate old_state = copy.deepcopy(self.state) old_fx = copy.deepcopy(self.fx) new_state, in_range, weight = self.statespace.propose(old_state) #self.state = new_state # evaluate "Energy"s new_fx = self._evaluate(new_state, in_range) #XXX self.state = new_state self.fx = new_fx self._write_result(self.fp_trial, extras=extra_info_to_write) #print(old_fx, new_fx) fdiff = new_fx - old_fx # Ignore an overflow warning in np.exp(x) for x >~ 710 # and an invalid operation warning in exp(nan) (nan came from 0 * inf) # Note: fdiff (fx) becomes inf when x is out of range # old_setting = np.seterr(over="ignore") old_setting = np.seterr(all="ignore") probs = np.exp(-beta * fdiff) #probs[np.isnan(probs)] = 0.0 if weight is not None: probs *= weight np.seterr(**old_setting) tocheck = in_range & (probs < 1.0) num_check = np.count_nonzero(tocheck) accepted = in_range.copy() accepted[tocheck] = self.rng.rand(num_check) < probs[tocheck] self.naccepted += accepted.sum() self.ntrial += accepted.size # update self.state = self.statespace.choose(accepted, new_state, old_state) self.fx = np.where(accepted, new_fx, old_fx) minidx = np.argmin(self.fx) if self.fx[minidx] < self.best_fx: np.copyto(self.best_x, self.state.x[minidx, :]) self.best_fx = self.fx[minidx] self.best_istep = self.istep self.best_iwalker = typing.cast(int, minidx) self._write_result(self.fp_result, extras=extra_info_to_write)
def _set_writer(self, fp_trial, fp_result): self.fp_trial = fp_trial self.fp_result = fp_result def _write_result(self, writer, extras=None): for iwalker in range(self.nwalkers): if isinstance(self.Tindex, int): beta = self.betas[self.Tindex] else: beta = self.betas[self.Tindex[iwalker]] if self.input_as_beta: tval = beta else: tval = 1.0 / beta data = [self.istep, iwalker, tval, self.fx[iwalker], *self.state.x[iwalker,:]] if extras: for extra in extras: data.append(extra[iwalker]) writer.write(*data)