Source code for canopy_factory.cli

import os
import gc
import sys
import copy
import pprint
import argparse
import numpy as np
import inspect
import pytz
import glob
import shutil
import itertools
import uuid
import datetime
from collections import OrderedDict
from canopy_factory import utils, arguments
from canopy_factory.utils import (
    cfg, rapidjson, units, parse_quantity, format_list_for_help,
    get_class_registry, NoDefault, cached_property,
)


[docs] class SetBase(object): r"""Simple wrapper for a set of classes. Args: members (list): Set of instances of the same class. parent (object, optional): Instance containing this group. """ _set_attributes = ['parent', 'members'] def __init__(self, parent, members): self.parent = parent self.members = members @classmethod def _is_set_attribute(cls, k): return (k in cls._set_attributes or hasattr(cls, k)) def __getattr__(self, k): if type(self)._is_set_attribute(k): return super(SetBase, self).__getattr__(k) if not self.members: raise AttributeError(f'{self}: {k}') vals = [getattr(x, k) for x in self.members] out = vals[0] if not all([x == out] for x in vals): raise AttributeError(f'{self}: {k} (values differ {vals})') return out def __setattr__(self, k, v): if type(self)._is_set_attribute(k): return super(SetBase, self).__setattr__(k, v) if not self.members: raise AttributeError(f'{self}: {k}') for x in self.members: setattr(x, k, v)
[docs] class InstrumentedParserSet(SetBase): r"""Simple wrapper for a set of parsers. Args: members (list): Set of parsers. parent (ArgumentParser, optional): Parser containing this group. """
[docs] def add_subparsers(self, group, **kwargs): r"""Add a subparsers group to this parser. Args: group (str): Name of the subparser group. Used to set the default 'title' & 'dest' subparser properties. **kwargs: Additional keyword arguments are passed to the base class's method. Returns: InstrumentedParserSet: Wrapped subparsers group. """ return InstrumentedSubparserSet( self, [x.add_subparsers(group, **kwargs) for x in self.members] )
[docs] def add_argument(self, *args, **kwargs): r"""Add an argument to the parser. Args: *args: All arguments are passed to the add_argument method of the subparsers (if there are any) or the parent class (if there are not any subparsers). **kwargs: All arguments are passed to the add_argument method of the subparsers (if there are any) or the parent class (if there are not any subparsers). """ return ActionSet( self, [ x.add_argument(*args, **kwargs) for x in self.members ] )
[docs] class InstrumentedSubparserSet(InstrumentedParserSet): r"""Simple wrapper for a set of subparsers.""" @property def choices(self): r"""dict: Set of subparsers in the group.""" return { k: InstrumentedParserSet( self, [x.choices[k] for x in self.members]) for k in self.members[0].choices.keys() }
[docs] def has_parser(self, name): r"""Check if there is a parser for a given name within this subparser group. Args: name (str): ID string for the parser. Returns: bool: True if the name parser exists, False otherwise. """ return all(x.has_parser(name) for x in self.members)
[docs] def add_parser(self, name, *args, **kwargs): r"""Add a parser to the subparser group. Args: name (str): ID string for the parser. *args: Additional arguments are passed to the add_parser method for the underlying subparser group. **kwargs: Additional keyword arguments are passed to the add_parser method for the underlying subparser group. Returns: ArgumentParser: New subparser. """ return InstrumentedParserSet( self, [x.add_parser(name, *args, **kwargs) for x in self.members] )
[docs] def get_parser(self, name, *args, **kwargs): r"""Get a parser for an option in the subparser group. Args: name (str): ID string for the parser. default (object, optional): Default to return if the subparser instance does not exist. add_missing (bool, dict, optional): If True or dictionary, add the subparser if it does not exist. If a dictionary is provided, it will be passed as keyword arguments to the add_parser method. *args: Additional arguments are passed to the get_parser method for the underlying subparser group. **kwargs: Additional keyword arguments are passed to the get_parser method for the underlying subparser group. Returns: ArgumentParser: Subparser. """ return InstrumentedParserSet( self, [x.get_parser(name, *args, **kwargs) for x in self.members] )
[docs] class ActionSet(SetBase): r"""Simple wrapper for a set of actions.""" pass
[docs] class SubParsersAction(argparse._SubParsersAction): def __init__(self, *args, default=None, choices=None, func=None, positional_index=0, subparser_name=None, **kwargs): super(SubParsersAction, self).__init__(*args, **kwargs) self.subparser_default = default self.func = func self._choice_names = [] self._positional_index = positional_index self._subparser_name = subparser_name if choices: for k in choices: self.add_parser(k)
[docs] def has_parser(self, name): r"""Check if there is a parser for a given name within this subparser group. Args: name (str): ID string for the parser. Returns: bool: True if the name parser exists, False otherwise. """ return (name in self._name_parser_map)
[docs] def add_parser(self, name, *args, **kwargs): r"""Add a parser to the subparser group. Args: name (str): ID string for the parser. *args: Additional arguments are passed to the add_parser method for the underlying subparser group. **kwargs: Additional keyword arguments are passed to the add_parser method for the underlying subparser group. Returns: ArgumentParser: New subparser. """ kwargs['positional_index'] = self._positional_index + 1 self._choice_names.append(name) out = super(SubParsersAction, self).add_parser( name, *args, **kwargs) out._subparser_name = name out._subparser_group = self return out
[docs] def get_parser(self, name, default=NoDefault, add_missing=False): r"""Get a parser for an option in the subparser group. Args: name (str): ID string for the parser. default (object, optional): Default to return if the subparser instance does not exist. add_missing (bool, dict, optional): If True or dictionary, add the subparser if it does not exist. If a dictionary is provided, it will be passed as keyword arguments to the add_parser method. Returns: ArgumentParser: Subparser. """ if not self.has_parser(name): if add_missing is True: add_missing = {} if isinstance(add_missing, dict): self.add_parser(name, **add_missing) elif default != NoDefault: return default return self._name_parser_map[name]
[docs] def add_subparsers(self, group, **kwargs): r"""Add a subparsers group to each parser in this subparser group. Args: group (str): Name of the subparser group. Used to set the default 'title' & 'dest' subparser properties. **kwargs: Additional keyword arguments are passed to the base class's method. Returns: InstrumentedParserSet: Set of subparsers for each parser. """ return InstrumentedSubparserSet( self, [ self._name_parser_map[k].add_subparsers(group, **kwargs) for k in self._choice_names ] )
[docs] def find_argument(self, name, default=NoDefault): r"""Find the action that will handle an argument. Args: name (str): Argument name or flag. default (object, optional): Object that should be returned if the arguments cannot be located. Returns: Action: Argument action. Raises: KeyError: If the argument cannot be located and a default is not provided. """ if self.has_argument(name): return ActionSet( self, [ self._name_parser_map[k].find_argument(name) for k in self._choice_names if self._name_parser_map[k].has_argument(name) ] ) if default is NoDefault: raise KeyError(f'No argumented \"{name}\" in subparser. ' f'Available parsers: {self._choice_names}') return default
[docs] def has_argument(self, name): r"""Check to see if the parser has an argument. Args: name (str): Argument name or flag. Returns: bool: True if the parser has an argument matching the name, False otherwise. """ for k in self._choice_names: if self._name_parser_map[k].has_argument(name): return True return False
[docs] def remove_argument(self, name): r"""Remove an argument from the parser. Args: name (str): Name of the argument to remove. """ for k in self._choice_names: self._name_parser_map[k].remove_argument(name)
[docs] def add_argument(self, *args, **kwargs): r"""Add an argument to the parser. Args: *args: All arguments are passed to the add_argument method of the subparsers (if there are any) or the parent class (if there are not any subparsers). **kwargs: All arguments are passed to the add_argument method of the subparsers (if there are any) or the parent class (if there are not any subparsers). """ return ActionSet( self, [ self._name_parser_map[k].add_argument(*args, **kwargs) for k in self._choice_names ] )
[docs] class InstrumentedParser(argparse.ArgumentParser): r"""Class for parsing arguments allowing arguments to be added to multiple subparsers.""" def __init__(self, *args, parent=None, positional_index=0, **kwargs): self._subparsers_action = None self._subparser_name = None self._subparser_group = None self._child_subparsers = {} self._positional_index = positional_index super(InstrumentedParser, self).__init__(*args, **kwargs) self.register('action', 'parsers', SubParsersAction) self.add_argument( '--hide-irrelevant', default=-1, dest='hide_irrelevant', type=int, const=0, nargs='?', help=('Hide arguments that are diabled by absent or ' 'invalid arguments already specified. This can ' 'be useful for getting more information about ' 'available command line options'), )
[docs] def local_subparsers(self, subparsers=None): r"""Get the subparser instances for the subparser group directly managed by this parser (if there is one). Args: subparsers (list, optional): Names of subparsers that should be included in the returned list. If not provided, all subparsers in the group will be returned. Returns: list: Subparser instances. """ if self._subparsers_action is None: return [] if isinstance(subparsers, dict) and self._subparsers_action: subparsers = subparsers.get(self._subparsers_action.dest, None) if subparsers is None: return list(self._subparsers_action.choices.values()) return [v for k, v in self._subparsers_action.choices.items() if k in subparsers]
[docs] def subparsers(self, group=None, subparsers=None, yield_roots=False): r"""Iterate over all subparser instances within this parser. Args: group (str, optional): Group that subparsers should be returned for. If not provided, all leaf level subparsers will be yielded. subparsers (dict, optional): Maps between the names of subparser groups and the names of subparsers within those groups that should be included. yield_roots (bool, optional): If True, yield the highest level parser that matches the group. Yields: InstrumentedParser: Subparsers. """ if group is None and (self._subparsers_action is None or yield_roots): yield self return if subparsers is None: subparsers = {} if ((self._subparsers_action is not None and self._subparsers_action.dest == group)): if yield_roots: for x in self.local_subparsers(subparsers=subparsers): yield x return group = None # Allow children to be yielded for x in self.local_subparsers(subparsers=subparsers): if yield_roots and group is None: yield x else: for xx in x.subparsers(group=group, subparsers=subparsers, yield_roots=yield_roots): yield xx
def _action_irrelevant(self, action, args): if action.help == argparse.SUPPRESS: return False if not getattr(action, 'dependencies', None): return False out = False for i, x in enumerate(action.dependencies[::-1]): if not x: continue for k, v in x.items(): add_to_parent = (i == 0) argv = getattr(args, k, None) if (((v is True and argv is None) or (v is False and argv is not None) or (isinstance(v, (list, tuple)) and argv not in v))): if args.hide_irrelevant > 0: if i >= args.hide_irrelevant: out = True add_to_parent = (i <= args.hide_irrelevant) else: out = True if not add_to_parent: continue if v is True and argv is None: self.find_argument(k).enables_dependencies.append( action.dest) elif v is False and argv is not None: self.find_argument(k).disables_dependencies.append( action.dest) elif isinstance(v, (list, tuple)) and argv not in v: parent_action = self.find_argument(k) for kdep in v: parent_action.optional_dependencies.setdefault( kdep, []) parent_action.optional_dependencies[kdep].append( action.dest) return out
[docs] def prune_irrelevant_args(self, args): r"""Remove dependent arguments from the parser that are not valid for the provided arguments. Args: args (argparse.Namespace): Arguments parsed so far. Returns: bool: True if arguments were pruned, False otherwise. """ if args.hide_irrelevant == -1: return False changes = False for action in self._actions: if self._action_irrelevant(action, args): action.help = argparse.SUPPRESS action.default = argparse.SUPPRESS changes = True for x in self.local_subparsers(): if x.prune_irrelevant_args(args): changes = True if changes: self._add_irrelevant_args_to_help(args) return changes
def _add_irrelevant_args_to_help(self, args): for action in self._actions: if action.help == argparse.SUPPRESS: continue new_help = [] if getattr(action, 'enables_dependencies', None): vals = format_list_for_help( sorted(list(set(action.enables_dependencies)))) new_help.append(f'Enables {vals}.') action.enables_dependencies = [] if getattr(action, 'disables_dependencies', None): vals = format_list_for_help( sorted(list(set(action.disables_dependencies)))) new_help.append(f'Disables {vals}.') action.disables_dependencies = [] if getattr(action, 'optional_dependencies', None): values = [ f'\'{k}\' enables {format_list_for_help(v)}.' for k, v in action.optional_dependencies.items() ] new_help.append('\n' + '\n'.join(values)) action.optional_dependencies = {} if new_help: if not action.help.endswith('.'): action.help += '.' action.help += ' ' + ' '.join(new_help) for x in self.local_subparsers(): x._add_irrelevant_args_to_help(args)
[docs] def add_subparser_defaults(self, args): if self._subparsers_action is None: return None value = getattr(args, self._subparsers_action.dest, None) if value is None: if self._subparsers_action.subparser_default is None: return None return ( self._subparsers_action._positional_index, self._subparsers_action.subparser_default ) return self._subparsers_action.choices[ value].add_subparser_defaults(args)
def _parse_args_partial(self, args, for_help=False): r"""Parse arguments, allowing for modifications to either the input arguments or argument parser based on the arguments that are present. Args: args (list): Arguments to parse. for_help (bool, optional): If True, the arguments are being parsed to generate help and defaults should not be set for subparsers. Returns: bool: True if either the arguments or argument parser was modified. """ assert isinstance(args, list) changes = False args0, _ = super(InstrumentedParser, self).parse_known_args( args=args) if self.prune_irrelevant_args(args0): changes = True if for_help: pass else: args_supp = self.add_subparser_defaults(args0) if args_supp: changes = True args.insert(*args_supp) return changes
[docs] def parse_known_args(self, args=None, namespace=None, **kwargs): r"""Parse known arguments. Args: args (list, optional): Arguments to parse. Defaults to sys.argv if not provided. namespace (argparse.Namespace, optional): Existing namespace that arguments should be added to. **kwargs: Additional keyword arguments are passed to the parent class's method. Returns: tuple(argparse.Namespace, list): Parsed and unparsed arguments. """ if self._subparser_group is None: if args is None: args = sys.argv[1:] help_flags = ['-h', '--help'] for_help = False for x in help_flags: if x in args: for_help = x args.remove(x) break while self._parse_args_partial(args, for_help=for_help): continue if for_help: args.append(for_help) out, argv = super(InstrumentedParser, self).parse_known_args( args=args, namespace=namespace, **kwargs) return out, argv
[docs] def has_subparser(self, group, name): r"""Check if there is a named subparser within a subparsers group. Args: group (str): Name of the subparsers group to check for. name (str): Name of the parser within the subparsers group to check for. Returns: bool: True if the named parser within the subparsers group exists. """ return (self.has_subparsers(group) and self.get_subparsers(group).has_parser(name))
[docs] def has_subparsers(self, group): r"""Check if there is a subparsers group associated with the specified name. Args: group (str): Name of the subparsers group to check for. Returns: bool: True if the named subparsers group exists. """ return (group in self._child_subparsers)
[docs] def get_subparser(self, group, name, default=NoDefault, add_missing=False, add_missing_group=None): r"""Get a single subparser from a subparser group. Args: group (str): Name of the subparser group that the parser belongs to. name (str): Name of the subparser. default (object, optional): Default to return if the subparser instance does not exist. add_missing (bool, dict, optional): If True or dictionary, add the subparser if it does not exist. If a dictionary is provided, it will be passed as keyword arguments to the add_subparser method. add_missing_group (bool, dict, optional): If True or dictionary, the subparser group will be added if it is missing. If a dictionary is provided it will be passed as keyword arguments to the add_subparsers method. If not provided, add_missing_group will be set to True only if add_missing is True or a dictionary. Returns: InstrumentedParser: Parser. Raises: KeyError: If default not provided and the subparser object does not exist. """ if add_missing_group is None: add_missing_group = ( add_missing or isinstance(add_missing, dict)) subparsers = self.get_subparsers(group, default=default, add_missing=add_missing_group) if not self.has_subparsers(group): return subparsers # Will be default return subparsers.get_parser(name, default=default, add_missing=add_missing)
[docs] def get_subparsers(self, group, default=NoDefault, add_missing=False): r"""Get the subparsers object with the specified group name. Args: group (str): Name of the subparsers instance to retrieve. default (object, optional): Default to return if the subparsers instance does not exist. add_missing (bool, dict, optional): If True or dictionary, add the subparser group if it does not exist. If a dictionary is provided, it will be passed as keyword arguments to the add_subparsers method. Returns: InstrumentedSubparsers: Subparsers instance. Raises: KeyError: If default not provided and the subparsers object does not exist. """ out = default if add_missing is True: add_missing = {} if ((isinstance(add_missing, dict) and group not in self._child_subparsers)): # TODO: Use name of first subparser for default? self.add_subparsers(group, **add_missing) if group in self._child_subparsers: out = self._child_subparsers[group] if out is NoDefault: raise KeyError(group) return out
[docs] def add_subparser(self, group, name, add_missing_group=True, **kwargs): r"""Add a subparser to a subparsers group. Args: group (str): Name of the subparser group that the parser belongs to. name (str): Name of the subparser. add_missing_group (bool, dict, optional): If True or dictionary, the subparser group will be added if it is missing. If a dictionary is provided it will be passed as keyword arguments to the add_subparsers method. **kwargs: Additional keyword argumetns are passed to the add_parser method of the wrapped subparsers group. Returns: InstrumentedParser: Parser. """ assert not self.has_subparser(group, name) subparsers = self.get_subparsers( group, add_missing=add_missing_group) subparsers.add_parser(name, **kwargs) return self.get_subparser(group, name)
[docs] def add_subparsers(self, group, **kwargs): r"""Add a subparsers group to this parser. Args: group (str): Name of the subparser group. Used to set the default 'title' & 'dest' subparser properties. **kwargs: Additional keyword arguments are passed to the base class's method. Returns: InstrumentedSubparsers: Wrapped subparsers group. """ assert group not in self._child_subparsers kwargs.setdefault('dest', group) kwargs.setdefault('title', group) if self._subparsers_action is not None: out = self._subparsers_action.add_subparsers(group, **kwargs) else: positional_index = ( self._positional_index + len(self._get_positional_actions()) ) subparser_name = self._subparser_name out = super(InstrumentedParser, self).add_subparsers( positional_index=positional_index, subparser_name=subparser_name, **kwargs) self._subparsers_action = out self._child_subparsers[group] = out return out
[docs] def subparser_class(self, group, args): r"""Get the subparser class selected by a subparser group. Args: group (str): Name of the subparser group to get the class for. args (argparse.Namespace): Parsed arguments. Returns: type: Subparser class. """ subparser = self.get_subparsers(group) subparser_name = getattr(args, group) if subparser_name is None: subparser_name = subparser.default setattr(args, group, subparser_name) assert subparser.func is None return get_class_registry().get(group, subparser_name)
[docs] def construct_subparser(self, group, args, **kwargs): r"""Construct the subparser selected by a subparser group. Args: group (str): Name of the subparser group to construct. args (argparse.Namespace): Parsed arguments. **kwargs: Additional keyword arguments are passed to the class constructor. Returns: SubparserBase: Subparser instance. """ cls = self.subparser_class(group, args) return cls(args, **kwargs)
[docs] def run_subparser(self, group, args, return_func=False): r"""Run the subparser selected by a subparser group. Args: group (str): Name of the subparser group to run. args (argparse.Namespace): Parsed arguments. return_func (bool, optional): If True, return the subparser run function without calling it. """ subparser = self.get_subparsers(group) subparser_name = getattr(args, group) if subparser_name is None: subparser_name = subparser.default func = subparser.func if func is None: func = get_class_registry().get(group, subparser_name) if return_func: return func print(f"RUNNING {subparser_name}") return func(args)
[docs] def find_argument(self, name, default=NoDefault): r"""Find the action that will handle an argument. Args: name (str): Argument name or flag. default (object, optional): Object that should be returned if the arguments cannot be located. Returns: Action: Argument action. Raises: KeyError: If the argument cannot be located and a default is not provided. """ for action in self._actions: if ((name in action.option_strings or name == action.dest or name.lstrip('-').replace('-', '_') == action.dest)): return action elif (isinstance(action, SubParsersAction) and action.has_argument(name)): return action.find_argument(name) if default is NoDefault: available = set() for action in self._actions: available |= set(action.option_strings + [action.dest]) raise KeyError(f'No argumented \"{name}\". Available ' f'options: {sorted(list(available))}') return default
[docs] def has_argument(self, name): r"""Check to see if the parser has an argument. Args: name (str): Argument name or flag. Returns: bool: True if the parser has an argument matching the name, False otherwise. """ try: self.find_argument(name) return True except KeyError: return False
[docs] def remove_argument(self, name): r"""Remove an argument from the parser. Args: name (str): Name of the argument to remove. """ if isinstance(name, str): action = self.find_argument(name) else: action = name action.container._remove_action(action) for k in action.option_strings: action.container._option_string_actions.pop(k)
[docs] def add_argument(self, *args, **kwargs): r"""Add an argument to the parser. Args: *args: All arguments are passed to the add_argument method of the subparsers (if there are any) or the parent class (if there are not any subparsers). **kwargs: All arguments are passed to the add_argument method of the subparsers (if there are any) or the parent class (if there are not any subparsers). """ kwargs.setdefault('dest', args[0].lstrip('-').replace('-', '_')) kwargs = copy.deepcopy(kwargs) dependencies = kwargs.pop('dependencies', None) is_subparser = kwargs.pop("is_subparser", False) subparsers = kwargs.pop('subparsers', None) subparser_options = kwargs.pop('subparser_options', {}) assert not subparsers assert not subparser_options if self._subparsers_action is not None: x = self._subparsers_action else: x = self iargs = copy.deepcopy(args) ikwargs = copy.deepcopy(kwargs) ikwargs = dict(kwargs, **subparser_options.get(x, {})) if is_subparser: group = iargs[0].replace('-', '_').lstrip('_') action = x.add_subparsers(group, **ikwargs) elif isinstance(x, InstrumentedParser): action = super(InstrumentedParser, x).add_argument( *iargs, **ikwargs) else: action = x.add_argument(*iargs, **ikwargs) action.dependencies = dependencies action.enables_dependencies = [] action.disables_dependencies = [] action.optional_dependencies = {} return action
[docs] class CompositeArgument(arguments.RegisteredArgumentClassDict): r"""Container for parsing related arguments. Args: args (argparse.Namespace): Parsed arguments. name_base (str): Name of the base variable that should be used to set defaults. defaults (dict, optional): Defaults that should be used for time arguments before the class _defauls. optional (bool, optional): If True, defaults from the class will not be set when the argument is not defined. args_overwrite (dict, optional): Arguments to overwrite. Class Attributes: _name (str): Name of the base variables that should be used to determine the prefix/suffix and where the instance should be stored on the parsed arguments. _defaults (dict): Argument defaults. _arguments_prefixed (list): Arguments that should be prefixed. _arguments_universal (list): Arguments that should not be prefixed/suffixed. """ _registry_key = 'argument' _is_factory_class = False _name = None _name_as_suffix = False _name_as_prefix = False _defaults = {} _arguments_prefixed = [] _arguments_universal = [] _attributes_kwargs = [ 'optional', ] _attributes_copy = [] name = None prefix = '' suffix = '' description = '' @staticmethod def _on_registration(cls): if cls.name is None or not cls._is_factory_class: cls.name = cls._name if isinstance(cls._arguments_prefixed, list): cls._arguments_prefixed = arguments.ArgumentDescriptionSet( cls._arguments_prefixed) if isinstance(cls._arguments_universal, list): cls._arguments_universal = arguments.ArgumentDescriptionSet( cls._arguments_universal) cls._arguments_universal.modify(universal=True) cls._arguments = ( cls._arguments_prefixed + cls._arguments_universal) cls._composite_argument = None arguments.RegisteredArgumentClassDict._on_registration(cls)
[docs] @classmethod def composite_argument(cls): if ((cls._composite_argument is None or (cls._composite_argument.name != cls.name))): assert cls.name is not None cls._composite_argument = arguments.CompositeArgumentDescription( cls.name, composite_type=cls, prefix=cls.prefix, suffix=cls.suffix, ) return cls._composite_argument
def __init__(self, args, name_base=None, defaults=None, optional=False, args_overwrite=None): if name_base == self.name: name_base = None if defaults is None: defaults = {} self._defaults_set = [] self.name_base = name_base self.base = None self.optional = optional self.defaults = { k: defaults[k] for k in self._arguments.keys() if k in defaults } super(CompositeArgument, self).__init__( args, args_overwrite=args_overwrite)
[docs] @classmethod def is_factory_analogue(cls, other): r"""Check if an instance is an analagous factory class. Args: other (CompositeArgument): Returns: bool: True if the instance is of an analagous class. """ if isinstance(other, cls): return True return (other.name == cls.name)
[docs] @classmethod def class_factory(cls, name, registry_name=None, **kwargs): r"""Create a new class for a modified version of the arguments. Args: name (str): Composite argument name. registry_name (str, optional): Name that should be used to register the generated class. If not provided, one will be generated. **kwargs: Additional keyword arguments are used to modify the set of arguments for the new class. Returns: type: Composite argument class. """ if name == cls.name and not (kwargs or registry_name): return cls if registry_name is None: registry_name = name prefix = cls.get_prefix(name) suffix = cls.get_suffix(name) if kwargs: while get_class_registry().get(cls._registry_key, registry_name, None): registry_name += str(uuid.uuid4()) else: existing = get_class_registry().get( cls._registry_key, registry_name, None) if existing is not None: return existing if cls.description: kwargs.setdefault('description', cls.description) arguments_prefixed = cls._arguments_prefixed.copy( prefix=prefix, suffix=suffix, **kwargs) arguments_universal = cls._arguments_universal.copy(**kwargs) # arguments = arguments_prefixed + arguments_universal argument_name = name argument_prefix = prefix argument_suffix = suffix class FactoryCompositeClass(cls): _registry_name = registry_name _is_factory_class = True # _arguments = arguments _arguments_prefixed = arguments_prefixed _arguments_universal = arguments_universal name = argument_name prefix = argument_prefix suffix = argument_suffix description = kwargs.get('description', '') return FactoryCompositeClass
[docs] def reset(self, args, args_overwrite=None): r"""Reinitialize the arguments used by this instance. Args: args (argparse.Namespace): Parsed arguments. args_overwrite (dict, optional): Arguments to overwrite. """ self._defaults_set.clear() if self.name_base is not None: self.base = self.from_args( args, name=self.name_base, name_base=self.name_base, args_overwrite=args_overwrite, ) super(CompositeArgument, self).reset( args, args_overwrite=args_overwrite)
[docs] def setarg(self, name, value): r"""Set an argument value. Args: name (str): Argument name. value (object): Argument value. """ super(CompositeArgument, self).setarg(name, value) self._defaults_set.clear()
[docs] @classmethod def from_other(cls, other, **kwargs): r"""Create an argument instance from an existing instance of any CompositeArgument subclass. Args: other (CompositeArgument): Instance to copy. **kwargs: Additional keyword arguments are passed to the class constructor. Returns: CompositeArgument: Copy as this class. """ argument_names = cls.argument_names(include='name') name = other.prefix + cls._name + other.suffix if other.name_base and 'name_base' not in kwargs: kwargs['name_base'] = ( other.get_prefix(other.name_base) + cls._name + other.get_suffix(other.name_base) ) if other.defaults and 'defaults' not in kwargs: kwargs['defaults'] = { k: v for k, v in other.defaults.items() if k in argument_names } for k in cls._attributes_kwargs: if ((k not in other._attributes_kwargs or k in kwargs)): continue kwargs[k] = getattr(other, k) args = argparse.Namespace() for k, v in other.args.items(): if k not in argument_names: continue setattr(args, other.prefix + k + other.suffix, v) out = cls.from_args(args, name=name, **kwargs) for k in cls._attributes_copy: if hasattr(other, k): setattr(out, k, getattr(other, k)) return out
[docs] @classmethod def from_args(cls, args, name=None, overwrite=False, dont_update=False, **kwargs): r"""Create an instance on the provided arguments, first checking if one already exists. Args: args (argparse.Namespace): Parsed arguments. name (str, optional): Name that should be used for the composite argument. overwrite (bool, optional): If True, overwrite the existing instance. dont_update (bool, optional): If True, don't set the argument attributes. **kwargs: Additional keyword arguments are passed to the class constructor. Returns: CompositeArgument: New or existing instance. """ if name is not None: return cls.class_factory(name).from_args( args, overwrite=overwrite, dont_update=dont_update, **kwargs) name = cls.name if overwrite: if dont_update: args = copy.deepcopy(args) cls._arguments.reset_args(args) inst = getattr(args, name, None) if not isinstance(inst, cls): if isinstance(inst, CompositeArgument): assert cls.is_factory_analogue(inst) if dont_update and not overwrite: args = copy.deepcopy(args) inst._arguments.reset_args(args) inst = cls(args, **kwargs) if not dont_update: inst.update_args(args) return inst
[docs] @classmethod def parse(cls, x, args, name=None, **kwargs): r"""Parse an argument. Args: x (object): Instance to parse. args (argparse.Namespace): Parsed arguments. name (str, optional): Name that should be used for the composite argument. **kwargs: Additional keyword arguments are passed to the from_args method. Returns: CompositeArgument: The parsed instance. """ if name is not None: return cls.class_factory(name).parse(x, args, **kwargs) return cls.composite_argument().parse(x, args, **kwargs)
[docs] def update_args(self, args, name=None): r"""Update a namespace with the parsed arguments. Args: args (argparse.Namespace): Parsed arguments. name (str, optional): Alternate name where the argument should be stored. """ if name is None: name = self.name setattr(args, self.name, self)
[docs] @classmethod def get_prefix(cls, name): r"""Get the prefix used for a variable. Args: name (str): Variable name. Returns: str: Variable prefix. """ if name == cls._name: out = '' else: out = name.rsplit(cls._name, 1)[0] if cls._name_as_prefix: out += cls._name return out
[docs] @classmethod def get_suffix(cls, name): r"""Get the suffix used for a variable. Args: name (str): Variable name. Returns: str: Variable suffix. """ if name == cls._name: out = '' else: out = name.split(cls._name, 1)[1] if cls._name_as_suffix: out = cls._name + out return out
[docs] def any_set(self, names): r"""Check if any of the specified arguments were set. Args: names (list): Argument names to check. Returns: bool: True if any of the arguments were set. """ return any(self.args[k] is not None for k in names)
[docs] def extract_unused(self, out, name): r"""Extract the equivalent value from an output. Args: out (object): Output. name (str): Argument to extract from out. Returns: object: Argument value. """ raise NotImplementedError(name)
[docs] def check_unused(self, name, output=NoDefault, matches=None, invalid=None): r"""Assert that unused arguments were not set. Args: name (str, list): Argument name(s) to check. output (object, optional): Output to try to extract arguments from. matches (dict, optional): Existing dictionary that matches should be added to. invalid (list, optional): Existing list that used arguments should be added to. Raises: AssertionError: If unused arguments were set and matches or invalid was not provided. """ nested = (matches is not None and invalid is not None) if matches is None: matches = {} if invalid is None: invalid = [] if isinstance(name, list): for k in name: self.check_unused(k, output=output, matches=matches, invalid=invalid) else: k = name if self.args[k] is not None: value = NoDefault if output is not NoDefault: try: value = self.extract_unused(output, k) matches[k] = value except NotImplementedError: pass if value is NoDefault or self.args[k] != value: invalid.append(k) if invalid and (not nested): invalid = { self.prefix + k + self.suffix: self.args[k] for k in invalid } raise AssertionError( f'Unused arguments set:\n{pprint.pformat(invalid)}\n' f'Did not match:\n{pprint.pformat(matches)}' )
[docs] def setdefaults(self, names): r"""Set defaults for missing arguments that were not set. Args: names (list): Argument names to set defaults for. Returns: bool: True if all defaults could be initialized, False otherwise. """ out = True for k in names: value = None if self.ignored(k): assert self.args[k] is None out = False continue elif self.args[k] is not None: continue elif k in self.defaults: value = self.defaults[k] elif self.base and self.base.args[k] is not None: value = self.base.args[k] elif self.optional: out = False continue elif k in self._defaults: value = self._defaults[k] else: out = False continue self._defaults_set.append(k) self.args[k] = value return out
[docs] def raw_args(self, name=None): r"""dict: Set of name/value pairs for arguments related to this argument.""" if name is None: prefix = self.prefix suffix = self.suffix else: prefix = self.get_prefix(name) suffix = self.get_suffix(name) return { (prefix + k + suffix): v for k, v in self.args.items() }
@property def value(self): r"""object: Parsed base argument value.""" if hasattr(self, self._name): return getattr(self, self._name) if not self.setdefaults([self._name]): return None return self.args[self._name] @property def string(self): r"""str: String representation of this variable.""" if self.is_wildcard(self._name): return '*' if not self.setdefaults([self._name]): return None if isinstance(self.args[self._name], str): return self.args[self._name] return None
[docs] def is_wildcard(self, k): r"""Check if an argument is a wildcard. Args: k (str, list): Argument(s) to check. Returns: bool: True if argument is wildcard. """ if isinstance(k, list): return any(self.is_wildcard(kk) for kk in k) return (self.args[k] == '*')
[docs] class OutputArgument(CompositeArgument): r"""Container for output arguments.""" _name = 'output' _defaults = { 'output': False, 'overwrite': False, 'dont_write': False, } _arguments_prefixed = [ (('--output', ), { 'nargs': '?', 'const': True, 'help': ( 'File where {description} should be loaded from or ' 'saved to. If passed without a filename, the filename ' 'will be generated based on other arguments.' ), }), (('--overwrite', ), { 'nargs': '?', 'const': True, 'choices': [True, 'force', 'local', 'force_local'], 'help': ( 'Overwrite any existing {prefix_dst}output{suffix_dst} ' 'file generated or passed to ' '\"--{prefix_arg}output{suffix_arg}\".'), }), (('--dont-write', ), { 'action': 'store_true', 'help': ( 'Don\'t write any output to ' '{prefix_dst}output{suffix_dst} on disk (even if an ' 'explict path is provided). The generated output will ' 'still be available during the Python session.' ), }), (('--make-test', ), { 'action': 'store_true', 'help': ( 'Once generated, copy the output file into the test ' 'data directory' ), }), ] _arguments_universal = [ (('--overwrite-all', ), { 'nargs': '?', 'const': True, 'choices': [True, 'force', 'local', 'force_local'], 'help': 'Overwrite all child components of the task', }), (('--dont-write-all', ), { 'action': 'store_true', 'help': 'Don\'t write any output to disk', }), ] _attributes_kwargs = CompositeArgument._attributes_kwargs + [ 'ext', 'base_output', 'base_prefix', 'base_suffix', 'directory', 'upstream', 'downstream', 'composite_param', 'merge_all', 'merge_all_output', 'exclude_dynamic_suffix_param', 'parts_generators', ] def __init__(self, args, ext=None, base_output=None, base_prefix=None, base_suffix=None, directory=None, upstream=None, downstream=None, composite_param=None, merge_all=None, merge_all_output=None, exclude_dynamic_suffix_param=None, parts_generators=None, **kwargs): if upstream is None: upstream = [] if downstream is None: downstream = {} if merge_all_output is None: merge_all_output = None if parts_generators is None: parts_generators = {} self.ext = ext self.base_output = base_output self.base_prefix = base_prefix self.base_suffix = base_suffix self._generated_path = None self.upstream = upstream self.downstream = downstream self._directory = directory self.composite_param = composite_param self.merge_all = merge_all self.merge_all_output = merge_all_output self.exclude_dynamic_suffix_param = exclude_dynamic_suffix_param self._parts_generators = parts_generators self._uncached_args = {} self.output_name = self.suffix.strip('_') if self.base_prefix is True: self.base_prefix = f'{self.output_name}_' if self.base_suffix is True: self.base_suffix = f'_{self.output_name}' if 'output' not in kwargs.get('defaults', {}): kwargs.setdefault('defaults', {}) if kwargs.get('optional', False): kwargs['defaults']['output'] = False else: kwargs['defaults']['output'] = True super(OutputArgument, self).__init__(args, **kwargs)
[docs] def reset(self, args, **kwargs): r"""Reinitialize the arguments used by this instance. Args: args (argparse.Namespace): Parsed arguments. **kwargs: Additional keyword arguments are passed to the base method. """ super(OutputArgument, self).reset(args, **kwargs) self._uncached_args.clear() if self.base_output is not None: if isinstance(self.base_output, OutputArgument): base_output = getattr( args, f'output_{self.base_output.output_name}') assert base_output is self.base_output else: base_output = getattr(args, f'output_{self.base_output}') self.base_output = base_output # If this fails it means that this output was initialized # before its base or the base was initialized with a copy # of args assert isinstance(self.base_output, OutputArgument) if self.composite_param is None: self.composite_param = ( [] if self.base_output is None else self.base_output.composite_param ) if self.merge_all is None: self._merge_all = ( False if self.base_output is None else self.base_output.merge_all ) if self._directory is None: assert self.suffix self._uncached_args['directory'] = os.path.join( args.output_dir, self.output_name) for k in self.composite_param: self._uncached_args[k] = getattr(args, k, None) if not self.generated: raise Exception(f'Filepath for \"{self.output_name}\" ' f'not generated: {self.path} ' f'{self.args["output"]}, ' f'{type(self.args["output"])}, ' f'{isinstance(self.args["output"], str)}') if self.generated: self.generate(args, reset=True)
@property def value(self): r"""object: Parsed base argument value.""" if not self.generated: return self.path return self.enabled @property def directory(self): r"""str: Directory where the generated file name should reside.""" if self._directory is not None: return self._directory assert self._uncached_args['directory'] return self._uncached_args['directory'] @property def is_test(self): r"""bool: True if the output points to the test directory.""" if isinstance(self.path, str): return self.path.startswith(cfg['directories']['test_output']) return False @property def make_test(self): r"""bool: True if the output is used as test_data.""" self.setdefaults(['make_test']) return self.args['make_test']
[docs] @classmethod def record_tests(cls, fname): r"""Record any tests associated with the provided file. Args: fname (str): File or file pattern to check for test counterparts. Returns: bool: True if there is a corresponding test. """ if (not fname.startswith(cfg['directories']['output'])): return False base = os.path.relpath(fname, cfg['directories']['output']) testfiles = cfg.getjson('files', 'testdata') if base in testfiles: return True fname_test = os.path.join( cfg['directories']['test_output'], base) files = glob.glob(fname_test) if files: cfg_updated = False for k in files: kbase = os.path.relpath( k, cfg['directories']['test_output']) if kbase in testfiles: continue testfiles.append(kbase) cfg_updated = True if cfg_updated: cfg.set('files', 'testdata', sorted(testfiles)) cfg.write() return True return False
[docs] @classmethod def create_test(cls, fname, overwrite=False): r"""Create a test by copying the provided output file to the corresponding test data directory. Args: fname (str): Output file to copy to the test directory. overwrite (bool, optional): If True, overwrite the existing file. """ assert os.path.isfile(fname) if not fname.startswith(cfg['directories']['output']): raise AssertionError( f"Cannot create a copy of the output \"{fname}\" " f"in the test directory because it is not in the " f"output directory \"{cfg['directories']['output']}\"" ) fname_test = fname.replace( cfg['directories']['output'], cfg['directories']['test_output'], ) if (not overwrite) and os.path.isfile(fname_test): return shutil.copy2(fname, fname_test) print(f'Created test data \"{fname_test}\"') assert cls.record_tests(fname) # Update cfg
@cached_property def enabled(self): r"""str: Output file""" self.setdefaults(['output']) return bool(self.args['output']) @cached_property def iterating_param(self): r"""list: Set of parameters that the output is a composite of.""" return [k for k in self.composite_param if self.is_iterating(k)] @cached_property def unmerged_param(self): r"""list: Set of parameters that the output is unmerged for.""" return [k for k in self.composite_param if self.is_unmerged(k)] @cached_property def merged_param(self): r"""list: Set of parameters that the output is merged for.""" return [k for k in self.composite_param if self.is_merged(k)]
[docs] def is_iterating(self, k): r"""Check if a parameter's value indicates the output is an composite of other outputs. Args: k (str): Parameter name. Returns: bool: True if the output is an composite of other outputs. """ if k not in self.composite_param: return False return (self._uncached_args[k] == 'all' or (isinstance(self.merge_all, str) and self._uncached_args[k] == self.merge_all))
[docs] def is_unmerged(self, k): r"""Check if a parameter's value indicates the output is an unmerged composite of other outputs. Args: k (str): Parameter name. Returns: bool: True if the output is an unmerged composite of other outputs. """ if k not in self.composite_param: return False if self.merge_all is False: return (self._uncached_args[k] == 'all') elif self.merge_all is True: return False else: return (self._uncached_args[k] == 'all' and self._uncached_args[k] != self.merge_all)
[docs] def is_merged(self, k): r"""Check if a parameter's value indicates the output is an merged composite of other outputs. Args: k (str): Parameter name. Returns: bool: True if the output is an merged composite of other outputs. """ if k not in self.composite_param: return False if self.merge_all is False: return False elif self.merge_all is True: return (self._uncached_args[k] == 'all') else: return (self._uncached_args[k] == self.merge_all)
@cached_property def generated(self): r"""bool: True if the file name is generated.""" self.setdefaults(['output']) return (not isinstance(self.args['output'], str)) @cached_property def parts_generators(self): r"""dict: Generator methods for file name parts.""" task = self.task_class out = self._parts_generators for k in ['directory', 'base', 'suffix', 'ext']: if k in out: continue if hasattr(task, f'_output_{k}'): out[k] = getattr(task, f'_output_{k}') # TODO: Handle case of using non-generated file name # if 'base' not in out and self.base_output is not None: # out['base'] = self._base_generator # # if 'suffix' in out: # # base_suffix = self.base_output.parts_generators.get( # # 'suffix', None) return out @cached_property def parts_defaults(self): r"""dict: Defaults for file name parts.""" assert self.directory out = { 'directory': self.directory, 'ext': self.ext, } return out
[docs] def assert_age_in_name(self, args): r"""Assert that the file name contains age information. Args: args (argparse.Namespace): Parsed arguments containing age. Raises: AssertionError: If the age string is not present. """ if args.age.string == 'maturity' or not self.path: return if args.age.string not in self.path: raise AssertionError(f'\"{self.output_name}\" filename does ' f'not contain age string ' f'\"{args.age.string}\": {self.path}')
@property def overwrite(self): r"""bool: True if overwrite set.""" self.setdefaults(['overwrite_all', 'overwrite']) if self.args['overwrite_all']: return self.args['overwrite_all'] return self.args['overwrite'] @property def overwrite_downstream(self): r"""bool: True if downstream files should be overwritten.""" if self.overwrite in ['local', 'force_local']: return False return (self.overwrite and self.downstream)
[docs] def clear_overwrite(self, args=None): r"""Clear the overwrite parameter. Args: args (argparse.Namespace, optional): Parsed arguments to clear. """ self.args['overwrite_all'] = False self.args['overwrite'] = False if args is not None: # setattr(args, 'overwrite_all', False) setattr(args, f'{self.prefix}overwrite{self.suffix}', False)
@property def dont_write(self): r"""bool: True if dont_write set.""" self.setdefaults(['dont_write_all', 'dont_write']) if self.args['dont_write_all']: return True return self.args['dont_write'] @property def path(self): r"""str: File name.""" if self.generated: if self._generated_path is not None: assert isinstance(self._generated_path, str) return self._generated_path assert isinstance(self.args['output'], (str, bool)) return self.args['output'] @property def exists(self): r"""bool: True if the file exists.""" fname = self.path if self.generated and fname is True: raise Exception(f'{self.name} file not initialized') return isinstance(fname, str) and os.path.isfile(fname)
[docs] def reset_generated(self, value=None): r"""Reset the generated path. Args: value (str, optional): New value for the generated path. """ if value is False: value = None if value is not None: assert self.generated assert isinstance(value, str) self._generated_path = value
[docs] def depends(self, arguments): r"""Check if the generated path depends on any of the listed arguments. Args: arguments (list): Set of arguments to check dependence on. Returns: bool: True if the path depends on any of the listed arguments. """ if isinstance(arguments, str): arguments = [arguments] if not (self.generated and arguments): return False suffix = self.parts_generators.get('suffix', None) suffix_args = suffix.depends(self.output_name) if bool(set(suffix_args) & set(arguments)): return True base = self.base_output if base is None: return False return base.depends(arguments)
@cached_property def task_class(self): r"""type: The task class that produces this output.""" return TaskBase.get_output_task(self.output_name) @cached_property def default_args(self): r"""argparse.Namespace: Default arguments.""" return self.task_class.copy_external_args(initialize=True)
[docs] def remove_downstream(self, args, removed=None, wildcards=None, args_copied=False, **kwargs): r"""Remove downstream outputs. Args: args (argparse.Namespace): Parsed arguments. removed (list, optional): List of outputs that have already been removed (to prevent duplication of effort). wildcards (list, optional): List of arguments that wildcards should be used for in the generated output file name. args_copied (bool, optional): If True, the arguments have already been copied. **kwargs: Additional keyword arguments are passed to nested calls to 'remove' for downstream outputs. """ if not self.downstream: return if removed is None: removed = [] if wildcards is None: wildcards = [] wildcards = wildcards + self.iterating_param task = self.task_class task.log_class(f"REMOVING DOWNSTREAM {self.output_name}:\n" f"{pprint.pformat(self.downstream)}") wildcards = set(wildcards) args_base = set(task.argument_names()) # Use task for name? for ext, outputs in self.downstream.items(): if all(k in removed for k in outputs): continue ext_wildcards = list( wildcards | (set(ext.argument_names()) - args_base) ) ext_args = args if getattr(args, f'output_{ext._name}', None) is None: if args_copied: ext.complete_external_args(ext_args) else: ext_args = ext.copy_external_args( args, initialize=True, ) args_copied = True for k in outputs: if k in removed: continue output = getattr(ext_args, f'output_{k}') output.remove( args=ext_args, wildcards=ext_wildcards, **kwargs ) removed.append(k) output.remove_downstream( ext_args, removed=removed, wildcards=ext_wildcards, args_copied=args_copied, **kwargs )
[docs] def remove(self, args=None, wildcards=None, skipped=None, force=False, skip_test_output=False): r"""Remove the output file. Args: args (argparse.Namespace, optional): Parsed arguments. Only required if the path has not been set or wildcards are provided. wildcards (list, optional): List of arguments that wildcards should be used for in the generated output file name. skipped (list, optional): List of arguments that should be skipped in the generated output file name. force (bool, optional): If True, remove without asking for user input. skip_test_output (bool, optional): If True, don't include both test output and generated file name. """ if wildcards is None: wildcards = [] wildcards = wildcards + self.iterating_param if wildcards or skipped or self.path is None: assert args is not None fname = self.generate(args, wildcards=wildcards, skipped=skipped) else: fname = self.path if not fname: return fnames = [fname] if not skip_test_output: self.record_tests(fname) if fname.startswith(cfg['directories']['output']): fnames.append(fname.replace( cfg['directories']['output'], cfg['directories']['test_output'] )) elif fname.startswith(cfg['directories']['test_output']): fnames.append(fname.replace( cfg['directories']['test_output'], cfg['directories']['output'] )) files = [] for x in fnames: files += glob.glob(x) if not files: return if not (force or self.overwrite in ['force', 'force_local']): if not utils.input_yes_or_no( f'Remove existing \"{self.output_name}\" ' f'output(s):\n{pprint.pformat(files)}?'): return for x in files: os.remove(x)
def _base_generator(self, args, name, **kwargs): assert name == self.output_name base = self.base_output assert base is not None return base.generate(args, **kwargs)
[docs] def generate(self, args, reset=False, wildcards=None, skipped=None): r"""Generate the file name. Args: args (argparse.Namespace): Parsed arguments that file name should be generated from. reset (bool, optional): If True, reset the generated path on the instance. wildcards (list, optional): List of arguments that wildcards should be used for in the generated output file name. skipped (list, optional): List of arguments that should be skipped in the generated output file name. Returns: str: Generated file name. """ from canopy_factory.utils import FilenameGenerationError task = self.task_class assert self.generated assert self.output_name in task._outputs_local try: fname = self._generate(args, wildcards=wildcards, skipped=skipped) except FilenameGenerationError as e: task.log_class(f"FILENAME GENERATION ERROR: {task} {e}") self.args['dont_write'] = True fname = False if reset: assert not wildcards self.reset_generated(value=fname) # task.log_class(f'Generated \"{self.output_name}\" ' # f'path: {self.path}') return fname
def _generate(self, args, wildcards=None, skipped=None): from canopy_factory.utils import FilenameGenerationError if self.exclude_dynamic_suffix_param: var = getattr(args, self.exclude_dynamic_suffix_param) assert isinstance(var, str) if skipped is None: skipped = [] skipped = skipped + [var] iparts = {} for k, v in self.parts_generators.items(): if k in iparts: continue iparts[k] = v(args, self.output_name, wildcards=wildcards, skipped=skipped) if iparts[k] is None: del iparts[k] for k, v in self.parts_defaults.items(): if k not in iparts: iparts[k] = v disabled = [k for k, v in iparts.items() if v is False] if disabled: raise FilenameGenerationError( f'\"{self.output_name}\" output disabled by ' f'\"{disabled}\"' ) if self.base_prefix: iparts['prefix'] = self.base_prefix + iparts.get('prefix', '') if self.base_suffix: iparts['suffix'] = iparts.get('suffix', '') + self.base_suffix base = iparts.pop('base', '') if ((iparts.get('prefix', '') and not base and ((not iparts.get('suffix', '')) or iparts.get('suffix', '').startswith('_')))): iparts['prefix'] = iparts['prefix'].rstrip('_') if iparts.get('suffix', '') and not (base or iparts.get('prefix', '')): iparts['suffix'] = iparts['suffix'].lstrip('_') fname = utils.generate_filename(base, **iparts) if wildcards: while '**' in fname: fname = fname.replace('**', '*') return fname
[docs] def complete_output(self, fname, created=False): r"""Perform tasks to finalize output. Args: fname (str): Full path to created file. created (bool, optional): If True, the output has just been created. """ if not (self.make_test or self.record_tests(fname)): return self.create_test(fname, overwrite=created)
[docs] class ColorArgument(CompositeArgument): r"""Container for color arguments.""" _name = 'color' _defaults = { 'color': 'transparent', } _arguments_prefixed = [ (('--color', ), { 'type': utils.parse_color, 'help': ( 'Color name or tuple of RGBA values expressed as float ' 'in the range [0, 1]{description}.' ), }), ] @cached_property def color(self): r"""tuple: RGBA color tuple.""" if not self.setdefaults(['color']): return None return utils.parse_color(self.args['color'], convert_names=True)
[docs] class ColorMapArgument(CompositeArgument): r"""Container for colormap arguments.""" _name = 'colormap' _defaults = { 'colormap': 'YlGn_r', 'colormap_scaling': 'linear', } _arguments_prefixed = [ (('--colormap', ), { 'type': str, # 'choices': plt.colormaps(), 'help': ( 'Name of the matplotlib color map that should be ' 'used to map values to colors{description}' ), }), (('--color-vmin', ), { 'type': parse_quantity, 'help': ( 'Value that should be mapped to the lowest ' 'color in the colormap{description}' ), }), (('--color-vmax', ), { 'type': parse_quantity, 'help': ( 'Value that should be mapped to the highest ' 'color in the colormap{description}' ), }), (('--colormap-scaling', ), { 'type': str, 'choices': ['linear', 'log'], 'help': ( 'Scaling that should be used to map values ' 'to colors in the colormap{description}' ), }), ] @property def colormap(self): r"""str: Colormap name.""" assert self.setdefaults(['colormap']) return self.args['colormap'] @cached_property def scaling(self): r"""str: Colormap scaling.""" assert self.setdefaults(['colormap_scaling']) return self.args['colormap_scaling'] @property def vmin(self): r"""units.Quantity: Minimum value mapped to colormap.""" if not self.setdefaults(['color_vmin']): return None return self.args['color_vmin'] @vmin.setter def vmin(self, value): self.args['color_vmin'] = value @property def vmax(self): r"""units.Quantity: Maximum value mapped to colormap.""" if not self.setdefaults(['color_vmax']): return None return self.args['color_vmax'] @vmax.setter def vmax(self, value): self.args['color_vmax'] = value @property def limits_defined(self): r"""bool: True if the limits are defined.""" return self.setdefaults(['color_vmin', 'color_vmax'])
[docs] class AxisArgument(CompositeArgument): r"""Container for parsing an axis argument.""" _name = 'axis' _arguments_prefixed = [ (('--axis', ), { 'type': 'str', 'help': 'Name or vector{description}', }), ] @property def string(self): r"""str: String representation of this variable.""" if not self.setdefaults(['axis']): return None if self.args['axis'] in utils._axis_map: return self.args['axis'] return None @cached_property def value(self): r"""object: Parsed base argument value.""" if not self.setdefaults(['axis']): return None return utils.parse_axis(self.args['axis'])
# class AxesArgument(CompositeArgument): # r"""Container for parsing axes arguments.""" # _name = 'axes' # _defaults = { # 'up': 'y', # 'north': 'x', # 'east': 'z', # } # _arguments_prefixed = [ # (('--axis-up', ), { # 'type': AxisArgument, # 'help': ( # 'Axis name or vector for up direction{description}.' # ), # }), # (('--axis-east', ), { # 'type': AxisArgument, # 'help': ( # 'Axis name or vector for east direction{description}.' # ), # }), # (('--axis-north', ), { # 'type': AxisArgument, # 'help': ( # 'Axis name or vector for north direction{description}.' # ), # }), # ] # # def __init__(self, args, **kwargs): # # super(AxesArgument, self).__init__(args, **kwargs) # # if self.args['axis_up'] # class CameraArgument(CompositeArgument): # r"""Container for parsing camera arguments.""" # _name = 'camera' # _name_as_prefix = True # _defaults = { # 'lens': 'projection', # 'direction': 'downsoutheast', # 'fov_width': utils.parse_quantity(45.0, 'degrees'), # 'fov_height': utils.parse_quantity(45.0, 'degrees'), # } # _arguments_prefixed = [ # (('--lens', ), { # 'type': str, # 'choices': ['projection', 'orthographic'], # 'spherical'], # 'help': 'Type of camera{description}', # }), # (('--direction', ), { # 'type': str, # 'help': ( # 'Direction that camera should face. If not ' # 'provided, the camera will point to the center of ' # 'the scene from its location.', # ), # }), # (('--fov-width', ), { # 'units': 'degrees', # 'help': ( # 'Angular width of the camera\'s field of view (in ' # 'degrees) for a projection camera.' # ), # }), # (('--fov-height', ), { # 'units': 'degrees', # 'help': ( # 'Angular height of the camera\'s field of view (in ' # 'degrees) for a projection camera.' # ), # }), # (('--up', ), { # 'type': str, # 'help': ( # 'Up direction for the camera. If not provided, the ' # 'up direction for the scene will be assumed.' # ), # }), # (('--location', ), { # 'type': str, # 'help': ( # 'Location of the camera. If not provided, one will ' # 'be determined that captures the entire scene from ' # 'the provided camera direction. If a direction is ' # 'also not provided, the camera will be centered ' # 'on the center of the scene facing down, ' # 'southeast at a distance that captures the entire ' # 'scene. If \"maturity\" is specified, the location will ' # 'be set for the mature plant (only valid for generated ' # 'meshes).' # ), # }), # ] # def __init__(self, args, **kwargs): # super(CameraArgument, self).__init__(args, **kwargs) # if ((self.args['direction'] is None # and self.args['location'] is None)): # self.setdefaults(['direction']) # # TODO # @cached_property # def direction(self): # r"""np.ndarray: Unit vector for camera's pointing direction.""" # if self.args['direction'] is not None: # return
[docs] class AgeArgument(CompositeArgument): r"""Container for parsing age arguments.""" _name = 'age' _defaults = { 'age': 'maturity', } _age_strings = [ 'planting', 'maturity', 'senesce', 'remove', ] _arguments_prefixed = [ (('--age', ), { 'units': 'days', 'choices': _age_strings, 'help': 'Plant age (in days since planting){description}.', }), ] _attributes_copy = [ '_parameter_inst', '_parameter_inst_function', ] def __init__(self, args, **kwargs): self._time_argument = None self._parameter_inst = None self._parameter_inst_function = None super(AgeArgument, self).__init__(args, **kwargs)
[docs] def reset(self, args, **kwargs): r"""Reinitialize the arguments used by this instance. Args: args (argparse.Namespace): Parsed arguments. **kwargs: Additional keyword arguments are passed to the base method. """ super(AgeArgument, self).reset(args, **kwargs) self._parameter_inst = None self._parameter_inst_function = getattr( args, '_parameter_inst_function', None)
[docs] @classmethod def is_crop_age(cls, x): r"""Check if a string is a named crop age. Args: x (str): Value to check. Returns: bool: True if x is a named crop age. """ return (x in cls._age_strings)
@property def parameter_inst(self): r"""ParametrizeCropTask: Instance for calculating ages.""" if ((self._parameter_inst is None and self._parameter_inst_function is not None)): self._parameter_inst = self._parameter_inst_function() assert self._parameter_inst is not None return self._parameter_inst @property def requires_parameter_inst(self): r"""bool: True if a parameter instance is required.""" return self.is_crop_age(self.args['age'])
[docs] def get_crop_age(self, x, return_quantity=False): r"""Get a named crop age as a timedelta. Args: x (str): Named crop age. Returns: timedelta: Age as a time delta. """ out = self.parameter_inst.get_age(x) if return_quantity: return out return utils.quantity2timedelta(out)
@cached_property def crop_age_string(self): r"""str: Crop-based age string.""" self.setdefaults(['age']) if self.is_crop_age(self.args['age']): return self.args['age'] return None @cached_property def string(self): r"""str: String representation of the age.""" if self.is_wildcard('age'): return '*' if self.crop_age_string: return self.crop_age_string if self.age is None: return None return f'{int(self.age.to("days"))}days' @cached_property def age(self): r"""units.Quantity: Days since planting.""" if self._time_argument is not None: return self._time_argument.age if not self.setdefaults(['age']): return None if self.is_crop_age(self.args['age']): return self.get_crop_age(self.args['age'], return_quantity=True) return self.args['age']
[docs] class TimeArgument(AgeArgument): r"""Container for parsing time arguments.""" _name = 'time' _defaults = dict( AgeArgument._defaults, **{ 'hour': 'noon', 'timezone': pytz.timezone("America/Chicago"), # 'doy': 169, # 06/17 # 'doy': 173, # 06/21 'doy': 'summer_solstice', 'year': 2024, } ) _arguments_prefixed = AgeArgument._arguments_prefixed + [ (('--time', '-t'), { 'type': utils.DatetimeArgument( ["now"] + utils.SolarModel._solar_times + utils.SolarModel._solar_dates), 'help': ( 'Date time (in any ISO 8601 format){description}.' 'If time information is not provided, the provided ' '\"--{prefix_arg}hour{suffix_arg}\" will be used. ' 'If \"now\" is specified, the current date and time ' 'will be used. ' 'The any of the solar string values for ' '\"--{prefix_arg}hour{suffix_arg}\" or ' '\"--{prefix_arg}doy{suffix_arg}\" is specified, ' 'it will be transfered to the corresponding argument.' ), }), (('--hour', '--hr', ), { 'type': utils.parse_solar_time, 'help': ( 'Hour{description}. If provided with ' '\"--{prefix_arg}time{suffix_arg}\", any hour ' 'information in the specified time will be overwritten. ' 'If any of ' ) + ( ", ".join( [f'"{x}"' for x in utils.SolarModel._solar_times]) ) + ( ' are provided, the time will be calculated from the ' 'provided date and location. ' 'Defaults to \"noon\" if ' '\"--{prefix_arg}doy{suffix_arg}\" is ' 'provided, but \"--{prefix_arg}hour{suffix_arg}\" is ' 'not.' ), }), (('--date', ), { 'type': utils.DatetimeArgument( ["now"] + utils.SolarModel._solar_dates), 'help': ( 'Date that should be used with ' '\"--{prefix_arg}hour{suffix_arg}\" or ' '\"--{prefix_arg}time{suffix_arg}\" ' 'if a string is provided describing the time of day ' '(e.g. \"sunrise\", \"noon\", \"sunset\"). ' 'If provided, \"--{prefix_arg}doy{suffix_arg}\" and ' '\"--{prefix_arg}year{suffix_arg}\" will not be used. ' ), }), (('--doy', ), { 'type': utils.parse_solar_date, 'help': ( 'Day of the year{description}. ' 'If any of ' ) + ( ", ".join( [f'"{x}"' for x in utils.SolarModel._solar_dates]) ) + ( ' are provided, the date will be calculated from the ' 'provided year and location.' ), }), (('--year', ), { 'type': int, 'help': ( 'Year{description}. If provided ' 'with \"--{prefix_arg}time{suffix_arg}\", ' 'the year in the time string(s) ' 'will be overwritten. Defaults to the current year ' 'if \"--{prefix_arg}doy{suffix_arg}\" is provided, ' 'but \"--{prefix_arg}year{suffix_arg}\" is not.' ), }), ] _arguments_universal = AgeArgument._arguments_universal + [ (('--planting-date', ), { 'type': utils.parse_datetime, 'help': ( 'Date time (in any ISO 8601 format) on which the crop ' 'was planted (used to calculate age from time or time ' 'from age.' ), }), (('--timezone', '--tz', ), { 'type': utils.ChoiceArgument(pytz.timezone, type=datetime.tzinfo), 'help': ( 'Name of timezone (as accepted by pytz){description}. ' 'If provided ' 'with \"--{prefix_arg}time{suffix_arg}\", ' 'any timezone information in the ' 'specified time(s) will be overwritten. Defaults ' 'to \"America/Chicago\" if \"--doy\" is provided, ' 'but \"--timezone\" is not.' ), }), ] _attributes_copy = AgeArgument._attributes_copy + [ 'ignore_date', 'ignore_age', ] def __init__(self, args, **kwargs): self.ignore_date = False self.ignore_age = False self.location = None super(TimeArgument, self).__init__(args, **kwargs)
[docs] def reset(self, args, **kwargs): r"""Reinitialize the arguments used by this instance. Args: args (argparse.Namespace): Parsed arguments. **kwargs: Additional keyword arguments are passed to the base method. """ super(TimeArgument, self).reset(args, **kwargs) self.location = None if getattr(args, 'location', None): assert isinstance(args.location, LocationArgument) self.location = args.location self.args['timezone'] = args.location.timezone if self.is_solar_time(self.args['time']): assert self.args['hour'] in [None, self.args['time']] self.args['hour'] = self.args['time'] self.args['time'] = None elif self.is_solar_date(self.args['time']): assert self.args['doy'] is None self.args['doy'] = self.args['time'] self.args['time'] = None elif self.args['time'] == 'now': self.args['time'] = datetime.datetime.now() elif ((isinstance(self.args['time'], str) and not self.is_wildcard('time'))): self.args['time'] = datetime.datetime.fromisoformat( self.args['time']) if utils.is_date(self.args['time']): assert self.args['date'] is None self.args['date'] = self.args['time'] self.args['time'] = None if self.is_solar_date(self.args['date']): assert self.args['doy'] is None self.args['doy'] = self.args['date'] self.args['date'] = None elif self.args['date'] == 'now': self.args['date'] = utils.to_date(datetime.datetime.now()) elif self.is_crop_age(self.args['date']): assert not self.ignored('age') assert self.args['age'] is None or 'age' in self._defaults_set self.args['age'] = self.args['date'] if self.args['age'] == 'planting': self.defaults['hour'] = 'noon' self.args['date'] = None if ((isinstance(self.args['timezone'], str) and not self.is_wildcard('timezone'))): self.args['timezone'] = pytz.timezone(self.args['timezone']) if ((self.args['age'] is None and self.args['planting_date'] is None)): self.setdefaults(['age']) if (((not self.ignored('planting_date')) and isinstance(self.args['planting_date'], str) and not self.is_wildcard('planting_date'))): self.args['planting_date'] = datetime.datetime.fromisoformat( self.args['planting_date']) assert utils.is_date(self.args['planting_date']) if ((self.crop_age_string == 'planting' and self.args['planting_date'] is None)): self.args['planting_date'] = self.date
[docs] def setarg(self, name, value): r"""Set an argument value. Args: name (str): Argument name. value (object): Argument value. """ if name == 'crop_age_string': name = 'age' elif name == 'solar_date_string': name = 'doy' elif name == 'solar_time_string': name = 'hour' super(TimeArgument, self).setarg(name, value)
[docs] def getarg(self, name, default=NoDefault): r"""Get an argument value. Args: name (str): Argument name. default (object, optional): Value to return if argument does not exist. Returns: object: Argument value. Raises: KeyError: If the argument does not exist and default is not provided. """ if name == 'time' and self.solar_time_string is not None: name = 'hour' return super(TimeArgument, self).getarg(name, default=default)
[docs] @classmethod def reset_args(cls, name, args, value=NoDefault): r"""Reset the arguments for a variable. Args: name (str): Variable name. args (argparse.Namespace): Parsed arguments. value (object, optional): Value that the parsed variable should be reset to. """ age_value = NoDefault if value is NoDefault else None super(TimeArgument, cls).reset_args(name, args, value=value) prefix = cls.get_prefix(name) suffix = cls.get_suffix(name) AgeArgument.reset_args(prefix + 'age' + suffix, args, value=age_value)
[docs] def update_args(self, args, name=None): r"""Update a namespace with the parsed time arguments. Args: args (argparse.Namespace): Parsed arguments. name (str, optional): Alternate name where the argument should be stored. """ if name is None: prefix = self.prefix suffix = self.suffix else: prefix = self.get_prefix(name) suffix = self.get_suffix(name) super(TimeArgument, self).update_args(args, name=name) if not self.ignored('age'): age_inst = AgeArgument.from_other(self) age_inst._time_argument = self setattr(args, f'{prefix}age{suffix}', age_inst)
[docs] def extract_unused(self, out, name): r"""Extract the equivalent value from an output. Args: out (object): Output. name (str): Argument to extract from out. Returns: object: Argument value. """ if isinstance(out, datetime.datetime): if name in ['year', 'hour']: return getattr(out, name) elif name == 'timezone': return out.tzinfo elif name == 'doy': doy = int(out.strftime('%j')) if self.solar_date_string and self.location is not None: solar_model = self.location.create_solar_model(out) solar_date = solar_model.solar_date( self.solar_date_string) if doy == int(solar_date.strftime('%j')): return self.solar_date_string return doy elif name == 'date': return utils.to_date(out) return super(TimeArgument, self).extract_unused(out, name)
[docs] def iteration_args(self, dt=None, include_bookends=False, dont_age=False): r"""Arguments that should be passed to represent this time in an iteration. Args: dt (units.Quantity, optional): Time interval to apply. include_bookends (bool, optional): If True, the keys in the returned arguments should include the prefix & suffix. dont_age (bool, optional): If True, don't change the age. Returns: dict: Arguments. """ if dt is None: dt = units.Quantity(0.0, 'days') dt_null = (dt == units.Quantity(0.0, 'days')) dt_days = ((dt % units.Quantity(1.0, 'days')) == units.Quantity(0.0, 'days')) out = { 'time': self.time + utils.quantity2timedelta(dt), 'date': None, 'age': None, 'planting_date': None, } for k in self._arguments_prefixed.keys(): out.setdefault(k, None) if self.solar_time_string and (dt_null or dt_days): out.update( time=self.solar_time_string, ) if self.solar_date_string and dt_null: out.update( date=self.solar_date_string, ) else: out.update( date=(self.date + utils.quantity2timedelta(dt)), ) if self.crop_age_string and (dt_null or dont_age): out.update( age=self.crop_age_string, ) elif dont_age: out.update( age=self.age, ) elif self.args['age'] is not None: out.update( age=(self.age + dt), ) else: out.update( planting_date=self.planting_date, ) if not include_bookends: return out return {f'{self.prefix}{k}{self.suffix}': v for k, v in out.items()}
[docs] @classmethod def is_solar_time(cls, x): r"""Check if a string is a named solar time. Args: x (str): Time to check. Returns: bool: True if x is a named solar time. """ return (x in utils.SolarModel._solar_times)
[docs] @classmethod def is_solar_date(cls, x): r"""Check if a string is a named solar date. Args: x (str): Time to check. Returns: bool: True if x is a named solar date. """ return (x in utils.SolarModel._solar_dates)
@cached_property def solar_time_string(self): r"""str: Solar time string.""" if self.args['time'] is not None: return None if not self.setdefaults(['hour']): return None if self.is_solar_time(self.args['hour']): return self.args['hour'] return None @cached_property def solar_date_string(self): r"""str: Solar date string.""" if ((self.args['time'] is None and self.args['date'] is None and ((self.args['planting_date'] is not None and self.args['age'] is not None) or (not self.setdefaults(['doy']))))): return None if self.is_solar_date(self.args.get('doy', None)): return self.args['doy'] return None @cached_property def string(self): r"""str: String representation of the time.""" out = '' if self.is_wildcard(['time', 'date', 'hour', 'year', 'timezone']): out += '*' return out if self.time is None: return None x = self.time.replace(microsecond=0) if self.location is not None: x = x.replace(tzinfo=None) time_string = x.timetz().isoformat().replace(':', '-') date_string = x.date().isoformat() sep = 'T' if self.solar_time_string: time_string = self.solar_time_string sep = '-' if self.solar_date_string: date_string = self.solar_date_string sep = '-' if not self.ignore_date: out += date_string + sep out += time_string if not (self.ignored('age') or self.ignore_age): out += '_' + super(TimeArgument, self).string return out @cached_property def date(self): r"""datetime.datetime: Parsed date instance.""" if self.args['time'] is not None: return utils.to_date(self.time) elif self.args['date'] is not None: if isinstance(self.args['date'], str): self.args['date'] = datetime.datetime.fromisoformat( self.args['date']) out = self.args['date'] assert utils.is_date(out) self.check_unused(['year', 'doy'], out) return out elif ((self.args['planting_date'] is not None and self.args['age'] is not None)): out = self.planting_date + utils.quantity2timedelta(self.age) out = utils.to_date(out) self.check_unused(['year', 'doy'], out) return out elif self.base and not self.any_set(['year', 'doy']): return copy.deepcopy(self.base.date) if not self.setdefaults(['year', 'doy']): return None if self.solar_date_string: assert self.location is not None if not self.setdefaults(['timezone']): return None date = datetime.datetime( month=1, day=1, # Month & day unused year=self.args['year'], tzinfo=self.args['timezone'], ) solar_model = self.location.create_solar_model(date) return solar_model.solar_date(self.solar_date_string) return datetime.datetime.strptime( f"{self.args['year']}-{self.args['doy']}", "%Y-%j" ) @cached_property def time(self): r"""datetime.datetime: Parsed time instance.""" if self.args['time'] is not None: if self.args['time'].tzinfo is None: self.setdefaults(['timezone']) self.args['time'] = self.args['time'].astimezone( self.args['timezone']) else: self.setdefaults(['timezone']) x = self.args['time'].astimezone(self.args['timezone']) assert x == self.args['time'] self.args['time'] = x out = self.args['time'] self.check_unused(['date', 'hour', 'year', 'doy'], out) return out date = self.date if date is None or not self.setdefaults(['timezone', 'hour']): return None date = date.astimezone(self.args['timezone']) if self.solar_time_string: assert self.location is not None solar_model = self.location.create_solar_model(date) return solar_model.solar_time(self.solar_time_string) return date.replace(hour=self.args['hour']) @cached_property def age(self): r"""units.Quantity: Days since planting.""" if self.args['age'] is not None: if self.is_crop_age(self.args['age']): return self.get_crop_age(self.args['age'], return_quantity=True) return self.args['age'] if self.args['planting_date'] is None: return None if self.date is None: return None diff = self.date - self.planting_date return utils.timedelta2quantity(diff) @cached_property def planting_date(self): r"""datetime.datetime: Planting date.""" if self.args['planting_date'] is not None: return self.args['planting_date'] if self.time is None or self.age is None: return None out = self.time - utils.quantity2timedelta(self.age) return utils.to_date(out) @cached_property def solar_model(self): r"""SolarModel: Solar model.""" if self.time is None or self.location is None: return None return self.location.create_solar_model(self.time)
[docs] class LocationArgument(CompositeArgument): r"""Container for parsing location arguments.""" _name = 'location' _defaults = { 'location': 'Champaign', 'latitude': utils.parse_quantity(40.1164, 'degrees'), 'longitude': utils.parse_quantity(-88.2434, 'degrees'), 'temperature': utils.parse_quantity(12.0, 'degC'), # Move this? 'altitude': utils.parse_quantity(224.0, 'meters'), } _arguments_prefixed = [ (('--location', ), { 'type': str, 'choices': sorted(list(utils.read_locations().keys())), 'help': ('Name of a registered location that should be used ' 'to set the location dependent properties: ' 'timezone, altitude, longitude, latitude'), }), (('--latitude', '--lat', ), { 'units': 'degrees', 'help': ('Latitude (in degrees) at which the sun should be ' 'modeled. Defaults to the latitude of Champaign ' 'IL.'), }), (('--longitude', '--long', ), { 'units': 'degrees', 'help': ('Longitude (in degrees) at which the sun should be ' 'modeled. Defaults to the longitude of Champaign ' 'IL.'), }), (('--altitude', '--elevation', ), { 'units': 'meters', 'help': ('Altitude (in meters) that should be used for ' 'solar light calculations. If not provided, it ' 'will be calculated from \"pressure\", if it is ' 'provided, and the elevation of Champaign, IL ' 'otherwise.'), }), (('--pressure', ), { 'units': 'Pa', 'help': ('Air pressure (in Pa) that should be used for ' 'solar light calculations. If not provided, it ' 'will be calculated from \"altitude\".'), }), # TODO: This should depend on time (('--temperature', ), { 'units': 'degC', 'help': ('Air temperature (in degrees C) that should be ' 'used for solar light calculations.'), }), ] def __init__(self, args, **kwargs): self.timezone = None super(LocationArgument, self).__init__(args, **kwargs)
[docs] def reset(self, args, **kwargs): r"""Reinitialize the arguments used by this instance. Args: args (argparse.Namespace): Parsed arguments. **kwargs: Additional keyword arguments are passed to the base method. """ super(LocationArgument, self).reset(args, **kwargs) if self.ignored('location'): return self.setdefaults(['location']) self.timezone = None if ((isinstance(self.args['location'], str) and not self.is_wildcard('location'))): location_data = utils.read_locations()[ self.args['location']] for k, v in location_data.items(): if k == 'name': continue elif k == 'timezone': self.timezone = pytz.timezone(v) continue self.args[k] = self._arguments[k].parse(v)
@property def is_northern_hemisphere(self): r"""bool: True if the latitude is in the norther hemisphere.""" if not self.setdefaults(['latitude']): return True latitude = self.args['latitude'] if not isinstance(latitude, units.Quantity): latitude = units.Quantity(latitude, "degrees") return (latitude > units.Quantity(0, 'degrees'))
[docs] def create_solar_model(self, time, **kwargs): r"""Create a solar model for this location. Args: time (datetime.datetime): Time that the model should be created for. **kwargs: Additional keyword arguments are passed to the utils.SolarModel constructor after being augmented with missing location data from this argument. Returns: utils.SolarModel: Solar model. """ if not (self.args['altitude'] or self.args['pressure']): self.setdefaults(['altitude']) self.setdefaults(['latitude', 'longitude', 'temperature']) for k in ['latitude', 'longitude', 'altitude', 'pressure', 'temperature']: kwargs.setdefault(k, self.args[k]) latitude = kwargs.pop('latitude') longitude = kwargs.pop('longitude') return utils.SolarModel(latitude, longitude, time, **kwargs)
@cached_property def string(self): r"""str: String representation of the location.""" out = '' if ((self.args['location'] is not None and self.args['location'] != self._defaults['location'])): out += f"{self.args['location']}-" return out
[docs] class LightArgument(CompositeArgument): r"""Container for parsing light arguments.""" _name = 'light' _defaults = {} _arguments_prefixed = [ (('--radiant-flux', '--power', ), { 'units': 'W', 'help': 'Amount power{description} emitted as radiation.', }), (('--luminous-flux', ), { 'units': 'lm', 'help': ( 'Perceived amount of visible light{description} ' 'emitted (in lumens)' ) }), (('--luminous-efficacy', ), { 'units': 'lm/W', 'help': 'Ratio of luminous flux to radiant flux{description}', }), (('--eta-par', ), { 'type': float, 'help': ( 'Fraction of radiant flux{description} that is ' 'photosynthetically active (wavelengths 400–700 nm).' ) }), (('--eta-photon', ), { 'type': float, 'help': ( 'Average number of photons per photosynthetically ' 'activate unit of radiation (in µmol s−1 W−1)' '{description}.' ), }), (('--par-flux', '--par'), { 'units': 'W', 'help': ( 'Amount of radiant flux{description} emitted as ' 'photosynthetically active radiation (wavelengths ' '400–700 nm, in W).' ), }), (('--irradiance', ), { 'units': 'W m-2', 'help': ( 'Flux density{description} (in W m-2)' ) }), (('--par-irradiance', ), { 'units': 'W m-2', 'help': ( 'Flux density{description} emitted as ' 'photosynthetically active radiation (wavelengths ' '400–700 nm, in W m-2)' ) }), (('--ppf', ), { 'units': 'µmol s−1', 'help': ( 'Flux of photosynthetically reactive photons' '{description} (wavelengths 400–700 nm, in µmol s−1).' ) }), (('--ppfd', ), { 'units': 'µmol s−1 m-2', 'help': ( 'Flux density of photosynthetically reactive photons' '{description} (wavelengths 400–700 nm, in ' 'µmol s−1 m-1).' ) }), (('--incident-area', ), { 'units': 'm-2', 'help': ( 'Area that the flux is spread over (in m-2)' ), }), (('--spectrum', ), { 'type': utils.parse_existing_file, 'help': ( 'Path to a csv file containing the spectrum{description}.' ) }), ]
[docs] def reset(self, args, **kwargs): r"""Reinitialize the arguments used by this instance. Args: args (argparse.Namespace): Parsed arguments. **kwargs: Additional keyword arguments are passed to the base method. """ super(LightArgument, self).reset(args, **kwargs) if not self.ignored('spectrum'): self.setdefaults(['spectrum']) if ((isinstance(self.args['spectrum'], str) and os.path.isfile(self.args['spectrum']))): for k, v in self.parse_spectrum(self.args['spectrum']).items(): self.args[k] = self._arguments[k].parse(v)
# TODO: Add check_unused for various other paths
[docs] def check_available(self, name, others=None): r"""Check if a variable is available. Args: name (str): Variable to check. others (list, optional): Auxillary variables to check via setdefaults. """ if self.check_calculating(name): return False if others is not None: if not self.setdefaults(others): return False out = getattr(self, name) if out is None: delattr(self, name) return False return True
@cached_property def ppf(self): r"""units.Quantity: Photosynthetically active photon flux.""" if self.args['ppf'] is not None: out = self.args['ppf'] self.check_unused(['ppfd', 'par_flux', 'radiant_flux', 'luminous_flux', 'par_irradiance', 'irradiance'], out) return out elif self.check_available('ppfd', ['incident_area']): return self.ppfd * self.args['incident_area'] elif self.check_available('par_flux', ['eta_photon']): return self.par_flux * self.args['eta_photon'] return None @cached_property def ppfd(self): r"""units.Quantity: Photosynthetically active photon flux density.""" if self.args['ppfd'] is not None: out = self.args['ppfd'] self.check_unused(['ppf', 'par_flux', 'radiant_flux', 'luminous_flux', 'par_irradiance', 'irradiance'], out) return out elif self.check_available('ppf', ['incident_area']): return self.ppf / self.args['incident_area'] elif self.check_available('par_irradiance', ['eta_photon']): return self.par_irradiance * self.args['eta_photon'] return None @cached_property def par_flux(self): r"""units.Quantity: Photosynthetically active flux.""" if self.args['par_flux'] is not None: out = self.args['par_flux'] self.check_unused(['ppf', 'ppfd', 'radiant_flux', 'luminous_flux', 'par_irradiance', 'irradiance'], out) return out elif self.check_available('ppf', ['eta_photon']): return self.ppf / self.args['eta_photon'] elif self.check_available('radiant_flux', ['eta_par']): return self.radiant_flux * self.args['eta_par'] elif self.check_available('par_irradiance', ['incident_area']): return self.par_irradiance * self.args['incident_area'] return None @cached_property def radiant_flux(self): r"""units.Quantity: Radiated power.""" if self.args['radiant_flux'] is not None: out = self.args['radiant_flux'] self.check_unused(['ppf', 'ppfd', 'par_flux', 'luminous_flux', 'par_irradiance', 'irradiance'], out) return out elif self.check_available('par_flux', ['eta_par']): return self.par_flux / self.args['eta_par'] elif self.check_available('luminous_flux', ['luminous_efficacy']): return self.luminous_flux / self.args['luminous_efficacy'] elif self.check_available('irradiance', ['incident_area']): return self.irradiance * self.args['incident_area'] return None @cached_property def luminous_flux(self): r"""units.Quantity: Perceived amount of visible light.""" if self.args['luminous_flux'] is not None: out = self.args['luminous_flux'] self.check_unused(['ppf', 'ppfd', 'par_flux', 'radiant_flux', 'par_irradiance', 'irradiance'], out) return out elif self.check_available('radiant_flux', ['luminous_efficacy']): return self.radiant_flux * self.args['luminous_efficacy'] return None @cached_property def par_irradiance(self): r"""units.Quantity: Photosynthetically active flux density.""" if self.args['par_irradiance'] is not None: out = self.args['par_irradiance'] self.check_unused(['ppf', 'ppfd', 'par_flux', 'radiant_flux', 'luminous_flux', 'irradiance'], out) return out elif self.check_available('irradiance', ['eta_par']): return self.irradiance * self.args['eta_par'] elif self.check_available('ppfd', ['eta_photon']): return self.ppfd / self.args['eta_photon'] elif self.check_available('par_flux', ['incident_area']): return self.par_flux / self.args['incident_area'] return None @cached_property def irradiance(self): r"""units.Quantity: Flux density.""" if self.args['irradiance'] is not None: out = self.args['irradiance'] self.check_unused(['ppf', 'ppfd', 'par_flux', 'radiant_flux', 'luminous_flux', 'par_irradiance'], out) return out elif self.check_available('par_irradiance', ['eta_par']): return self.par_irradiance / self.args['eta_par'] elif self.check_available('radiant_flux', ['incident_area']): return self.radiant_flux / self.args['incident_area'] return None
[docs] @classmethod def parse_spectrum(cls, fname): # TODO: # Read # integrate total # integrate 400-700nm to get eta_par # integrate 380-750nm w/ photopic luminosity function to get # luminous_efficacy # - scale by 683.002 lm/W # - photopic luminosity function roughly gaussian w/ # peak at 555nm raise NotImplementedError
[docs] @classmethod def integrate_spectrum(cls, spectrum, start, stop): raise NotImplementedError
[docs] class RepeatIterationError(BaseException): r"""Error can be raised if a step should be repeated.""" def __init__(self, args_overwrite=None): if args_overwrite is None: args_overwrite = {} self.args_overwrite = args_overwrite super(RepeatIterationError, self).__init__()
[docs] class SubparserBase(arguments.RegisteredArgumentClassBase): r"""Base class for tasks associated with subparsers. Args: args (argparse.Namespace, optional): Parsed arguments. If not provided, additional keyword arguments are parsed to create args and keyword arguments that are not used by the parser are passed to the run method. **kwargs: Additional keyword arguments are passed to the run method. """ _name = None _help = None _default = None
[docs] class TaskBase(SubparserBase): r"""Base class for tasks. Args: args (argparse.Namespace): Parsed arguments. root (TaskBase, optional): Top level task. cached_outputs (dict, optional): Outputs that have been cached in memory. Class Attributes: _output_info (dict): Properties of task outputs. _outputs_local (list): Outputs produced by this task. _outputs_required (list): Required local outputs. _outputs_optional (list): Optional local outputs. _outputs_external (dict): Mapping between outputs produced by external tasks that are used by this task and the external task that produces it. _outputs_total (list): All outputs produced by this task or the external tasks that are used by this task. _external_tasks (dict): Mapping of external tasks used by this task and information about how arguments should be adopted by this task. _dont_inherit_base (bool): If True, arguments of the base class will not be inherited. """ _registry_key = 'task' _default = 'generate' _output_info = {} _outputs_local = [] _outputs_required = [] _outputs_optional = [] _outputs_external = {} _outputs_total = [] _external_tasks = {} _dont_inherit_base = False _arguments = [ (('--verbose', ), { 'action': 'store_true', 'help': 'Show log messages' }), (('--debug', ), { 'action': 'store_true', 'help': ('Run in debug mode, setting break points for debug ' 'messages and errors') }), (('--output-dir', ), { 'type': str, 'default': cfg['directories']['output'], 'help': 'Base directory where output should be stored.', }), (('--figure-dpi', ), { 'type': int, 'default': 300, 'help': 'DPI for generated figures', }), (('--figure-font-weight', ), { 'type': str, 'default': 'bold', 'help': 'Font weight for generated figures', }), (('--figure-linewidth', ), { 'type': float, 'default': 2.0, 'help': 'Line width for plots', }), (('--figure-color-by', ), { 'type': str, 'choices': ['id', 'idbase', 'idvar', 'location'], 'help': 'How colors should be mapped to the plot data', }), (('--figure-linestyle-by', ), { 'type': str, 'choices': ['id', 'idbase', 'idvar', 'location'], 'help': 'How line styles should be mapped to the plot data', }), ] _lineprop_map = { 'linestyle': { 'class': ['-', '--', ':', '-.'], 'idvar': { 'WT': '-', 'rdla': '--', }, 'idbase': { 'B73': '-', 'PHKW3': '--', }, 'location': { 'interior': '-', 'exterior': '--', }, }, 'color': { 'class': ['blue', 'orange', 'purple', 'teal'], 'idvar': { 'WT': 'blue', 'rdla': 'orange', }, 'idbase': { 'B73': 'blue', 'PHKW3': 'orange', }, 'location': { 'interior': 'blue', 'exterior': 'green', }, }, } def __init__(self, args=None, root=None, cached_outputs=None): if args is None: args = self.copy_external_args( args_overwrite={self._registry_key: self._name}, set_defaults=True, ) if root is None: root = self self.root = root self._outputs = {} if root is self: if cached_outputs is None: cached_outputs = {} self._cached_outputs = cached_outputs else: assert cached_outputs is None self._cached_outputs = root._cached_outputs self.external_tasks = {} super(TaskBase, self).__init__(args) for cls in self._external_tasks.keys(): if cls._name in self.external_tasks: continue else: self.external_tasks[cls._name] = cls( self.args, root=self.root, ) if self.is_root: args._parameter_inst_function = self.get_parameter_inst self.finalize()
[docs] def finalize(self, dont_overwrite=False): r"""Perform steps to finalize the class for use. Args: dont_overwrite (bool, optional): If True, don't remove overwritten files. """ self.adjust_args(self.args) if not dont_overwrite: self.overwrite_outputs()
@property def is_root(self): r"""bool: True if this is the root task.""" return (self.root is self)
[docs] def get_parameter_inst(self): r"""Get the instance of ParametrizeCropTask used by this task. Returns: ParametrizeCropTask: Instance that parametrizes geometries. """ out = self.output_task('parametrize', from_root=True) out.ensure_initialized() return out
[docs] @classmethod def task_hierarchy(cls): r"""list: Order that outputs should be initialized in.""" out = [] for v in cls._external_tasks: out += [x for x in v.task_hierarchy() if x not in out] out.append(cls._name) return out
[docs] @classmethod def adjust_args(cls, args, subset=None): r"""Adjust the parsed arguments including setting defaults that depend on other provided arguments. Args: args (argparse.Namespace): Parsed arguments. subset (list, optional): Set of arguments that should be adjusted. If not provided, all of the available arguments identified by the available options will be performed.:: 'internal': Internal arguments will be adjusted by this task. 'external': External arguments will be adjusted by external tasks. 'outputs': Output arguments will be adjusted. """ if subset is None: subset = ['internal', 'external', 'outputs'] if 'internal' in subset: cls.adjust_args_internal(args) if 'external' in subset: # Omit outputs so that all arguments are initialized before # OutputArgument instances are created cls.adjust_args_external( args, subset=['internal', 'external'], ) if 'outputs' in subset: cls.adjust_args_output(args)
[docs] @classmethod def adjust_args_internal(cls, args, skip=None): r"""Adjust the parsed arguments including setting defaults that depend on other provided arguments. Args: args (argparse.Namespace): Parsed arguments. skip (list, optional): Set of arguments to skip. """ if skip is None: skip = [] output_args = [ v.dest for v in cls._arguments.values() if v.is_output ] if args.figure_color_by is None: if ((getattr(args, 'crop', None) == 'maize' and getattr(args, 'id', None) == 'all')): args.figure_color_by = 'idbase' else: args.figure_color_by = 'class' if args.figure_linestyle_by is None: if ((getattr(args, 'crop', None) == 'maize' and getattr(args, 'id', None) == 'all')): args.figure_linestyle_by = 'idvar' super(TaskBase, cls).adjust_args(args, skip=(skip + output_args), skip_root=True)
[docs] @classmethod def adjust_args_external(cls, args, **kwargs): r"""Adjust the parsed arguments including setting defaults that depend on other provided arguments for external tasks. Args: args (argparse.Namespace): Parsed arguments. **kwargs: Additional keyword arguments are passed to adjust_args for each external task. """ for ext in cls._external_tasks.keys(): ext.adjust_args(args, **kwargs)
[docs] @classmethod def adjust_args_output(cls, args): r"""Create the output arguments (assumes that adjust_args has already been called to initialize the other arguments. Args: args (argparse.Namespace): Parsed arguments. """ for ext in cls._external_tasks.keys(): ext.adjust_args_output(args) for v in cls._arguments.values(): if v.is_output: v.adjust_args(args)
[docs] @classmethod def complete_external_args(cls, args): r"""Add missing arguments to the provided argument set. Args: args (argparse.Namespace): Parsed arguments to complete. """ setattr(args, cls._registry_key, cls._name) # TODO: This may be redundant for v in cls._arguments.flatten(): if hasattr(args, v.dest): continue if v.default is NoDefault: setattr(args, v.dest, None) else: setattr(args, v.dest, v.default) cls.adjust_args(args)
[docs] @classmethod def copy_external_args(cls, args=None, args_overwrite=None, args_external=None, set_defaults=False, initialize=False, return_dict=False, verbose=False): r"""Extract arguments for this task from a set for another task. Args: args (argparse.Namespace, optional): Parsed arguments to copy. args_overwrite (dict, optional): Arguments to overwrite. args_external (list, optional): External arguments that should be preserved. set_defaults (bool, optional): If True, don't set missing arguments to defaults. initialize (bool, optional): If True, initialize arguments by applying adjust_args (implies set_defaults == True). return_dict (bool, optional): Return a dictionary instead of argparse.Namespace. verbose (bool, optional): If True, write copied arguments to a log message. Returns: argparse.Namespace: Arguments. """ if initialize: set_defaults = True if args is None: args = argparse.Namespace() if args_overwrite is None: args_overwrite = {} if args_external is None: args_external = [] args_overwrite.setdefault(cls._registry_key, cls._name) args_changing = list(args_overwrite.keys()) if args_changing: for v in cls._arguments.values(): if not (isinstance(v, arguments.CompositeArgumentDescription) and v.dest in args_changing): continue args_changing += [ x for x in v.argument_names(include='prefixed') if x not in args_changing ] if 'id' not in args_changing: args_external += ['_parameter_inst_function'] out = {cls._registry_key: args_overwrite[cls._registry_key]} for arg in cls._arguments.flatten(): k = arg.dest if k in args_overwrite: out[k] = args_overwrite[k] elif k in out: continue elif hasattr(args, k): out[k] = getattr(args, k) elif set_defaults and arg.default is not NoDefault: out[k] = arg.default else: out[k] = None v = out[k] if isinstance(v, CompositeArgument): for kk, vv in v.raw_args(k).items(): if kk in args_overwrite: if kk not in out: out[kk] = args_overwrite[kk] continue out[kk] = vv continue missing = [] for k in args_overwrite.keys(): if k not in out: missing.append(k) if missing: raise Exception(f'Members of args_overwrite were unused: ' f'{missing}') for k in args_external: assert k not in out if hasattr(args, k): out[k] = getattr(args, k) if verbose: cls.log_class(pprint.pformat(out)) if return_dict: assert not initialize return out out = arguments.ArgumentDescription.kwargs2args(out) if initialize: cls.adjust_args(out) return out
[docs] @classmethod def from_kwargs(cls, kws, **kwargs): r"""Create an instance from the provided arguments. Args: kws (dict): Keyword arguments that should be parsed into arguments via the parse function. **kwargs: Additional keyword arguments are passed to the class constructor. """ kws[cls._registry_key] = cls._name args = cls.copy_external_args(args_overwrite=kws, set_defaults=True) return cls(args, **kwargs)
[docs] @classmethod def from_external_args(cls, args, args_overwrite=None, args_external=None, copy_outputs_from=None, verbose=False, **kwargs): r"""Create an instance from a set of external arguments. Args: args (argparse.Namespace): Parsed arguments. args_ovewrite (dict, optional): Argument values to set for the run after copying the current argument namespace. args_external (list, optional): External arguments that should be preserved. copy_outputs_from (TaskBase, optional): Existing instance that matching outputs should be copied from. verbose (bool, optional): If True, write copied arguments to a log message. **kwargs: Additional keyword arguments are passed to the class constructor. Returns: TaskBase: New task instance. """ args = cls.copy_external_args( args, args_overwrite=args_overwrite, args_external=args_external, set_defaults=True, verbose=verbose, ) out = cls(args, **kwargs) if copy_outputs_from is not None: out.copy_matching_outputs(copy_outputs_from) return out
# Methods for building arguments @staticmethod def _build_arguments(cls): SubparserBase._build_arguments(cls) for k in ['_output_info', '_outputs_required', '_outputs_local']: setattr(cls, k, copy.deepcopy(getattr(cls, k))) if not cls._dont_inherit_base: base = inspect.getmro(cls)[1] if base != SubparserBase: TaskBase._copy_external_arguments(cls, base) cls._dont_inherit_base = False if cls._name is not None and cls._name not in cls._outputs_required: cls._outputs_required.insert(0, cls._name) cls._outputs_required = [ k for k, v in cls._output_info.items() if not v.get('optional', False) ] cls._outputs_optional = [ k for k, v in cls._output_info.items() if v.get('optional', False) ] cls._outputs_local = cls._outputs_required + cls._outputs_optional cls._outputs_external = {} for ext, props in cls._external_tasks.items(): TaskBase._copy_external_arguments(cls, ext, **props) cls._outputs_total = ( cls._outputs_local + list(cls._outputs_external.keys()) ) output_bases = {} for k, v in cls._output_info.items(): if k in cls._outputs_external: continue if (('base_output' in v and v['base_output'] not in v.get('upstream', []))): v.setdefault('upstream', []) v['upstream'].append(v['base_output']) output_bases.setdefault(v['base_output'], []) output_bases[v['base_output']].append(k) else: output_bases.setdefault(None, []) output_bases[None].append(k) for x in v.get('upstream', []): xsrc = cls while x not in xsrc._outputs_local: xsrc = xsrc._outputs_external[x] xsrc._output_info[x].setdefault('downstream', {}) xsrc._output_info[x]['downstream'].setdefault(cls, []) if k not in xsrc._output_info[x]['downstream'][cls]: xsrc._output_info[x]['downstream'][cls].append(k) if xsrc._output_info[x].get('composite_param', []): v.setdefault('composite_param', []) for kcomp in xsrc._output_info[x]['composite_param']: if ((kcomp not in v['composite_param'] and cls._arguments.hasnested(kcomp))): v['composite_param'].append(kcomp) koutput = f'output_{k}' if koutput not in cls._arguments: karg = arguments.CompositeArgumentDescription( koutput, 'output', **v) cls._arguments.append(karg) cls._arguments.modify(append_suffix_outputs=output_bases) cls._arguments.sort_by_suffix() cls._arguments.modify(suffix_index=0) @staticmethod def _copy_external_arguments(dst, src, include=None, exclude=None, modifications=None, optional=False, increment_suffix_index=None, **kwargs): if modifications is None: modifications = {} modifications = copy.deepcopy(modifications) if include is not None: include += [f'output_{k}' for k in src._outputs_local] if optional: for k in src._outputs_local: modifications.setdefault(f'output_{k}', {}) modifications[f'output_{k}'].setdefault( 'default', False) if increment_suffix_index is None: increment_suffix_index = -1 src_arguments = src._arguments.copy( modifications=modifications, include=include, exclude=exclude, strip_classes=True, increment_suffix_index=increment_suffix_index, **kwargs ) new_args = arguments.ArgumentDescriptionSet([]) for i, v in enumerate(src_arguments.values()): vnested = dst._arguments.findnested(v, None) if vnested is not None: if vnested.generates_suffix: if v.suffix_param['index'] < vnested.suffix_param['index']: # print("SUFFIX_INDEX", dst, src, # vnested.dest, # vnested.suffix_param['index'], # v.suffix_param['index']) vnested.modify( suffix_index=v.suffix_param['index']) continue vmod = {} if v.is_output: vmod['cls_kwargs'] = copy.deepcopy(v.cls_kwargs) vmod['cls_kwargs'].setdefault('defaults', {}) vmod['cls_kwargs']['defaults']['output'] = False new_args.append(v.copy(**vmod)) dst._arguments.append(new_args, dont_copy=True) dst._outputs_external.update(src._outputs_external) for k in src._outputs_total: dst._outputs_external[k] = src
[docs] @classmethod def get_line_properties(cls, args, **kws): r"""Get keyword arguments for a line plot based on properties passed via the command line arguments. Args: args (argparse.Namespace): Parsed arguments. **kws: Additional keyword arguments are properties that should be used when selecting line styles & colors. Returns: dict: Keyword arguments for line plot. """ id = kws.get('id', args.id) out = {} if args.figure_linewidth: out['linewidth'] = args.figure_linewidth if '_' in id: kws['idbase'], kws['idvar'] = id.split('_') else: kws['idvar'] = id for k in ['color', 'linestyle']: karg = getattr(args, f'figure_{k}_by') if karg is None: continue if karg in cls._lineprop_map[k] and karg != 'class': out[k] = cls._lineprop_map[k][karg][kws[karg]] else: v = kws.get(karg, getattr(args, karg, None)) if not hasattr(args, f'_used_figure_{k}'): setattr(args, f'_used_figure_{k}', []) used = getattr(args, f'_used_figure_{k}') if v in used: idx = used.index(v) else: idx = len(used) used.append(v) out[k] = cls._lineprop_map[k]['class'][idx] for k in ['linewidth', 'color', 'linestyle']: if k in kws: out[k] = kws[k] return out
# Methods for managing task I/O @classmethod def _read_output(cls, args, name, fname): r"""Load an output file produced by this task. Args: args (argparse.Namespace): Parsed arguments. name (str): Name of the output to read. fname (str): Path of file that should be read from. Returns: object: Contents of the output file. """ ext = os.path.splitext(fname)[-1] if ext == '.json': with open(fname, 'r') as fd: return rapidjson.load(fd) elif ext == '.csv': return utils.read_csv(fname, verbose=args.verbose) elif ext in ['.obj', '.ply']: return utils.read_3D( fname, file_format=getattr(args, 'mesh_format', None), verbose=args.verbose, # include_units=getattr(args, 'include_units', False), ) elif ext == '.png': return utils.read_png(fname, verbose=args.verbose) elif ext in ['.lpy']: with open(fname, 'r') as fd: return fd.read() raise NotImplementedError(f'{name}: {fname}')
[docs] def read_output(self, name=None, fname=None): r"""Load an output file produced by this task. Args: name (str, optional): Name of the output to read. fname (str, optional): File to read if different than the generated file name. Returns: object: Contents of the output file. """ def _read_output(task, name, fname): if fname is None: fname = task.output_file(name, return_disabled=True) if not os.path.isfile(fname): raise RuntimeError(f'\"{name}\" output file does not ' f'exist: {fname}') return task._read_output(task.args, name, fname) args = (fname, ) return self._call_output_task(self, _read_output, name, args)
@classmethod def _write_output(cls, args, name, fname, output): r"""Write to an output file. Args: args (argparse.Namespace): Parsed arguments. name (str): Name of the output to write. fname (str): Path of the file that should be written to. output (object): Output object to write to file. """ ext = os.path.splitext(fname)[-1] if ext == '.json': assert output with open(fname, 'w') as fd: rapidjson.dump(output, fd, write_mode=rapidjson.WM_PRETTY) return elif ext == '.csv': utils.write_csv(output, fname, verbose=args.verbose) return elif ext in ['.obj', '.ply']: utils.write_3D(output, fname, file_format=args.mesh_format, verbose=args.verbose) return elif ext == '.png': from matplotlib.figure import Figure if isinstance(output, Figure): output.savefig(fname, dpi=args.figure_dpi) return utils.write_png(output, fname, verbose=args.verbose) return elif ext in ['.lpy']: with open(fname, 'w') as fd: fd.write(output) return raise NotImplementedError(f'{name}: {fname}')
[docs] def write_output(self, name, output, overwrite=False): r"""Write to an output file. Args: name (str): Name of the output to write. output (object): Output object to write to file. overwrite (bool, optional): If True, overwrite existing output. """ def _write_output(task, name): if not task.output_enabled(name, for_write=True): raise RuntimeError(f'Write to disk not enabled for ' f'\"{name}\" output.') fname = task.output_file(name) output_arg = task._output_argument(task.args, name) assert isinstance(fname, str) if (not overwrite) and os.path.isfile(fname): output_arg.complete_output(fname) return fdir = os.path.dirname(fname) task.log(f'Writing \"{name}\" output: {fname}', force=True) if not os.path.isdir(self.args.output_dir): os.mkdir(self.args.output_dir) if not os.path.isdir(fdir): os.mkdir(fdir) task._write_output(task.args, name, fname, output) output_arg.complete_output(fname, created=True) return self._call_output_task(self, _write_output, name)
[docs] def output_task(self, name, default=NoDefault, from_root=False): r"""Get the task instance responsible for producing an output. Args: name (str): Name of output to get task for. default (object, optional): Value to return if the task instance cannot be located. from_root (bool, optional): If True, start from the root task. Returns: TaskBase: Task that produces the output. """ if from_root: return self.root._get_output_task(self.root, name, default=default) return self._get_output_task(self, name, default=default)
[docs] @classmethod def get_output_task(cls, name, default=NoDefault): r"""Determine which task class produces the named output. Args: name (str): Output name. default (object, optional): Value to return if the task class cannot be located. Returns: type: Task class. """ for v in get_class_registry().values('task'): if name in v._outputs_local: return v if default is not NoDefault: return default raise KeyError(name)
@staticmethod def _get_output_task(cls, name=None, default=NoDefault, initialize_missing=False): r"""Get the task class responsible for producing an output. Args: name (str, list, optional): Name(s) of one or more outputs to get task(s) for. default (object, optional): Value to return if the task instance cannot be located. initialize_missing (bool, optional): If the task instance does not exist, create it. Returns: type: Task class. """ if isinstance(name, list): return {k: cls._get_output_task(cls, k) for k in name} if name is None: name = cls._name if name in cls._outputs_local: return cls out = cls._outputs_external[name] if not isinstance(cls, type): if out._name not in cls.external_tasks: if initialize_missing: cls.external_tasks[out._name] = out(cls.args) elif default is not NoDefault: return default out = cls.external_tasks[out._name] if name not in out._outputs_local: return cls._get_output_task( out, name, default=default, initialize_missing=initialize_missing, ) return out @staticmethod def _call_output_task(cls, method, name, args=None): if args is None: args = tuple([]) if isinstance(name, list): return {k: cls._call_output_task(cls, method, k, args) for k in name} if name is None: name = cls._name task = cls._get_output_task(cls, name) return method(task, name, *args)
[docs] def enabled_outputs(self, for_write=False): r"""Get the set of outputs enabled by the provided arguments. Args: args (argparse.Namespace): Parsed arguments. for_write (bool, optional): If True, only return outputs for which write is enabled. Returns: list: Names of enabled outputs. """ return [k for k in self._outputs_local if self.output_enabled(k, for_write=for_write)]
[docs] def output_exists(self, name=None): r"""Check if a task output exists. Args: name (str, list, optional): Name(s) of one or more outputs to check for. Returns: bool: True if the output file exists. """ if isinstance(name, list): return all(self.output_exists(name=x) for x in name) if name is None: name = self._name task = self._get_output_task(self, name) output = getattr(task.args, f'output_{name}') if (not output.overwrite) and name in task._outputs: return True return ((not output.overwrite) and output.exists)
[docs] def output_missing(self, name=None): r"""Check if a task output is enabled, but does not exist. Args: name (str, list, optional): Name(s) of one or more outputs to check for. Returns: bool: True if the output file exists. """ if isinstance(name, list): return any(self.output_missing(name=x) for x in name) if name is None: name = self._name task = self._get_output_task(self, name) output = task.output_file(name) if not output: return False return (not task.output_exists(name))
@classmethod def _output_enabled(cls, args, name=None, for_write=False): r"""Check if an output is enabled by the current arguments. Args: args (argparse.Namespace): Parsed arguments. name (str, list, optional): Name(s) of one or more outputs to check. for_write (bool, optional): If True, only return True if output is enabled and write is not disabled. Returns: bool: True if the output is enabled, False otherwise. """ def _output_enabled(task, name): output = getattr(args, f'output_{name}') if for_write and (output.dont_write or output.unmerged_param): return False return output.enabled return cls._call_output_task(cls, _output_enabled, name)
[docs] def output_enabled(self, name=None, for_write=False): r"""Check if an output is enabled by the current arguments. Args: name (str, list, optional): Name(s) of one or more outputs to check. for_write (bool, optional): If True, only return True if output is enabled and write is not disabled. Returns: bool: True if the output is enabled, False otherwise. """ def _output_enabled(task, name): return task._output_enabled(task.args, name, for_write=for_write) return self._call_output_task(self, _output_enabled, name)
@classmethod def _output_names(cls, args, exclude_local=False, include_external=False, exclude_optional=False, exclude_required=False, include_disabled=False, for_write=False): r"""Get the list of outputs that will be generated. Args: args (argparse.Namespace): Parsed arguments. exclude_local (bool, optional): If True, don't include local outputs. include_external (bool, optional): If True, include outputs produced by external tasks. exclude_optional (bool, optional): If True, don't include optional outputs. exclude_required (bool, optional): If True, don't include required outputs. include_disabled (bool, optional): If True, include disabled outputs. for_write (bool, optional): If True, only include outputs that will be written to disk Returns: list: Output names. """ out = [] if not exclude_local: if not exclude_optional: out += cls._outputs_optional if not exclude_required: out += cls._outputs_required if not include_disabled: out = [k for k in out if cls._output_enabled(args, k, for_write=for_write)] if include_external: for k, v in cls._external_tasks.items(): if exclude_optional and v.get('optional', False): continue out += [ x for x in k._output_names( args, include_external=include_external, exclude_optional=exclude_optional, exclude_required=exclude_required, include_disabled=include_disabled, for_write=for_write) if x not in out ] return out
[docs] def output_names(self, **kwargs): r"""Get the list of outputs that will be generated. Args: **kwargs: Keyword arguments are passed to _output_names. Returns: list: Output names. """ return self._output_names(self.args, **kwargs)
@classmethod def _output_depends(cls, args, name, variables): r"""Determine if an output is dependent on a variable. Args: args (argparse.Namespace): Parsed arguments. name (str): Name of output to check dependency of. variables (list): List of arguments to check if the affect the named output. Returns: bool: True if the output depends on the named variables. """ output = cls._output_argument(args, name) return output.depends(variables)
[docs] def output_depends(self, name, variables): r"""Determine if an output is dependent on a variable. Args: name (str): Name of output to check dependency of. variables (list): List of arguments to check if the affect the named output. Returns: bool: True if the output depends on the named variables. """ return self._output_depends(self.args, name, variables)
@classmethod def _output_file(cls, args, name=None, return_disabled=False, regenerate=False, wildcards=None, skipped=None, copy_args=False): r"""Get the filename for an output. Args: args (argparse.Namespace): Parsed arguments. name (str, list, optional): Name(s) of one or more outputs to get the files for. return_disabled (bool, optional): If True, return the generated filename even if the output is disabled. regenerate (bool, optional): If True, regenerate all output file names. wildcards (list, optional): List of arguments that wildcards should be used for in the generated output file name. skipped (list, optional): List of arguments that should be skipped in the generated output file name. copy_args (bool, optional): If True, the args should be copied and initialized. Returns: str: Name of output file. If True, this indicates the output should be generated, but not written. If False, the output should not be generated. """ def _output_file(task, name): output = task._output_argument(args, name, copy_args=copy_args) # if ((copy_args # or getattr(args, f'output_{name}', None) is None)): # args_local = task.copy_external_args( # args, initialize=True, # ) # else: # args_local = args # output = getattr(args_local, f'output_{name}') if not output.generated: cls.log_class(f'Filename for \"{name}\" output ' f'was not generated: {output.path}') assert output.generated return output.path if output.path and not (regenerate or wildcards or skipped): return output.path if (not return_disabled) and (not output.enabled): return False # return output.generate(args_local, wildcards=wildcards, # skipped=skipped) return output.generate(args, wildcards=wildcards, skipped=skipped) return cls._call_output_task(cls, _output_file, name)
[docs] def output_file(self, name=None, return_disabled=False, regenerate=False, wildcards=None, skipped=None, copy_args=False): r"""Get the filename for an output. Args: name (str, list, optional): Name(s) of one or more outputs to get the files for. return_disabled (bool, optional): If True, return the generated filename even if the output is disabled. regenerate (bool, optional): If True, regenerate all output file names. wildcards (list, optional): List of arguments that wildcards should be used for in the generated output file name. skipped (list, optional): List of arguments that should be skipped in the generated output file name. copy_args (bool, optional): If True, the args should be copied and initialized. Returns: str: Name of output file. If True, this indicates the output should be generated, but not written. If False, the output should not be generated. """ def _output_file(task, name): return task._output_file( task.args, name, return_disabled=return_disabled, wildcards=wildcards, skipped=skipped, regenerate=regenerate, copy_args=copy_args, ) return self._call_output_task(self, _output_file, name)
@classmethod def _output_argument(cls, args, name=None, copy_args=False): r"""Get the OutputArgument instance for an output. Args: args (argparse.Namespace): Parsed arguments. name (str, optional): Name of the output to return the argument instance for. copy_args (bool, optional): If True, the args should be copied and initialized. Returns: OutputArgument: Argument instance controling output. """ if name is None: name = cls._name if getattr(args, f'output_{name}', None) is None: task = cls._get_output_task(cls, name) args = task.copy_external_args(args, initialize=True) getattr(args, f'output_{name}')._copied_args = args # if copy_args or getattr(args, f'output_{name}', None) is None: # task = cls._get_output_task(cls, name) # args = task.copy_external_args(args, initialize=True) # args._copied_args = args return getattr(args, f'output_{name}')
[docs] def output_argument(self, name=None): r"""Get the OutputArgument instance for an output. Args: name (str, optional): Name of the output to return the argument instance for. Returns: OutputArgument: Argument instance controling output. """ return self._output_argument(self.args, name=name)
@classmethod def _remove_output(cls, args, name=None, wildcards=None, skipped=None, copy_args=False, force=False, skip_test_output=False, skip_downstream=False): r"""Remove existing output file. Args: args (argparse.Namespace): Parsed arguments. name (str, list, optional): Name(s) of one or more outputs to remove. wildcards (list, optional): List of arguments that wildcards should be used for in the generated output file name. skipped (list, optional): List of arguments that should be skipped in the generated output file name. copy_args (bool, optional): If True, the args should be copied and initialized. force (bool, optional): If True, remove without asking for user input. skip_test_output (bool, optional): If True, don't include both test output and generated file name. skip_downstream (bool, optional): If True, downstream outputs will not be removed. """ assert not copy_args def _remove_output(task, name): output = task._output_argument(args, name) output.remove(args=args, wildcards=wildcards, skipped=skipped, force=force, skip_test_output=skip_test_output) if not skip_downstream: output.remove_downstream( args, removed=[output.output_name], wildcards=wildcards, skipped=skipped, force=force, skip_test_output=skip_test_output, ) cls._call_output_task(cls, _remove_output, name)
[docs] def remove_output(self, name=None, remove_local=False, **kwargs): r"""Remove existing output file. Args: name (str, list, optional): Name(s) of one or more outputs to remove. remove_local (bool, optional): If True, remove any in-memory output. **kwargs: Additional keyword arguments are passed to _remove_output. """ def _remove_output(task, name): if remove_local: task._outputs.pop(name, None) return task._remove_output(task.args, name=name, **kwargs) self._call_output_task(self, _remove_output, name)
[docs] def cache_output(self, name=None): r"""Cache output on the root task for later use. Args: name (str, list, optional): Name(s) of one or more outputs to remove. """ def _cache_output(task, name): fname = task.output_file(name, return_disabled=True) task._cached_outputs[fname] = task.get_output(name) self._call_output_task(self, _cache_output, name)
[docs] def overwrite_outputs(self, downstream=None, wildcards=None, skipped=None): r"""Remove existing files that should be overwritten. Args: downstream (list, optional): Existing list that output names should be added to that downstream files need to be removed for. wildcards (list, optional): List of arguments that wildcards should be used for in the generated output file names. skipped (list, optional): List of arguments that should be skipped in the generated output file name. """ skip_downstream = (downstream is not None) if downstream is None: downstream = [] for name in self._outputs_local: if self.overwrite_output(name, wildcards=wildcards, skipped=skipped): downstream.append(name) for task in self.external_tasks.values(): task.overwrite_outputs(downstream=downstream, wildcards=wildcards, skipped=skipped) if (not skip_downstream) and downstream: removed = [] for k in downstream: koutput = self.output_argument(k) koutput.remove_downstream( self.args, removed=removed, wildcards=wildcards, skipped=skipped, ) if self.is_root: for k in list(self._cached_outputs.keys()): if not os.path.isfile(k): self._cached_outputs.pop(k)
[docs] def overwrite_output(self, name=None, wildcards=None, skipped=None): r"""Prepare an output for a run. If overwrite specified, any existing data/files for the output will be removed. If the output file is not fully specified on the args, it will be set. Args: name (str, list, optional): Name(s) of one or more outputs to handle overwrite for. wildcards (list, optional): List of arguments that wildcards should be used for in the generated output file names. skipped (list, optional): List of arguments that should be skipped in the generated output file name. Returns: bool: True if downstream outputs should be removed. """ if name is None: name = self._name task = self._get_output_task(self, name) output = task.output_argument(name) # fname = output.path remove_downstream = False if output.overwrite: task._outputs.pop(name, None) output.remove(task.args, wildcards=wildcards, skipped=skipped) remove_downstream = output.overwrite_downstream # elif fname and not (name in task._outputs or output.exists # or output.dont_write # or output.unmerged_param # or len(output.downstream) == 0): # self.log(f'Output \"{name}\" does not exist, removing ' # f'downstream files ({fname}) ' # f'isfile = {os.path.isfile(fname)}', force=True) # pdb.set_trace() # remove_downstream = bool(output.downstream) output.clear_overwrite(task.args) return remove_downstream
# Methods for executing a run
[docs] def get_output(self, name=None): r"""Get a task output, generating or loading it if necessary. Args: name (str, list, optional): Name(s) of one or more outputs to return. Returns: object: Output. """ def _get_output(task, name): if name in task._outputs: return task._outputs[name] if task.output_exists(name=name): fname = task.output_file(name) if fname in task._cached_outputs: self.log(f'Using cached \"{name}\" output: {fname}', force=True) task._outputs[name] = task._cached_outputs[fname] else: self.log(f'Loading existing \"{name}\" output: ' f'{fname}', force=True) task._outputs[name] = task._read_output( task.args, name, fname) return task._outputs[name] self.log(f'Generating \"{name}\"', force=True) task.generate_output(name) assert name in task._outputs if task.output_enabled(name, for_write=True): task.write_output(name, task._outputs[name]) return task._outputs[name] return self._call_output_task(self, _get_output, name)
[docs] def set_output(self, name, output, overwrite=False): r"""Set an output value for the task. Args: name (str): Name of the output to set. output (object): Output instance. overwrite (bool, optional): If True, overwrite existing output. """ task = self._get_output_task(self, name) if not overwrite: assert name not in task._outputs task._outputs[name] = output if task.output_enabled(name, for_write=True): task.write_output(name, output, overwrite=overwrite)
@property def all_ids(self): r"""list: All crop classes for current data.""" # assert self.args.output_generate.generated # return self.output_task('generate').all_ids if not self.args.data: return utils.DataProcessor.available_ids(self.args.crop) return utils.DataProcessor.from_file(self.args.data).ids @property def all_data_years(self): r"""list: All data years for current crop.""" if not self.args.data: return utils.DataProcessor.available_years(self.args.crop) return [utils.DataProcessor.from_file(self.args.data).year]
[docs] def get_iteration_values(self, k): r"""Get the set of parameter values that should be iterated over when 'all' is specified. Args: k (str): Parameter name to get values for. Return: list: Iteration values. """ if isinstance(k, list): out = {kk: self.get_iteration_values(kk) for kk in k} if 'id' in k and 'data_year' in k: for i in range(len(out['id'])): out['id'][i] = utils.DependentIterationParam( 'id', out['id'][i], data_year=utils.DataProcessor.available_years( self.args.crop, id=out['id'][i]), ) return out if k in ['id', 'data_year']: out = getattr(self, f'all_{k}s') if not out: out = [None] elif k == 'canopy': out = ['unique', 'tile', 'virtual', 'virtual_single'] elif k == 'periodic_canopy': out = [False, 'scene'] # , 'plants', 'rays'] else: out = [kk for kk in self._arguments[k].properties['choices'] if not kk.startswith('all')] return out
[docs] def generate_output(self, name): r"""Generate the specified output value. Args: name (str): Name of the output to generate. Returns: object: Generated output. """ output = getattr(self.args, f'output_{name}') if output.iterating_param: over = self.get_iteration_values(output.iterating_param) merged_param = output.merged_param merge_all_output = output.merge_all_output if merge_all_output is None: merge_all_output = name out = OrderedDict() i = 0 for x in self.run_series(over=over): if merged_param: ikey = tuple([self.args._iteration_param[-1][k] for k in output.merged_param]) out[ikey] = x.get_output(merge_all_output) i += 1 if merged_param: out = self._merge_output(name, out, merged_param) else: out = None else: out = self._generate_output(name) self.set_output(name, out) return out
def _generate_output(self, name): r"""Generate the specified output value. Args: name (str): Name of the output to generate. Returns: object: Generated output. """ raise NotImplementedError def _merge_output(self, name, output, merged_param): r"""Merge the output for multiple sets of parameters values. Args: name (str): Name of the output to generate. output (dict): Mapping from tuples of parameter values to the output for the parameter values. merged_param (tuple): Names of the parameters that are being merged (the parameters specified by the tuple keys in output). Returns: object: Generated output. """ raise NotImplementedError
[docs] def run(self, output_name=None): r"""Run the process associated with this subparser. Args: output_name (str, optional): Name of output that should be returned. Defaults to the output with the same name as the task. Returns: object: Output named by output_name. """ output_names = self.enabled_outputs() if output_name is None: output_name = self._name assert output_name in ['instance'] + self._outputs_local for k in output_names: assert getattr(self.args, f'overwrite_{k}') is False if not self.output_exists(k): self.get_output(k) assert self.output_exists(k) if output_name == 'instance': return self return self.get_output(output_name)
[docs] def run_iteration(self, cls=None, **kwargs): r"""Run an iteration, regenerating output file names. Args: cls (type, optional): Task class that should be run in iteration. Defaults to the type of the current task. **kwargs: Additional keyword arguments are passed to run_iteration_class. Returns: object: Result of the run. """ if cls is None: cls = type(self) kwargs.setdefault('cached_outputs', self._cached_outputs) return cls.run_iteration_class(self.args, **kwargs)
[docs] @classmethod def run_iteration_class(cls, args, **kwargs): r"""Run an iteration, regenerating output file names. Args: args (argparse.Namespace): Parsed arguments. **kwargs: Additional keyword arguments are passed to run_class. Returns: object: Result of the run (defaults to the task instance). """ kwargs.setdefault('output_name', 'instance') while True: try: return cls.run_class(args, **kwargs) except RepeatIterationError as e: kwargs['args_overwrite'] = dict( kwargs.get('args_overwrite', {}), **e.args_overwrite)
[docs] @classmethod def run_class(cls, args, output_name=None, args_preserve=None, cache_outputs=None, **kwargs): r"""Run the process associated with this subparser. Args: args (argparse.Namespace): Parsed arguments. output_name (str, optional): Name of output that should be returned. Defaults to the output with the same name as the task. If 'instance' is provided, the created task instance will be returned. args_preserve (list, optional): Set of argument names to preserve following a run. cache_outputs (list, optional): Set of outputs that should be cached. **kwargs: Additional keyword arguments are passed to from_external_args. Returns: object: Result of the run. """ assert 'root' not in kwargs self = cls.from_external_args(args, **kwargs) out = self.run(output_name=output_name) if args_preserve: for k in args_preserve: setattr(args, k, getattr(self.args, k)) if cache_outputs: assert 'root' in kwargs or 'cached_outputs' in kwargs self.cache_output(cache_outputs) return out
[docs] def copy_matching_outputs(self, other): r"""Copy existing outputs from another task to prevent repeated I/O when the files match. Args: other (TaskBase): Instance to copy from. """ assert isinstance(other, type(self)) copied = [] for k in self._outputs_local: self_fname = self.output_file(k, return_disabled=True) other_fname = other.output_file(k, return_disabled=True) if (((not self_fname) or self_fname != other_fname or k in self._outputs or k not in other._outputs)): continue self._outputs[k] = other._outputs[k] copied.append(k) for k, v in self.external_tasks.items(): vother = other.external_tasks[k] v.copy_matching_outputs(vother)
[docs] def run_series(self, cls=None, **kwargs): r"""Run the process for a series of arguments. Args: cls (type, optional): Task class that should be run in iteration. Defaults to the type of the current task. **kwargs: Additional keyword arguments are passed to cls.run_series_class. Yields: object: Results from each step. """ if cls is None: cls = type(self) kwargs.setdefault('cached_outputs', self._cached_outputs) for x in cls.run_series_class(self.args, **kwargs): yield x
[docs] @classmethod def run_series_class(cls, args, over=None, per_iter=None, args_overwrite=None, **kwargs): r"""Run the process for a series of arguments. Args: args (argparse.Namespace): Parsed arguments. over (dict, optional): Mapping between argument names and values that should be iterated over. per_iter (dict, optional): Dictionary of argument values that should be added for each iteration. This can be updated between iterations. args_overwrite (dict, optional): Arguments to overwrite. **kwargs: Additional keyword arguments are passed to cls.run_iteration. Yields: object: Results from each step. """ if per_iter is None: per_iter = {} if args_overwrite is None: args_overwrite = {} if not over: yield cls.run_iteration_class( args, args_overwrite=dict(args_overwrite, **per_iter), **kwargs ) return keys = list(over.keys()) for k in keys: if not isinstance(over[k], list): over[k] = [over[k]] cls.log_class(f"STARTING LOOP OVER: {keys}") if not hasattr(args, '_iteration_param'): args._iteration_param = [] for x in itertools.product(*[over[k] for k in keys]): xparam = {k: v for k, v in zip(keys, x)} if not utils.DependentIterationParam.check_param(xparam): continue iargs_overwrite = dict(args_overwrite, **per_iter) iargs_overwrite[f'output_{cls._name}'] = True iargs_overwrite.update(**xparam) cls.log_class( f'ITERATION:\n{pprint.pformat(iargs_overwrite)}') try: args._iteration_param.append(iargs_overwrite) yield cls.run_iteration_class( args, args_overwrite=iargs_overwrite, **kwargs ) finally: args._iteration_param.pop()
def __del__(self): if self.has_cached_property('figure'): import matplotlib.pyplot as plt plt.close(self.figure) self.clear_cached_properties( include=['figure', 'axes'], ) @cached_property def figure(self): r"""Matplotlib figure.""" import matplotlib.pyplot as plt import matplotlib.dates as mdates import matplotlib.units as munits converter = mdates.ConciseDateConverter() munits.registry[datetime.datetime] = converter return plt.figure() @cached_property def axes(self): r"""Matplotlib axes.""" ax = self.figure.add_subplot(111) return ax @property def raw_figure_data(self): r"""np.ndarray: Raw pixel data for the current figure.""" fig = self.figure fig.canvas.draw() data_buffer = fig.canvas.get_renderer().buffer_rgba() data = np.asarray(data_buffer) return data
[docs] class IterationTaskBase(TaskBase): r"""Base class for iterating over a task.""" _step_task = None _step_vary = None _step_args_preserve = None def __init__(self, *args, **kwargs): self._step_results = [] super(IterationTaskBase, self).__init__(*args, **kwargs) @staticmethod def _on_registration(cls): if cls._step_task is not None: step_prop = cls._external_tasks.get(cls._step_task, {}) # Force the step parameters to be first in the suffix step_prop.setdefault('increment_suffix_index', -2) if cls._step_vary is not None: step_prop.setdefault('exclude', []) if cls._step_vary not in step_prop['exclude']: step_prop['exclude'].append(cls._step_vary) cls._external_tasks = copy.deepcopy(cls._external_tasks) cls._external_tasks[cls._step_task] = step_prop cls._output_info.setdefault(cls._name, {}) cls._output_info[cls._name].setdefault( 'base_output', cls._step_task._name) TaskBase._on_registration(cls)
[docs] def overwrite_outputs(self, downstream=None, wildcards=None, skipped=None): r"""Remove existing files that should be overwritten. Args: downstream (list, optional): Existing list that output names should be added to that downstream files need to be removed for. wildcards (list, optional): List of arguments that wildcards should be used for in the generated output file names. skipped (list, optional): List of arguments that should be skipped in the generated output file name. """ if wildcards is None: wildcards = [] if ((self._step_vary is not None and self._step_vary not in wildcards)): wildcards = wildcards + [self._step_vary] # for k in self._step_task._outputs_total: # if not getattr(self.args, f'overwrite_{k}', False): # continue # step_task = self._get_output_task(self, k) # args_base = self.argument_names() # args_step = step_task.argument_names() # wildcards = list(set(args_step) - set(args_base)) # print("OVERWRITE", k, wildcards) # pdb.set_trace() # if self._step_vary is not None: # assert self._step_vary in wildcards # if vary is not None: # wildcards += [x for x in vary if x not in wildcards] # self.remove_output(k, wildcards=wildcards) super(IterationTaskBase, self).overwrite_outputs( downstream=downstream, wildcards=wildcards, skipped=skipped, )
[docs] def step_args_full(self): r"""Yield the full updates that will be made to the arguments for each step including class level updates. Yields: dict: Step arguments. """ for args_overwrite in self.step_args(): iargs_overwrite = copy.deepcopy(args_overwrite) for k, v in self.args_overwrite.items(): iargs_overwrite.setdefault(k, v) yield iargs_overwrite
[docs] def step_args(self): r"""Yield the updates that should be made to the arguments for each step. Yields: dict: Step arguments. """ raise NotImplementedError
[docs] def finalize_step(self, x): r"""Finalize the output from a step. Args: x (object): Result of step. Returns: object: Finalized step result. """ return x
[docs] def join_steps(self, xlist): r"""Join the output form all of the steps. Args: xlist (list): Result of all steps. Returns: object: Joined output from all steps. """ return xlist
@cached_property def args_overwrite(self): r"""dict: Arguments to overwrite for each step.""" # if getattr(self.args, f'dont_write_{self._name}'): # return {f'dont_write_{k}': True # for k in self._step_task._outputs_total} return {}
[docs] def run_steps(self, output_name='instance'): r"""Run the steps. Args: output_name (str, optional): Step output that should be passed to finalize_step for each step. Returns: list: Output from each step. """ self._step_results = [] x_prev = None for iargs_overwrite in self.step_args_full(): self.log(f'STEP:\n{pprint.pformat(iargs_overwrite)}') while True: try: x = self.run_iteration( cls=self._step_task, args_overwrite=iargs_overwrite, output_name=output_name, copy_outputs_from=x_prev, args_preserve=self._step_args_preserve, ) self._step_results.append(self.finalize_step(x)) if isinstance(x, TaskBase): del x_prev x_prev = x gc.collect() break except RepeatIterationError as e: iargs_overwrite.update(e.args_overwrite) return self._step_results
def _generate_output(self, name): r"""Generate the specified output value. Args: name (str): Name of the output to generate. Returns: object: Generated output. """ if name != self._name: return super(IterationTaskBase, self)._generate_output(name) if self._step_task is None: raise NotImplementedError if hasattr(self._step_task, 'adjust_args_step'): self._step_task.adjust_args_step(self.args, self._step_vary) return self.join_steps(self.run_steps())
[docs] class OptimizationTaskBase(IterationTaskBase): r"""Base class for tasks that iterate to achieve a result.""" _final_outputs = [] _arguments = [ (('--goal', ), { 'help': 'Goal of the optimization.', 'suffix_param': {}, }), (('--vary', ), { 'type': str, 'help': 'Argument that should be varied.', 'suffix_param': {'prefix': 'vs_'}, }), (('--method', ), { 'type': str, 'choices': ['nelder-mead', 'powell'], 'default': 'nelder-mead', 'help': ( 'Method that should be used to minimize the objective' ), 'suffix_param': {'noteq': 'nelder-mead'}, }), (('--tolerance', ), { 'type': float, 'default': 1e-5, 'help': 'Tolerance for achieving result', 'suffix_param': { 'prefix': 'tol', 'noteq': 1e-5, }, }), (('--initial-value', ), { 'type': str, 'help': 'Initial value for the parameter being varied', }), ] @staticmethod def _on_registration(cls): if cls._step_task is not None: cls._output_info = copy.deepcopy(cls._output_info) cls._output_info.setdefault(cls._name, {}) for k in cls._final_outputs: kcls = cls.get_output_task(k) cls._output_info.setdefault( k, { 'base_output': cls._name, 'ext': kcls._output_info[k]['ext'], 'description': kcls._output_info[k]['description'], 'optional': True, 'parts_generators': { 'suffix': cls._delayed_suffix_generator, }, 'exclude_dynamic_suffix_param': 'vary', } ) cls._output_info[cls._name].setdefault('ext', '.json') cls._output_info[cls._name].setdefault( 'exclude_dynamic_suffix_param', 'vary' ) IterationTaskBase._on_registration(cls) @classmethod def _delayed_suffix_generator(cls, *args, **kwargs): return cls._arguments.suffix_generator(*args, **kwargs) @cached_property def goal(self): r"""units.Quantity: Value that should be achieved.""" if self.args.goal in ['minimize', 'maximize']: return self.args.goal # TODO: Fix units return parse_quantity(self.args.goal) @cached_property def goal_units(self): r"""str: Goal units.""" if isinstance(self.goal, units.Quantity): return str(self.goal.units) return None @cached_property def vary_units(self): r"""str: Units of argument to vary.""" x = getattr(self.args, self.args.vary) if isinstance(x, units.Quantity): return str(x.units) return None @cached_property def args_overwrite(self): r"""dict: Arguments to overwrite for each step.""" # TODO: Does this work? return {f'dont_write_{k}': True for k in self._step_task._outputs_total}
[docs] def objective(self, x): r"""Objective function for use with scipy.optimize.minimize. Args: x (np.ndarray): Input arguments. Returns: float: Result. """ assert len(x) == 1 iargs_overwrite = { self.args.vary: parse_quantity(x[0], self.vary_units) } for k, v in self.args_overwrite.items(): iargs_overwrite.setdefault(k, v) repeat = True out = None while repeat: try: self.log(f'OBJECTIVE:\n{pprint.pformat(iargs_overwrite)}', force=True) x = self.run_iteration( cls=self._step_task, args_overwrite=iargs_overwrite, copy_outputs_from=self._prev_instance, ) result = self.finalize_step(x) if self.goal == 'minimize': out = result elif self.goal == 'maximize': out = -result else: out = np.abs(float( (result - self.goal) / self.goal)) self.log(f'OBJECTIVE RESULT: {out} ({result})', force=True) self._prev_instance = x repeat = False except RepeatIterationError as e: iargs_overwrite.update(e.args_overwrite) return out
[docs] def step_args(self): r"""Yield the updates that should be made to the arguments for each step. Yields: dict: Step arguments. """ return []
[docs] def run_steps(self, output_name='instance'): r"""Run the steps. Args: output_name (str, optional): Step output that should be passed to finalize_step for each step. Returns: list: Output from each step. """ from scipy.optimize import minimize self.goal # Initialize if self.args.initial_value is not None: v0 = self.args.initial_value else: v0 = getattr(self.args, self.args.vary) x0 = np.array([v0]) self._prev_instance = None res = minimize( self.objective, x0, method=self.args.method, tol=self.args.tolerance, # options={ # 'xatol': self.args.tolerance, # } ) assert res.success out = { self.args.vary: res.x[0], } return out
[docs] def final_output_args(self, name): r"""Get the arguments that should be used generate the final output. Args: name (str): Name of the final output to generate. Returns: dict: Arguments to use. """ return {}
def _generate_output(self, name, output_name=None, args_overwrite=None): r"""Generate the specified output value. Args: name (str): Name of the output to generate. Returns: object: Generated output. """ if name in self._final_outputs: if output_name is None: output_name = name cls = self.get_output_task(name) if args_overwrite is None: args_overwrite = dict( self.get_output(self._name), **self.final_output_args(name) ) for k in cls._outputs_local: args_overwrite.setdefault(f'dont_write_{k}', True) return self.run_iteration( cls=cls, args_overwrite=args_overwrite, output_name=output_name, ) return super(OptimizationTaskBase, self)._generate_output(name)
[docs] class TemporalTaskBase(IterationTaskBase): r"""Base class for tasks that iterate over another class.""" _step_task = None _step_vary = 'time' _arguments = [ arguments.CompositeArgumentDescription( 'start_time', 'time', description=' to start at', defaults={'hour': 'sunrise'}, suffix_param={}, ), arguments.CompositeArgumentDescription( 'stop_time', 'time', description=' to stop at', defaults={'hour': 'sunset'}, name_base='start_time', suffix_param={}, ), (('--step-count', ), { 'type': int, 'help': ('The number of time steps that should be taken ' 'between the start and end time. If not provided, ' 'the number of time steps will be determined from ' '\"step_interval\"'), 'suffix_param': {}, }), (('--duration', ), { 'units': 'hours', 'help': 'The time that the animation should last', }), (('--step-interval', ), { 'units': 'hours', 'help': ('The interval (in hours) that should be used ' 'between time steps. If not provided, ' '\"step_count\" will be used to calculate the ' 'step interval. If \"step_count\" is not ' 'provided, a step interval of 1 hour will be ' 'used.'), }), (('--dont-age', ), { 'action': 'store_true', 'help': ( 'Don\' age the generated scene. Currently this is set ' 'to true if the start and end date are the same, but ' 'this may change in the future.' ) }), ]
[docs] @classmethod def adjust_args_internal(cls, args, **kwargs): r"""Adjust the parsed arguments including setting defaults that depend on other provided arguments. Args: args (argparse.Namespace): Parsed arguments. **kwargs: Additional keyword arguments are passed to the parent class's method. """ cls._arguments.getnested('location').adjust_args(args) cls._arguments['duration'].adjust_args(args) if args.duration is None and not (args.step_interval is None or args.step_count is None): args.duration = args.step_interval * args.step_count start_set = cls._arguments['start_time'].any_arguments_set(args) stop_set = cls._arguments['stop_time'].any_arguments_set(args) if not (start_set or stop_set): start_set = True # Force default for start if args.duration is not None: if start_set and not stop_set: cls._arguments['start_time'].adjust_args(args) args.stop_time = ( args.start_time.time + utils.quantity2timedelta(args.duration) ) elif stop_set and not start_set: cls._arguments['stop_time'].adjust_args(args) args.start_time = ( args.stop_time.time - utils.quantity2timedelta(args.duration) ) elif not stop_set: cls._arguments['start_time'].adjust_args(args) if args.start_time.crop_age_string == 'planting': assert not args.dont_age args.stop_age = 'maturity' args.planting_date = args.start_time.date args.stop_doy = None args.stop_date = None if args.start_time.solar_time_string: args.stop_hour = ( args.start_time.solar_time_string ) elif args.start_time.solar_time_string == 'sunrise': args.stop_hour = 'sunset' args.stop_date = args.start_time.date if args.start_time.crop_age_string: args.stop_age = ( args.start_time.crop_age_string ) else: args.duration = units.Quantity(24.0, 'hours') if start_set: # Ensure that planting date is set cls._arguments['start_time'].adjust_args(args) if args.start_time.crop_age_string == 'planting': args.planting_date = args.start_time.date super(TemporalTaskBase, cls).adjust_args_internal(args, **kwargs) if args.stop_time.time == args.start_time.time: args.stop_time = args.stop_time.time.replace( hour=0, minute=0, microsecond=0) cls._arguments['stop_time'].adjust_args(args, overwrite=True) assert args.stop_time.time > args.start_time.time duration = args.stop_time.time - args.start_time.time duration = utils.timedelta2quantity(duration) if not args.step_count: if not args.step_interval: ndays = int(duration.to('days')) if ndays > 2: args.step_interval = units.Quantity(1.0, 'days') else: args.step_interval = units.Quantity(1.0, 'hours') args.step_count = int(duration / args.step_interval) elif not args.step_interval: args.step_interval = duration / args.step_count args.start_time.update_args(args, name='time') if args.start_time.date == args.stop_time.date: for k in ['crop_age_string', 'solar_date_string']: vstart = getattr(args.start_time, k) vstop = getattr(args.stop_time, k) if vstart == vstop: continue if vstart: assert vstop is None args.stop_time.setarg(k, vstart) else: assert vstart is None args.star_time.setarg(k, vstop) args.stop_time.ignore_date = True # TODO: Update this? args.dont_age = True if args.dont_age: args.start_time.ignore_age = True
[docs] def step_args(self): r"""Yield the updates that should be made to the arguments for each step. Yields: dict: Step arguments. """ dt = self.args.step_interval iargs = None for i in range(self.args.step_count): iargs = self.args.start_time.iteration_args( i * dt, dont_age=self.args.dont_age) yield iargs iargs.setdefault('location', self.args.location) time = TimeArgument.from_kwargs(iargs) if time.time < self.args.stop_time.time: yield self.args.stop_time.iteration_args()
################################################################# # CLI #################################################################
[docs] def parse(**kwargs): r"""Parse arguments provided via the command line or keyword arguments Args: **kwargs: If any keyword args are passed, they are parsed instead of the command line arguments. Returns: argparse.Namespace, dict: Argument namespace and keyword keyword arguments that were not parsed. """ parser = InstrumentedParser("Generate/analyze a 3D canopy model") arguments.ClassSubparserArgumentDescription('task').add_to_parser( parser) if kwargs: arglist = [kwargs.get('task', 'generate')] args = parser.parse_args(arglist) else: args = parser.parse_args() for k in list(kwargs.keys()): if hasattr(args, k): setattr(args, k, kwargs.pop(k)) if kwargs: raise AssertionError(f'Unparsed kwargs: {pprint.pformat(kwargs)}') args.SUBPARSER_CLASS = parser.subparser_class('task', args) return args
[docs] def main(**kwargs): r"""Parse arguments provided via the command line or keyword arguments and run the parsed task. Args: **kwargs: If any keyword args are passed, they are parsed instead of the command line arguments. Returns: argparse.Namespace, dict: Argument namespace and keyword keyword arguments that were not parsed. """ args = parse(**kwargs) inst = args.SUBPARSER_CLASS(args) return inst.run(output_name='instance')
if __name__ == "__main__": main()