Source code for pyiron_atomistics.sphinx.interactive

# coding: utf-8
# Copyright (c) Max-Planck-Institut für Eisenforschung GmbH - Computational Materials Design (CM) Department
# Distributed under the terms of "New BSD License", see the LICENSE file.

import os
import subprocess
import time
import warnings

import numpy as np
import scipy.constants

from pyiron_atomistics.atomistics.job.interactive import (
    GenericInteractive,
    GenericInteractiveOutput,
)
from pyiron_atomistics.sphinx.base import Group, Output, SphinxBase
from pyiron_atomistics.vasp.potential import VaspPotentialSetter

BOHR_TO_ANGSTROM = (
    scipy.constants.physical_constants["Bohr radius"][0] / scipy.constants.angstrom
)
HARTREE_TO_EV = scipy.constants.physical_constants["Hartree energy in eV"][0]
HARTREE_OVER_BOHR_TO_EV_OVER_ANGSTROM = HARTREE_TO_EV / BOHR_TO_ANGSTROM

__author__ = "Osamu Waseda, Jan Janssen"
__copyright__ = (
    "Copyright 2021, Max-Planck-Institut für Eisenforschung GmbH - "
    "Computational Materials Design (CM) Department"
)
__version__ = "1.0"
__maintainer__ = "Jan Janssen"
__email__ = "janssen@mpie.de"
__status__ = "development"
__date__ = "Sep 1, 2017"


[docs] class SphinxInteractive(SphinxBase, GenericInteractive):
[docs] def __init__(self, project, job_name): super(SphinxInteractive, self).__init__(project, job_name) self._interactive_write_input_files = True self._interactive_library_read = None self._interactive_fetch_completed = True self.interactive_flush_frequency = 1 self.output = SphinxOutput(job=self)
@property def structure(self): return GenericInteractive.structure.fget(self) @structure.setter def structure(self, structure): GenericInteractive.structure.fset(self, structure) if structure is not None: self._potential = VaspPotentialSetter( element_lst=structure.get_species_symbols().tolist() ) def _get_structure(self, frame=-1, wrap_atoms=True): return GenericInteractive._get_structure( self, frame=frame, wrap_atoms=wrap_atoms ) def interactive_energy_tot_getter(self): return self.interactive_energy_pot_getter() def interactive_energy_pot_getter(self): self._interactive_pipe_write("get energy") return float(self._interactive_library_read.readline()) * HARTREE_TO_EV def interactive_forces_getter(self): self._interactive_pipe_write("get forces") ff = [] for _ in range(len(self.structure)): line = self._interactive_library_read.readline().split() ff.append( [ float(line[i]) * HARTREE_OVER_BOHR_TO_EV_OVER_ANGSTROM for i in range(3) ] ) ff = np.array(ff)[self.id_spx_to_pyi] return ff def interactive_cells_getter(self): self._interactive_pipe_write("get cell") cc = [] for _ in range(3): line = self._interactive_library_read.readline().split() cc.append([float(line[i]) * BOHR_TO_ANGSTROM for i in range(3)]) return np.array(cc) def interactive_positions_getter(self): self._interactive_pipe_write("get structure") xx = [] for _ in range(len(self.structure)): line = self._interactive_library_read.readline().split() xx.append([float(line[i]) * BOHR_TO_ANGSTROM for i in range(3)]) xx = np.array(xx)[self.id_spx_to_pyi] return xx def interactive_positions_setter(self, positions): self._interactive_pipe_write("set structure") positions = positions[self.id_pyi_to_spx] positions = np.reshape(positions, 3 * len(self.structure)) / BOHR_TO_ANGSTROM self._interactive_pipe_write(positions.tolist()) def interactive_spins_getter(self): self._logger.debug("get spins - start ...") self._interactive_pipe_write("get atomspin") mm = [] for _ in range(len(self.structure)): line = self._interactive_library_read.readline().split() mm.append(float(line[0])) mm = np.array(mm)[self.id_spx_to_pyi] # self.interactive_cache['atom_spins'].append(mm) self._logger.debug("get spins - done.") return mm def interactive_spin_constraints_setter(self, spins): if self._generic_input["fix_spin_constraint"]: self._logger.debug("set spin constraints - start ...") self._spin_constraint_enabled = True self._interactive_pipe_write("set spinconstraint") spins = np.array(spins)[self.id_pyi_to_spx] self._spin_constraints = np.array(spins) self._interactive_pipe_write(spins.tolist()) # self.interactive_cache['atom_spin_constraints'].append(spins) self._logger.debug("set spin constraints - done.") else: warnings.warn("Spin constraint not set -> set fix_spin_constraint = True") def interactive_spin_constraints_getter(self): return self._spin_constraints # return self.interactive_cache['atom_spin_constraints'][-1] def interactive_magnetic_forces_getter(self): if self._generic_input["fix_spin_constraint"]: self._interactive_pipe_write("get nu") nn = [] for _ in range(len(self.structure)): line = self._interactive_library_read.readline().split() nn.append(HARTREE_TO_EV * float(line[0])) nn = np.array(nn)[self.id_spx_to_pyi] return nn else: return None def interactive_initialize_interface(self): self.server.threads = self.input["THREADS"] if self.executable.executable_path == "": self.status.aborted = True raise ValueError("No executable set!") if self.server.cores == 1 or not self.executable.mpi: out = subprocess.Popen( str(self.executable), cwd=self.project_hdf5.working_directory, shell=True, stderr=subprocess.STDOUT, universal_newlines=True, ) else: out = subprocess.Popen( [ self.executable.executable_path, str(self.server.cores), str(self.server.threads), ], cwd=self.project_hdf5.working_directory, shell=False, stderr=subprocess.STDOUT, universal_newlines=True, ) while not self._interactive_pipes_initialized: time.sleep(1) self._logger.debug("open interactive interface!") self._interactive_library = open( os.path.join(self.working_directory, "sxctrl"), "w" ) self._interactive_library_read = open( os.path.join(self.working_directory, "sxres"), "r" ) self._logger.debug("interactive interface is opened!") if ( not self.structure.has("initial_magmoms") and "atom_spins" in self.interactive_cache.keys() ): del self.interactive_cache["atom_spins"] if self._generic_input["fix_spin_constraint"]: self.interactive_spin_constraints_setter( self._structure_current.get_initial_magnetic_moments() ) else: if "magnetic_forces" in self.interactive_cache.keys(): del self.interactive_cache["magnetic_forces"] if "atom_spin_constraints" in self.interactive_cache.keys(): del self.interactive_cache["atom_spin_constraints"] if len(self.restart_file_list) > 0: self._logger.debug("restarting; interactive run - start ...") self._interactive_pipe_write("run restart") self.interactive_fetch() def _output_interactive_to_generic(self): with self.project_hdf5.open("output") as h5: if "interactive" in h5.list_groups(): for key in ["positions", "indices", "cells", "forces"]: self.output.generic[key] = h5["interactive/" + key] if "atom_spin_constraints" in h5["interactive"].list_nodes(): self.output.generic.dft.atom_spin_constraints = h5[ "interactive/atom_spin_constraints" ] self.output.generic.to_hdf(hdf=self.project_hdf5)
[docs] def collect_output(self, force_update=False, compress_files=True): super(SphinxInteractive, self).collect_output( force_update=force_update, compress_files=compress_files ) self._output_interactive_to_generic()
[docs] def interactive_close(self): if self.interactive_is_activated(): self._interactive_pipe_write("end") self._interactive_library.close() self._interactive_library_read.close() self.status.collect = True if self["energy.dat"] is not None: self.run() self._output_interactive_to_generic() super(SphinxInteractive, self).interactive_close()
[docs] def calc_minimize( self, electronic_steps=None, ionic_steps=None, max_iter=None, pressure=None, ionic_energy_tolerance=None, ionic_force_tolerance=None, ionic_energy=None, ionic_forces=None, volume_only=False, ): if ( self.server.run_mode.interactive or self.server.run_mode.interactive_non_modal ): raise NotImplementedError( "calc_minimize() is not implemented for the interactive mode use calc_static()!" ) else: super(SphinxInteractive, self).calc_minimize( electronic_steps=electronic_steps, ionic_steps=ionic_steps, max_iter=max_iter, pressure=pressure, ionic_energy_tolerance=ionic_energy_tolerance, ionic_force_tolerance=ionic_force_tolerance, volume_only=volume_only, )
[docs] def run_if_interactive(self): super(SphinxInteractive, self).run_if_interactive() self._logger.debug("interactive run - start ...") self._interactive_pipe_write("run electronicminimization") self.interactive_fetch()
[docs] def run_if_interactive_non_modal(self): if not self._interactive_fetch_completed: print("Warning: interactive_fetch being effectuated") self.interactive_fetch() super(SphinxInteractive, self).run_if_interactive() self._logger.debug("interactive run - start ...") self._interactive_pipe_write("run electronicminimization") self._interactive_fetch_completed = False
[docs] def interactive_fetch(self): if ( self._interactive_fetch_completed and self.server.run_mode.interactive_non_modal ): print("First run and then fetch") else: self.interactive_collect() self._logger.debug("interactive run - done")
@property def _interactive_pipes_initialized(self): return os.path.exists( os.path.join(self.working_directory, "sxctrl") ) and os.path.exists(os.path.join(self.working_directory, "sxres")) def _interactive_pipe_write(self, line): if isinstance(line, str) or isinstance(line, int) or isinstance(line, float): self._interactive_library.write(str(line) + "\n") self._interactive_library.flush() elif isinstance(line, list): for subline in line: self._interactive_pipe_write(subline) else: raise TypeError("only lists, strings and numbers are supported!") def _interactive_pipe_read(self): return self._interactive_library_read.readline()
[docs] def load_main_group(self): main_group = Group() if ( self.server.run_mode.interactive or self.server.run_mode.interactive_non_modal ): commands = Group( [ { "id": '"restart"', "scfDiag": self.get_scf_group( maxSteps=10, keepRhoFixed=True, dEnergy=1.0e-4 ), }, { "id": '"electronicminimization"', "scfDiag": self.get_scf_group(), }, ] ) self.input.sphinx.main.extControl = Group() self.input.sphinx.main.extControl.create_group("bornOppenheimer") self.input.sphinx.main.extControl.bornOppenheimer = commands else: super(SphinxInteractive, self).load_main_group()
[docs] class SphinxOutput(Output, GenericInteractiveOutput): """ Handles the output from a SPHInX simulation. """
[docs] def check_band_occupancy(self, plot=True): """ Check whether there are still empty bands available. args: plot (bool): plots occupancy of the last step returns: True if there are still empty bands """ import matplotlib.pylab as plt try: elec_dict = self._job.content["output/generic/dft"]["n_valence"] except ValueError: raise AssertionError("Number of electrons not parsed") from None n_elec = np.sum( [elec_dict[k] for k in self._job.structure.get_chemical_symbols()] ) n_elec = int(np.ceil(n_elec / 2)) bands = self._job.content["output/generic/dft"]["bands_occ"][-1] bands = bands.reshape(-1, bands.shape[-1]) max_occ = np.sum(~np.isclose(bands, 0), axis=-1).max() n_bands = bands.shape[-1] if plot: xticks = np.arange(1, n_bands + 1) plt.xlabel("Electron number") plt.ylabel("Occupancy") if n_bands < 20: plt.xticks(xticks) plt.axvline(n_elec, label="#electrons: {}".format(n_elec)) plt.axvline(max_occ, color="red", label="Max occupancy: {}".format(max_occ)) plt.axvline( n_bands, color="green", label="Number of bands: {}".format(n_bands) ) plt.plot(xticks, bands.T, "x", color="black") plt.legend() if max_occ < n_bands: return True else: return False
[docs] class SphinxInt2(SphinxInteractive):
[docs] def __init__(self, project, job_name): warnings.warn("Please use SphinxInt instead of SphinxInt2") super(SphinxInt2, self).__init__(project=project, job_name=job_name)