# SPDX-License-Identifier: MPL-2.0
#
# ODAT-SE -- an open framework for data analysis
# Copyright (C) 2020- The University of Tokyo
#
# This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0.
# If a copy of the MPL was not distributed with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
from typing import Dict, List, Optional
import pathlib
from os import PathLike
from collections import namedtuple
import numpy as np
from odatse import mpi
Entry = namedtuple("Entry", ["step", "walker", "fx", "xs"])
[docs]
def separateT(
Ts: np.ndarray,
nwalkers: int,
output_dir: PathLike,
comm: Optional[mpi.Comm],
use_beta: bool,
buffer_size: int = 10000,
) -> None:
"""
Separates and processes temperature data for quantum beam diffraction experiments.
Parameters
----------
Ts : np.ndarray
Array of temperature values.
nwalkers : int
Number of walkers.
output_dir : PathLike
Directory to store the output files.
comm : mpi.Comm, optional
MPI communicator for parallel processing.
use_beta : bool
Flag to determine if beta values are used instead of temperature.
buffer_size : int, optional
Size of the buffer for reading input data. Default is 10000.
"""
if comm is None:
mpisize = 1
mpirank = 0
else:
mpisize = comm.size
mpirank = comm.rank
buffer_size = int(np.ceil(buffer_size / nwalkers)) * nwalkers
output_dir = pathlib.Path(output_dir)
proc_dir = output_dir / str(mpirank)
T2idx = {T: i for i, T in enumerate(Ts)}
T2rank = {}
results = []
for rank, Ts_local in enumerate(np.array_split(Ts, mpisize)):
d: Dict[str, List[Entry]] = {}
for T in Ts_local:
T2rank[str(T)] = rank
d[str(T)] = []
results.append(d)
# write file header
for T in Ts[mpirank * nwalkers : (mpirank + 1) * nwalkers]:
idx = T2idx[T]
with open(output_dir / f"result_T{idx}.txt", "w") as f_out:
if use_beta:
f_out.write(f"# beta = {T}\n")
else:
f_out.write(f"# T = {T}\n")
f_in = open(proc_dir / "result.txt")
EOF = False
while not EOF:
for i in range(len(results)):
for key in results[i].keys():
results[i][key] = []
for _ in range(buffer_size):
line = f_in.readline()
if line == "":
EOF = True
break
line = line.split("#")[0].strip()
if len(line) == 0:
continue
words = line.split()
step = int(words[0])
walker = mpirank * nwalkers + int(words[1])
Tstr = words[2]
fx = words[3]
xs = words[4:]
entry = Entry(step=step, walker=walker, fx=fx, xs=xs)
rank = T2rank[Tstr]
results[rank][Tstr].append(entry)
if mpisize > 1:
results2 = comm.alltoall(results)
else:
results2 = results
d = results2[0]
for i in range(1, len(results2)):
for key in d.keys():
d[key].extend(results2[i][key])
for T in Ts[mpirank * nwalkers : (mpirank + 1) * nwalkers]:
idx = T2idx[T]
d[str(T)].sort(key=lambda e: e.step)
with open(output_dir / f"result_T{idx}.txt", "a") as f_out:
for e in d[str(T)]:
f_out.write(f"{e.step} ")
f_out.write(f"{e.walker} ")
f_out.write(f"{e.fx} ")
for x in e.xs:
f_out.write(f"{x} ")
f_out.write("\n")
[docs]
def calculate_statistics_from_separated_files(
Ts: np.ndarray,
output_dir: PathLike,
thermalization_steps: int,
comm: Optional[mpi.Comm],
) -> None:
"""
Calculate and save statistical quantities (means and errors of f(x) and partition function) from separated files generated by separateT.
This function reads the separated files, ``result_T<Tindex>.txt`` in ``output_dir``, generated by separateT.
The output file is ``fx.txt`` in ``output_dir``. The format is described as a header as follows:
.. code-block::
# $1: 1/T
# $2: mean of f(x)
# $3: standard error of f(x)
# $4: number of replicas [Not used for exchange MC]
# $5: log(Z/Z0)
# $6: acceptance ratio
Parameters
----------
Ts : np.ndarray
Array of temperature values.
output_dir : PathLike
Directory to store the output files.
thermalization_steps : int
Number of steps to discard for thermalization.
comm : mpi.Comm, optional
MPI communicator for parallel processing.
"""
if comm is None:
mpisize = 1
mpirank = 0
else:
mpisize = comm.size
mpirank = comm.rank
output_dir = pathlib.Path(output_dir)
numT = len(Ts)
T_is_ascending = Ts[0] < Ts[1] if numT > 1 else True
local_Tindices = np.array_split(np.arange(numT), mpisize)[mpirank]
fx_means = np.zeros(numT)
fx_errors = np.zeros(numT)
dlogZs = np.zeros(numT)
acceptances = np.zeros(numT)
use_log_sum = True
for Tindex in local_Tindices:
T = Ts[Tindex]
if T_is_ascending:
lowerTindex = Tindex - 1
else:
lowerTindex = Tindex + 1
if 0 <= lowerTindex < numT:
lowerT = Ts[lowerTindex]
dbeta = 1.0 / lowerT - 1.0 / T
else:
lowerTindex = -1
dbeta = 0.0
dlogZ = 0.0
f_base = None
fx_sum = 0.0
fx_sum2 = 0.0
accepted = 0
old_x = None
N = 0
with open(output_dir / f"result_T{Tindex}.txt", "r") as f_in:
for line in f_in:
line = line.split("#")[0].strip()
if len(line) == 0:
continue
words = line.split()
step = int(words[0])
fx = float(words[2])
x = np.array([float(x) for x in words[3:]])
if step < thermalization_steps:
old_x = x
continue
N += 1
if not np.allclose(x, old_x):
accepted += 1
old_x = x
fx_sum += fx
fx_sum2 += fx*fx
if lowerTindex < 0:
continue
if f_base is None:
f_base = fx
continue
if use_log_sum:
## for A > B > 0,
## log(A+B) = log(A) + log(1+B/A) = log(A) + log(1+exp(log(B)-log(A)))
logA = dlogZ
logB = -dbeta * (fx - f_base)
if logB > logA:
logA, logB = logB, logA
dlogZ = logA + np.log1p(np.exp(logB - logA))
else:
dlogZ += np.exp(-dbeta * (fx - f_base))
mean = fx_sum / N
fx_means[Tindex] = mean
err = np.sqrt((fx_sum2/N - mean**2)/(N-1))
fx_errors[Tindex] = err
acceptances[Tindex] = accepted / N
if lowerTindex < 0:
continue
if use_log_sum:
dlogZ -= np.log(N)
else:
dlogZ = np.log(dlogZ/N)
dlogZ += -dbeta * f_base
dlogZs[lowerTindex] = dlogZ
if comm is not None and mpisize > 1:
buffer = np.zeros(numT)
comm.Allreduce(dlogZs, buffer)
dlogZs[:] = buffer
comm.Allreduce(fx_means, buffer)
fx_means[:] = buffer
comm.Allreduce(fx_errors, buffer)
fx_errors[:] = buffer
comm.Allreduce(acceptances, buffer)
acceptances[:] = buffer
if mpirank == 0:
dlogZ = 0.0
with open(output_dir / "fx.txt", "w") as f_out:
f_out.write("# $1: 1/T\n")
f_out.write("# $2: mean of f(x)\n")
f_out.write("# $3: standard error of f(x)\n")
f_out.write("# $4: number of replicas [Not used for exchange MC]\n")
f_out.write("# $5: log(Z/Z0)\n")
f_out.write("# $6: acceptance ratio\n")
Tindices = np.arange(numT)
if T_is_ascending:
Tindices = Tindices[::-1]
for Tindex in Tindices:
dlogZ += dlogZs[Tindex]
T = Ts[Tindex]
f_out.write(f"{1/T}")
f_out.write(f" {fx_means[Tindex]}")
f_out.write(f" {fx_errors[Tindex]}")
f_out.write(f" {numT}")
f_out.write(f" {dlogZ}")
f_out.write(f" {acceptances[Tindex]}")
f_out.write(f"\n")