Source code for odatse.util.separateT

# SPDX-License-Identifier: MPL-2.0
#
# ODAT-SE -- an open framework for data analysis
# Copyright (C) 2020- The University of Tokyo
#
# This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0.
# If a copy of the MPL was not distributed with this file, You can obtain one at http://mozilla.org/MPL/2.0/.

from typing import Dict, List, Optional
import pathlib
from os import PathLike
from collections import namedtuple

import numpy as np

from odatse import mpi

Entry = namedtuple("Entry", ["step", "walker", "fx", "xs"])


[docs] def separateT( Ts: np.ndarray, nwalkers: int, output_dir: PathLike, comm: Optional[mpi.Comm], use_beta: bool, buffer_size: int = 10000, ) -> None: """ Separates and processes temperature data for quantum beam diffraction experiments. Parameters ---------- Ts : np.ndarray Array of temperature values. nwalkers : int Number of walkers. output_dir : PathLike Directory to store the output files. comm : mpi.Comm, optional MPI communicator for parallel processing. use_beta : bool Flag to determine if beta values are used instead of temperature. buffer_size : int, optional Size of the buffer for reading input data. Default is 10000. """ if comm is None: mpisize = 1 mpirank = 0 else: mpisize = comm.size mpirank = comm.rank buffer_size = int(np.ceil(buffer_size / nwalkers)) * nwalkers output_dir = pathlib.Path(output_dir) proc_dir = output_dir / str(mpirank) T2idx = {T: i for i, T in enumerate(Ts)} T2rank = {} results = [] for rank, Ts_local in enumerate(np.array_split(Ts, mpisize)): d: Dict[str, List[Entry]] = {} for T in Ts_local: T2rank[str(T)] = rank d[str(T)] = [] results.append(d) # write file header for T in Ts[mpirank * nwalkers : (mpirank + 1) * nwalkers]: idx = T2idx[T] with open(output_dir / f"result_T{idx}.txt", "w") as f_out: if use_beta: f_out.write(f"# beta = {T}\n") else: f_out.write(f"# T = {T}\n") f_in = open(proc_dir / "result.txt") EOF = False while not EOF: for i in range(len(results)): for key in results[i].keys(): results[i][key] = [] for _ in range(buffer_size): line = f_in.readline() if line == "": EOF = True break line = line.split("#")[0].strip() if len(line) == 0: continue words = line.split() step = int(words[0]) walker = mpirank * nwalkers + int(words[1]) Tstr = words[2] fx = words[3] xs = words[4:] entry = Entry(step=step, walker=walker, fx=fx, xs=xs) rank = T2rank[Tstr] results[rank][Tstr].append(entry) if mpisize > 1: results2 = comm.alltoall(results) else: results2 = results d = results2[0] for i in range(1, len(results2)): for key in d.keys(): d[key].extend(results2[i][key]) for T in Ts[mpirank * nwalkers : (mpirank + 1) * nwalkers]: idx = T2idx[T] d[str(T)].sort(key=lambda e: e.step) with open(output_dir / f"result_T{idx}.txt", "a") as f_out: for e in d[str(T)]: f_out.write(f"{e.step} ") f_out.write(f"{e.walker} ") f_out.write(f"{e.fx} ") for x in e.xs: f_out.write(f"{x} ") f_out.write("\n")
[docs] def calculate_statistics_from_separated_files( Ts: np.ndarray, output_dir: PathLike, thermalization_steps: int, comm: Optional[mpi.Comm], ) -> None: """ Calculate and save statistical quantities (means and errors of f(x) and partition function) from separated files generated by separateT. This function reads the separated files, ``result_T<Tindex>.txt`` in ``output_dir``, generated by separateT. The output file is ``fx.txt`` in ``output_dir``. The format is described as a header as follows: .. code-block:: # $1: 1/T # $2: mean of f(x) # $3: standard error of f(x) # $4: number of replicas [Not used for exchange MC] # $5: log(Z/Z0) # $6: acceptance ratio Parameters ---------- Ts : np.ndarray Array of temperature values. output_dir : PathLike Directory to store the output files. thermalization_steps : int Number of steps to discard for thermalization. comm : mpi.Comm, optional MPI communicator for parallel processing. """ if comm is None: mpisize = 1 mpirank = 0 else: mpisize = comm.size mpirank = comm.rank output_dir = pathlib.Path(output_dir) numT = len(Ts) T_is_ascending = Ts[0] < Ts[1] if numT > 1 else True local_Tindices = np.array_split(np.arange(numT), mpisize)[mpirank] fx_means = np.zeros(numT) fx_errors = np.zeros(numT) dlogZs = np.zeros(numT) acceptances = np.zeros(numT) use_log_sum = True for Tindex in local_Tindices: T = Ts[Tindex] if T_is_ascending: lowerTindex = Tindex - 1 else: lowerTindex = Tindex + 1 if 0 <= lowerTindex < numT: lowerT = Ts[lowerTindex] dbeta = 1.0 / lowerT - 1.0 / T else: lowerTindex = -1 dbeta = 0.0 dlogZ = 0.0 f_base = None fx_sum = 0.0 fx_sum2 = 0.0 accepted = 0 old_x = None N = 0 with open(output_dir / f"result_T{Tindex}.txt", "r") as f_in: for line in f_in: line = line.split("#")[0].strip() if len(line) == 0: continue words = line.split() step = int(words[0]) fx = float(words[2]) x = np.array([float(x) for x in words[3:]]) if step < thermalization_steps: old_x = x continue N += 1 if not np.allclose(x, old_x): accepted += 1 old_x = x fx_sum += fx fx_sum2 += fx*fx if lowerTindex < 0: continue if f_base is None: f_base = fx continue if use_log_sum: ## for A > B > 0, ## log(A+B) = log(A) + log(1+B/A) = log(A) + log(1+exp(log(B)-log(A))) logA = dlogZ logB = -dbeta * (fx - f_base) if logB > logA: logA, logB = logB, logA dlogZ = logA + np.log1p(np.exp(logB - logA)) else: dlogZ += np.exp(-dbeta * (fx - f_base)) mean = fx_sum / N fx_means[Tindex] = mean err = np.sqrt((fx_sum2/N - mean**2)/(N-1)) fx_errors[Tindex] = err acceptances[Tindex] = accepted / N if lowerTindex < 0: continue if use_log_sum: dlogZ -= np.log(N) else: dlogZ = np.log(dlogZ/N) dlogZ += -dbeta * f_base dlogZs[lowerTindex] = dlogZ if comm is not None and mpisize > 1: buffer = np.zeros(numT) comm.Allreduce(dlogZs, buffer) dlogZs[:] = buffer comm.Allreduce(fx_means, buffer) fx_means[:] = buffer comm.Allreduce(fx_errors, buffer) fx_errors[:] = buffer comm.Allreduce(acceptances, buffer) acceptances[:] = buffer if mpirank == 0: dlogZ = 0.0 with open(output_dir / "fx.txt", "w") as f_out: f_out.write("# $1: 1/T\n") f_out.write("# $2: mean of f(x)\n") f_out.write("# $3: standard error of f(x)\n") f_out.write("# $4: number of replicas [Not used for exchange MC]\n") f_out.write("# $5: log(Z/Z0)\n") f_out.write("# $6: acceptance ratio\n") Tindices = np.arange(numT) if T_is_ascending: Tindices = Tindices[::-1] for Tindex in Tindices: dlogZ += dlogZs[Tindex] T = Ts[Tindex] f_out.write(f"{1/T}") f_out.write(f" {fx_means[Tindex]}") f_out.write(f" {fx_errors[Tindex]}") f_out.write(f" {numT}") f_out.write(f" {dlogZ}") f_out.write(f" {acceptances[Tindex]}") f_out.write(f"\n")