Source code for backintime

#    Back In Time
#    Copyright (C) 2008-2022 Oprea Dan, Bart de Koning, Richard Bailey, Germar Reitze
#
#    This program is free software; you can redistribute it and/or modify
#    it under the terms of the GNU General Public License as published by
#    the Free Software Foundation; either version 2 of the License, or
#    (at your option) any later version.
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.
#
#    You should have received a copy of the GNU General Public License along
#    with this program; if not, write to the Free Software Foundation, Inc.,
#    51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.

import os
import sys
import argparse
import atexit
import subprocess
from datetime import datetime
from time import sleep
import json
import pathlib
import tools
# Workaround for situations where startApp() is not invoked.
# E.g. when using --diagnostics and other argparse.Action
tools.initiate_translation(None)

import config
import logger
import snapshots
import sshtools
import mount
import password
import encfstools
import cli
from diagnostics import collect_diagnostics, collect_minimal_diagnostics
from exceptions import MountException
from applicationinstance import ApplicationInstance
from version import __version__

RETURN_OK = 0
RETURN_ERR = 1
RETURN_NO_CFG = 2

parsers = {}

[docs] def takeSnapshotAsync(cfg, checksum = False): """ Fork a new backintime process with 'backup' command which will take a new snapshot in background. Args: cfg (config.Config): config that should be used """ cmd = [] if cfg.ioniceOnUser(): cmd.extend(('ionice', '-c2', '-n7')) cmd.append('backintime') if '1' != cfg.currentProfile(): cmd.extend(('--profile-id', str(cfg.currentProfile()))) if cfg._LOCAL_CONFIG_PATH is not cfg._DEFAULT_CONFIG_PATH: cmd.extend(('--config', cfg._LOCAL_CONFIG_PATH)) if cfg._LOCAL_DATA_FOLDER is not cfg._DEFAULT_LOCAL_DATA_FOLDER: cmd.extend(('--share-path', cfg.DATA_FOLDER_ROOT)) if logger.DEBUG: cmd.append('--debug') if checksum: cmd.append('--checksum') cmd.append('backup') # child process need to start its own ssh-agent because otherwise # it would be lost without ssh-agent if parent will close env = os.environ.copy() for i in ('SSH_AUTH_SOCK', 'SSH_AGENT_PID'): try: del env[i] except: pass subprocess.Popen(cmd, env = env)
[docs] def takeSnapshot(cfg, force = True): """ Take a new snapshot. Args: cfg (config.Config): config that should be used force (bool): take the snapshot even if it wouldn't need to or would be prevented (e.g. running on battery) Returns: bool: ``True`` if there was an error """ tools.envLoad(cfg.cronEnvFile()) ret = snapshots.Snapshots(cfg).backup(force) return ret
[docs] def _mount(cfg): """ Mount external filesystems. Args: cfg (config.Config): config that should be used """ try: hash_id = mount.Mount(cfg = cfg).mount() except MountException as ex: logger.error(str(ex)) sys.exit(RETURN_ERR) else: cfg.setCurrentHashId(hash_id)
[docs] def _umount(cfg): """ Unmount external filesystems. Args: cfg (config.Config): config that should be used """ try: mount.Mount(cfg = cfg).umount(cfg.current_hash_id) except MountException as ex: logger.error(str(ex))
[docs] def createParsers(app_name = 'backintime'): """ Define parsers for commandline arguments. Args: app_name (str): string representing the current application """ global parsers #define debug debugArgsParser = argparse.ArgumentParser(add_help = False) debugArgsParser.add_argument('--debug', action = 'store_true', help = 'Increase verbosity.') #define config argument configArgsParser = argparse.ArgumentParser(add_help = False) configArgsParser.add_argument('--config', metavar = 'PATH', type = str, action = 'store', help = 'Read config from %(metavar)s. ' + 'Default = ~/.config/backintime/config') configArgsParser.add_argument('--share-path', metavar = 'PATH', type = str, action = 'store', help = 'Write runtime data (locks, messages, log and mountpoints) to %(metavar)s.') #define common arguments which are used for all commands commonArgsParser = argparse.ArgumentParser(add_help = False, parents = [configArgsParser, debugArgsParser]) profileGroup = commonArgsParser.add_mutually_exclusive_group() profileGroup.add_argument ('--profile', metavar = 'NAME', type = str, action = 'store', help = 'Select profile by %(metavar)s.') profileGroup.add_argument ('--profile-id', metavar = 'ID', type = int, action = 'store', help = 'Select profile by %(metavar)s.') commonArgsParser.add_argument('--quiet', action = 'store_true', help = 'Be quiet. Suppress messages on stdout.') #define arguments which are only used by snapshots-path, snapshots-list-path and last-snapshot-path snapshotPathParser = argparse.ArgumentParser(add_help = False) snapshotPathParser.add_argument('--keep-mount', action = 'store_true', help = "Don't unmount on exit.") #define arguments which are used by rsync commands (backup and restore) rsyncArgsParser = argparse.ArgumentParser(add_help = False) rsyncArgsParser.add_argument('--checksum', action = 'store_true', help = 'force to use checksum for checking if files have been changed.') #define arguments for snapshot remove removeArgsParser = argparse.ArgumentParser(add_help = False) removeArgsParser.add_argument('SNAPSHOT_ID', type = str, action = 'store', nargs = '*', help = 'ID of snapshots which should be removed.') #define main argument parser parser = argparse.ArgumentParser(prog = app_name, parents = [commonArgsParser], description = '%(app)s - a simple backup tool for Linux.' % {'app': config.Config.APP_NAME}, epilog = "For backwards compatibility commands can also be used with trailing '--'. " "All listed arguments will work with all commands. Some commands have extra arguments. " "Run '%(app_name)s <COMMAND> -h' to see the extra arguments." % {'app_name': app_name}) parsers['main'] = parser parser.add_argument('--version', '-v', action = 'version', version = '%(prog)s ' + __version__, help = "show %(prog)s's version number.") parser.add_argument('--license', action = printLicense, nargs = 0, help = "show %(prog)s's license.") parser.add_argument('--diagnostics', action = printDiagnostics, nargs = 0, help = "show helpful info for better support in case of issues (in JSON format)") ####################### ### define commands ### ####################### epilog = "Run '%(app_name)s -h' to get help for additional arguments. " %{'app_name': app_name} epilogCommon = epilog + 'Additional arguments: --config, --debug, --profile, --profile-id, --quiet' epilogConfig = epilog + 'Additional arguments: --config, --debug' subparsers = parser.add_subparsers(title = 'Commands', dest = 'command') command = 'backup' nargs = 0 aliases = [(command, nargs), ('b', nargs)] description = 'Take a new snapshot. Ignore if the profile ' +\ 'is not scheduled or if the machine is running on battery.' backupCP = subparsers.add_parser(command, parents = [rsyncArgsParser], epilog = epilogCommon, help = description, description = description) backupCP.set_defaults(func = backup) parsers[command] = backupCP command = 'backup-job' nargs = 0 aliases.append((command, nargs)) description = 'Take a new snapshot in background only ' +\ 'if the profile is scheduled and the machine ' +\ 'is not on battery. This is used by cron jobs.' backupJobCP = subparsers.add_parser(command, parents = [rsyncArgsParser], epilog = epilogCommon, help = description, description = description) backupJobCP.set_defaults(func = backupJob) parsers[command] = backupJobCP command = 'benchmark-cipher' nargs = '?' aliases.append((command, nargs)) description = 'Show a benchmark of all ciphers for ssh transfer.' benchmarkCipherCP = subparsers.add_parser(command, epilog = epilogCommon, help = description, description = description) benchmarkCipherCP.set_defaults(func = benchmarkCipher) parsers[command] = benchmarkCipherCP benchmarkCipherCP.add_argument ('FILE_SIZE', type = int, action = 'store', default = 40, nargs = '?', help = 'File size used for benchmark.') command = 'check-config' description = 'Check the profiles configuration and install crontab entries.' checkConfigCP = subparsers.add_parser(command, epilog = epilogCommon, help = description, description = description) checkConfigCP.add_argument ('--no-crontab', action = 'store_true', help = 'Do not install crontab entries.') checkConfigCP.set_defaults(func = checkConfig) parsers[command] = checkConfigCP command = 'decode' nargs = '*' aliases.append((command, nargs)) description = "Decode paths with 'encfsctl decode'" decodeCP = subparsers.add_parser(command, epilog = epilogCommon, help = description, description = description) decodeCP.set_defaults(func = decode) parsers[command] = decodeCP decodeCP.add_argument ('PATH', type = str, action = 'store', nargs = '*', help = 'Decode PATH. If no PATH is specified on command line ' +\ 'a list of filenames will be read from stdin.') command = 'last-snapshot' nargs = 0 aliases.append((command, nargs)) description = 'Show the ID of the last snapshot.' lastSnapshotCP = subparsers.add_parser(command, epilog = epilogCommon, help = description, description = description) lastSnapshotCP.set_defaults(func = lastSnapshot) parsers[command] = lastSnapshotCP command = 'last-snapshot-path' nargs = 0 aliases.append((command, nargs)) description = 'Show the path of the last snapshot.' lastSnapshotsPathCP = subparsers.add_parser(command, parents = [snapshotPathParser], epilog = epilogCommon, help = description, description = description) lastSnapshotsPathCP.set_defaults(func = lastSnapshotPath) parsers[command] = lastSnapshotsPathCP command = 'pw-cache' nargs = '*' aliases.append((command, nargs)) description = 'Control Password Cache for non-interactive cronjobs.' pwCacheCP = subparsers.add_parser(command, epilog = epilogConfig, help = description, description = description) pwCacheCP.set_defaults(func = pwCache) parsers[command] = pwCacheCP pwCacheCP.add_argument ('ACTION', action = 'store', choices = ['start', 'stop', 'restart', 'reload', 'status'], nargs = '?', help = 'Command to send to Password Cache daemon.') command = 'remove' nargs = '*' aliases.append((command, nargs)) description = 'Remove a snapshot.' removeCP = subparsers.add_parser(command, parents = [removeArgsParser], epilog = epilogCommon, help = description, description = description) removeCP.set_defaults(func = remove) parsers[command] = removeCP command = 'remove-and-do-not-ask-again' nargs = '*' aliases.append((command, nargs)) description = "Remove snapshots and don't ask for confirmation before. Be careful!" removeDoNotAskCP = subparsers.add_parser(command, parents = [removeArgsParser], epilog = epilogCommon, help = description, description = description) removeDoNotAskCP.set_defaults(func = removeAndDoNotAskAgain) parsers[command] = removeDoNotAskCP command = 'restore' nargs = '*' aliases.append((command, nargs)) description = 'Restore files.' restoreCP = subparsers.add_parser(command, parents = [rsyncArgsParser], epilog = epilogCommon, help = description, description = description) restoreCP.set_defaults(func = restore) parsers[command] = restoreCP backupGroup = restoreCP.add_mutually_exclusive_group() restoreCP.add_argument ('WHAT', type = str, action = 'store', nargs = '?', help = 'Restore file or folder WHAT.') restoreCP.add_argument ('WHERE', type = str, action = 'store', nargs = '?', help = "Restore to WHERE. An empty argument '' will restore to original destination.") restoreCP.add_argument ('SNAPSHOT_ID', type = str, action = 'store', nargs = '?', help = 'Which SNAPSHOT_ID should be used. This can be a snapshot ID or ' +\ 'an integer starting with 0 for the last snapshot, 1 for the second to last, ... ' +\ 'the very first snapshot is -1') restoreCP.add_argument ('--delete', action = 'store_true', help = 'Restore and delete newer files which are not in the snapshot. ' +\ 'WARNING: deleting files in filesystem root could break your whole system!!!') backupGroup.add_argument ('--local-backup', action = 'store_true', help = 'Create backup files before changing local files.') backupGroup.add_argument ('--no-local-backup', action = 'store_true', help = 'Temporarily disable creation of backup files before changing local files. ' +\ 'This can be switched off permanently in Settings, too.') restoreCP.add_argument ('--only-new', action = 'store_true', help = 'Only restore files which do not exist or are newer than ' +\ 'those in destination. Using "rsync --update" option.') command = 'shutdown' nargs = 0 description = 'Shut down the computer after the snapshot is done.' shutdownCP = subparsers.add_parser(command, epilog = epilogCommon, help = description, description = description) shutdownCP.set_defaults(func = shutdown) parsers[command] = shutdownCP command = 'smart-remove' nargs = 0 description = 'Remove snapshots based on "Smart Remove" pattern.' smartRemoveCP = subparsers.add_parser(command, epilog = epilogCommon, help = description, description = description) smartRemoveCP.set_defaults(func = smartRemove) parsers[command] = smartRemoveCP command = 'snapshots-list' nargs = 0 aliases.append((command, nargs)) description = 'Show a list of snapshot IDs.' snapshotsListCP = subparsers.add_parser(command, parents = [snapshotPathParser], epilog = epilogCommon, help = description, description = description) snapshotsListCP.set_defaults(func = snapshotsList) parsers[command] = snapshotsListCP command = 'snapshots-list-path' nargs = 0 aliases.append((command, nargs)) description = "Show the paths to snapshots." snapshotsListPathCP = subparsers.add_parser(command, parents = [snapshotPathParser], epilog = epilogCommon, help = description, description = description) snapshotsListPathCP.set_defaults(func = snapshotsListPath) parsers[command] = snapshotsListPathCP command = 'snapshots-path' nargs = 0 aliases.append((command, nargs)) description = 'Show the path where snapshots are stored.' snapshotsPathCP = subparsers.add_parser(command, parents = [snapshotPathParser], epilog = epilogCommon, help = description, description = description) snapshotsPathCP.set_defaults(func = snapshotsPath) parsers[command] = snapshotsPathCP command = 'unmount' nargs = 0 aliases.append((command, nargs)) description = 'Unmount the profile.' unmountCP = subparsers.add_parser(command, epilog = epilogCommon, help = description, description = description) unmountCP.set_defaults(func = unmount) parsers[command] = unmountCP #define aliases for all commands with trailing -- group = parser.add_mutually_exclusive_group() for alias, nargs in aliases: if len(alias) == 1: arg = '-%s' % alias else: arg = '--%s' % alias group.add_argument(arg, nargs = nargs, action = PseudoAliasAction, help = argparse.SUPPRESS)
[docs] def startApp(app_name = 'backintime'): """ Start the requested command or return config if there was no command in arguments. Args: app_name (str): string representing the current application Returns: config.Config: current config if no command was given in arguments """ createParsers(app_name) logger.openlog() args = argParse(None) # Name, Version, As Root, OS diag = collect_minimal_diagnostics() logger.debug( f'{diag["backintime"]} {list(diag["host-setup"]["OS"].values())}') # Add source path to $PATH environ if running from source if tools.runningFromSource(): tools.addSourceToPathEnviron() # Warn about sudo if tools.usingSudo() and os.getenv('BIT_SUDO_WARNING_PRINTED', 'false') == 'false': os.putenv('BIT_SUDO_WARNING_PRINTED', 'true') logger.warning("It looks like you're using 'sudo' to start %(app)s. " "This will cause some trouble. Please use either 'sudo -i %(app_name)s' " "or 'pkexec %(app_name)s'." %{'app_name': app_name, 'app': config.Config.APP_NAME}) # Call commands if 'func' in dir(args): args.func(args) else: setQuiet(args) printHeader() return getConfig(args, False)
[docs] def argParse(args): """ Parse arguments given on commandline. Args: args (argparse.Namespace): Namespace that should be enhanced or ``None`` Returns: argparser.Namespace: new parsed Namespace """ def join(args, subArgs): """ Add new arguments to existing Namespace. Args: args (argparse.Namespace): main Namespace that should get new arguments subArgs (argparse.Namespace): second Namespace which have new arguments that should be merged into ``args`` """ for key, value in vars(subArgs).items(): # Only add new values if it isn't set already or if there really IS # a value if getattr(args, key, None) is None or value: setattr(args, key, value) # First parse the main parser without subparsers # otherwise positional args in subparsers will be to greedy # but only if -h or --help is not involved because otherwise # help will not work for subcommands mainParser = parsers['main'] sub = [] if '-h' not in sys.argv and '--help' not in sys.argv: for i in mainParser._actions: if isinstance(i, argparse._SubParsersAction): # Remove subparsers mainParser._remove_action(i) sub.append(i) args, unknownArgs = mainParser.parse_known_args(args) # Read subparsers again if sub: [mainParser._add_action(i) for i in sub] # Parse it again for unknown args if unknownArgs: subArgs, unknownArgs = mainParser.parse_known_args(unknownArgs) join(args, subArgs) # Finally parse only the command parser, otherwise we miss some arguments # from command if unknownArgs and 'command' in args and args.command in parsers: commandParser = parsers[args.command] subArgs, unknownArgs = commandParser.parse_known_args(unknownArgs) join(args, subArgs) try: logger.DEBUG = args.debug except AttributeError: pass args_dict = vars(args) used_args = { key: args_dict[key] for key in filter(lambda key: args_dict[key] is not None, args_dict) } logger.debug(f'Used argument(s): {used_args}') logger.debug(f'Unknown argument(s): {unknownArgs}') # Report unknown arguments but not if we run aliasParser next because we # will parse again in there. if unknownArgs and not ('func' in args and args.func is aliasParser): mainParser.error(f'Unknown argument(s): {unknownArgs}') return args
[docs] def printHeader(): """ Print application name, version and legal notes. """ print('') print('Back In Time') print('Version: ' + __version__) print('') print('Back In Time comes with ABSOLUTELY NO WARRANTY.') print('This is free software, and you are welcome to redistribute it') print("under certain conditions; type `backintime --license' for details.") print('')
[docs] class PseudoAliasAction(argparse.Action): """ Translate '--COMMAND' into 'COMMAND' for backwards compatibility. """ def __call__(self, parser, namespace, values, option_string=None): """ Translate '--COMMAND' into 'COMMAND' for backwards compatibility. Args: parser (argparse.ArgumentParser): NotImplemented namespace (argparse.Namespace): Namespace that should get modified values: NotImplemented option_string: NotImplemented """ #TODO: find a more elegant way to solve this dest = self.dest.replace('_', '-') if self.dest == 'b': replace = '-b' alias = 'backup' else: replace = '--%s' % dest alias = dest setattr(namespace, 'func', aliasParser) setattr(namespace, 'replace', replace) setattr(namespace, 'alias', alias)
[docs] def aliasParser(args): """ Call commands which where given with leading -- for backwards compatibility. Args: args (argparse.Namespace): previously parsed arguments """ if not args.quiet: logger.info("Run command '%(alias)s' instead of argument '%(replace)s' due to backwards compatibility." % {'alias': args.alias, 'replace': args.replace}) argv = [w.replace(args.replace, args.alias) for w in sys.argv[1:]] newArgs = argParse(argv) if 'func' in dir(newArgs): newArgs.func(newArgs)
[docs] def getConfig(args, check = True): """ Load config and change to profile selected on commandline. Args: args (argparse.Namespace): previously parsed arguments check (bool): if ``True`` check if config is valid Returns: config.Config: current config with requested profile selected Raises: SystemExit: 1 if ``profile`` or ``profile_id`` is no valid profile 2 if ``check`` is ``True`` and config is not configured """ cfg = config.Config(config_path = args.config, data_path = args.share_path) logger.debug('config file: %s' % cfg._LOCAL_CONFIG_PATH) logger.debug('share path: %s' % cfg._LOCAL_DATA_FOLDER) logger.debug('profiles: %s' % ', '.join('%s=%s' % (x, cfg.profileName(x)) for x in cfg.profiles())) if 'profile_id' in args and args.profile_id: if not cfg.setCurrentProfile(args.profile_id): logger.error('Profile-ID not found: %s' % args.profile_id) sys.exit(RETURN_ERR) if 'profile' in args and args.profile: if not cfg.setCurrentProfileByName(args.profile): logger.error('Profile not found: %s' % args.profile) sys.exit(RETURN_ERR) if check and not cfg.isConfigured(): logger.error('%(app)s is not configured!' %{'app': cfg.APP_NAME}) sys.exit(RETURN_NO_CFG) if 'checksum' in args: cfg.forceUseChecksum = args.checksum return cfg
[docs] def setQuiet(args): """ Redirect :py:data:`sys.stdout` to ``/dev/null`` if ``--quiet`` was set on commandline. Return the original :py:data:`sys.stdout` file object which can be used to print absolute necessary information. Args: args (argparse.Namespace): previously parsed arguments Returns: sys.stdout: default sys.stdout """ force_stdout = sys.stdout if args.quiet: # do not replace with subprocess.DEVNULL - will not work sys.stdout = open(os.devnull, 'w') atexit.register(sys.stdout.close) atexit.register(force_stdout.close) return force_stdout
[docs] class printLicense(argparse.Action): """ Print custom license """ def __init__(self, *args, **kwargs): super(printLicense, self).__init__(*args, **kwargs) def __call__(self, *args, **kwargs): license_path = pathlib.Path(tools.docPath()) / 'LICENSE' print(license_path.read_text('utf-8')) sys.exit(RETURN_OK)
[docs] class printDiagnostics(argparse.Action): """ Print information that is helpful for the support team to narrow down problems and bugs. The info is printed using the machine- and human-readable JSON format """ def __init__(self, *args, **kwargs): super(printDiagnostics, self).__init__(*args, **kwargs) def __call__(self, *args, **kwargs): diagnostics = collect_diagnostics() print(json.dumps(diagnostics, indent=4)) sys.exit(RETURN_OK)
[docs] def backup(args, force = True): """ Command for force taking a new snapshot. Args: args (argparse.Namespace): previously parsed arguments force (bool): take the snapshot even if it wouldn't need to or would be prevented (e.g. running on battery) Raises: SystemExit: 0 if successful, 1 if not """ setQuiet(args) printHeader() cfg = getConfig(args) ret = takeSnapshot(cfg, force) sys.exit(int(ret))
[docs] def backupJob(args): """ Command for taking a new snapshot in background. Mainly used for cronjobs. This will run the snapshot inside a daemon and detach from it. It will return immediately back to commandline. Args: args (argparse.Namespace): previously parsed arguments Raises: SystemExit: 0 """ cli.BackupJobDaemon(backup, args).start()
[docs] def shutdown(args): """ Command for shutting down the computer after the current snapshot has finished. Args: args (argparse.Namespace): previously parsed arguments Raises: SystemExit: 0 if successful; 1 if it failed either because there is no active snapshot for this profile or shutdown is not supported. """ setQuiet(args) printHeader() cfg = getConfig(args) sd = tools.ShutDown() if not sd.canShutdown(): logger.warning('Shutdown is not supported.') sys.exit(RETURN_ERR) instance = ApplicationInstance(cfg.takeSnapshotInstanceFile(), False) profile = '='.join((cfg.currentProfile(), cfg.profileName())) if not instance.busy(): logger.info('There is no active snapshot for profile %s. Skip shutdown.' %profile) sys.exit(RETURN_ERR) print('Shutdown is waiting for the snapshot in profile %s to end.\nPress CTRL+C to interrupt shutdown.\n' %profile) sd.activate_shutdown = True try: while instance.busy(): logger.debug('Snapshot is still active. Wait for shutdown.') sleep(5) except KeyboardInterrupt: print('Shutdown interrupted.') else: logger.info('Shutdown now.') sd.shutdown() sys.exit(RETURN_OK)
[docs] def snapshotsPath(args): """ Command for printing the full snapshot path of current profile. Args: args (argparse.Namespace): previously parsed arguments Raises: SystemExit: 0 """ force_stdout = setQuiet(args) cfg = getConfig(args) if args.keep_mount: _mount(cfg) if args.quiet: msg = '{}' else: msg = 'SnapshotsPath: {}' print(msg.format(cfg.snapshotsFullPath()), file=force_stdout) sys.exit(RETURN_OK)
[docs] def snapshotsList(args): """ Command for printing a list of all snapshots in current profile. Args: args (argparse.Namespace): previously parsed arguments Raises: SystemExit: 0 """ force_stdout = setQuiet(args) cfg = getConfig(args) _mount(cfg) if args.quiet: msg = '{}' else: msg = 'SnapshotID: {}' no_sids = True #use snapshots.listSnapshots instead of iterSnapshots because of sorting for sid in snapshots.listSnapshots(cfg, reverse = False): print(msg.format(sid), file=force_stdout) no_sids = False if no_sids: logger.error("There are no snapshots in '%s'" % cfg.profileName()) if not args.keep_mount: _umount(cfg) sys.exit(RETURN_OK)
[docs] def snapshotsListPath(args): """ Command for printing a list of all snapshots paths in current profile. Args: args (argparse.Namespace): previously parsed arguments Raises: SystemExit: 0 """ force_stdout = setQuiet(args) cfg = getConfig(args) _mount(cfg) if args.quiet: msg = '{}' else: msg = 'SnapshotPath: {}' no_sids = True #use snapshots.listSnapshots instead of iterSnapshots because of sorting for sid in snapshots.listSnapshots(cfg, reverse = False): print(msg.format(sid.path()), file=force_stdout) no_sids = False if no_sids: logger.error("There are no snapshots in '%s'" % cfg.profileName()) if not args.keep_mount: _umount(cfg) sys.exit(RETURN_OK)
[docs] def lastSnapshot(args): """ Command for printing the very last snapshot in current profile. Args: args (argparse.Namespace): previously parsed arguments Raises: SystemExit: 0 """ force_stdout = setQuiet(args) cfg = getConfig(args) _mount(cfg) sid = snapshots.lastSnapshot(cfg) if sid: if args.quiet: msg = '{}' else: msg = 'SnapshotID: {}' print(msg.format(sid), file=force_stdout) else: logger.error("There are no snapshots in '%s'" % cfg.profileName()) _umount(cfg) sys.exit(RETURN_OK)
[docs] def lastSnapshotPath(args): """ Command for printing the path of the very last snapshot in current profile. Args: args (argparse.Namespace): previously parsed arguments Raises: SystemExit: 0 """ force_stdout = setQuiet(args) cfg = getConfig(args) _mount(cfg) sid = snapshots.lastSnapshot(cfg) if sid: if args.quiet: msg = '{}' else: msg = 'SnapshotPath: {}' print(msg.format(sid.path()), file=force_stdout) else: logger.error("There are no snapshots in '%s'" % cfg.profileName()) if not args.keep_mount: _umount(cfg) sys.exit(RETURN_OK)
[docs] def unmount(args): """ Command for unmounting all filesystems. Args: args (argparse.Namespace): previously parsed arguments Raises: SystemExit: 0 """ setQuiet(args) cfg = getConfig(args) _mount(cfg) _umount(cfg) sys.exit(RETURN_OK)
[docs] def benchmarkCipher(args): """ Command for transferring a file with scp to remote host with all available ciphers and print its speed and time. Args: args (argparse.Namespace): previously parsed arguments Raises: SystemExit: 0 """ setQuiet(args) printHeader() cfg = getConfig(args) if cfg.snapshotsMode() in ('ssh', 'ssh_encfs'): ssh = sshtools.SSH(cfg) ssh.benchmarkCipher(args.FILE_SIZE) sys.exit(RETURN_OK) else: logger.error("SSH is not configured for profile '%s'!" % cfg.profileName()) sys.exit(RETURN_ERR)
[docs] def pwCache(args): """ Command for starting password cache daemon. Args: args (argparse.Namespace): previously parsed arguments Raises: SystemExit: 0 if daemon is running, 1 if not """ force_stdout = setQuiet(args) printHeader() cfg = getConfig(args) ret = RETURN_OK daemon = password.Password_Cache(cfg) if args.ACTION and args.ACTION != 'status': getattr(daemon, args.ACTION)() elif args.ACTION == 'status': print('%(app)s Password Cache: ' % {'app': cfg.APP_NAME}, end=' ', file = force_stdout) if daemon.status(): print(cli.bcolors.OKGREEN + 'running' + cli.bcolors.ENDC, file = force_stdout) ret = RETURN_OK else: print(cli.bcolors.FAIL + 'not running' + cli.bcolors.ENDC, file = force_stdout) ret = RETURN_ERR else: daemon.run() sys.exit(ret)
[docs] def decode(args): """ Command for decoding paths given paths with 'encfsctl'. Will listen on stdin if no path was given. Args: args (argparse.Namespace): previously parsed arguments Raises: SystemExit: 0 """ force_stdout = setQuiet(args) cfg = getConfig(args) if cfg.snapshotsMode() not in ('local_encfs', 'ssh_encfs'): logger.error("Profile '%s' is not encrypted." % cfg.profileName()) sys.exit(RETURN_ERR) _mount(cfg) d = encfstools.Decode(cfg) if not args.PATH: while True: try: path = input() except EOFError: break if not path: break print(d.path(path), file = force_stdout) else: print('\n'.join(d.list(args.PATH)), file = force_stdout) d.close() _umount(cfg) sys.exit(RETURN_OK)
[docs] def remove(args, force = False): """ Command for removing snapshots. Args: args (argparse.Namespace): previously parsed arguments force (bool): don't ask before removing (BE CAREFUL!) Raises: SystemExit: 0 """ setQuiet(args) printHeader() cfg = getConfig(args) _mount(cfg) cli.remove(cfg, args.SNAPSHOT_ID, force) _umount(cfg) sys.exit(RETURN_OK)
[docs] def removeAndDoNotAskAgain(args): """ Command for removing snapshots without asking before remove (BE CAREFUL!) Args: args (argparse.Namespace): previously parsed arguments Raises: SystemExit: 0 """ remove(args, True)
[docs] def smartRemove(args): """ Command for running Smart-Remove from Terminal. Args: args (argparse.Namespace): previously parsed arguments Raises: SystemExit: 0 if okay 2 if Smart-Remove is not configured """ setQuiet(args) printHeader() cfg = getConfig(args) sn = snapshots.Snapshots(cfg) enabled, keep_all, keep_one_per_day, keep_one_per_week, keep_one_per_month = cfg.smartRemove() if enabled: _mount(cfg) del_snapshots = sn.smartRemoveList(datetime.today(), keep_all, keep_one_per_day, keep_one_per_week, keep_one_per_month) logger.info('Smart Remove will remove {} snapshots'.format(len(del_snapshots))) sn.smartRemove(del_snapshots, log = logger.info) _umount(cfg) sys.exit(RETURN_OK) else: logger.error('Smart Remove is not configured.') sys.exit(RETURN_NO_CFG)
[docs] def restore(args): """ Command for restoring files from snapshots. Args: args (argparse.Namespace): previously parsed arguments Raises: SystemExit: 0 """ setQuiet(args) printHeader() cfg = getConfig(args) _mount(cfg) if cfg.backupOnRestore() and not args.no_local_backup: backup = True else: backup = args.local_backup cli.restore(cfg, args.SNAPSHOT_ID, args.WHAT, args.WHERE, delete = args.delete, backup = backup, only_new = args.only_new) _umount(cfg) sys.exit(RETURN_OK)
[docs] def checkConfig(args): """ Command for checking the config file. Args: args (argparse.Namespace): previously parsed arguments Raises: SystemExit: 0 if config is okay, 1 if not """ force_stdout = setQuiet(args) printHeader() cfg = getConfig(args) if cli.checkConfig(cfg, crontab = not args.no_crontab): print("\nConfig %(cfg)s profile '%(profile)s' is fine." % {'cfg': cfg._LOCAL_CONFIG_PATH, 'profile': cfg.profileName()}, file = force_stdout) sys.exit(RETURN_OK) else: print("\nConfig %(cfg)s profile '%(profile)s' has errors." % {'cfg': cfg._LOCAL_CONFIG_PATH, 'profile': cfg.profileName()}, file = force_stdout) sys.exit(RETURN_ERR)
if __name__ == '__main__': startApp()