Source code for cli

# -*- coding: utf-8 -*-
#    Back In Time
#    Copyright (C) 2012-2022 Germar Reitze
#
#    This program is free software; you can redistribute it and/or modify
#    it under the terms of the GNU General Public License as published by
#    the Free Software Foundation; either version 2 of the License, or
#    (at your option) any later version.
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.
#
#    You should have received a copy of the GNU General Public License along
#    with this program; if not, write to the Free Software Foundation, Inc.,
#    51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.

import os
import sys

import tools
import snapshots
import bcolors

[docs] def restore(cfg, snapshot_id = None, what = None, where = None, **kwargs): if what is None: what = input('File to restore: ') what = tools.preparePath(os.path.abspath(os.path.expanduser(what))) if where is None: where = input('Restore to (empty for original path): ') if where: where = tools.preparePath(os.path.abspath(os.path.expanduser(where))) snapshotsList = snapshots.listSnapshots(cfg) sid = selectSnapshot(snapshotsList, cfg, snapshot_id, 'SnapshotID to restore') print('') RestoreDialog(cfg, sid, what, where, **kwargs).run()
[docs] def remove(cfg, snapshot_ids = None, force = None): snapshotsList = snapshots.listSnapshots(cfg) if not snapshot_ids: snapshot_ids = (None,) sids = [selectSnapshot(snapshotsList, cfg, sid, 'SnapshotID to remove') for sid in snapshot_ids] if not force: print('Do you really want to remove these snapshots?') [print(sid.displayName) for sid in sids] if not 'yes' == input('(no/yes): '): return s = snapshots.Snapshots(cfg) [s.remove(sid) for sid in sids]
[docs] def checkConfig(cfg, crontab = True): import mount from exceptions import MountException def announceTest(): print() print(frame(test)) def failed(): print(test + ': ' + bcolors.FAIL + 'failed' + bcolors.ENDC) def okay(): print(test + ': ' + bcolors.OKGREEN + 'done' + bcolors.ENDC) def errorHandler(msg): print(bcolors.WARNING + 'WARNING: ' + bcolors.ENDC + msg) cfg.setErrorHandler(errorHandler) mode = cfg.snapshotsMode() if cfg.SNAPSHOT_MODES[mode][0] is not None: #preMountCheck test = 'Run mount tests' announceTest() mnt = mount.Mount(cfg = cfg, tmp_mount = True) try: mnt.preMountCheck(mode = mode, first_run = True) except MountException as ex: failed() print(str(ex)) return False okay() #okay, lets try to mount test = 'Mount' announceTest() try: hash_id = mnt.mount(mode = mode, check = False) except MountException as ex: failed() print(str(ex)) return False okay() test = 'Check/prepare snapshot path' announceTest() snapshots_path = cfg.snapshotsPath(mode = mode, tmp_mount = True) if not cfg.setSnapshotsPath(snapshots_path, mode = mode): failed() return False okay() #umount if not cfg.SNAPSHOT_MODES[mode][0] is None: test = 'Unmount' announceTest() try: mnt.umount(hash_id = hash_id) except MountException as ex: failed() print(str(ex)) return False okay() test = 'Check config' announceTest() if not cfg.checkConfig(): failed() return False okay() if crontab: test = 'Install crontab' announceTest() if not cfg.setupCron(): failed() return False okay() return True
[docs] def selectSnapshot(snapshotsList, cfg, snapshot_id = None, msg = 'SnapshotID'): """ check if given snapshot is valid. If not print a list of all snapshots and ask to choose one """ len_snapshots = len(snapshotsList) if not snapshot_id is None: try: sid = snapshots.SID(snapshot_id, cfg) if sid in snapshotsList: return sid else: print('SnapshotID %s not found.' % snapshot_id) except ValueError: try: index = int(snapshot_id) return snapshotsList[index] except (ValueError, IndexError): print('Invalid SnaphotID index: %s' % snapshot_id) snapshot_id = None columns = (terminalSize()[1] - 25) // 26 + 1 rows = len_snapshots // columns if len_snapshots % columns > 0: rows += 1 print('SnapshotID\'s:') for row in range(rows): line = [] for column in range(columns): index = row + column * rows if index > len_snapshots - 1: continue line.append('{i:>4}: {s}'.format(i = index, s = snapshotsList[index])) print(' '.join(line)) print('') while snapshot_id is None: try: index = int(input(msg + ' (0 - %d): ' % (len_snapshots - 1))) snapshot_id = snapshotsList[index] except (ValueError, IndexError): print('Invalid Input') continue return snapshot_id
[docs] def terminalSize(): """ get terminal size """ for fd in (sys.stdin, sys.stdout, sys.stderr): try: import fcntl, termios, struct return [int(x) for x in struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ, '1234'))] except: pass return [24, 80]
[docs] def frame(msg, size = 32): ret = ' +' + '-' * size + '+\n' ret += ' |' + msg.center(size) + '|\n' ret += ' +' + '-' * size + '+' return ret
[docs] class RestoreDialog(object): def __init__(self, cfg, sid, what, where, **kwargs): self.config = cfg self.sid = sid self.what = what self.where = where self.kwargs = kwargs self.logFile = self.config.restoreLogFile() if os.path.exists(self.logFile): os.remove(self.logFile)
[docs] def callback(self, line, *params): if not line: return print(line) with open(self.logFile, 'a') as log: log.write(line + '\n')
[docs] def run(self): s = snapshots.Snapshots(self.config) s.restore(self.sid, self.what, self.callback, self.where, **self.kwargs) print('\nLog saved to %s' % self.logFile)
[docs] class BackupJobDaemon(tools.Daemon): def __init__(self, func, args): super(BackupJobDaemon, self).__init__() self.func = func self.args = args
[docs] def run(self): self.func(self.args, False)