# Copyright (C) 2012-2022 Germar Reitze
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
import sys
import os
import time
import atexit
import signal
import subprocess
import re
import errno
import config
import configfile
import tools
import password_ipc
import logger
from exceptions import Timeout
[docs]
class Password_Cache(tools.Daemon):
"""
Password_Cache get started on User login. It provides passwords for
BIT cronjobs because keyring is not available when the User is not
logged in. Does not start if there is no password to cache
(e.g. no profile allows to cache).
"""
PW_CACHE_VERSION = 3
def __init__(self, cfg = None, *args, **kwargs):
self.config = cfg
if self.config is None:
self.config = config.Config()
cachePath = self.config.passwordCacheFolder()
if not tools.mkdir(cachePath, 0o700):
msg = 'Failed to create secure Password_Cache folder'
logger.error(msg, self)
raise PermissionError(msg)
pid = self.config.passwordCachePid()
super(Password_Cache, self).__init__(pid, umask = 0o077, *args, **kwargs)
self.dbKeyring = {}
self.dbUsr = {}
self.fifo = password_ipc.FIFO(self.config.passwordCacheFifo())
self.keyringSupported = tools.keyringSupported()
[docs]
def run(self):
"""
wait for password request on FIFO and answer with password
from self.db through FIFO.
"""
info = configfile.ConfigFile()
info.setIntValue('version', self.PW_CACHE_VERSION)
info.save(self.config.passwordCacheInfo())
os.chmod(self.config.passwordCacheInfo(), 0o600)
logger.debug('Keyring supported: %s' %self.keyringSupported, self)
tools.envSave(self.config.cronEnvFile())
if not self.collectPasswords():
logger.debug('Nothing to cache. Quit.', self)
sys.exit(0)
self.fifo.create()
atexit.register(self.fifo.delfifo)
signal.signal(signal.SIGHUP, self.reloadHandler)
logger.debug('Start loop', self)
while True:
try:
request = self.fifo.read()
request = request.split('\n')[0]
task, value = request.split(':', 1)
if task == 'get_pw':
key = value
if key in list(self.dbKeyring.keys()):
answer = 'pw:' + self.dbKeyring[key]
elif key in list(self.dbUsr.keys()):
answer = 'pw:' + self.dbUsr[key]
else:
answer = 'none:'
self.fifo.write(answer, 5)
elif task == 'set_pw':
key, value = value.split(':', 1)
self.dbUsr[key] = value
except IOError as e:
logger.error('Error in writing answer to FIFO: %s' % str(e), self)
except KeyboardInterrupt:
logger.debug('Quit.', self)
break
except Timeout:
logger.error('FIFO timeout', self)
except Exception as e:
logger.error('ERROR: %s' % str(e), self)
[docs]
def reloadHandler(self, signum, frame):
"""
reload passwords during runtime.
"""
time.sleep(2)
cfgPath = self.config._LOCAL_CONFIG_PATH
del(self.config)
self.config = config.Config(cfgPath)
del(self.dbKeyring)
self.dbKeyring = {}
self.collectPasswords()
[docs]
def collectPasswords(self):
"""
search all profiles in config and collect passwords from keyring.
"""
run_daemon = False
profiles = self.config.profiles()
for profile_id in profiles:
mode = self.config.snapshotsMode(profile_id)
for pw_id in (1, 2):
if self.config.modeNeedPassword(mode, pw_id):
if self.config.passwordUseCache(profile_id):
run_daemon = True
if self.config.passwordSave(profile_id) and self.keyringSupported:
service_name = self.config.keyringServiceName(profile_id, mode, pw_id)
user_name = self.config.keyringUserName(profile_id)
password = tools.password(service_name, user_name)
if password is None:
continue
self.dbKeyring['%s/%s' %(service_name, user_name)] = password
return run_daemon
[docs]
def checkVersion(self):
info = configfile.ConfigFile()
info.load(self.config.passwordCacheInfo())
if info.intValue('version') < self.PW_CACHE_VERSION:
return False
return True
[docs]
def cleanupHandler(self, signum, frame):
self.fifo.delfifo()
super(Password_Cache, self).cleanupHandler(signum, frame)
[docs]
class Password(object):
"""
provide passwords for BIT either from keyring, Password_Cache or
by asking User.
"""
def __init__(self, cfg = None):
self.config = cfg
if self.config is None:
self.config = config.Config()
self.cache = Password_Cache(self.config)
self.fifo = password_ipc.FIFO(self.config.passwordCacheFifo())
self.db = {}
self.keyringSupported = tools.keyringSupported()
[docs]
def password(self, parent, profile_id, mode, pw_id = 1, only_from_keyring = False):
"""
based on profile settings return password from keyring,
Password_Cache or by asking User.
"""
if not self.config.modeNeedPassword(mode, pw_id):
return ''
service_name = self.config.keyringServiceName(profile_id, mode, pw_id)
user_name = self.config.keyringUserName(profile_id)
try:
return self.db['%s/%s' %(service_name, user_name)]
except KeyError:
pass
password = ''
if self.config.passwordUseCache(profile_id) and not only_from_keyring:
#from cache
password = self.passwordFromCache(service_name, user_name)
if not password is None:
self.setPasswordDb(service_name, user_name, password)
return password
if self.config.passwordSave(profile_id):
#from keyring
password = self.passwordFromKeyring(service_name, user_name)
if not password is None:
self.setPasswordDb(service_name, user_name, password)
return password
if not only_from_keyring:
#ask user and write to cache
password = self.passwordFromUser(parent, profile_id, mode, pw_id)
if self.config.passwordUseCache(profile_id):
self.setPasswordCache(service_name, user_name, password)
self.setPasswordDb(service_name, user_name, password)
return password
return password
[docs]
def passwordFromKeyring(self, service_name, user_name):
"""
get password from system keyring (seahorse). The keyring is only
available if User is logged in.
"""
if self.keyringSupported:
try:
return tools.password(service_name, user_name)
except Exception:
logger.error('get password from Keyring failed', self)
return None
[docs]
def passwordFromCache(self, service_name, user_name):
"""
get password from Password_Cache
"""
if self.cache.status():
self.cache.checkVersion()
self.fifo.write('get_pw:%s/%s' %(service_name, user_name), timeout = 5)
answer = self.fifo.read(timeout = 5)
mode, pw = answer.split(':', 1)
if mode == 'none':
return None
return pw
else:
return None
[docs]
def passwordFromUser(self, parent, profile_id = None, mode = None, pw_id = 1, prompt = None):
"""
ask user for password. This does even work when run as cronjob
and user is logged in.
"""
if prompt is None:
"""
Profile {name}: Enter password for {mode}
"""
prompt = _("Profile '{profile}': Enter password for {mode}: ") \
.format(
profile=self.config.profileName(profile_id),
mode=self.config.SNAPSHOT_MODES[mode][pw_id+1])
tools.registerBackintimePath('qt')
x_server = tools.checkXServer()
import_successful = False
if x_server:
try:
import messagebox
import_successful = True
except ImportError:
pass
if not import_successful or not x_server:
import getpass
alarm = tools.Alarm()
alarm.start(300)
try:
password = getpass.getpass(prompt)
alarm.stop()
except Timeout:
password = ''
return password
password = messagebox.askPasswordDialog(
parent=parent,
title=self.config.APP_NAME,
prompt=prompt,
language_code=self.config.language(),
timeout=300)
return password
[docs]
def setPasswordDb(self, service_name, user_name, password):
"""
internal Password cache. Prevent to ask password several times
during runtime.
"""
self.db['%s/%s' %(service_name, user_name)] = password
[docs]
def setPassword(self, password, profile_id, mode, pw_id):
"""
store password to keyring and Password_Cache
"""
if self.config.modeNeedPassword(mode, pw_id):
service_name = self.config.keyringServiceName(profile_id, mode, pw_id)
user_name = self.config.keyringUserName(profile_id)
if self.config.passwordSave(profile_id):
self.setPasswordKeyring(service_name, user_name, password)
if self.config.passwordUseCache(profile_id):
self.setPasswordCache(service_name, user_name, password)
self.setPasswordDb(service_name, user_name, password)
[docs]
def setPasswordKeyring(self, service_name, user_name, password):
return tools.setPassword(service_name, user_name, password)
[docs]
def setPasswordCache(self, service_name, user_name, password):
if self.cache.status():
self.cache.checkVersion()
self.fifo.write('set_pw:%s/%s:%s' %(service_name, user_name, password), timeout = 5)