# ab-Initio Configuration Sampling tool kit (abICS)
# Copyright (C) 2019- The University of Tokyo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see http://www.gnu.org/licenses/.
import os
from shutil import move
import sys
from mpi4py import MPI
import numpy as np
import numpy.random as rand
from abics.mc import observer_base, obs_decode, verylargeint
from abics.util import pickle_dump, pickle_load, numpy_save, numpy_load
[docs]class RXParams:
""" Parameter set for replica exchange Monte Carlo
Attributes
----------
nreplicas : int
The number of replicas
nprocs_per_replica : int
The number of processes which a replica uses
kTstart : float
The lower bound of temperature range
kTend : float
The upper bound of temperature range
nsteps : int
The number of MC steps
RXtrial_frequency :
The number of MC steps between replica exchange operations
sample_frequency :
The number of MC steps between measurements observables
print_frequency :
The number of MC steps between show information
reload : bool
Whether to restart simulation or not
seed : int
The seed of the random number generator
If 0, some random number is used (e.g., system time or some random noise).
"""
def __init__(self):
self.nreplicas = None
self.nprocs_per_replica = 1
self.kTstart = None
self.kTend = None
self.nsteps = None
self.RXtrial_frequency = 1
self.sample_frequency = 1
self.print_frequency = 1
self.reload = False
self.seed = 0
[docs] @classmethod
def from_dict(cls, d):
"""
Read information from dictionary
Parameters
----------
d: dict
Dictionary including parameters for replica exchange Monte Carlo method
Returns
-------
params: DFTParams object
self
"""
if 'replica' in d:
d = d['replica']
params = cls()
params.nreplicas = d["nreplicas"]
params.nprocs_per_replica = d["nprocs_per_replica"]
params.kTstart = d["kTstart"]
params.kTend = d["kTend"]
params.nsteps = d["nsteps"]
params.RXtrial_frequency = d.get("RXtrial_frequency", 1)
params.sample_frequency = d.get("sample_frequency", 1)
params.print_frequency = d.get("print_frequency", 1)
params.reload = d.get("reload", False)
params.seed = d.get("seed", 0)
return params
[docs] @classmethod
def from_toml(cls, fname):
"""
Read information from toml file
Parameters
----------
f: str
The name of input toml File
Returns
-------
DFTParams: DFTParams object
self
"""
import toml
return cls.from_dict(toml.load(fname))
[docs]def RX_MPI_init(rxparams):
"""
Parameters
----------
rxparams: RXParams
Parameters for replica exchange Monte Carlo method.
Returns:
-------
comm: comm world
MPI communicator
"""
nreplicas = rxparams.nreplicas
commworld = MPI.COMM_WORLD
worldrank = commworld.Get_rank()
worldprocs = commworld.Get_size()
if rxparams.seed > 0:
rand.seed(rxparams.seed + worldrank * 137)
else:
rand_seeds = [rand.random() for i in range(worldprocs)]
rand.seed(rand_seeds[worldrank])
if worldprocs < nreplicas:
if worldrank == 0:
print("ERROR! Please run with at least as many MPI processes as the number of replicas")
sys.exit(1)
if worldprocs > nreplicas:
if worldrank == 0:
print(
"Setting number of replicas smaller than MPI processes; I hope you"
+ " know what you're doing..."
)
sys.stdout.flush()
if worldrank >= nreplicas:
# belong to comm that does nothing
comm = commworld.Split(color=1, key=worldrank)
comm.Free()
sys.exit() # Wait for MPI_finalize
else:
comm = commworld.Split(color=0, key=worldrank)
else:
comm = commworld
return comm
[docs]class ParallelMC(object):
[docs] def __init__(self, comm, MCalgo, model, configs, kTs, subdirs=True):
"""
Parameters
----------
comm: comm world
MPI communicator
MCalgo: object for MonteCarlo algorithm
MonteCarlo algorithm
model: dft_latgas object
DFT lattice gas mapping model
configs: config object
Configurations
kTs: list
Temperature list
subdirs: boolean
if true, working directory for this rank is made
"""
self.comm = comm
self.rank = self.comm.Get_rank()
self.procs = self.comm.Get_size()
self.kTs = kTs
self.model = model
self.subdirs = subdirs
self.nreplicas = len(configs)
if not (self.procs == self.nreplicas == len(self.kTs)):
if self.rank == 0:
print(
"ERROR: You have to set the number of replicas equal to the"
+ "number of temperatures equal to the number of processes"
)
sys.exit(1)
myconfig = configs[self.rank]
mytemp = kTs[self.rank]
self.mycalc = MCalgo(model, mytemp, myconfig)
[docs] def run(self, nsteps, sample_frequency, observer=observer_base()):
"""
Parameters
----------
nsteps: int
Number of Monte Carlo steps for running.
sample_frequency: int
Number of Monte Carlo steps for running.
observer: observer object
Returns
-------
obs_buffer: numpy array
Observables
"""
if self.subdirs:
# make working directory for this rank
try:
os.mkdir(str(self.rank))
except FileExistsError:
pass
os.chdir(str(self.rank))
observables = self.mycalc.run(nsteps, sample_frequency, observer)
pickle_dump(self.mycalc.config, "config.pickle")
if self.subdirs:
os.chdir("../")
if sample_frequency:
obs_buffer = np.empty([self.procs, len(observables)])
self.comm.Allgather(observables, obs_buffer)
return obs_buffer
[docs]class TemperatureRX_MPI(ParallelMC):
[docs] def __init__(self, comm, MCalgo, model, configs, kTs, subdirs=True):
"""
Parameters
----------
comm: comm world
MPI communicator
MCalgo: object for MonteCarlo algorithm
MonteCarlo algorithm
model: dft_latgas
DFT lattice gas mapping model
configs: config object
Configuration
kTs: list
Temperature list
subdirs: boolean
If true, working directory for this rank is made
"""
super(TemperatureRX_MPI, self).__init__(
comm, MCalgo, model, configs, kTs, subdirs
)
self.betas = 1.0 / np.array(kTs)
self.rank_to_T = np.arange(0, self.procs, 1, dtype=np.int)
self.float_buffer = np.array(0.0, dtype=np.float)
self.int_buffer = np.array(0, dtype=np.int)
self.obs_save = []
self.Trank_hist = []
self.kT_hist = []
self.Lreload = False
[docs] def reload(self):
self.rank_to_T = pickle_load("rank_to_T.pickle")
self.mycalc.kT = self.kTs[self.rank_to_T[self.rank]]
self.mycalc.config = pickle_load(os.path.join(str(self.rank), "calc.pickle"))
self.obs_save0 = numpy_load(os.path.join(str(self.rank), "obs_save.npy"))
self.mycalc.energy = self.obs_save0[-1,0]
self.Trank_hist0 = numpy_load(os.path.join(str(self.rank), "Trank_hist.npy"))
self.kT_hist0 = numpy_load(os.path.join(str(self.rank), "kT_hist.npy"))
rand_state = pickle_load(os.path.join(str(self.rank), "rand_state.pickle"))
rand.set_state(rand_state)
self.Lreload = True
[docs] def find_procrank_from_Trank(self, Trank):
"""
Parameters
----------
Trank: int
Temperature rank
Returns
-------
procrank: int
"""
i = np.argwhere(self.rank_to_T == Trank)
if i is None:
sys.exit("Internal error in TemperatureRX_MPI.find_procrank_from_Trank")
else:
return i
[docs] def Xtrial(self, XCscheme=-1):
"""
Parameters
----------
XCscheme: int
Returns:
-------
"""
# What is my temperature rank?
myTrank = self.rank_to_T[self.rank]
if (myTrank + XCscheme) % 2 == 0 and myTrank == self.procs - 1:
self.comm.Allgather(self.rank_to_T[self.rank], self.rank_to_T)
return
if XCscheme == 1 and myTrank == 0:
self.comm.Allgather(self.rank_to_T[self.rank], self.rank_to_T)
return
if (myTrank + XCscheme) % 2 == 0:
myTrankp1 = myTrank + 1
# Get the energy from the replica with higher temperature
exchange_rank = self.find_procrank_from_Trank(myTrankp1)
self.comm.Recv(self.float_buffer, source=exchange_rank, tag=1)
delta = (self.betas[myTrankp1] - self.betas[myTrank]) * (
self.mycalc.energy - self.float_buffer
)
if delta < 0.0:
# Exchange temperatures!
self.comm.Send(
[self.rank_to_T[self.rank], 1, MPI.INT], dest=exchange_rank, tag=2
)
self.rank_to_T[self.rank] = myTrankp1
else:
accept_probability = np.exp(-delta)
# print accept_probability, "accept prob"
if rand.random() <= accept_probability:
self.comm.Send(
[self.rank_to_T[self.rank], 1, MPI.INT],
dest=exchange_rank,
tag=2,
)
self.rank_to_T[self.rank] = myTrankp1
else:
# print("RXtrial rejected")
self.comm.Send(
[self.rank_to_T[exchange_rank], 1, MPI.INT],
dest=exchange_rank,
tag=2,
)
else:
myTrankm1 = myTrank - 1
exchange_rank = self.find_procrank_from_Trank(myTrankm1)
self.comm.Send(self.mycalc.energy, dest=exchange_rank, tag=1)
self.comm.Recv([self.int_buffer, 1, MPI.INT], source=exchange_rank, tag=2)
self.rank_to_T[self.rank] = self.int_buffer
self.comm.Allgather(self.rank_to_T[self.rank], self.rank_to_T)
self.mycalc.kT = self.kTs[self.rank_to_T[self.rank]]
return
[docs] def run(
self,
nsteps,
RXtrial_frequency,
sample_frequency=verylargeint,
print_frequency=verylargeint,
observer=observer_base(),
subdirs=True,
save_obs=True,
):
"""
Parameters
----------
nsteps: int
The number of Monte Carlo steps for running.
RXtrial_frequency: int
The number of Monte Carlo steps for replica exchange.
sample_frequency: int
The number of Monte Carlo steps for observation of physical quantities.
print_frequency: int
The number of Monte Carlo steps for saving physical quantities.
observer: observer object
subdirs: boolean
If true, working directory for this rank is made
save_obs: boolean
Returns
-------
obs_list: list
Observation list
"""
if subdirs:
try:
os.mkdir(str(self.rank))
except FileExistsError:
pass
os.chdir(str(self.rank))
self.accept_count = 0
if not self.Lreload:
self.mycalc.energy = self.mycalc.model.energy(self.mycalc.config)
with open(os.devnull, "w") as f:
test_observe = observer.observe(self.mycalc, f, lprint=False)
if hasattr(test_observe, "__iter__"):
obs_len = len(test_observe)
obs = np.zeros([len(self.kTs), obs_len])
if hasattr(test_observe, "__add__"):
observe = True
else:
observe = False
nsample = 0
XCscheme = 0
with open("obs.dat", "a") as output:
for i in range(1, nsteps + 1):
self.mycalc.MCstep()
if i % RXtrial_frequency == 0:
self.Xtrial(XCscheme)
XCscheme = (XCscheme + 1) % 2
if observe and i % sample_frequency == 0:
obs_step = observer.observe(
self.mycalc, output, i % print_frequency == 0
)
obs[self.rank_to_T[self.rank]] += obs_step
if save_obs:
self.obs_save.append(obs_step)
self.Trank_hist.append(self.rank_to_T[self.rank])
self.kT_hist.append(self.mycalc.kT)
nsample += 1
self.comm.Barrier()
# save information for restart
pickle_dump(self.mycalc.config, "calc.pickle")
rand_state = rand.get_state()
pickle_dump(rand_state, "rand_state.pickle")
if save_obs:
if hasattr(self, "obs_save0"):
obs_save_ = np.concatenate((self.obs_save0, np.array(self.obs_save)))
Trank_hist_ = np.concatenate(
(self.Trank_hist0, np.array(self.Trank_hist))
)
kT_hist_ = np.concatenate((self.kT_hist0, np.array(self.kT_hist)))
else:
obs_save_ = np.array(self.obs_save)
Trank_hist_ = np.array(self.Trank_hist)
kT_hist_ = np.array(self.kT_hist)
numpy_save(obs_save_, "obs_save.npy")
numpy_save(Trank_hist_, "Trank_hist.npy")
numpy_save(kT_hist_, "kT_hist.npy")
if subdirs:
os.chdir("../")
if self.rank == 0:
pickle_dump(self.rank_to_T, "rank_to_T.pickle")
numpy_save(self.kTs, "kTs.npy")
if subdirs:
os.chdir(str(self.rank))
if subdirs:
os.chdir("../")
if nsample != 0:
obs = np.array(obs)
obs_buffer = np.empty(obs.shape)
obs /= nsample
self.comm.Allreduce(obs, obs_buffer, op=MPI.SUM)
obs_list = []
args_info = observer.obs_info(self.mycalc)
for i in range(len(self.kTs)):
obs_list.append(obs_decode(args_info, obs_buffer[i]))
return obs_list