Source code for abics.applications.latgas_abinitio_interface.qe

# ab-Initio Configuration Sampling tool kit (abICS)
# Copyright (C) 2019- The University of Tokyo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see http://www.gnu.org/licenses/.

"""
To deal with QuantumESPRESSO

"""

from collections import namedtuple
import xml.etree.ElementTree as ET
import operator
import os
import os.path

import numpy as np
import scipy.constants as spc
from pymatgen import Structure
from qe_tools.parsers import PwInputFile

from .base_solver import SolverBase
from ...util import expand_path
from ...exception import InputError


hartree2eV = spc.value("Hartree energy in eV")
Bohr2AA = spc.value("Bohr radius") * 1e10


[docs]class QESolver(SolverBase): """ Solver class dealing with QuantumESPRESSO (with new XML format). """
[docs] def __init__(self, path_to_solver, *, parallel_level=None): """ Initialize the solver. Parameters ---------- path_to_solver : str Path to the solver. parallel_level : dict Parallelization levels """ super(QESolver, self).__init__(path_to_solver) self.path_to_solver = path_to_solver self.input = QESolver.Input(parallel_level=parallel_level) self.output = QESolver.Output("pwscf")
[docs] def name(self): """ Returns ------- solver_name : str Name of solver, "QuantumESPRESSO". """ return "QuantumESPRESSO"
[docs] class Input(object): """ QE input files manager. Attributes ---------- pwi : qe_tools.parsers.PwInputFile QE input. datadir : str Path to the data directory of QE. filetocheck : str Name of the file to be used for check finished. parallel_level : dict Parallelization levels """ def __init__(self, *, parallel_level=None): self.pwi = None self.datadir = "pwscf.save" self.filetocheck = "data-file-schema.xml" if parallel_level: self.parallel_level = parallel_level else: self.parallel_level = {}
[docs] def cleanup(self, workdir): """ Remove file for check finished. Parameters ---------- workdir : str Path to the working directory. """ checkfile = os.path.join(workdir, self.datadir, self.filetocheck) if os.path.exists(checkfile): os.remove(checkfile)
[docs] def check_finished(self, workdir): """ Check if the solver program has finished or not. Parameters ---------- workdir : str Path to the working directory. """ f = os.path.join(workdir, self.datadir, self.filetocheck) if not os.path.exists(f): return False try: tree = ET.parse(f) except ET.ParseError: return False root = tree.getroot() control = root.find("input").find("control_variables") calculation = control.find("calculation").text if calculation == "scf": return True elif calculation == "relax": scf_conv = root.find("output").find("convergence_info").find("scf_conv") scf_converged = ( scf_conv.find("convergence_achieved").text.lower() == "true" ) # scf_maxstep = int(root.find("input").find("electron_control").find("max_nstep").text) # scf_nstep = int(scf_conv.find("n_scf_steps").text) if not scf_converged: # scf does not converged and QE stopped return True opt_maxstep = int(control.find("nstep").text) opt_conv = root.find("output").find("convergence_info").find("opt_conv") opt_converged = ( opt_conv.find("convergence_achieved").text.lower() == "true" ) if opt_converged: return True opt_nstep = int(opt_conv.find("n_opt_steps").text) return opt_nstep == opt_maxstep else: raise InputError( "calculation is {}, but this is not yet supported".format( calculation ) )
[docs] def from_directory(self, base_input_dir): """ Initialize information from files in base_input_dir. Parameters ---------- base_input_dir : str Path to the directory including base input files. """ inputfile = os.path.join(os.getcwd(), base_input_dir, "scf.in") self.pwi = PwInputFile(inputfile) for name in ["CONTROL", "SYSTEM"]: if name not in self.pwi.namelists: raise InputError("{} cannot found in {}".format(name, inputfile)) calculation = self.pwi.namelists["CONTROL"]["calculation"] if calculation not in ("scf", "relax"): raise InputError("Sorry, {} is not yet supported".format(calculation)) for name in ["ELECTRONS", "IONS", "CELL"]: if name not in self.pwi.namelists: self.pwi.namelists[name] = {} self.pwi.namelists["CONTROL"]["prefix"] = "pwscf" self.pwi.namelists["CONTROL"]["pseudo_dir"] = expand_path( self.pwi.namelists["CONTROL"]["pseudo_dir"], os.getcwd() )
[docs] def update_info_by_structure(self, structure): """ Update information by atomic structure. Parameters ---------- structure : pymatgen.Structure Atomic structure """ A = structure.lattice.matrix self.pwi.cell_parameters = {"units": "alat", "cell": A} nat = len(structure.sites) self.pwi.namelists["SYSTEM"]["ibrav"] = 0 self.pwi.namelists["SYSTEM"]["a"] = 1.0 self.pwi.namelists["SYSTEM"]["ntyp"] = len(structure.composition.elements) self.pwi.namelists["SYSTEM"]["nat"] = nat self.pwi.atomic_positions["names"] = [""] * nat self.pwi.atomic_positions["positions"] = [[0.0, 0.0, 0.0]] * nat self.pwi.atomic_positions["fixed_coords"] = [[False, False, False]] * nat for i, site in enumerate(structure.sites): self.pwi.atomic_positions["names"][i] = site.specie self.pwi.atomic_positions["positions"][i] = [site.a, site.b, site.c] self.pwi.atomic_positions["fixed_coords"][i] = [False, False, False] if "seldyn" in structure.site_properties: seldyn_arr = structure.site_properties["seldyn"] for idx, dyn_info in enumerate(seldyn_arr): self.pwi.atomic_positions["fixed_coords"][idx] = list( map(operator.not_, dyn_info) )
[docs] def update_info_from_files(self, output_dir, rerun): """ Do nothing. """ pass
[docs] def write_input(self, output_dir): """ Generate input files of the solver program. Parameters ---------- output_dir : str Path to working directory. """ self.pwi.namelists["CONTROL"]["outdir"] = output_dir os.makedirs(output_dir, exist_ok=True) with open(os.path.join(output_dir, "scf.in"), "w") as f: for section in self.pwi.namelists.keys(): f.write("&{}\n".format(section)) for k, v in self.pwi.namelists[section].items(): if isinstance(v, str): f.write(' {} = "{}"\n'.format(k, v)) elif isinstance(v, bool): f.write( " {} = {}\n".format(k, ".true." if v else ".false.") ) else: f.write(" {} = {}\n".format(k, v)) f.write("/\n") f.write( "CELL_PARAMETERS {}\n".format(self.pwi.cell_parameters["units"]) ) for row in self.pwi.cell_parameters["cell"]: for elem in row: f.write(" {}".format(elem)) f.write("\n") f.write("ATOMIC_SPECIES\n") for name, mass, pfile in zip( self.pwi.atomic_species["names"], self.pwi.atomic_species["masses"], self.pwi.atomic_species["pseudo_file_names"], ): f.write(" {} {} {}\n".format(name, mass, pfile)) f.write( "ATOMIC_POSITIONS {}\n".format(self.pwi.atomic_positions["units"]) ) for name, pos, if_pos in zip( self.pwi.atomic_positions["names"], self.pwi.atomic_positions["positions"], self.pwi.atomic_positions["fixed_coords"], ): f.write(" {}".format(name)) for p in pos: f.write(" {}".format(p)) for p in if_pos: if p: f.write(" 0") else: f.write(" 1") f.write("\n") f.write("K_POINTS {}\n".format(self.pwi.k_points["type"])) if "points" in self.pwi.k_points: for kp in self.pwi.k_points["points"]: f.write(" {}".format(kp)) for offset in self.pwi.k_points["offset"]: f.write(" {}".format(int(2 * offset))) f.write("\n")
[docs] def file_for_check_finished(self, workdir): return os.path.join(workdir, self.datadir, self.filetocheck)
[docs] def cl_args(self, nprocs, nthreads, workdir): """ Generate command line arguments of the solver program. Parameters ---------- nprocs : int The number of processes. nthreads : int The number of threads. workdir : str Path to the working directory. Returns ------- args : list[str] Arguments of command """ ret = [] nprocs_per_image = nprocs nprocs_per_pool = nprocs pl = [] for name in ("nimage", "npools", "nband", "ntg", "ndiag"): if name in self.parallel_level: v = self.parallel_level[name] pl.append(v) ret.append("-{}".format(name)) ret.append(str(v)) else: pl.append(1) ret.append("-in") ret.append(os.path.join(workdir, "scf.in")) # check for each parallel level # image if pl[0] > nprocs: raise InputError( "Too many parallel images, {} is given but {} is available".format( pl[0], nprocs ) ) nprocs_per_image = nprocs // pl[0] # pool if pl[1] > nprocs_per_image: raise InputError( "Too many parallel pools, {} is given but {} is available".format( pl[1], nprocs_per_image ) ) nprocs_per_pool = nprocs_per_image // pl[1] # band if pl[2] > nprocs_per_pool: raise InputError( "Too many parallel band groups, {} is given but {} is available".format( pl[2], nprocs_per_pool ) ) # task if pl[3] > nprocs_per_pool: raise InputError( "Too many parallel FFT task groups, {} is given but {} is available".format( pl[3], nprocs_per_pool ) ) # diag if pl[4] > nprocs: raise InputError( "Too many parallel linear-algebra groups, {} is given but {} is available".format( pl[4], nprocs ) ) return ret
[docs] class Output(object): """ Output manager. """ def __init__(self, prefix): pass
[docs] def get_results(self, workdir): """ Get energy and structure obtained by the solver program. Parameters ---------- workdir : str Path to the working directory. Returns ------- phys : named_tuple("energy", "structure") Total energy and atomic structure. The energy is measured in the units of eV and coordinates is measured in the units of Angstrom. """ # Read results from files in output_dir and calculate values tree = ET.parse(os.path.join(workdir, "pwscf.save", "data-file-schema.xml")) root = tree.getroot() A = np.zeros((3, 3)) child = root.find("input").find("atomic_structure").find("cell") for i in range(3): A[:, i] = list(map(float, child.find("a{}".format(i + 1)).text.split())) A *= Bohr2AA species = [] positions = [] output = root.find("output") child = output.find("atomic_structure").find("atomic_positions") for itr in child.iter("atom"): species.append(itr.attrib["name"]) pos = list(map(lambda x: float(x) * Bohr2AA, itr.text.split())) positions.append(pos) structure = Structure(A, species, positions, coords_are_cartesian=True) child = output.find("total_energy").find("etot") ene = hartree2eV * float(child.text) Phys = namedtuple("PhysValues", ("energy", "structure")) return Phys(np.float64(ene), structure)
[docs] def solver_run_schemes(self): """ Returns ------- schemes : tuple[str] Implemented runner schemes. """ return "mpi_spawn"