Source code for fret.cli

import argparse
import collections
import logging
import shutil
import sys
import os

from .common import command, argspec, commands, configurables, \
    NoAppError, NotConfiguredError
from .workspace import Workspace
from .util import colored, ColoredFormatter, Configuration


def main(args=None):
    logger = logging.getLogger('fret')
    logger.setLevel(logging.INFO)
    formatter = ColoredFormatter(
        '%(levelname)s %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S'
    )
    handler = logging.StreamHandler()
    handler.setFormatter(formatter)
    logger.addHandler(handler)

    try:
        from . import app
        argument_style = app.config._get('argument_style') or 'java'
    except NoAppError:
        app = None
        argument_style = 'java'

    main_parser = _ArgumentParser(
        formatter_class=argparse.ArgumentDefaultsHelpFormatter,
        prog='fret',
        description='fret: Framework for Reproducible ExperimenTs')

    main_parser.add_argument('-q', action='store_true', help='quiet')
    main_parser.add_argument('-v', action='store_true', help='verbose')
    main_parser.add_argument('-w', '--workspace', help='workspace dir')

    if args is None:
        args = sys.argv[1:]

    with_help = False
    if '-h' in args:
        args.remove('-h')
        with_help = True
    if '--help' in args:
        args.remove('--help')
        with_help = True

    args, remaining = main_parser.parse_known_args(args)

    if with_help:
        remaining.append('-h')

    # configure logging level
    if args.q:
        logger.setLevel(logging.WARNING)
    elif args.v:
        logger.setLevel(logging.DEBUG)
    else:
        logger.setLevel(logging.INFO)

    if args.workspace is None:
        cwd = os.getcwd()
        if app is not None and not os.path.samefile(cwd, app.root):
            ws_path = cwd
            os.chdir(app.root)
        else:
            ws_path = 'ws/_default'
    else:
        ws_path = args.workspace

    if os.path.exists(ws_path):
        ws = Workspace(ws_path)
    else:
        ws = None
    main = None

    subparsers = main_parser.add_subparsers(title='supported commands',
                                            dest='command')
    subparsers.required = True

    for cmd, f in commands.items():
        if f.__functype__ == 'method':
            if ws is None:
                continue
            cls_name, func_name = cmd.split('.')
            if main is None:
                main = ws.build()
            if cls_name != main.__class__.__name__:
                # not applicable
                continue
            cmd = func_name

        sub = subparsers.add_parser(
            cmd,
            help=getattr(f, '__help__', 'command ' + cmd),
            description=getattr(f, '__desc__', None),
            formatter_class=argparse.ArgumentDefaultsHelpFormatter)

        with ParserBuilder(sub, argument_style) as builder:
            for arg in f.__funcspec__.pos[int(not f.__static__):]:
                builder.add_opt(arg, argspec())
            for k, v in f.__funcspec__.kw:
                builder.add_opt(k, v)

        if f.__functype__ == 'method':
            sub.set_defaults(func=_default_func(f, main))
        else:
            sub.set_defaults(func=_default_func(f, Workspace(ws_path)))

    if app is not None:
        config_sub = subparsers.add_parser(
            'config', help='configure module for workspace',
            formatter_class=argparse.ArgumentDefaultsHelpFormatter)
        _add_config_sub(config_sub, argument_style)
        config_sub.set_defaults(func=_config_default_func)
    else:
        config_sub = None

    args = main_parser.parse_args(remaining)
    del args.q
    del args.v
    args.workspace = ws_path

    logger = logging.getLogger('fret.' + args.command)

    try:
        return args.func(args)
    except KeyboardInterrupt:
        # print traceback info to screen only
        import traceback
        sys.stderr.write(traceback.format_exc())
        logger.warning('cancelled by user')
    except NotConfiguredError as e:
        print('error:', e)
        if config_sub is not None:
            config_sub.print_usage()
        sys.exit(1)
    except Exception as e:  # pylint: disable=broad-except
        # print traceback info to screen only
        import traceback
        sys.stderr.write(traceback.format_exc())
        logger.error('exception occurred: %s: %s',
                     e.__class__.__name__, e)


[docs]class _ArgumentParser(argparse.ArgumentParser):
[docs] def error(self, message): # customize error message self.print_usage(sys.stderr) err = colored('error:', 'r', style='b') self.exit(2, '%s %s\n' % (err, message))
[docs]class ParserBuilder: """Utility to generate CLI arguments in different styles.""" def __init__(self, parser, style='java'): self._parser = parser self._style = style self._names = [] self._spec = []
[docs] def add_opt(self, name, spec): """Add option with specification. Args: name (str) : option name spec (argspec): argument specification""" if spec.default() is True: # change name for better bool support spec._kwargs['dest'] = name # pylint: disable=protected-access name = 'no_' + name self._names.append(name) self._spec.append(spec)
def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): prefix = '-' if self._style == 'java' else '--' seen = set(self._names) for name, spec in zip(self._names, self._spec): if name.startswith('_'): continue args, kwargs = spec.spec() if not args: args = [prefix + name] short = ''.join(seg[0] for seg in name.split('_')) if short not in seen: args.append('-' + short) seen.add(short) elif args[0].startswith('-'): kwargs['dest'] = name if 'help' not in kwargs: kwargs['help'] = 'parameter ' + name self._parser.add_argument(*args, **kwargs)
def _default_func(f, obj): def run(args): del args.command, args.func, args.workspace args = {name: value for (name, value) in args._get_kwargs()} args = Configuration(args) if f.__static__: return f(**args._dict()) else: return f(obj, **args._dict()) return run def _add_config_sub(parser, argument_style): parser.add_argument('name', default='main', nargs='?', help='module name') if sys.version_info < (3, 7): subs = parser.add_subparsers(title='modules available', dest='module') else: subs = parser.add_subparsers(title='modules available', dest='module', required=False) group_options = collections.defaultdict(set) for module, module_cls in configurables.items(): _parser_formatter = argparse.ArgumentDefaultsHelpFormatter sub = subs.add_parser(module, help=module_cls.help, formatter_class=_parser_formatter) group = sub.add_argument_group('config') with ParserBuilder(group, argument_style) as builder: mro = [] for base_cls in module_cls.__mro__: mro.append(base_cls) if ( not hasattr(base_cls, '__funcspec__') or not base_cls.__funcspec__.varkw ): break for base_cls in reversed(mro): if hasattr(base_cls, '__funcspec__'): for name, opt in base_cls.__funcspec__.kw: builder.add_opt(name, opt) for submodule in module_cls.submodules: builder.add_opt(submodule, argspec( default=submodule, help='submodule ' + submodule )) for action in group._group_actions: group_options[module].add(action.dest) def save(args): with Workspace(args.workspace) as ws: m = args.module cfg = [(name, value) for (name, value) in args._get_kwargs() if name in group_options[m]] cfg = Configuration(cfg) msg = '[%s] configured "%s" as "%s"' % \ (ws, args.name, m) if cfg._config: msg += ' with: ' + str(cfg) print(msg, file=sys.stderr) ws.register(args.name, configurables[m], **cfg._dict()) sub.set_defaults(func=save) def _config_default_func(args): ws = Workspace(args.workspace) cfg = ws.config_path if cfg.exists(): cfg = cfg.open().read().strip() return cfg else: raise NotConfiguredError('no configuration in this workspace')
[docs]@command(help='fork workspace, possibly with modifications') def fork(ws, path, mods=([], 'modifications (in format: NAME.ARG=VAL)')): """Command ``fork``, Fork from existing workspace and change some of the arguments. Example: .. code-block:: bash $ fret fork ws/test main.foo=6 In [ws/test]: as in [ws/_default], with modification(s): main.foo=6 """ conf = ws.config_dict() for mod in mods: k, v = mod.split('=') d = conf if '.' not in k: k = 'main.' + k fields = k.split('.') try: for field in fields[:-1]: d = d[field] d[fields[-1]] # try get last field except KeyError as e: print('{}: no such key to modify'.format(e.args[0])) return d[fields[-1]] = v ws_ = Workspace(path, config_dict=conf) ws_.write()
[docs]@command( help='clean workspace', description='Remove all snapshots in specific workspace by default. ' + 'If `--all` is specified, clean the entire workspace' ) def clean(ws, config=(False, 'remove workspace configuration'), log=(False, 'clear workspace logs'), snapshot=(False, 'clear snapshots'), everything=argspec( '-a', action='store_true', help='clear everything except for configuration' ), all=argspec('--all', action='store_true', help='clean the entire workspace'), force=(False, 'do without confirmation')): """Command ``clean``. Remove all snapshots in specific workspace. If ``--all`` is specified, clean the entire workspace """ if all: if not force: try: c = input('[{}] clean the entire workspace? [y/N] '.format(ws)) except KeyboardInterrupt: return 1 if c.lower() != 'y': return 1 shutil.rmtree(str(ws)) else: if not force: if everything: todo = ['snapshots', 'logs'] if config: todo.append('config') else: todo = [] if snapshot: todo.append('snapshots') if log: todo.append('logs') if config: todo.append('config') if len(todo) == 0: todo.append('snapshots') msg = '[{}] clean {}? [y/N] '.format(ws, ', '.join(todo)) try: c = input(msg) except KeyboardInterrupt: return 1 if c.lower() != 'y': return 1 if (not config and not log) or snapshot or everything: # fret clean or fret clean -s ... or fret clean -a ... shutil.rmtree(str(ws.snapshot())) if log or everything: shutil.rmtree(str(ws.log())) if config: try: (ws.path / 'config.toml').unlink() except FileNotFoundError: pass