Source code for mdacli.libcli

#!/usr/bin/env python3
# -*- Mode: python; tab-width: 4; indent-tabs-mode:nil; coding:utf-8 -*-
#
# Copyright (c) 2021 Authors and contributors
#
# Released under the GNU Public Licence, v2 or any higher version
# SPDX-License-Identifier: GPL-2.0-or-later

"""Functionalities that support the creation of the command lines interface."""
import argparse
import ast
import importlib
import inspect
import json
import logging
import os
import re
import warnings
from typing import List

import MDAnalysis as mda
from MDAnalysis.transformations.boxdimensions import set_dimensions

from .colors import Emphasise
from .save import save
from .utils import convert_str_time, parse_callable_signature, parse_docs


logger = logging.getLogger(__name__)

# serves CLI factory
STR_TYPE_DICT = {
    "bool": bool,
    "str": str,
    "list": list,
    "tuple": tuple,
    "dict": dict,
    "int": int,
    "float": float,
    "complex": complex,
    "NoneType": None,
    "AtomGroup": mda.AtomGroup,
    "MDAnalysis.core.groups.AtomGroup": mda.AtomGroup,
    "list[AtomGroup]": List[mda.AtomGroup],
    "MDAnalysis.core.universe.Universe": mda.Universe,
    "Universe": mda.Universe,
    }


def _warning(message, *args, **kwargs):
    logger.warning(Emphasise.warning(message))


warnings.showwarning = _warning


[docs] class KwargsDict(argparse.Action): """ Convert input string to a dictionary. If string points to a ".json" file, reads the file. Else, attempts to convert string to dictionary using json.loads. """ def __call__(self, parser, namespace, value, option_string=None): """Call me.""" if value.startswith("{") and value.endswith("}"): try: jdict = json.loads(value) except json.decoder.JSONDecodeError as err: raise json.decoder.JSONDecodeError( "An error ocurred when reading " f"{self.dest!r} argument.", err.doc, err.pos, ) from None else: with open(value, 'r') as fin: jdict = json.load(fin) setattr(namespace, self.dest, jdict)
[docs] def find_classes_in_modules(cls, *module_names): """ Find classes that belong to cls in modules. A series of names can be given as arguments. Parameters ---------- cls : single class or list of classes parent reference class type to search for module_names : str module to import import in absolute or relative terms (e.g. either pkg.mod or ..mod). Returns ------- list list of found class objects. If no classes are found, return None. """ # Convert all cls to tuples if type(cls) not in (list, tuple): cls = [cls] members = [] for name in module_names: module = importlib.import_module(name) for _, member in inspect.getmembers(module): if inspect.isclass(member) and issubclass(member, tuple(cls)) \ and member not in cls: members.append(member) return members or None
[docs] def find_cls_members(cls, modules, ignore_warnings=False): """Find members of a certain class in modules. Parameters ---------- cls : class or list of classes parent reference class or list of classes to be searched for modules : list list of modules for which members should be searched for ignore_warnings : bool Flag to ignore warnings """ with warnings.catch_warnings(): if not ignore_warnings: warnings.simplefilter('ignore') members = find_classes_in_modules(cls, *[m for m in modules]) return members
[docs] def split_argparse_into_groups(parser, namespace): """ Split the populated namespace of argparse into groups. https://stackoverflow.com/questions/31519997/is-it-possible-to- only-parse-one-argument-groups-parameters-with-argparse Parameters ---------- parse : `argparse.ArgumentParser` argument parser instance namespace : `argparse.Namespace` instance storing the parameters Returns ------- arg_grouped_dict : dict Dictionary containing parameters split according to their groups """ arg_grouped_dict = {} for group in parser._action_groups: group_dict = {a.dest: getattr(namespace, a.dest, None) for a in group._group_actions} arg_grouped_dict[group.title] = group_dict return arg_grouped_dict
[docs] def add_run_group(analysis_class_parser): """Add run group parameters to an given argparse.ArgumentParser instance. The run group adds the parameters `start`, `stop`, `step`, `verbose` to the parser. Parameters ---------- analysis_class_parser : argparse.ArgumentParser The ArgumentsParser instance to which the run grorup is added """ run_group = analysis_class_parser.add_argument_group( title="Analysis Run Parameters", description="General parameters specific for running the analysis. " "Parameters can be given in terms of frames (e.g. 12) " "or as a time ('12ps'). Allowed time units are " "'ps', 'fs' and 'ns'.") run_group.add_argument( "-b", dest="start", type=str, default=None, help="start frame or time for evaluation (default: %(default)s)" ) run_group.add_argument( "-e", dest="stop", type=str, default=None, help="end frame or time for evaluation (default: %(default)s)" ) run_group.add_argument( "-dt", dest="step", type=str, default="1", help="step or time step for evaluation (default: %(default)s)" ) run_group.add_argument( "-v", dest="verbose", help="Be loud and noisy", action="store_true", )
[docs] def add_output_group(analysis_class_parser): """Add output group parameters to argparse.ArgumentParser instance. The run group adds the parameters `output_prefix` and `output_directory` to the parser. Parameters ---------- analysis_class_parser : argparse.ArgumentParser The ArgumentsParser instance to which the run grorup is added """ output_group = analysis_class_parser.add_argument_group( title="Output Parameters", description="Genereal parameters specific for the result output.") output_group.add_argument( "-pre", dest="output_prefix", type=str, default="", help="Additional prefix for all output files. Files will be " " automatically named by the used module (default: %(default)s)" ) output_group.add_argument( "-o", dest="output_directory", type=str, default=".", help="Directory in which the output files produced will be stored." "(default: %(default)s)" )
[docs] def add_cli_universe(parser, name=''): """Add universe parameters to an given argparse.ArgumentParser. instance. The parameters `topology`, `topology_format`, `atom_style`, `coordinates` and `trajectory_format` are added to the parse. Parameters ---------- analysis_class_parser : argparse.ArgumentParser The ArgumentsParser instance to which the run grorup is added name : str suffix for the argument names """ name = f'_{name}' if name else '' parser.add_argument( f"-s{name}", dest=f"topology{name}", type=str, default="topol.tpr", help="The topolgy file. " "The FORMATs {} are implemented in MDAnalysis." "".format(", ".join(mda._PARSERS.keys())), ) parser.add_argument( f"-top{name}", dest=f"topology_format{name}", type=str, default=None, help="Override automatic topology type detection. " "See topology for implemented formats.") parser.add_argument( f"-atom_style{name}", dest=f"atom_style{name}", type=str, default=None, help="Manually set the atom_style information" "(currently only LAMMPS parser). E.g. atom_style='id type x y z'.") parser.add_argument( f"-f{name}", dest=f"coordinates{name}", type=str, default=None, nargs="+", help="A single or multiple coordinate files. " "The FORMATs {} are implemented in MDAnalysis." "".format(", ".join(mda._READERS.keys())), ) parser.add_argument( f"-traj{name}", dest=f"trajectory_format{name}", type=str, default=None, help="Override automatic trajectory type detection. " "See trajectory for implemented formats.") parser.add_argument( f"-dimensions{name}", dest=f"dimensions{name}", type=float, default=None, nargs="+", help="Manually set/overwrite the simulation box dimensions to a " "vector containing unit cell dimensions " "[a, b, c, alpha, beta, gamma], lengths a, b, c are in angstrom, " "and angles alpha, beta, gamma are in degrees. " "Providing only three parameters will assume a rectengular simulation " "box (alpha = beta = gamma = 90°).")
[docs] def create_cli(sub_parser, interface_name, parameters): """ Add subparsers to `cli_parser`. Subparsers parameters are divided in the following categories: 1. Analysis Run parameters time frame as begin, end, step and vebosity 2. Saving Parameters output_prefix and output_directory 3. Mandatory Parameters mandatory parameters are defined in the CLI as named parameters as per design 4. Optional Parameters Named parameters in the Analysis class 5. Reference Universe Parameters A reference Universe for selection commands. Only is created if AtomGroup arguments exist. All CLI's parameters are named parameters. Parameters ---------- sub_parser : argparse.sub_parser A sub parser where the new parser will be added. interface_name : str Name of the interface. parameters : dict Parameters needed to fill the argparse requirements for the CLI interface. Returns ------- None """ # creates the subparser analysis_class_parser = sub_parser.add_parser( interface_name, aliases=[interface_name.lower()], help=parameters["desc"], description=f"{parameters['desc']}\n\n{parameters['desc_long']}", formatter_class=argparse.RawDescriptionHelpFormatter, ) # Add run_analysis function as the default func parameter. # this is possible because the run_analysis function is equal to all # Analysis Classes analysis_class_parser.set_defaults( analysis_callable=parameters["callable"]) add_run_group(analysis_class_parser) # adds only if `save` method does not exist if not getattr(parameters['callable'], 'save', False): logger.debug("No save method found. Use generic one.") # TODO: add our save function as method. Avoids try except later... add_output_group(analysis_class_parser) else: # TODO: add parameters from save function to parser pass # add positional and optional arguments pos_ = sorted(list(parameters["positional"].items()), key=lambda x: x[0]) opt_ = sorted(list(parameters["optional"].items()), key=lambda x: x[0]) parameters_to_parse = pos_ + opt_ MANDATORY_TITLE = "Mandatory Parameters" mandatory_parameters_group = analysis_class_parser.add_argument_group( title=MANDATORY_TITLE, description="Mandatory parameters of this Analysis", ) groups = len(pos_) * [mandatory_parameters_group] # Only create parser if optional arguments exist if len(opt_) > 0: optional_parameters_group = analysis_class_parser.add_argument_group( title="Optional Parameters", description="Optional parameters specific of this Analysis", ) groups += len(opt_) * [optional_parameters_group] for group, (name, args_dict) in zip(groups, parameters_to_parse): # prepares parameters before add_argument try: type_ = STR_TYPE_DICT[args_dict["type"]] except KeyError: type_ = str # numpydocs allows for choices. Check if this is the case. try: match = re.search(r"\{(?P<CAST>[^\]]*)\}", args_dict["type"]) except KeyError: match = None type_ = str if match is not None: # Parameter can only assume one of a fixed set of values. # No type is given in this format, so we use ast.literal_eval to # infer the correct type. values = [ ast.literal_eval(obj.strip(" `")) for obj in match.group(1).split(",") ] # prepare type for later type_ = 'enum' default = values[0] try: default = args_dict["default"] except KeyError: default = None description = args_dict["desc"] flag = f"-{name}" arg_params = dict(dest=name, help=description, default=default, ) if group.title == MANDATORY_TITLE: arg_params["required"] = True if type_ is dict: arg_params["default"] = None arg_params["action"] = KwargsDict elif type_ is bool: if default: flag = f"-no-{name}" arg_params["action"] = "store_false" else: arg_params["action"] = "store_true" elif type_ in (mda.AtomGroup, List[mda.AtomGroup]): if type_ == List[mda.AtomGroup]: arg_params["nargs"] = "+" arg_params["type"] = str arg_params["help"] += " Use a MDAnalysis selection string." # Create one reference Universe argument for atom selection try: reference_universe_group except NameError: reference_universe_group = \ analysis_class_parser.add_argument_group( title="Reference Universe Parameters", description="Parameters specific for loading " "the reference topology and trajectory" " used for atom selection.") add_cli_universe(reference_universe_group) elif type_ is mda.Universe: add_cli_universe(group, name) continue elif type_ == 'enum': arg_params["choices"] = values arg_params["type"] = type(values[0]) else: if type_ in (list, tuple): arg_params["nargs"] = "+" arg_params["type"] = type_ arg_params["help"] += " (default: %(default)s)" group.add_argument(flag, **arg_params) return
[docs] def create_universe(topology, coordinates=None, topology_format=None, trajectory_format=None, atom_style=None, dimensions=None): """ Initilize a MDAnalysis universe instance. Parameters ---------- topology : str, stream, `~MDAnalysis.core.topology.Topology`, `np.ndarray` A CHARMM/XPLOR PSF topology file, PDB file or Gromacs GRO file; used to define the list of atoms. If the file includes bond information, partial charges, atom masses, ... then these data will be available to MDAnalysis. Alternatively, an existing :class:`MDAnalysis.core.topology.Topology` instance may be given, numpy coordinates, or None for an empty universe. coordinates : str, stream, list of str, list of stream Coordinates can be provided as files of a single frame (eg a PDB, CRD, or GRO file); a list of single frames; or a trajectory file (in CHARMM/NAMD/LAMMPS DCD, Gromacs XTC/TRR, or generic XYZ format). The coordinates must be ordered in the same way as the list of atoms in the topology. See :ref:`Supported coordinate formats` for what can be read as coordinates. Alternatively, streams can be given. topology_format : str, None Provide the file format of the topology file; ``None`` guesses it from the file extension. Can also pass a subclass of :class:`MDAnalysis.topology.base.TopologyReaderBase` to define a custom reader to be used on the topology file. trajectory_format : str or list or object provide the file format of the coordinate or trajectory file; ``None`` guesses it from the file extension. Note that this keyword has no effect if a list of file names is supplied because the "chained" reader has to guess the file format for each individual list member [``None``]. Can also pass a subclass of :class:`MDAnalysis.coordinates.base.ProtoReader` to define a custom reader to be used on the trajectory file. atom_style : str Customised LAMMPS `atom_style` information. Only works with `topology_format = data` dimensions : iterable of floats vector that contains unit cell lengths and probable angles. Expected shapes are eithere (6, 0) or (1, 6) or for shapes of (3, 0) or (1, 3) all angles are set to 90 degrees. Raises ------ IndexError If the dimesions of the `dimensions` argument are not 3 or 6. Returns ------- `MDAnalysis.Universe` """ universe = mda.Universe(topology, topology_format=topology_format, atom_style=atom_style) if coordinates is not None: universe.load_new(coordinates, format=trajectory_format) if dimensions is not None: if len(dimensions) == 3: dimensions = [*dimensions, 90, 90, 90] elif len(dimensions) != 6: raise IndexError( "The dimensions must contain at least 3 entries for " "the box length and possibly 3 more entries for the angles.") universe.trajectory.add_transformations(set_dimensions(dimensions)) return universe
[docs] def run_analysis(analysis_callable, mandatory_analysis_parameters, optional_analysis_parameters=None, reference_universe_parameters=None, run_parameters=None, output_parameters=None): """Perform main client logic. Parameters ---------- analysis_callable : function Analysis class for which the analysis is performed. mandatory_analysis_parameters : dict Mandatory parameters for executing the analysis optional_analysis_parameters : dict Optional parameters for executing the analysis run_parameters : dict time frame parameters: start, stop, step, verbose output_parameters : dict output_prefix and output_directory Returns ------- `MDAnalysis.analysis.base.AnalysisBase` AnalysisBase instance of the given ``analysis_callable`` after run. """ if optional_analysis_parameters is None: optional_analysis_parameters = {} if run_parameters is None: run_parameters = {} if output_parameters is None: output_parameters = {} verbose = run_parameters.pop("verbose", False) if reference_universe_parameters is not None: reference_universe = create_universe(**reference_universe_parameters) else: reference_universe = None # Initilize analysis callable universe = convert_analysis_parameters(analysis_callable, mandatory_analysis_parameters, reference_universe) convert_analysis_parameters(analysis_callable, optional_analysis_parameters, reference_universe) if universe is None: universe = reference_universe ac = analysis_callable(**mandatory_analysis_parameters, **optional_analysis_parameters) # Run the analysis for key, value in run_parameters.items(): run_parameters[key] = \ convert_str_time(value, universe.trajectory.dt) if value else value ac.run(verbose=verbose, **run_parameters) # Save results try: ac.save() except AttributeError: directory = output_parameters.get("output_directory", "") fname = output_parameters.get("output_prefix", "") fname = f"{fname}_{analysis_callable.__name__}" if fname \ else analysis_callable.__name__ save(ac.results, os.path.join(directory, fname)) return ac
[docs] def convert_analysis_parameters(analysis_callable, analysis_parameters, reference_universe=None): """ Convert parameters from the command line suitable for anlysis. Special types (i.e AtomGroups, Universes) are converted from the command line strings into the correct format. Parameters are changed inplace. Note that only keys are converted and no new key are added if present in the doc of the `analysis_callable` but not in the `analysis_parameters` dict. AtomGroup selection with type None are ignored since these could be default arguments. The following types are converted: * AtomGroup: Select atoms based on ``universe.select_atoms`` * list[AtomGroup]: Select atoms based on ``universe.select_atoms`` for every element in list * Universe: Created from parameters. Parameters ---------- analysis_callable : function Analysis class for which the analysis should be performed. analysis_parameters : dict parameters to be processed reference_universe : `MDAnalysis.Universe` Universe from which the AtomGroup selection are done. Returns ------- universe : Universe The universe created from the anaylysis parameters or None of no ine is created Raises ------ ValueError If an Atomgroup does not contain any atoms """ params = parse_docs(analysis_callable)[2] universe = None # If a Universe is part of the parameters several extra arguments with # non matching names were created. We seperate them by their connecting # character. analysis_parameters_keys = [p.split("_")[-1] for p in analysis_parameters.keys()] for param_name, dictionary in params.items(): if param_name in analysis_parameters_keys: if dictionary['type'] in ["AtomGroup", "MDAnalysis.core.groups.AtomGroup"]: sel = analysis_parameters[param_name] # Do not try to parse `None` value # They could be default arguments of a function if sel is None: continue atomgrp = reference_universe.select_atoms(sel) if atomgrp: analysis_parameters[param_name] = atomgrp else: raise ValueError(f"AtomGroup `-{param_name}` with " f"string of the selection {sel}` " f"does not contain any atoms.") elif "list[AtomGroup]" == dictionary['type']: if analysis_parameters[param_name] is None: continue for i, sel in enumerate(analysis_parameters[param_name]): atomgrp = reference_universe.select_atoms(sel) if atomgrp: analysis_parameters[param_name][i] = atomgrp else: raise ValueError(f"AtomGroup `-{param_name}` with " f"string of the selection {sel}` " f"does not contain any atoms.") elif dictionary['type'] in ["Universe", "MDAnalysis.core.universe.Universe"]: # Create universe parameter dictionary from signature sig = inspect.signature(create_universe) universe_parameters = dict(sig.parameters) for k in universe_parameters.keys(): universe_parameters[k] = analysis_parameters.pop( f"{k}_{param_name}") universe = create_universe(**universe_parameters) analysis_parameters[param_name] = universe return universe
[docs] def setup_clients(ap, title, members): """ Set up analysis clients for an ArgumentParser instance. Parameters ---------- ap : `argparse.ArgumentParser` Argument parser instance title : str title of the parser members : list list containing Analysis classes for setting up the parser """ cli_subparser = ap.add_subparsers(title=title) analysis_interfaces = { member.__name__: parse_callable_signature(member) for member in members } # adds each Analysis class/function as a CLI under 'cli_subparser' # to be writen for member_name, parameters in analysis_interfaces.items(): create_cli(sub_parser=cli_subparser, interface_name=member_name, parameters=parameters)
[docs] def init_base_argparse(name, version, description): """Create a basic `ArgumentParser`. The parser has options for printing the version, running in debug mode and with a logfile. Note that the funtion only adds the options to the parser but not the logic for actually running in debug mode nor how to store the log file. Parameters ---------- name : str Name of the cli program version : str Version of the cli program description : str Description of the cli program Returns ------- `ArgumentParser` """ ap = argparse.ArgumentParser( description=description, formatter_class=argparse.RawDescriptionHelpFormatter) ap.add_argument( '--version', action='version', version=f"{name} {version}", ) ap.add_argument( '--debug', action='store_true', help="Run with debug options.", ) ap.add_argument( "-nt", dest="num_threads", type=int, default=0, help="Total number of threads to start (0 is guess)") ap.add_argument('--logfile', dest='logfile', action='store', help='Logfile (optional)') return ap