Source code for tools

"""Collection of helper functions not fitting to other modules.
"""
# Back In Time
# Copyright (C) 2008-2022 Oprea Dan, Bart de Koning, Richard Bailey,
# Germar Reitze, Taylor Raack
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
import os
import sys
import pathlib
import subprocess
import shlex
import signal
import re
import errno
import gzip
import tempfile
import gettext
try:
    from collections.abc import MutableSet
except ImportError:
    from collections import MutableSet
import hashlib
import ipaddress
import atexit
from datetime import datetime
from packaging.version import Version
from time import sleep

import logger

# Try to import keyring
is_keyring_available = False
try:
    # Jan 4, 2024 aryoda: The env var BIT_USE_KEYRING is neither documented
    #                     anywhere nor used at all in the code.
    #                     Via "git blame" I have found a commit message saying:
    #                     "block subsequent 'import keyring' if it failed once"
    #                     So I assume it is an internal temporary env var only.
    # Note: os.geteuid() is used instead of tools.isRoot() here
    #       because the latter is still not available here in the global
    #       module code.
    if os.getenv('BIT_USE_KEYRING', 'true') == 'true' and os.geteuid() != 0:
        import keyring
        from keyring import backend
        import keyring.util.platform_
        is_keyring_available = True
except Exception as e:
    is_keyring_available = False
    # block subsequent 'import keyring' if it failed once before
    os.putenv('BIT_USE_KEYRING', 'false')
    logger.warning(f"'import keyring' failed with: {repr(e)}")

# getting dbus imports to work in Travis CI is a huge pain
# use conditional dbus import
ON_TRAVIS = os.environ.get('TRAVIS', 'None').lower() == 'true'
ON_RTD = os.environ.get('READTHEDOCS', 'None').lower() == 'true'

try:
    import dbus
except ImportError:
    if ON_TRAVIS or ON_RTD:
        # python-dbus doesn't work on Travis yet.
        dbus = None
    else:
        raise

import configfile
import bcolors
from applicationinstance import ApplicationInstance
from exceptions import Timeout, InvalidChar, InvalidCmd, LimitExceeded, PermissionDeniedByPolicy
import languages

DISK_BY_UUID = '/dev/disk/by-uuid'

# |-----------------|
# | Handling paths  |
# |-----------------|

[docs] def sharePath(): """Get path where Back In Time is installed. This is similar to $XDG_DATA_DIRS (XDG Base Directory Specification). If running from source return default ``/usr/share``. Returns: str: share path like:: /usr/share /usr/local/share /opt/usr/share """ share = os.path.abspath( os.path.join(__file__, os.pardir, os.pardir, os.pardir) ) if os.path.basename(share) == 'share': return share return '/usr/share'
[docs] def backintimePath(*path): """ Get path inside 'backintime' install folder. Args: *path (str): paths that should be joined to 'backintime' Returns: str: 'backintime' child path like:: /usr/share/backintime/common /usr/share/backintime/qt """ return os.path.abspath(os.path.join(__file__, os.pardir, os.pardir, *path))
[docs] def docPath(): """Not sure what this path is about. """ path = pathlib.Path(sharePath()) / 'doc' / 'backintime-common' # Dev note (buhtz, aryoda, 2024-02): # This piece of code originally resisted in Config.__init__() and was # introduced by Dan in 2008. The reason for the existence of this "if" # is unclear. # Makefile (in common) does only install into share/doc/backintime-common # but never into the the backintime "binary" path so I guess the if is # a) either a distro-specific exception for a distro package that # (manually?) installs the LICENSE into another path # b) or a left-over from old code where the LICENSE was installed # differently... license_file = pathlib.Path(backintimePath()) / 'LICENSE' if license_file.exists(): path = backintimePath() return str(path)
# |---------------------------------------------------| # | Internationalization (i18n) & localization (L10n) | # |---------------------------------------------------| _GETTEXT_DOMAIN = 'backintime' _GETTEXT_LOCALE_DIR = pathlib.Path(sharePath()) / 'locale'
[docs] def _determine_current_used_language_code(translation, language_code): """Return the language code used by GNU gettext for real. Args: translation(gettext.NullTranslations): The translation installed. language_code(str): Configured language code. The used language code can differ from the one in Back In Times config file and from the current systems locale. It is necessary because of situations where the language is not explicit setup in Back In Time config file and GNU gettext do try to find and use a language file for the current systems locale. But even this can fail and the fallback (source language "en") is used or an alternative locale. """ try: # The field "language" is rooted in header of the po-file. current_used_language_code = translation.info()['language'] except KeyError: # Workaround: # BIT versions 1.3.3 or older don't have the "language" field in the # header of their po-files. # The approach is to extract the language code from the full filepath # of the currently used mo-file. # Get the filepath of the used mo-file mo_file_path = gettext.find( domain=_GETTEXT_DOMAIN, localedir=_GETTEXT_LOCALE_DIR, languages=[language_code, ] if language_code else None, ) # Extract the language code form that path if mo_file_path: mo_file_path = pathlib.Path(mo_file_path) # e.g /usr/share/locale/de/LC_MESSAGES/backintime.mo # ^^ current_used_language_code = mo_file_path.relative_to( _GETTEXT_LOCALE_DIR).parts[0] else: # Workaround: Happens when LC_ALL=C, which in BIT context mean # its source language in English. current_used_language_code = 'en' return current_used_language_code
[docs] def initiate_translation(language_code): """Initiate Class-based API of GNU gettext. Args: language_code(str): Language code to use (based on ISO-639). It installs the ``_()`` (and ``ngettext()`` for plural forms) in the ``builtins`` namespace and eliminates the need to ``import gettext`` and declare ``_()`` in each module. The systems current local is used if the language code is None. """ if language_code: logger.debug(f'Language code "{language_code}".') else: logger.debug('No language code. Use systems current locale.') translation = gettext.translation( domain=_GETTEXT_DOMAIN, localedir=_GETTEXT_LOCALE_DIR, languages=[language_code, ] if language_code else None, fallback=True ) translation.install(names=['ngettext']) return _determine_current_used_language_code(translation, language_code)
[docs] def get_available_language_codes(): """Return language codes available in the current installation. The filesystem is searched for ``backintime.mo`` files and the language code is extracted from the full path of that files. Return: List of language codes. """ # full path of one mo-file # e.g. /usr/share/locale/de/LC_MESSAGES/backintime.mo mo = gettext.find(domain=_GETTEXT_DOMAIN, localedir=_GETTEXT_LOCALE_DIR) if mo: mo = pathlib.Path(mo) else: # Workaround. This happens if LC_ALL=C and BIT don't use an explicit # language. Should be re-design. mo = _GETTEXT_LOCALE_DIR / 'xy' / 'LC_MESSAGES' / 'backintime.mo' # e.g. de/LC_MESSAGES/backintime.mo mo = mo.relative_to(_GETTEXT_LOCALE_DIR) # e.g. */LC_MESSAGES/backintime.mo mo = pathlib.Path('*') / pathlib.Path(*mo.parts[1:]) mofiles = _GETTEXT_LOCALE_DIR.rglob(str(mo)) return [p.relative_to(_GETTEXT_LOCALE_DIR).parts[0] for p in mofiles]
[docs] def get_language_names(language_code): """Return a list with language names in three different flavors. Language codes from `get_available_language_codes()` are combined with `languages.language_names` to prepare the list. Args: language_code (str): Usually the current language used by Back In Time. Returns: A dictionary indexed by language codes with 3-item tuples as values. Each tuple contain three representations of the same language: ``language_code`` (usually the current locales language), the language itself (native) and in English (the source language); e.g. ``ja`` (Japanese) for ``de`` (German) locale is ``('Japanisch', '日本語', 'Japanese')``. """ result = {} codes = ['en'] + get_available_language_codes() for c in codes: try: # A dict with one specific language and how its name is # represented in all other languages. # e.g. "Japanese" in "de" is "Japanisch" # e.g. "Deutsch" in "es" is "alemán" lang = languages.names[c] except KeyError: names = None else: names = ( # in currents locale language lang[language_code], # native lang['_native'], # in English (source language) lang['en'] ) result[c] = names return result
[docs] def get_native_language_and_completeness(language_code): """Return the language name in its native flavor and the completeness of its translation in percent. Args: language_code(str): The language code. Returns: A two-entry tuple with language name as string and a percent as integer. """ name = languages.names[language_code][language_code] completeness = languages.completeness[language_code] return (name, completeness)
# |------------------------------------| # | Miscellaneous, not categorized yet | # |------------------------------------|
[docs] def registerBackintimePath(*path): """ Add BackInTime path ``path`` to :py:data:`sys.path` so subsequent imports can discover them. Args: *path (str): paths that should be joined to 'backintime' Note: Duplicate in :py:func:`qt/qttools.py` because modules in qt folder would need this to actually import :py:mod:`tools`. """ path = backintimePath(*path) if not path in sys.path: sys.path.insert(0, path)
[docs] def runningFromSource(): """ Check if BackInTime is running from source (without installing). Returns: bool: ``True`` if BackInTime is running from source """ return os.path.isfile(backintimePath('common', 'backintime'))
[docs] def addSourceToPathEnviron(): """ Add 'backintime/common' path to 'PATH' environ variable. """ source = backintimePath('common') path = os.getenv('PATH') if path and source not in path.split(':'): os.environ['PATH'] = '%s:%s' %(source, path)
[docs] def get_git_repository_info(path=None, hash_length=None): """Return the current branch and last commit hash. About the length of a commit hash. There is no strict rule but it is common sense that 8 to 10 characters are enough to be unique. Credits: https://stackoverflow.com/a/51224861/4865723 Args: path (Path): Path with '.git' folder in (default is current working directory). cut_hash (int): Restrict length of commit hash. Returns: (dict): Dict with keys "branch" and "hash" if it is a git repo, otherwise an `None`. """ if not path: # Default is current working dir path = pathlib.Path.cwd() elif isinstance(path, str): # WORKAROUND until cmoplete migration to pathlib path = pathlib.Path(path) git_folder = path / '.git' if not git_folder.exists(): return None result = {} # branch name with (git_folder / 'HEAD').open('r') as handle: val = handle.read() if not val.startswith('ref: '): result['branch'] = '(detached HEAD)' result['hash'] = val else: result['branch'] = '/'.join(val.split('/')[2:]).strip() # commit hash with (git_folder / 'refs' / 'heads' / result['branch']) \ .open('r') as handle: result['hash'] = handle.read().strip() if hash_length: result['hash'] = result['hash'][:hash_length] return result
[docs] def readFile(path, default=None): """ Read the file in ``path`` or its '.gz' compressed variant and return its content or ``default`` if ``path`` does not exist. Args: path (str): full path to file that should be read. '.gz' will be added automatically if the file is compressed default (str): default if ``path`` does not exist Returns: str: content of file in ``path`` """ ret_val = default try: if os.path.exists(path): with open(path) as f: ret_val = f.read() elif os.path.exists(path + '.gz'): with gzip.open(path + '.gz', 'rt') as f: ret_val = f.read() except: pass return ret_val
[docs] def readFileLines(path, default = None): """ Read the file in ``path`` or its '.gz' compressed variant and return its content as a list of lines or ``default`` if ``path`` does not exist. Args: path (str): full path to file that should be read. '.gz' will be added automatically if the file is compressed default (list): default if ``path`` does not exist Returns: list: content of file in ``path`` split by lines. """ ret_val = default try: if os.path.exists(path): with open(path) as f: ret_val = [x.rstrip('\n') for x in f.readlines()] elif os.path.exists(path + '.gz'): with gzip.open(path + '.gz', 'rt') as f: ret_val = [x.rstrip('\n') for x in f.readlines()] except: pass return ret_val
[docs] def checkCommand(cmd): """ Check if command ``cmd`` is a file in 'PATH' environ. Args: cmd (str): command Returns: bool: ``True`` if command ``cmd`` is in 'PATH' environ """ cmd = cmd.strip() if not cmd: return False if os.path.isfile(cmd): return True return not which(cmd) is None
[docs] def which(cmd): """ Get the fullpath of executable command ``cmd``. Works like command-line 'which' command. Args: cmd (str): command Returns: str: fullpath of command ``cmd`` or ``None`` if command is not available """ pathenv = os.getenv('PATH', '') path = pathenv.split(":") common = backintimePath('common') if runningFromSource() and common not in path: path.insert(0, common) for directory in path: fullpath = os.path.join(directory, cmd) if os.path.isfile(fullpath) and os.access(fullpath, os.X_OK): return fullpath return None
[docs] def makeDirs(path): """ Create directories ``path`` recursive and return success. Args: path (str): fullpath to directories that should be created Returns: bool: ``True`` if successful """ path = path.rstrip(os.sep) if not path: return False if os.path.isdir(path): return True else: try: os.makedirs(path) except Exception as e: logger.error("Failed to make dirs '%s': %s" %(path, str(e)), traceDepth = 1) return os.path.isdir(path)
[docs] def mkdir(path, mode = 0o755, enforce_permissions = True): """ Create directory ``path``. Args: path (str): full path to directory that should be created mode (int): numeric permission mode Returns: bool: ``True`` if successful """ if os.path.isdir(path): try: if enforce_permissions: os.chmod(path, mode) except: return False return True else: os.mkdir(path, mode) if mode & 0o002 == 0o002: #make file world (other) writable was requested #debian and ubuntu won't set o+w with os.mkdir #this will fix it os.chmod(path, mode) return os.path.isdir(path)
[docs] def pids(): """ List all PIDs currently running on the system. Returns: list: PIDs as int """ return [int(x) for x in os.listdir('/proc') if x.isdigit()]
[docs] def processStat(pid): """ Get the stat's of the process with ``pid``. Args: pid (int): Process Indicator Returns: str: stat from /proc/PID/stat """ try: with open('/proc/{}/stat'.format(pid), 'rt') as f: return f.read() except OSError as e: logger.warning('Failed to read process stat from {}: [{}] {}' .format(e.filename, e.errno, e.strerror)) return ''
[docs] def processPaused(pid): """ Check if process ``pid`` is paused (got signal SIGSTOP). Args: pid (int): Process Indicator Returns: bool: True if process is paused """ m = re.match(r'\d+ \(.+\) T', processStat(pid)) return bool(m)
[docs] def processName(pid): """ Get the name of the process with ``pid``. Args: pid (int): Process Indicator Returns: str: name of the process """ m = re.match(r'.*\((.+)\).*', processStat(pid)) if m: return m.group(1)
[docs] def processCmdline(pid): """ Get the cmdline (command that spawnd this process) of the process with ``pid``. Args: pid (int): Process Indicator Returns: str: cmdline of the process """ try: with open('/proc/{}/cmdline'.format(pid), 'rt') as f: return f.read().strip('\n') except OSError as e: logger.warning('Failed to read process cmdline from {}: [{}] {}'.format(e.filename, e.errno, e.strerror)) return ''
[docs] def pidsWithName(name): """ Get all processes currently running with name ``name``. Args: name (str): name of a process like 'python3' or 'backintime' Returns: list: PIDs as int """ # /proc/###/stat stores just the first 16 chars of the process name return [x for x in pids() if processName(x) == name[:15]]
[docs] def processExists(name): """ Check if process ``name`` is currently running. Args: name (str): name of a process like 'python3' or 'backintime' Returns: bool: ``True`` if there is a process running with ``name`` """ return len(pidsWithName(name)) > 0
[docs] def processAlive(pid): """ Check if the process with PID ``pid`` is alive. Args: pid (int): Process Indicator Returns: bool: ``True`` if the process with PID ``pid`` is alive Raises: ValueError: If ``pid`` is 0 because 'kill(0, SIG)' would send SIG to all processes """ if pid < 0: return False elif pid == 0: raise ValueError('invalid PID 0') else: try: os.kill(pid, 0) #this will raise an exception if the pid is not valid except OSError as err: if err.errno == errno.ESRCH: # ESRCH == No such process return False elif err.errno == errno.EPERM: # EPERM clearly means there's a process to deny access to return True else: raise else: return True
[docs] def checkXServer(): """ Check if there is a X11 server running on this system. Use ``is_Qt_working`` instead if you want to be sure that Qt is working. Returns: bool: ``True`` if X11 server is running """ # Note: Return values of xdpyinfo <> 0 are not clearly documented. # xdpyinfo does indeed return 1 if it prints # xdypinfo: unable to open display "..." # This seems to be undocumented (at least not in the man pages) # and the source is not obvious here: # https://cgit.freedesktop.org/xorg/app/xdpyinfo/tree/xdpyinfo.c if checkCommand('xdpyinfo'): proc = subprocess.Popen(['xdpyinfo'], stdout = subprocess.DEVNULL, stderr = subprocess.DEVNULL) proc.communicate() return proc.returncode == 0 else: return False
[docs] def is_Qt_working(systray_required=False): """ Check if the Qt GUI library is working (installed and configured) This function is contained in BiT CLI (not BiT Qt) to allow Qt diagnostics output even if the BiT Qt GUI is not installed. This function does NOT add a hard Qt dependency (just "probing") so it is OK to be in BiT CLI. Args: systray_required: Set to ``True`` if the systray of the desktop environment must be available too to consider Qt as "working" Returns: bool: ``True`` Qt can create a GUI ``False`` Qt fails (or the systray is not available if ``systray_required`` is ``True``) """ # Spawns a new process since it may crash with a SIGABRT and we # don't want to crash BiT if this happens... try: path = os.path.join(backintimePath("common"), "qt_probing.py") cmd = [sys.executable, path] if logger.DEBUG: cmd.append('--debug') with subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) as proc: std_output, error_output = proc.communicate(timeout=30) # to get the exit code # "timeout" fixes #1592 (qt_probing.py may hang as root): Kill after timeout logger.debug(f"Qt probing result: exit code {proc.returncode}") if proc.returncode != 2 or logger.DEBUG: # if some Qt parts are missing: Show details logger.debug(f"Qt probing stdout:\n{std_output}") logger.debug(f"Qt probing errout:\n{error_output}") return proc.returncode == 2 or (proc.returncode == 1 and systray_required is False) except FileNotFoundError: logger.error(f"Qt probing script not found: {cmd[0]}") raise # Fix for #1592 (qt_probing.py may hang as root): Kill after timeout except subprocess.TimeoutExpired: proc.kill() outs, errs = proc.communicate() logger.info("Qt probing sub process killed after timeout without response") logger.debug(f"Qt probing stdout:\n{outs}") logger.debug(f"Qt probing errout:\n{errs}") except Exception as e: logger.error(f"Error: {repr(e)}") raise
[docs] def preparePath(path): """ Removes trailing slash '/' from ``path``. Args: path (str): absolute path Returns: str: path ``path`` without trailing but with leading slash """ path = path.strip("/") path = os.sep + path return path
[docs] def powerStatusAvailable(): """ Check if org.freedesktop.UPower is available so that :py:func:`tools.onBattery` would return the correct power status. Returns: bool: ``True`` if :py:func:`tools.onBattery` can report power status """ if dbus: try: bus = dbus.SystemBus() proxy = bus.get_object('org.freedesktop.UPower', '/org/freedesktop/UPower') return 'OnBattery' in proxy.GetAll('org.freedesktop.UPower', dbus_interface = 'org.freedesktop.DBus.Properties') except dbus.exceptions.DBusException: pass return False
[docs] def onBattery(): """ Checks if the system is on battery power. Returns: bool: ``True`` if system is running on battery """ if dbus: try: bus = dbus.SystemBus() proxy = bus.get_object('org.freedesktop.UPower', '/org/freedesktop/UPower') return bool(proxy.Get('org.freedesktop.UPower', 'OnBattery', dbus_interface = 'org.freedesktop.DBus.Properties')) except dbus.exceptions.DBusException: pass return False
[docs] def rsyncCaps(data = None): """ Get capabilities of the installed rsync binary. This can be different from version to version and also on build arguments used when building rsync. Args: data (str): 'rsync --version' output. This is just for unittests. Returns: list: List of str with rsyncs capabilities """ if not data: proc = subprocess.Popen(['rsync', '--version'], stdout = subprocess.PIPE, universal_newlines = True) data = proc.communicate()[0] caps = [] #rsync >= 3.1 does provide --info=progress2 matchers = [r'rsync\s*version\s*(\d\.\d)', r'rsync\s*version\s*v(\d\.\d.\d)'] for matcher in matchers: m = re.match(matcher, data) if m and Version(m.group(1)) >= Version('3.1'): caps.append('progress2') break #all other capabilities are separated by ',' between #'Capabilities:' and '\n\n' m = re.match(r'.*Capabilities:(.+)\n\n.*', data, re.DOTALL) if not m: return caps for line in m.group(1).split('\n'): caps.extend([i.strip(' \n') for i in line.split(',') if i.strip(' \n')]) return caps
[docs] def rsyncPrefix(config, no_perms=True, use_mode=['ssh', 'ssh_encfs'], progress=True): """ Get rsync command and all args for creating a new snapshot. Args are based on current profile in ``config``. Args: config (config.Config): current config no_perms (bool): don't sync permissions (--no-p --no-g --no-o) if ``True``. :py:func:`config.Config.preserveAcl` == ``True`` or :py:func:`config.Config.preserveXattr` == ``True`` will overwrite this to ``False`` use_mode (list): if current mode is in this list add additional args for that mode progress (bool): add '--info=progress2' to show progress Returns: list: rsync command with all args but without --include, --exclude, source and destination """ caps = rsyncCaps() cmd = [] if config.nocacheOnLocal(): cmd.append('nocache') cmd.append('rsync') cmd.extend(( # recurse into directories '--recursive', # preserve modification times '--times', # preserve device files (super-user only) '--devices', # preserve special files '--specials', # preserve hard links '--hard-links', # numbers in a human-readable format '--human-readable', # use "new" argument protection '-s' )) if config.useChecksum() or config.forceUseChecksum: cmd.append('--checksum') if config.copyUnsafeLinks(): cmd.append('--copy-unsafe-links') if config.copyLinks(): cmd.append('--copy-links') else: cmd.append('--links') if config.oneFileSystem(): cmd.append('--one-file-system') if config.preserveAcl() and "ACLs" in caps: cmd.append('--acls') # preserve ACLs (implies --perms) no_perms = False if config.preserveXattr() and "xattrs" in caps: cmd.append('--xattrs') # preserve extended attributes no_perms = False if no_perms: cmd.extend(('--no-perms', '--no-group', '--no-owner')) else: cmd.extend(('--perms', # preserve permissions '--executability', # preserve executability '--group', # preserve group '--owner')) # preserve owner (super-user only) if progress and 'progress2' in caps: cmd.extend(('--info=progress2', '--no-inc-recursive')) if config.bwlimitEnabled(): cmd.append('--bwlimit=%d' % config.bwlimit()) if config.rsyncOptionsEnabled(): cmd.extend(shlex.split(config.rsyncOptions())) cmd.extend(rsyncSshArgs(config, use_mode)) return cmd
[docs] def rsyncSshArgs(config, use_mode=['ssh', 'ssh_encfs']): """ Get SSH args for rsync based on current profile in ``config``. Args: config (config.Config): Current config instance. use_mode (list): If the profiles current mode is in this list add additional args. Returns: list: List of rsync args related to SSH. """ cmd = [] mode = config.snapshotsMode() if mode in ['ssh', 'ssh_encfs'] and mode in use_mode: ssh = config.sshCommand(user_host=False, ionice=False, nice=False) cmd.append('--rsh=' + ' '.join(ssh)) if config.niceOnRemote() \ or config.ioniceOnRemote() \ or config.nocacheOnRemote(): rsync_path = '--rsync-path=' if config.niceOnRemote(): rsync_path += 'nice -n 19 ' if config.ioniceOnRemote(): rsync_path += 'ionice -c2 -n7 ' if config.nocacheOnRemote(): rsync_path += 'nocache ' rsync_path += 'rsync' cmd.append(rsync_path) return cmd
[docs] def rsyncRemove(config, run_local = True): """ Get rsync command and all args for removing snapshots with rsync. Args: config (config.Config): current config run_local (bool): if True and current mode is ``ssh`` or ``ssh_encfs`` this will add SSH options Returns: list: rsync command with all args """ cmd = ['rsync', '-a', '--delete', '-s'] if run_local: cmd.extend(rsyncSshArgs(config)) return cmd
#TODO: check if we really need this
[docs] def tempFailureRetry(func, *args, **kwargs): while True: try: return func(*args, **kwargs) except (os.error, IOError) as ex: if ex.errno == errno.EINTR: continue else: raise
[docs] def md5sum(path): """ Calculate md5sum for file in ``path``. Args: path (str): full path to file Returns: str: md5sum of file """ md5 = hashlib.md5() with open(path, 'rb') as f: while True: data = f.read(4096) if not data: break md5.update(data) return md5.hexdigest()
[docs] def checkCronPattern(s): """ Check if ``s`` is a valid cron pattern. Examples:: 0,10,13,15,17,20,23 */6 Args: s (str): pattern to check Returns: bool: ``True`` if ``s`` is a valid cron pattern """ if s.find(' ') >= 0: return False try: if s.startswith('*/'): if s[2:].isdigit() and int(s[2:]) <= 24: return True else: return False for i in s.split(','): if i.isdigit() and int(i) <= 24: continue else: return False return True except ValueError: return False
#TODO: check if this is still necessary
[docs] def checkHomeEncrypt(): """ Return ``True`` if users home is encrypted """ home = os.path.expanduser('~') if not os.path.ismount(home): return False if checkCommand('ecryptfs-verify'): try: subprocess.check_call(['ecryptfs-verify', '--home'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) except subprocess.CalledProcessError: pass else: return True if checkCommand('encfs'): proc = subprocess.Popen(['mount'], stdout=subprocess.PIPE, universal_newlines = True) mount = proc.communicate()[0] r = re.compile('^encfs on %s type fuse' % home) for line in mount.split('\n'): if r.match(line): return True return False
[docs] def envLoad(f): """ Load environ variables from file ``f`` into current environ. Do not overwrite existing environ variables. Args: f (str): full path to file with environ variables """ env = os.environ.copy() env_file = configfile.ConfigFile() env_file.load(f, maxsplit = 1) for key in env_file.keys(): value = env_file.strValue(key) if not value: continue if not key in list(env.keys()): os.environ[key] = value del(env_file)
[docs] def envSave(f): """ Save environ variables to file that are needed by cron to connect to keyring. This will only work if the user is logged in. Args: f (str): full path to file for environ variables """ env = os.environ.copy() env_file = configfile.ConfigFile() for key in ('GNOME_KEYRING_CONTROL', 'DBUS_SESSION_BUS_ADDRESS', \ 'DBUS_SESSION_BUS_PID', 'DBUS_SESSION_BUS_WINDOWID', \ 'DISPLAY', 'XAUTHORITY', 'GNOME_DESKTOP_SESSION_ID', \ 'KDE_FULL_SESSION'): if key in env: env_file.setStrValue(key, env[key]) env_file.save(f)
[docs] def keyringSupported(): """ Checks if a keyring (supported by BiT) is available Returns: bool: ``True`` if a supported keyring could be loaded """ if not is_keyring_available: logger.debug('No keyring due to import error.') return False keyring_config_file_folder = "Unknown" try: keyring_config_file_folder = keyring.util.platform_.config_root() except: pass logger.debug(f"Keyring config file folder: {keyring_config_file_folder}") # Determine the currently active backend try: # get_keyring() internally calls keyring.core.init_backend() # which fixes non-available backends for the first call. # See related issue #1321: # https://github.com/bit-team/backintime/issues/1321 # The module name is used instead of the class name # to show only the keyring name (not the technical name) displayName = keyring.get_keyring().__module__ except: displayName = str(keyring.get_keyring()) # technical class name! logger.debug("Available keyring backends:") try: for b in backend.get_all_keyring(): logger.debug(b) except Exception as e: logger.debug("Available backends cannot be listed: " + repr(e)) available_backends = [] # Create a list of installed backends that BiT supports (white-listed). # This is done by trying to put the meta classes ("class definitions", # NOT instances of the class itself!) of the supported backends # into the "backends" list backends_to_check = [ (keyring.backends, ['SecretService', 'Keyring']), (keyring.backends, ['Gnome', 'Keyring']), (keyring.backends, ['kwallet', 'Keyring']), (keyring.backends, ['kwallet', 'DBusKeyring']), (keyring.backend, ['SecretServiceKeyring']), (keyring.backend, ['GnomeKeyring']), (keyring.backend, ['KDEWallet']), # See issue #1410: ChainerBackend is now supported to solve the # problem of configuring the used backend since it iterates over all # of them and is to be the default backend now. Please read the issue # details to understand the unwanted side-effects the chainer could # bring with it. # See also: # https://github.com/jaraco/keyring/blob/977ed03677bb0602b91f005461ef3dddf01a49f6/keyring/backends/chainer.py#L11 # noqa (keyring.backends, ('chainer', 'ChainerBackend')), ] for backend_package, backends in backends_to_check: result = backend_package # e.g. keyring.backends try: # Load the backend step-by-step. # e.g. When the target is "keyring.backends.Gnome.Keyring" then in # a first step "Gnome" part is loaded first and if successful the # "keyring" part. for b in backends: result = getattr(result, b) except AttributeError as err: # Debug message if backend is not available. logger.debug('Metaclass {}.{} not found: {}' .format(backend_package.__name__, '.'.join(backends), repr(err))) else: # Remember the backend class (not an instance) as available. available_backends.append(result) logger.debug("Available supported backends: " + repr(available_backends)) if available_backends and isinstance(keyring.get_keyring(), tuple(available_backends)): logger.debug("Found appropriate keyring '{}'".format(displayName)) return True logger.debug(f"No appropriate keyring found. '{displayName}' can't be " "used with BackInTime.") logger.debug("See https://github.com/bit-team/backintime on how to fix " "this by creating a keyring config file.") return False
[docs] def password(*args): if is_keyring_available: return keyring.get_password(*args) return None
[docs] def setPassword(*args): if is_keyring_available: return keyring.set_password(*args) return False
[docs] def mountpoint(path): """ Get the mountpoint of ``path``. If your HOME is on a separate partition mountpoint('/home/user/foo') would return '/home'. Args: path (str): full path Returns: str: mountpoint of the filesystem """ path = os.path.realpath(os.path.abspath(path)) while path != os.path.sep: if os.path.ismount(path): return path path = os.path.abspath(os.path.join(path, os.pardir)) return path
[docs] def decodeOctalEscape(s): """ Decode octal-escaped characters with its ASCII dependence. For example '\040' will be a space ' ' Args: s (str): string with or without octal-escaped characters Returns: str: human readable string """ def repl(m): return chr(int(m.group(1), 8)) return re.sub(r'\\(\d{3})', repl, s)
[docs] def mountArgs(path): """ Get all /etc/mtab args for the filesystem of ``path`` as a list. Example:: [DEVICE, MOUNTPOINT, FILESYSTEM_TYPE, OPTIONS, DUMP, PASS] ['/dev/sda3', '/', 'ext4', 'defaults', '0', '0'] ['/dev/sda1', '/boot', 'ext4', 'defaults', '0', '0'] Args: path (str): full path Returns: list: mount args """ mp = mountpoint(path) with open('/etc/mtab', 'r') as mounts: for line in mounts: args = line.strip('\n').split(' ') if len(args) >= 2: args[1] = decodeOctalEscape(args[1]) if args[1] == mp: return args return None
[docs] def device(path): """ Get the device for the filesystem of ``path``. Example:: /dev/sda1 /dev/mapper/vglinux proc Args: path (str): full path Returns: str: device """ args = mountArgs(path) if args: return args[0] return None
[docs] def filesystem(path): """ Get the filesystem type for the filesystem of ``path``. Args: path (str): full path Returns: str: filesystem """ args = mountArgs(path) if args and len(args) >= 3: return args[2] return None
[docs] def _uuidFromDev_via_filesystem(dev): """Get the UUID for the block device ``dev`` from ``/dev/disk/by-uuid`` in the filesystem. Args: dev (pathlib.Path): The block device path (e.g. ``/dev/sda1``). Returns: str: The UUID or ``None`` if nothing found. """ # /dev/disk/by-uuid path_DISK_BY_UUID = pathlib.Path(DISK_BY_UUID) if not path_DISK_BY_UUID.exists(): return None # Each known uuid for uuid_symlink in path_DISK_BY_UUID.glob('*'): # Resolve the symlink (get it's target) to get the real device name # and compare it with the device we are looking for if dev == uuid_symlink.resolve(): # e.g. 'c7aca0a7-89ed-43f0-a4f9-c744dfe673e0' return uuid_symlink.name # Nothing found return None
[docs] def _uuidFromDev_via_blkid_command(dev): """Get the UUID for the block device ``dev`` via the extern command ``blkid``. Hint: On most systems the ``blkid`` command is available only for the super-user (e.g. via ``sudo``). Args: dev (pathlib.Path): The block device path (e.g. ``/dev/sda1``). Returns: str: The UUID or ``None`` if nothing found. """ # Call "blkid" command try: # If device does not exist, blkid will exit with a non-zero code output = subprocess.check_output(['blkid', dev], stderr = subprocess.DEVNULL, universal_newlines=True) except (subprocess.CalledProcessError, FileNotFoundError): return None # Parse the commands output for a UUID try: return re.findall(r'.*\sUUID=\"([^\"]*)\".*', output)[0] except IndexError: # nothing found via the regex pattern pass return None
[docs] def _uuidFromDev_via_udevadm_command(dev): """Get the UUID for the block device ``dev`` via the extern command ``udevadm``. Args: dev (pathlib.Path): The block device path (e.g. ``/dev/sda1``). Returns: str: The UUID or ``None`` if nothing found. """ # Call "udevadm" command try: output = subprocess.check_output(['udevadm', 'info', f'--name={dev}'], stderr = subprocess.DEVNULL, universal_newlines=True) except (subprocess.CalledProcessError, FileNotFoundError): return None # Parse the commands output for a UUID try: return re.findall(r'.*?ID_FS_UUID=(\S+)', output)[0] except IndexError: # nothing found via the regex pattern pass return None
[docs] def uuidFromDev(dev): """ Get the UUID for the block device ``dev``. Args: dev (str, pathlib.Path): block device path Returns: str: UUID """ # handle Path objects only if not isinstance(dev, pathlib.Path): dev = pathlib.Path(dev) if dev.exists(): dev = dev.resolve() # when /dev/sda1 is a symlink # Look at /dev/disk/by-uuid/ uuid = _uuidFromDev_via_filesystem(dev) if uuid: return uuid # Try extern command "blkid" uuid = _uuidFromDev_via_blkid_command(dev) if uuid: return uuid # "dev" doesn't exist in the filesystem # Try "udevadm" command at the end return _uuidFromDev_via_udevadm_command(dev)
[docs] def uuidFromPath(path): """ Get the UUID for the for the filesystem of ``path``. Args: path (str): full path Returns: str: UUID """ return uuidFromDev(device(path))
[docs] def filesystemMountInfo(): """ Get a dict of mount point string -> dict of filesystem info for entire system. Returns: dict: {MOUNTPOINT: {'original_uuid': UUID}} """ # There may be multiple mount points inside of the root (/) mount, so # iterate over mtab to find all non-special mounts. with open('/etc/mtab', 'r') as mounts: return {items[1]: {'original_uuid': uuidFromDev(items[0])} for items in [mount_line.strip('\n').split(' ')[:2] for mount_line in mounts] if uuidFromDev(items[0]) != None}
[docs] def syncfs(): """ Sync any data buffered in memory to disk. Returns: bool: ``True`` if successful """ if checkCommand('sync'): return(Execute(['sync']).run() == 0)
[docs] def isRoot(): """ Check if we are root. Returns: bool: ``True`` if we are root """ # The EUID (Effective UID) may be different from the UID (user ID) # in case of SetUID or using "sudo" (where EUID is "root" and UID # is the original user who executed "sudo"). return os.geteuid() == 0
[docs] def usingSudo(): """ Check if 'sudo' was used to start this process. Returns: bool: ``True`` if the process was started with sudo """ return isRoot() and os.getenv('HOME', '/root') != '/root'
re_wildcard = re.compile(r'(?:\[|\]|\?)') re_asterisk = re.compile(r'\*') re_separate_asterisk = re.compile(r'(?:^\*+[^/\*]|[^/\*]\*+[^/\*]|[^/\*]\*+|\*+[^/\*]|[^/\*]\*+$)')
[docs] def patternHasNotEncryptableWildcard(pattern): """ Check if ``pattern`` has wildcards ``[ ] ? *``. but return ``False`` for ``foo/*``, ``foo/*/bar``, ``*/bar`` or ``**/bar`` Args: pattern (str): path or pattern to check Returns: bool: ``True`` if ``pattern`` has wildcards ``[ ] ? *`` but ``False`` if wildcard look like ``foo/*``, ``foo/*/bar``, ``*/bar`` or ``**/bar`` """ if not re_wildcard.search(pattern) is None: return True if not re_asterisk is None and not re_separate_asterisk.search(pattern) is None: return True return False
BIT_TIME_FORMAT = '%Y%m%d %H%M' ANACRON_TIME_FORMAT = '%Y%m%d'
[docs] def readTimeStamp(fname): """ Read date string from file ``fname`` and try to return datetime. Args: fname (str): full path to timestamp file Returns: datetime.datetime: date from timestamp file """ if not os.path.exists(fname): logger.debug("no timestamp in '%(file)s'" % {'file': fname}) return with open(fname, 'r') as f: s = f.read().strip('\n') for i in (ANACRON_TIME_FORMAT, BIT_TIME_FORMAT): try: stamp = datetime.strptime(s, i) logger.debug("read timestamp '%(time)s' from file '%(file)s'" % {'time': stamp, 'file': fname}) return stamp except ValueError: pass
[docs] def writeTimeStamp(fname): """ Write current date and time into file ``fname``. Args: fname (str): full path to timestamp file """ now = datetime.now().strftime(BIT_TIME_FORMAT) logger.debug("write timestamp '%(time)s' into file '%(file)s'" % {'time': now, 'file': fname}) makeDirs(os.path.dirname(fname)) with open(fname, 'w') as f: f.write(now)
INHIBIT_LOGGING_OUT = 1 INHIBIT_USER_SWITCHING = 2 INHIBIT_SUSPENDING = 4 INHIBIT_IDLE = 8 INHIBIT_DBUS = ( {'service': 'org.gnome.SessionManager', 'objectPath': '/org/gnome/SessionManager', 'methodSet': 'Inhibit', 'methodUnSet': 'Uninhibit', 'interface': 'org.gnome.SessionManager', 'arguments': (0, 1, 2, 3) }, {'service': 'org.mate.SessionManager', 'objectPath': '/org/mate/SessionManager', 'methodSet': 'Inhibit', 'methodUnSet': 'Uninhibit', 'interface': 'org.mate.SessionManager', 'arguments': (0, 1, 2, 3) }, {'service': 'org.freedesktop.PowerManagement', 'objectPath': '/org/freedesktop/PowerManagement/Inhibit', 'methodSet': 'Inhibit', 'methodUnSet': 'UnInhibit', 'interface': 'org.freedesktop.PowerManagement.Inhibit', 'arguments': (0, 2) })
[docs] def inhibitSuspend(app_id = sys.argv[0], toplevel_xid = None, reason = 'take snapshot', flags = INHIBIT_SUSPENDING | INHIBIT_IDLE): """ Prevent machine to go to suspend or hibernate. Returns the inhibit cookie which is used to end the inhibitor. """ if ON_TRAVIS or dbus is None: # no suspend on travis (no dbus either) return # Fixes #1592 (BiT hangs as root when trying to establish a dbus user session connection) # Side effect: In BiT <= 1.4.1 root still tried to connect to the dbus user session # and it may have worked sometimes (without logging we don't know) # so as root suspend can no longer inhibited. if isRoot(): logger.debug("Inhibit Suspend failed because BIT was started as root.") return if not app_id: app_id = 'backintime' try: if not toplevel_xid: toplevel_xid = 0 except IndexError: toplevel_xid = 0 for dbus_props in INHIBIT_DBUS: try: #connect directly to the socket instead of dbus.SessionBus because #the dbus.SessionBus was initiated before we loaded the environ #variables and might not work if 'DBUS_SESSION_BUS_ADDRESS' in os.environ: bus = dbus.bus.BusConnection(os.environ['DBUS_SESSION_BUS_ADDRESS']) else: bus = dbus.SessionBus() # This code may hang forever (if BiT is run as root via cron job and no user is logged in). See #1592 interface = bus.get_object(dbus_props['service'], dbus_props['objectPath']) proxy = interface.get_dbus_method(dbus_props['methodSet'], dbus_props['interface']) cookie = proxy(*[(app_id, dbus.UInt32(toplevel_xid), reason, dbus.UInt32(flags))[i] for i in dbus_props['arguments']]) logger.debug('Inhibit Suspend started. Reason: {}'.format(reason)) return (cookie, bus, dbus_props) except dbus.exceptions.DBusException: pass logger.warning('Inhibit Suspend failed.')
[docs] def unInhibitSuspend(cookie, bus, dbus_props): """ Release inhibit. """ assert isinstance(cookie, int), 'cookie is not int type: %s' % cookie assert isinstance(bus, dbus.bus.BusConnection), 'bus is not dbus.bus.BusConnection type: %s' % bus assert isinstance(dbus_props, dict), 'dbus_props is not dict type: %s' % dbus_props try: interface = bus.get_object(dbus_props['service'], dbus_props['objectPath']) proxy = interface.get_dbus_method(dbus_props['methodUnSet'], dbus_props['interface']) proxy(cookie) logger.debug('Release inhibit Suspend') return None except dbus.exceptions.DBusException: logger.warning('Release inhibit Suspend failed.') return (cookie, bus, dbus_props)
[docs] def readCrontab(): """ Read users crontab. Returns: list: crontab lines """ cmd = ['crontab', '-l'] if not checkCommand(cmd[0]): logger.debug('crontab not found.') return [] else: proc = subprocess.Popen(cmd, stdout = subprocess.PIPE, stderr = subprocess.PIPE, universal_newlines = True) out, err = proc.communicate() if proc.returncode or err: logger.error('Failed to get crontab lines: %s, %s' %(proc.returncode, err)) return [] else: crontab = [x.strip() for x in out.strip('\n').split('\n')] if crontab == ['']: # Fixes issue #1181 (line count of empty crontab was 1 instead of 0) crontab = [] logger.debug('Read %s lines from user crontab' %len(crontab)) return crontab
[docs] def writeCrontab(lines): """ Write to users crontab. Note: This will overwrite the whole crontab. So to keep the old crontab and only add new entries you need to read it first with :py:func:`tools.readCrontab`, append new entries to the list and write it back. Args: lines (:py:class:`list`, :py:class:`tuple`): lines that should be written to crontab Returns: bool: ``True`` if successful """ assert isinstance(lines, (list, tuple)), 'lines is not list or tuple type: %s' % lines with tempfile.NamedTemporaryFile(mode = 'wt') as f: f.write('\n'.join(lines)) f.write('\n\n') f.flush() cmd = ['crontab', f.name] proc = subprocess.Popen(cmd, stdout = subprocess.DEVNULL, stderr = subprocess.PIPE, universal_newlines = True) out, err = proc.communicate() if proc.returncode or err: logger.error('Failed to write lines to crontab: %s, %s' %(proc.returncode, err)) return False else: logger.debug('Wrote %s lines to user crontab' %len(lines)) return True
[docs] def splitCommands(cmds, head = '', tail = '', maxLength = 0): """ Split a list of commands ``cmds`` into multiple commands with each length lower than ``maxLength``. Args: cmds (list): commands head (str): command that need to run first on every iteration of ``cmds`` tail (str): command that need to run after every iteration of ``cmds`` maxLength (int): maximum length a command could be. Don't split if <= 0 Yields: str: new command with length < ``maxLength`` Example:: head cmds[0] cmds[n] tail """ while cmds: s = head while cmds and ((len(s + cmds[0] + tail) <= maxLength) or maxLength <= 0): s += cmds.pop(0) s += tail yield s
[docs] def isIPv6Address(address): """ Check if ``address`` is a valid IPv6 address. Args: address (str): address that should get tested Returns: bool: True if ``address`` is a valid IPv6 address """ try: return isinstance(ipaddress.IPv6Address(address), ipaddress.IPv6Address) except: return False
[docs] def escapeIPv6Address(address): """ Escape IPv6 Addresses with square brackets ``[]``. Args: address (str): address that should be escaped Returns: str: ``address`` in square brackets """ if isIPv6Address(address): return '[{}]'.format(address) else: return address
[docs] def camelCase(s): """ Remove underlines and make every first char uppercase. Args: s (str): string separated by underlines (foo_bar) Returns: str: string without underlines but uppercase chars (FooBar) """ return ''.join([x.capitalize() for x in s.split('_')])
[docs] def fdDup(old, new_fd, mode = 'w'): """ Duplicate file descriptor `old` to `new_fd` and closing the latter first. Used to redirect stdin, stdout and stderr from daemonized threads. Args: old (str): Path to the old file (e.g. /dev/stdout) new_fd (_io.TextIOWrapper): file object for the new file mode (str): mode in which the old file should be opened """ try: fd = open(old, mode) os.dup2(fd.fileno(), new_fd.fileno()) except OSError as e: logger.debug('Failed to redirect {}: {}'.format(old, str(e)))
[docs] class UniquenessSet: """ Check for uniqueness or equality of files. Args: dc (bool): if ``True`` use deep check which will compare files md5sums if they are of same size but no hardlinks (don't have the same inode). If ``False`` use files size and mtime follow_symlink (bool): if ``True`` check symlinks target instead of the link list_equal_to (str): full path to file. If not empty only return equal files to the given path instead of unique files. """ def __init__(self, dc = False, follow_symlink = False, list_equal_to = ''): self.deep_check = dc self.follow_sym = follow_symlink self._uniq_dict = {} # if not self._uniq_dict[size] -> size already checked with md5sum self._size_inode = set() # if (size,inode) in self._size_inode -> path is a hlink self.list_equal_to = list_equal_to if list_equal_to: st = os.stat(list_equal_to) if self.deep_check: self.reference = (st.st_size, md5sum(list_equal_to)) else: self.reference = (st.st_size, int(st.st_mtime))
[docs] def check(self, input_path): """ Check file ``input_path`` for either uniqueness or equality (depending on ``list_equal_to`` from constructor). Args: input_path (str): full path to file Returns: bool: ``True`` if file is unique and ``list_equal_to`` is empty. Or ``True`` if file is equal to file in ``list_equal_to`` """ # follow symlinks ? path = input_path if self.follow_sym and os.path.islink(input_path): path = os.readlink(input_path) if self.list_equal_to: return self.checkEqual(path) else: return self.checkUnique(path)
[docs] def checkUnique(self, path): """ Check file ``path`` for uniqueness and store a unique key for ``path``. Args: path (str): full path to file Returns: bool: ``True`` if file is unique """ # check if self.deep_check: dum = os.stat(path) size,inode = dum.st_size, dum.st_ino # is it a hlink ? if (size, inode) in self._size_inode: logger.debug("[deep test]: skip, it's a duplicate (size, inode)", self) return False self._size_inode.add((size,inode)) if size not in self._uniq_dict: # first item of that size unique_key = size logger.debug("[deep test]: store current size?", self) else: prev = self._uniq_dict[size] if prev: # store md5sum instead of previously stored size md5sum_prev = md5sum(prev) self._uniq_dict[size] = None self._uniq_dict[md5sum_prev] = prev logger.debug("[deep test]: size duplicate, remove the size, store prev md5sum", self) unique_key = md5sum(path) logger.debug("[deep test]: store current md5sum?", self) else: # store a tuple of (size, modification time) obj = os.stat(path) unique_key = (obj.st_size, int(obj.st_mtime)) # store if not already present, then return True if unique_key not in self._uniq_dict: logger.debug(" >> ok, store!", self) self._uniq_dict[unique_key] = path return True logger.debug(" >> skip (it's a duplicate)", self) return False
[docs] def checkEqual(self, path): """ Check if ``path`` is equal to the file in ``list_equal_to`` from constructor. Args: path (str): full path to file Returns: bool: ``True`` if file is equal """ st = os.stat(path) if self.deep_check: if self.reference[0] == st.st_size: return self.reference[1] == md5sum(path) return False else: return self.reference == (st.st_size, int(st.st_mtime))
[docs] class Alarm(object): """ Establish a callback function that is called after a timeout. The implementation uses a SIGALRM signal so do not call code in the callback that does not support multi-threading (reentrance) or you may cause non-deterministic "random" RTEs. """ def __init__(self, callback = None, overwrite = True): """ Create a new alarm instance Args: callback: Function to call when the timer ran down (ensure calling only reentrant code). Use ``None`` to throw a ``Timeout`` exception instead. overwrite: Is it allowed to (re)start the timer even though the current timer is still running ("ticking"): ``True`` cancels the current timer (if active) and restarts with the new timeout. ``False` silently ignores the start request if the current timer is still "ticking" """ self.callback = callback self.ticking = False self.overwrite = overwrite
[docs] def start(self, timeout): """ Start the timer (which calls the handler function when the timer ran down). The start is silently ignored if the current timer is still ticking and the the attribute ``overwrite`` is ``False``. Args: timeout: timer count down in seconds """ if self.ticking and not self.overwrite: return try: # Warning: This code may cause non-deterministic RTEs # if the handler function calls code that does # not support reentrance (see e.g. issue #1003). signal.signal(signal.SIGALRM, self.handler) signal.alarm(timeout) except ValueError: pass self.ticking = True
[docs] def stop(self): """ Stop timer before it comes to an end """ try: signal.alarm(0) self.ticking = False except: pass
[docs] def handler(self, signum, frame): """ This method is called after the timer ran down to zero and calls the callback function of the alarm instance. Raises: Timeout: If no callback function was set for the alarm instance """ self.ticking = False if self.callback is None: raise Timeout() else: self.callback()
[docs] class ShutDown(object): """ Shutdown the system after the current snapshot has finished. This should work for KDE, Gnome, Unity, Cinnamon, XFCE, Mate and E17. """ DBUS_SHUTDOWN ={'gnome': {'bus': 'sessionbus', 'service': 'org.gnome.SessionManager', 'objectPath': '/org/gnome/SessionManager', 'method': 'Shutdown', #methods Shutdown # Reboot # Logout 'interface': 'org.gnome.SessionManager', 'arguments': () #arg (only with Logout) # 0 normal # 1 no confirm # 2 force }, 'kde': {'bus': 'sessionbus', 'service': 'org.kde.ksmserver', 'objectPath': '/KSMServer', 'method': 'logout', 'interface': 'org.kde.KSMServerInterface', 'arguments': (-1, 2, -1) #1st arg -1 confirm # 0 no confirm #2nd arg -1 full dialog with default logout # 0 logout # 1 restart # 2 shutdown #3rd arg -1 wait 30sec # 2 immediately }, 'xfce': {'bus': 'sessionbus', 'service': 'org.xfce.SessionManager', 'objectPath': '/org/xfce/SessionManager', 'method': 'Shutdown', #methods Shutdown # Restart # Suspend (no args) # Hibernate (no args) # Logout (two args) 'interface': 'org.xfce.Session.Manager', 'arguments': (True,) #arg True allow saving # False don't allow saving #1st arg (only with Logout) # True show dialog # False don't show dialog #2nd arg (only with Logout) # True allow saving # False don't allow saving }, 'mate': {'bus': 'sessionbus', 'service': 'org.mate.SessionManager', 'objectPath': '/org/mate/SessionManager', 'method': 'Shutdown', #methods Shutdown # Logout 'interface': 'org.mate.SessionManager', 'arguments': () #arg (only with Logout) # 0 normal # 1 no confirm # 2 force }, 'e17': {'bus': 'sessionbus', 'service': 'org.enlightenment.Remote.service', 'objectPath': '/org/enlightenment/Remote/RemoteObject', 'method': 'Halt', #methods Halt -> Shutdown # Reboot # Logout # Suspend # Hibernate 'interface': 'org.enlightenment.Remote.Core', 'arguments': () }, 'e19': {'bus': 'sessionbus', 'service': 'org.enlightenment.wm.service', 'objectPath': '/org/enlightenment/wm/RemoteObject', 'method': 'Shutdown', #methods Shutdown # Restart 'interface': 'org.enlightenment.wm.Core', 'arguments': () }, 'z_freed': {'bus': 'systembus', 'service': 'org.freedesktop.login1', 'objectPath': '/org/freedesktop/login1', 'method': 'PowerOff', 'interface': 'org.freedesktop.login1.Manager', 'arguments': (True,) } } def __init__(self): self.is_root = isRoot() if self.is_root: self.proxy, self.args = None, None else: self.proxy, self.args = self._prepair() self.activate_shutdown = False self.started = False
[docs] def _prepair(self): """ Try to connect to the given dbus services. If successful it will return a callable dbus proxy and those arguments. """ try: if 'DBUS_SESSION_BUS_ADDRESS' in os.environ: sessionbus = dbus.bus.BusConnection(os.environ['DBUS_SESSION_BUS_ADDRESS']) else: sessionbus = dbus.SessionBus() systembus = dbus.SystemBus() except: return((None, None)) des = list(self.DBUS_SHUTDOWN.keys()) des.sort() for de in des: if de == 'gnome' and self.unity7(): continue dbus_props = self.DBUS_SHUTDOWN[de] try: if dbus_props['bus'] == 'sessionbus': bus = sessionbus else: bus = systembus interface = bus.get_object(dbus_props['service'], dbus_props['objectPath']) proxy = interface.get_dbus_method(dbus_props['method'], dbus_props['interface']) return((proxy, dbus_props['arguments'])) except dbus.exceptions.DBusException: continue return((None, None))
[docs] def canShutdown(self): """ Indicate if a valid dbus service is available to shutdown system. """ return(not self.proxy is None or self.is_root)
[docs] def askBeforeQuit(self): """ Indicate if ShutDown is ready to fire and so the application shouldn't be closed. """ return(self.activate_shutdown and not self.started)
[docs] def shutdown(self): """ Run 'shutdown -h now' if we are root or call the dbus proxy to start the shutdown. """ if not self.activate_shutdown: return(False) if self.is_root: syncfs() self.started = True proc = subprocess.Popen(['shutdown', '-h', 'now']) proc.communicate() return proc.returncode if self.proxy is None: return(False) else: syncfs() self.started = True return(self.proxy(*self.args))
[docs] def unity7(self): """ Unity >= 7.0 doesn't shutdown automatically. It will only show shutdown dialog and wait for user input. """ if not checkCommand('unity'): return False proc = subprocess.Popen(['unity', '--version'], stdout = subprocess.PIPE, universal_newlines = True) unity_version = proc.communicate()[0] m = re.match(r'unity ([\d\.]+)', unity_version) return m and Version(m.group(1)) >= Version('7.0') and processExists('unity-panel-service')
[docs] class SetupUdev(object): """ Setup Udev rules for starting BackInTime when a drive get connected. This is done by serviceHelper.py script (included in backintime-qt) running as root though DBus. """ CONNECTION = 'net.launchpad.backintime.serviceHelper' OBJECT = '/UdevRules' INTERFACE = 'net.launchpad.backintime.serviceHelper.UdevRules' MEMBERS = ('addRule', 'save', 'delete') def __init__(self): if dbus is None: self.isReady = False return try: bus = dbus.SystemBus() conn = bus.get_object(SetupUdev.CONNECTION, SetupUdev.OBJECT) self.iface = dbus.Interface(conn, SetupUdev.INTERFACE) except dbus.exceptions.DBusException as e: # Only DBusExceptions are handled to do a "graceful recovery" # by working without a serviceHelper D-Bus connection... # All other exceptions are still raised causing BiT # to stop during startup. # if e._dbus_error_name in ('org.freedesktop.DBus.Error.NameHasNoOwner', # 'org.freedesktop.DBus.Error.ServiceUnknown', # 'org.freedesktop.DBus.Error.FileNotFound'): logger.warning("Failed to connect to Udev serviceHelper daemon via D-Bus: " + e.get_dbus_name()) logger.warning("D-Bus message: " + e.get_dbus_message()) logger.warning("Udev-based profiles cannot be changed or checked due to Udev serviceHelper connection failure") conn = None # else: # raise self.isReady = bool(conn)
[docs] def addRule(self, cmd, uuid): """ Prepare rules in serviceHelper.py """ if not self.isReady: return try: return self.iface.addRule(cmd, uuid) except dbus.exceptions.DBusException as e: if e._dbus_error_name == 'net.launchpad.backintime.InvalidChar': raise InvalidChar(str(e)) elif e._dbus_error_name == 'net.launchpad.backintime.InvalidCmd': raise InvalidCmd(str(e)) elif e._dbus_error_name == 'net.launchpad.backintime.LimitExceeded': raise LimitExceeded(str(e)) else: raise
[docs] def save(self): """ Save rules with serviceHelper.py after authentication If no rules where added before this will delete current rule. """ if not self.isReady: return try: return self.iface.save() except dbus.exceptions.DBusException as e: if e._dbus_error_name == 'com.ubuntu.DeviceDriver.PermissionDeniedByPolicy': raise PermissionDeniedByPolicy(str(e)) else: raise
[docs] def clean(self): """ Clean up remote cache """ if not self.isReady: return self.iface.clean()
[docs] class PathHistory(object): def __init__(self, path): self.history = [path,] self.index = 0
[docs] def append(self, path): #append path after the current index self.history = self.history[:self.index + 1] + [path,] self.index = len(self.history) - 1
[docs] def previous(self): if self.index == 0: return self.history[0] try: path = self.history[self.index - 1] except IndexError: return self.history[self.index] self.index -= 1 return path
[docs] def next(self): if self.index == len(self.history) - 1: return self.history[-1] try: path = self.history[self.index + 1] except IndexError: return self.history[self.index] self.index += 1 return path
[docs] def reset(self, path): self.history = [path,] self.index = 0
[docs] class OrderedSet(MutableSet): """ OrderedSet from Python recipe http://code.activestate.com/recipes/576694/ """ def __init__(self, iterable=None): self.end = end = [] end += [None, end, end] # sentinel node for doubly linked list self.map = {} # key --> [key, prev, next] if iterable is not None: self |= iterable def __len__(self): return len(self.map) def __contains__(self, key): return key in self.map
[docs] def add(self, key): if key not in self.map: end = self.end curr = end[1] curr[2] = end[1] = self.map[key] = [key, curr, end]
[docs] def discard(self, key): if key in self.map: key, prev, next = self.map.pop(key) prev[2] = next next[1] = prev
def __iter__(self): end = self.end curr = end[2] while curr is not end: yield curr[0] curr = curr[2] def __reversed__(self): end = self.end curr = end[1] while curr is not end: yield curr[0] curr = curr[1]
[docs] def pop(self, last=True): if not self: raise KeyError('set is empty') key = self.end[1][0] if last else self.end[2][0] self.discard(key) return key
def __repr__(self): if not self: return '%s()' % (self.__class__.__name__,) return '%s(%r)' % (self.__class__.__name__, list(self)) def __eq__(self, other): if isinstance(other, OrderedSet): return len(self) == len(other) and list(self) == list(other) return set(self) == set(other)
[docs] class Execute(object): """ Execute external commands and handle its output. Args: cmd (:py:class:`str` or :py:class:`list`): command with arguments that should be called. Depending on if this is :py:class:`str` or :py:class:`list` instance the command will be called by either :py:func:`os.system` (deprecated) or :py:class:`subprocess.Popen` callback (method): function which will handle output returned by command (e.g. to extract errors) user_data: extra arguments which will be forwarded to ``callback`` function (e.g. a tuple - which is passed by reference in Python - to "return" results of the callback function as side effect). filters (tuple): Tuple of functions used to filter messages before sending them to the ``callback`` function parent (instance): instance of the calling method used only to proper format log messages conv_str (bool): convert output to :py:class:`str` if True or keep it as :py:class:`bytes` if False join_stderr (bool): join stderr to stdout Note: Signals SIGTSTP ("keyboard stop") and SIGCONT send to Python main process will be forwarded to the command. SIGHUP will kill the process. """ def __init__(self, cmd, callback = None, user_data = None, filters = (), parent = None, conv_str = True, join_stderr = True): self.cmd = cmd self.callback = callback self.user_data = user_data self.filters = filters self.currentProc = None self.conv_str = conv_str self.join_stderr = join_stderr # we need to forward parent to have the correct class name in debug log if parent: self.parent = parent else: self.parent = self if isinstance(self.cmd, list): self.pausable = True self.printable_cmd = ' '.join(self.cmd) logger.debug('Call command "%s"' %self.printable_cmd, self.parent, 2) else: self.pausable = False self.printable_cmd = self.cmd logger.warning('Call command with old os.system method "%s"' %self.printable_cmd, self.parent, 2)
[docs] def run(self): """ Start the command. Returns: int: return code from the command """ ret_val = 0 out = '' # backwards compatibility with old os.system and os.popen calls # TODO Is this still required as the minimal Python version is 3.10++ now? # TODO Which Python versions are considered as "old" here? if isinstance(self.cmd, str): logger.deprecated(self) if self.callback is None: ret_val = os.system(self.cmd) else: pipe = os.popen(self.cmd, 'r') while True: line = tempFailureRetry(pipe.readline) if not line: break line = line.strip() for f in self.filters: line = f(line) if not line: continue self.callback(line, self.user_data) ret_val = pipe.close() if ret_val is None: ret_val = 0 # new and preferred method using subprocess.Popen # TODO Which minimal Python version is required to be considered as "new"? elif isinstance(self.cmd, (list, tuple)): try: # register signals for pause, resume and kill # Forward these signals (sent to the "backintime" process # normally) to the child process ("rsync" normally). # Note: SIGSTOP (unblockable stop) cannot be forwarded because # it cannot be caught in a signal handler! signal.signal(signal.SIGTSTP, self.pause) signal.signal(signal.SIGCONT, self.resume) signal.signal(signal.SIGHUP, self.kill) except ValueError: # signal only work in qt main thread # TODO What does this imply? pass if self.join_stderr: stderr = subprocess.STDOUT else: stderr = subprocess.DEVNULL logger.debug(f"Starting command '{self.printable_cmd[:min(16, len(self.printable_cmd))]}...'") self.currentProc = subprocess.Popen(self.cmd, stdout = subprocess.PIPE, stderr = stderr) # # TEST code for developers to simulate a killed rsync process # if self.printable_cmd.startswith("rsync --recursive"): # self.currentProc.terminate() # signal 15 (SIGTERM) like "killall" and "kill" do by default # # self.currentProc.send_signal(signal.SIGHUP) # signal 1 # # self.currentProc.kill() # signal 9 # logger.error("rsync killed for testing purposes during development") if self.callback: for line in self.currentProc.stdout: if self.conv_str: line = line.decode().rstrip('\n') else: line = line.rstrip(b'\n') for f in self.filters: line = f(line) if not line: continue self.callback(line, self.user_data) # We use communicate() instead of wait() to avoid a deadlock # when stdout=PIPE and/or stderr=PIPE and the child process # generates enough output to pipe that it blocks waiting for # free buffer. See also: # https://docs.python.org/3.10/library/subprocess.html#subprocess.Popen.wait out = self.currentProc.communicate()[0] # TODO Why is "out" empty instead of containing all stdout? # Most probably because Popen was called with a PIPE as stdout # to directly process each stdout line by calling the callback... ret_val = self.currentProc.returncode # TODO ret_val is sometimes 0 instead of e.g. 23 for rsync. Why? try: # reset signal handler to their default signal.signal(signal.SIGTSTP, signal.SIG_DFL) signal.signal(signal.SIGCONT, signal.SIG_DFL) signal.signal(signal.SIGHUP, signal.SIG_DFL) except ValueError: # signal only work in qt main thread # TODO What does this imply? pass if ret_val != 0: msg = 'Command "%s" returns %s%s%s' %(self.printable_cmd, bcolors.WARNING, ret_val, bcolors.ENDC) if out: msg += ' | %s' %out.decode().strip('\n') logger.warning(msg, self.parent, 2) else: msg = 'Command "%s..." returns %s' %(self.printable_cmd[:min(16, len(self.printable_cmd))], ret_val) if out: msg += ': %s' %out.decode().strip('\n') logger.debug(msg, self.parent, 2) return ret_val
[docs] def pause(self, signum, frame): """ Slot which will send ``SIGSTOP`` to the command. Is connected to signal ``SIGTSTP``. """ if self.pausable and self.currentProc: logger.info('Pause process "%s"' %self.printable_cmd, self.parent, 2) return self.currentProc.send_signal(signal.SIGSTOP)
[docs] def resume(self, signum, frame): """ Slot which will send ``SIGCONT`` to the command. Is connected to signal ``SIGCONT``. """ if self.pausable and self.currentProc: logger.info('Resume process "%s"' %self.printable_cmd, self.parent, 2) return self.currentProc.send_signal(signal.SIGCONT)
[docs] def kill(self, signum, frame): """ Slot which will kill the command. Is connected to signal ``SIGHUP``. """ if self.pausable and self.currentProc: logger.info('Kill process "%s"' %self.printable_cmd, self.parent, 2) return self.currentProc.kill()
[docs] class Daemon: """ A generic daemon class. Usage: subclass the Daemon class and override the run() method Daemon Copyright by Sander Marechal License CC BY-SA 3.0 http://www.jejik.com/articles/2007/02/a_simple_unix_linux_daemon_in_python/ """ def __init__(self, pidfile = None, stdin='/dev/null', stdout='/dev/stdout', stderr='/dev/null', umask = 0o022): self.stdin = stdin self.stdout = stdout self.stderr = stderr self.pidfile = pidfile self.umask = umask if pidfile: self.appInstance = ApplicationInstance(pidfile, autoExit = False, flock = False)
[docs] def daemonize(self): """ "Converts" the current process into a daemon (= process running in the background) and sends a SIGTERM signal to the current process. This is done via the UNIX double-fork magic, see Stevens' "Advanced Programming in the UNIX Environment" for details (ISBN 0201563177) and this explanation: https://stackoverflow.com/a/6011298 """ try: pid = os.fork() logger.debug('first fork pid: {}'.format(pid), self) if pid > 0: # exit first parent sys.exit(0) except OSError as e: logger.error("fork #1 failed: %d (%s)" % (e.errno, str(e)), self) sys.exit(1) # decouple from parent environment logger.debug('decouple from parent environment', self) os.chdir("/") os.setsid() os.umask(self.umask) # do second fork try: pid = os.fork() logger.debug('second fork pid: {}'.format(pid), self) if pid > 0: # exit from second parent sys.exit(0) except OSError as e: logger.error("fork #2 failed: %d (%s)" % (e.errno, str(e)), self) sys.exit(1) # redirect standard file descriptors logger.debug('redirect standard file descriptors', self) sys.stdout.flush() sys.stderr.flush() fdDup(self.stdin, sys.stdin, 'r') fdDup(self.stdout, sys.stdout, 'w') fdDup(self.stderr, sys.stderr, 'w') signal.signal(signal.SIGTERM, self.cleanupHandler) if self.pidfile: atexit.register(self.appInstance.exitApplication) # write pidfile logger.debug('write pidfile', self) self.appInstance.startApplication()
[docs] def cleanupHandler(self, signum, frame): if self.pidfile: self.appInstance.exitApplication() sys.exit(0)
[docs] def start(self): """ Start the daemon """ # Check for a pidfile to see if the daemon already runs if self.pidfile and not self.appInstance.check(): message = "pidfile %s already exists. Daemon already running?\n" logger.error(message % self.pidfile, self) sys.exit(1) # Start the daemon self.daemonize() self.run()
[docs] def stop(self): """ Stop the daemon """ if not self.pidfile: logger.debug("Unattended daemon can't be stopped. No PID file", self) return # Get the pid from the pidfile pid, procname = self.appInstance.readPidFile() if not pid: message = "pidfile %s does not exist. Daemon not running?\n" logger.error(message % self.pidfile, self) return # not an error in a restart # Try killing the daemon process try: while True: os.kill(pid, signal.SIGTERM) sleep(0.1) except OSError as err: if err.errno == errno.ESRCH: #no such process self.appInstance.exitApplication() else: logger.error(str(err), self) sys.exit(1)
[docs] def restart(self): """ Restart the daemon """ self.stop() self.start()
[docs] def reload(self): """ send SIGHUP signal to process """ if not self.pidfile: logger.debug("Unattended daemon can't be reloaded. No PID file", self) return # Get the pid from the pidfile pid, procname = self.appInstance.readPidFile() if not pid: message = "pidfile %s does not exist. Daemon not running?\n" logger.error(message % self.pidfile, self) return # Try killing the daemon process try: os.kill(pid, signal.SIGHUP) except OSError as err: if err.errno == errno.ESRCH: #no such process self.appInstance.exitApplication() else: sys.stderr.write(str(err)) sys.exit(1)
[docs] def status(self): """ return status """ if not self.pidfile: logger.debug("Unattended daemon can't be checked. No PID file", self) return return not self.appInstance.check()
[docs] def run(self): """ You should override this method when you subclass Daemon. It will be called after the process has been daemonized by start() or restart(). """ pass
# def __logKeyringWarning(): # # from time import sleep # sleep(0.1) # # TODO This function may not be thread-safe # logger.warning('import keyring failed') # # # # if is_keyring_available: # # # delay warning to give logger some time to import # # # Jan 4, 2024 aryoda: # # This is an assumed work-around for #820 (unhandled exception: NoneType) # # but does not seem to fix the problem. # # So I have refactored the possible name shadowing of "keyring" # # as described in # # https://github.com/bit-team/backintime/issues/820#issuecomment-1472971734 # # and left this code unchanged to wait for user feed-back if it works now. # # If the error still occurs I would move the log output call # # to the client of this module so that it is certain to assume it is # # correctly initialized. # # Maybe use backintime.py and app.py for logging... # # (don't call tools.keyringSupported() for that because # # it produces too much debug logging output whenever it is called # # but just query tools.is_keyring_available. # from threading import Thread # thread = Thread(target=__logKeyringWarning, args=()) # thread.start()