Source code for pdb2pqr.inputgen
"""Create an APBS input file using :mod:`psize` data.
.. codeauthor:: Todd Dolinsky
.. codeauthor:: Nathan Baker
"""
import logging
import sys
from argparse import (
ArgumentDefaultsHelpFormatter,
ArgumentError,
ArgumentParser,
Namespace,
)
from pathlib import Path
from typing import List
from pdb2pqr.process_cli import check_file
from .psize import Psize
from .config import (
ApbsCalcType,
FilePermission,
LogLevels,
COARSE_GRID_FACTOR,
FINE_GRID_ADD,
GRID_SPACING,
BYTES_PER_GRID,
MEMORY_CEILING_MB,
PARTITION_OVERLAP,
FOCUS_FACTOR,
TITLE_STR,
)
from .elec import Elec
_LOGGER = logging.getLogger(__name__)
[docs]class Input:
"""Each object of this class is one APBS input file."""
[docs] def __init__(
self, pqrpath, size, method, asyncflag, istrng=0, potdx=False
):
"""Initialize the input file class.
Each input file contains a PQR name, a list of elec objects, and a
list of strings containing print statements.
For starters, assume two ELEC statements are needed, one for the
inhomgenous and the other for the homogenous dielectric calculations.
.. note::
This assumes you have already run psize, either by
:func:`size.run_psize(...)` or :func:`size.parse_string(...)`
followed by :func:`size.set_all()`.
:param pqrpath: path to PQR file
:type pqrpath: str
:param size: parameter sizing object
:type size: Psize
:param method: solution method (e.g., mg-para, mg-auto, etc.)
:type method: str
:param asyncflag: perform an asynchronous parallel focusing
calculation
:type asyncflag: bool
:param istrng: ionic strength/concentration (M)
:type istring: float
:param potdx: whether to write out potential information in DX format
:type potdx: bool
"""
self.pqrpath = Path(pqrpath)
self.pqrname = self.pqrpath.name
pqr_stem: str = self.pqrpath.stem
self.asyncflag = asyncflag
# Initialize variables to default elec values
elec1 = Elec(pqr_stem, method, size, asyncflag, istrng, potdx)
if not potdx:
elec2 = Elec(pqr_stem, method, size, asyncflag, istrng, potdx)
setattr(elec2, "sdie", 2.0)
setattr(elec2, "write", [])
else:
elec2 = ""
self.elecs: List[Elec] = [elec1, elec2]
if not potdx:
self.prints = ["print elecEnergy 2 - 1 end"]
else:
self.prints = ["print elecEnergy 1 end"]
def __str__(self):
text = "read\n"
text += f" mol pqr {self.pqrname}\n"
text += "end\n"
for elec in self.elecs:
text += str(elec)
for prints in self.prints:
text += prints
text += "\nquit\n"
return text
[docs] def print_input_files(self, output_path) -> List[str]:
"""Generate the input file(s) associated with this object.
:param output_path: location for generated files
:type output_path: str
:return: List of file paths which were generated
:rtype: List[str]
"""
file_list = []
path = Path(output_path)
if self.asyncflag:
base_name = path.stem
outname = path.parent / f"{base_name}"
# Temporarily disable async flag
for elec in self.elecs:
elec.asyncflag = False
para_name = path.parent / f"{outname}-para.in"
with open(para_name, "wt", encoding="utf-8") as out_file:
out_file.write(str(self))
file_list.append(para_name)
# Now make the async files
file_list.extend(split_input(para_name, outname))
else:
with open(path, "wt", encoding="utf-8") as out_file:
out_file.write(str(self))
file_list.append(path)
return file_list
[docs]def split_input(filename: Path, stem: str = None) -> List[str]:
"""Split the parallel input file into multiple async file names.
:param filename: the path to the original parallel input file
:type filename: str
:param stem: an optional file prefix for name
:type stem: str
:return: List of file paths which were generated
:rtype: List[str]
"""
nproc = 0
file_list = []
with open(filename, "rt", encoding="utf-8") as file_:
text = ""
while True:
line = file_.readline()
if not line:
break
text += line
line = line.strip()
if line.startswith("pdime"): # Get # Procs
words = line.split()
nproc = int(words[1]) * int(words[2]) * int(words[3])
if nproc == 0:
errstr = f"{filename} is not a valid APBS parallel input file!\n"
errstr = errstr + (
"The inputgen script was unable to asynchronize this file!"
)
raise RuntimeError(errstr)
for iproc in range(nproc):
if stem is None:
stem = filename.stem
outname = filename.parent / f"{stem}-PE{iproc}.in"
outtext = text.replace("mg-para\n", f"mg-para\n async {iproc}\n")
with open(outname, "w", encoding="utf-8") as outfile:
outfile.write(outtext)
file_list.append(outname)
return file_list
[docs]def get_cli_args(args_str: str = None) -> Namespace:
"""Define and parse command line arguments via argparse.
:param args_str: String representation of command line arguments
:type args_str: str
:return: Parsed arguments object
:rtype: argparse.Namespace
"""
desc = f"{TITLE_STR}\ninputgen: generating APBS input files since "
desc += "(at least) 2004"
parser = ArgumentParser(
description=desc,
formatter_class=ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"--log-level",
help="Set logging level",
default=str(LogLevels.INFO),
choices=LogLevels.values(),
)
parser.add_argument(
"--asynch",
action="store_true",
help="perform an asynchronous parallel calculation.",
)
parser.add_argument(
"--split",
action="store_true",
help=(
"split an existing parallel input file to multiple "
"async input files."
),
)
parser.add_argument(
"--potdx",
action="store_true",
help=("create an input to compute an electrostatic potential map."),
)
parser.add_argument(
"--method",
help=("force output file to write a specific APBS ELEC method."),
choices=ApbsCalcType.values(),
default=str(ApbsCalcType.MG_AUTO),
type=str.lower,
)
parser.add_argument(
"--cfac",
type=float,
default=COARSE_GRID_FACTOR,
help=(
"factor by which to expand molecular dimensions to "
"get coarse grid dimensions."
),
)
parser.add_argument(
"--fadd",
type=float,
default=FINE_GRID_ADD,
help=(
"amount to add to molecular dimensions to get fine "
"grid dimensions."
),
)
parser.add_argument(
"--space",
type=float,
default=GRID_SPACING,
help="desired fine mesh resolution",
)
parser.add_argument(
"--gmemfac",
type=int,
default=BYTES_PER_GRID,
help=(
"number of bytes per grid point required for sequential "
"MG calculation"
),
)
parser.add_argument(
"--gmemceil",
type=int,
default=MEMORY_CEILING_MB,
help=(
"max MB allowed for sequential MG calculation; adjust "
"this to force the script to perform faster calculations "
"(which require more parallelism)"
),
)
parser.add_argument(
"--ofrac",
type=float,
default=PARTITION_OVERLAP,
help="overlap factor between mesh partitions (parallel)",
)
parser.add_argument(
"--redfac",
type=float,
default=FOCUS_FACTOR,
help=(
"the maximum factor by which a domain dimension can "
"be reduced during focusing"
),
)
parser.add_argument(
"--istrng",
type=float,
default=0.0,
help="Ionic strength (M); Na+ and Cl- ions will be used",
)
parser.add_argument("filename")
args = None
try:
if args_str:
return parser.parse_args(args_str.split())
args = parser.parse_args()
except ArgumentError as err:
_LOGGER.error("Cannot parse CLI: %s", err)
sys.exit(1)
return args
[docs]def main():
"""Main driver"""
args = get_cli_args()
check_file(args.filename)
filename = Path(args.filename)
if args.split:
split_input(filename)
else:
output_path = filename.parent / Path(f"{filename.stem}.in")
check_file(
output_path, permission=FilePermission.WRITE, overwrite=False
)
psize = Psize()
psize.run_psize(args.filename)
input_ = Input(
args.filename,
psize,
args.method,
args.asynch,
args.istrng,
args.potdx,
)
input_.print_input_files(output_path)
if __name__ == "__main__":
main()