# ab-Initio Configuration Sampling tool kit (abICS)
# Copyright (C) 2019- The University of Tokyo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see http://www.gnu.org/licenses/.
from math import exp
# import SFMT_cython.sfmt_random as sfmt_random
# from multiprocessing import Process, Queue, Pool, TimeoutError
import os
import sys
import numpy as np
import numpy.random as rand
verylargeint = sys.maxsize
"""Defines base classes for Monte Carlo simulations"""
[docs]class model:
""" This class defines a model whose energy equals 0 no matter the configuration, and the configuration
never changes.
This is a base template for building useful models."""
model_name = None
# def __init__(self):
[docs] def energy(self, config):
"""
Calculate energy of configuration: input: config
Parameters
----------
config: config object
configuration
Returns
-------
"""
return 0.0
[docs] def trialstep(self, config, energy):
"""
Define a trial step on config. Returns dconfig, which can contain the minimal information for
constructing the trial configuration from config to be used in newconfig().
Make sure that config is unchanged.
Parameters
----------
config: config object
configuration
energy: float
energy
Returns
-------
dconfig: config object
The minimal information for constructing the trial configuration from config to be used in newconfig()
dE: float
Energy diffence
"""
dE = 0.0
dconfig = None
# Return only change in configuration dconfig so that
# you don't have to copy entire configurations,
# which can sometimes be costly
return dconfig, dE
[docs] def newconfig(self, config, dconfig):
"""
Update config by using the trial step, dconfig
Parameters
----------
config: config object
Original configuration
This may be mutated through this function
dconfig: config object
Difference of configuration
Returns
-------
config: config object
updated configuration
"""
return config
[docs]class grid_1D:
[docs] def __init__(self, dx, minx, maxx):
"""
Parameters
----------
dx: float
interval
minx: float
minimum value of x
maxx: float
maximum value of x
"""
self.dx = dx
self.x = np.arange(minx, maxx, dx)
[docs]def binning(x, nlevels):
"""
Parameters
----------
x: list
Coordinates
nlevels: int
Number to determine the number of measurements
Returns
-------
error_estimate: list
error estimation
"""
error_estimate = []
x = np.array(x, dtype=np.float64)
# assert 2**nlevels*10 < len(x)
throwout = len(x) % (2 ** nlevels)
if throwout != 0:
# The number of measurements must be divisible by 2**nlevels
# If not, throw out initial measurements
x = x[throwout:]
error_estimate.append(np.sqrt(np.var(x, ddof=1) / len(x)))
for lvl in range(1, nlevels + 1):
x_tmp = x
x = (x_tmp[0::2] + x_tmp[1::2]) / 2.0
error_estimate.append(np.sqrt(np.var(x, ddof=1) / len(x)))
return error_estimate
empty_array = np.array([])
# @profile
[docs]def obs_encode(*args):
"""
Parameters
----------
args: list
Returns
-------
obs_array: numpy array
"""
# nargs = np.array([len(args)])
# args_length_list = []
obs_array = empty_array
for arg in args:
# Inelegant way to make everything a 1D array
arg = np.array([arg])
arg = arg.ravel()
obs_array = np.concatenate((obs_array, arg))
# args_length_list.append(len(arg))
# args_length_array = np.array(args_length_list)
# args_info = np.concatenate((nargs, args_length_array))
return obs_array
[docs]def args_info(*args):
"""
Parameters
----------
args: list
Returns
-------
args_info: numpy array
"""
nargs = np.array([len(args)])
args_length_list = []
for arg in args:
# Inelegant way to make everything a 1D array
arg = np.array([arg])
arg = arg.ravel()
args_length_list.append(len(arg))
args_length_array = np.array(args_length_list)
args_info = np.concatenate((nargs, args_length_array))
return args_info
[docs]def obs_decode(args_info, obs_array):
"""
Parameters
----------
args_info: numpy array
obs_array: numpy array
Returns
-------
args: list
"""
nargs = args_info[0]
args_length_array = args_info[1 : nargs + 1]
args = []
idx = 0
for i in range(nargs):
length = args_length_array[i]
if length == 1:
args.append(obs_array[idx])
else:
args.append(obs_array[idx : idx + length])
idx += length
return args
[docs]class observer_base:
def __init__(self):
self.lprintcount = 0
[docs] def obs_info(self, calc_state):
"""
Parameters
----------
calc_state: MonteCarlo algorithm object
MonteCarlo algorithm
Returns
-------
args_info: numpy array
"""
obs_log = self.logfunc(calc_state)
if not isinstance(obs_log, tuple):
obs_log = (obs_log,)
obs_ND = []
obs_save = self.savefuncs(calc_state)
if obs_save is not None:
if isinstance(obs_save, tuple):
for obs in obs_save:
obs_ND.append(obs)
else:
obs_ND.append(obs_save)
return args_info(*obs_log, *obs_ND)
[docs] def logfunc(self, calc_state):
"""
Parameters
----------
calc_state: MonteCarlo algorithm object
MonteCarlo algorithm
Returns
-------
calc_state.energy: tuple
(Energy)
"""
return (calc_state.energy,)
[docs] def savefuncs(self, calc_state):
"""
Parameters
----------
calc_state: MonteCarlo algorithm object
MonteCarlo algorithm
Returns
-------
"""
return None
[docs] def writefile(self, calc_state):
"""
Parameters
----------
calc_state: MonteCarlo algorithm object
MonteCarlo algorithm
Returns
-------
"""
return None
[docs] def observe(self, calc_state, outputfi, lprint=True):
"""
Parameters
----------
calc_state: MonteCarlo algorithm object
MonteCarlo algorithm
outputfi: _io.TextIOWrapper
TextIOWrapper for output
lprint: boolean
if true, log info is outputted to TextIOWrapper
Returns
-------
obs_log: numpy array
log information about observation
"""
obs_log = np.atleast_1d(self.logfunc(calc_state))
if lprint:
outputfi.write(
str(self.lprintcount)
+ "\t"
+ str(calc_state.kT)
+ "\t"
+ "\t".join([str(x) for x in obs_log])
+ "\n"
)
outputfi.flush()
self.writefile(calc_state)
self.lprintcount += 1
obs_save = self.savefuncs(calc_state)
if obs_save is not None:
obs_save = np.atleast_1d(obs_save)
obs_save = obs_save.ravel()
print(obs_log.shape, obs_save.shape)
return np.concatenate((obs_log, obs_save))
else:
return obs_log
[docs]class CanonicalMonteCarlo:
[docs] def __init__(self, model, kT, config):
"""
Parameters
----------
model: dft_latgas object
DFT lattice gas mapping model
kT: float
Temperature
config: config object
Configuration
"""
self.model = model
self.config = config
self.kT = kT
self.obs_save = []
# @profile
[docs] def MCstep(self):
dconfig, dE = self.model.trialstep(self.config, self.energy)
# if self.energy == float("inf"):
# self.config = self.model.newconfig(self.config, dconfig)
# self.energy = dE
if dE < 0.0:
self.config = self.model.newconfig(self.config, dconfig)
self.energy += dE
else:
accept_probability = exp(-dE / self.kT)
if rand.random() <= accept_probability:
self.config = self.model.newconfig(self.config, dconfig)
self.energy += dE
[docs] def run(
self,
nsteps,
sample_frequency=verylargeint,
print_frequency=verylargeint,
observer=observer_base(),
save_obs=False,
):
"""
Parameters
----------
nsteps: int
The number of Monte Carlo steps for running.
sample_frequency: int
The number of Monte Carlo steps for observation of physical quantities.
print_frequency: int
The number of Monte Carlo steps for saving physical quantities.
observer: observer object
save_obs: boolean
Returns
-------
obs_decode: list
"""
observables = 0.0
nsample = 0
self.energy = self.model.energy(self.config)
output = open("obs.dat", "a")
with open(os.devnull, 'w') as f:
if hasattr(observer.observe(self, f), "__add__"):
observe = True
else:
observe = False
for i in range(1, nsteps + 1):
self.MCstep()
sys.stdout.flush()
if observe and i % sample_frequency == 0:
obs_step = observer.observe(self, output, i % print_frequency == 0)
observables += obs_step
nsample += 1
if save_obs:
self.obs_save.append(obs_step)
output.close()
if save_obs:
np.save(open("obs_save.npy", "wb"), np.array(self.obs_save))
if nsample > 0:
observables /= nsample
args_info = observer.obs_info(self)
return obs_decode(args_info, observables)
else:
return None
# def swap_configs(MCreplicas, rep, accept_count):
# """
#
# Parameters
# ----------
# MCreplicas:
# rep:
# accept_count:
#
# Returns
# -------
#
# """
# # swap configs, energy
# tmp = MCreplicas[rep + 1].config
# tmpe = MCreplicas[rep + 1].energy
# MCreplicas[rep + 1].config = MCreplicas[rep].config
# MCreplicas[rep + 1].energy = MCreplicas[rep].energy
# MCreplicas[rep].config = tmp
# MCreplicas[rep].energy = tmpe
# accept_count += 1
# return MCreplicas, accept_count
"""
class TemperatureReplicaExchange:
def __init__(self, model, kTs, configs, MCalgo, swap_algo=swap_configs):
assert len(kTs) == len(configs)
self.model = model
self.kTs = kTs
self.betas = 1.0/kTs
self.n_replicas = len(kTs)
self.MCreplicas = []
self.accept_count = 0
self.swap_algo = swap_algo
self.writefunc = writefunc
self.configs = configs
for i in range(self.n_replicas):
self.MCreplicas.append(MCalgo(model, kTs[i], configs[i], writefunc))
def Xtrial(self):
# pick a replica
rep = randrange(self.n_replicas - 1)
delta = (self.betas[rep + 1] - self.betas[rep]) \
*(self.MCreplicas[rep].energy -
self.MCreplicas[rep+1].energy)
#print self.MCreplicas[rep].energy, self.model.energy(self.MCreplicas[rep].config)
if delta < 0.0:
self.MCreplicas, self.accept_count = self.swap_algo(self.MCreplicas,
rep, self.accept_count)
print("RXtrial accepted")
else:
accept_probability = exp(-delta)
#print accept_probability, "accept prob"
if rand.random() <= accept_probability:
self.MCreplicas, self.accept_count = self.swap_algo(self.MCreplicas,
rep, self.accept_count)
print("RXtrial accepted")
else:
print("RXtrial rejected")
def run(self, nsteps, RXtrial_frequency, pool, sample_frequency=0, subdirs=False):
self.accept_count = 0
outerloop = nsteps//RXtrial_frequency
for i in range(outerloop):
self.MCreplicas = MultiProcessReplicaRun(self.MCreplicas, RXtrial_frequency, pool, sample_frequency, subdirs)
self.Xtrial()
self.configs = [MCreplica.config for MCreplica in self.MCreplicas]
#print self.accept_count
#self.accept_count = 0
"""