cryptonas-branches/pythonrewrite/bin/CryptoBoxSettings.py
lars 0fe6d426ed added mounting and unmounting of config partition
moved config partition handling to CryptoBoxSettings
implemented environment checks (writeable config, https (off for now))
chown mounted directory after mount to the cryptobox user
2006-11-03 14:27:19 +00:00

474 lines
16 KiB
Python

import logging
try:
import validate
except:
raise CryptoBoxExceptions.CBEnvironmentError("couldn't import 'validate'! Try 'apt-get install python-formencode'.")
import os
import CryptoBoxExceptions
import subprocess
try:
import configobj ## needed for reading and writing of the config file
except:
raise CryptoBoxExceptions.CBEnvironmentError("couldn't import 'configobj'! Try 'apt-get install python-configobj'.")
class CryptoBoxSettings:
CONF_LOCATIONS = [
"./cryptobox.conf",
"~/.cryptobox.conf",
"/etc/cryptobox/cryptobox.conf"]
NAMEDB_FILE = "cryptobox_names.db"
PLUGINCONF_FILE = "cryptobox_plugins.conf"
USERDB_FILE = "cryptobox_users.db"
def __init__(self, config_file=None):
self.log = logging.getLogger("CryptoBox")
config_file = self.__getConfigFileName(config_file)
self.log.info("loading config file: %s" % config_file)
self.prefs = self.__getPreferences(config_file)
self.__validateConfig()
self.__configureLogHandler()
self.__checkUnknownPreferences()
self.nameDB = self.__getNameDatabase()
self.pluginConf = self.__getPluginConfig()
self.userDB = self.__getUserDB()
self.misc_files = self.__getMiscFiles()
def write(self):
"""
write all local setting files including the content of the "misc" subdirectory
"""
ok = True
try:
self.nameDB.write()
except IOError:
self.log.warn("could not save the name database")
ok = False
try:
self.pluginConf.write()
except IOError:
self.log.warn("could not save the plugin configuration")
ok = False
try:
self.userDB.write()
except IOError:
self.log.warn("could not save the user database")
ok = False
for misc_file in self.misc_files:
if not misc_file.save():
self.log.warn("could not save a misc setting file (%s)" % misc_file.filename)
ok = False
return ok
def isWriteable(self):
return os.access(self.prefs["Locations"]["SettingsDir"], os.W_OK)
def getActivePartition(self):
settings_dir = self.prefs["Locations"]["SettingsDir"]
if not os.path.ismount(settings_dir): return None
for line in file("/proc/mounts"):
fields = line.split(" ")
mount_dir = fields[1]
try:
if os.path.samefile(mount_dir, settings_dir): return fields[0]
except OSError:
pass
## no matching entry found
return None
def mountPartition(self):
if self.isWriteable():
self.log.warn("mountConfigPartition: configuration is already writeable - mounting anyway")
if self.getActivePartition():
self.log.warn("mountConfigPartition: configuration partition already mounted - not mounting again")
return False
confPartitions = self.getAvailablePartitions()
if not confPartitions:
self.log.error("no configuration partitions found - you have to create it first")
return False
partition = confPartitions[0]
proc = subprocess.Popen(
shell = False,
stdout = subprocess.PIPE,
stderr = subprocess.PIPE,
args = [
self.prefs["Programs"]["super"],
self.prefs["Programs"]["CryptoBoxRootActions"],
"mount",
partition,
self.prefs["Locations"]["SettingsDir"]])
(stdout, stderr) = proc.communicate()
if proc.returncode != 0:
self.log.error("failed to mount the configuration partition: %s" % partition)
self.log.error("output of mount: %s" % (stderr,))
return False
self.log.info("configuration partition mounted: %s" % partition)
return True
def umountPartition(self):
if not self.getActivePartition():
self.log.warn("umountConfigPartition: no configuration partition mounted")
return False
proc = subprocess.Popen(
shell = False,
stdout = subprocess.PIPE,
stderr = subprocess.PIPE,
args = [
self.prefs["Programs"]["super"],
self.prefs["Programs"]["CryptoBoxRootActions"],
"umount",
self.prefs["Locations"]["SettingsDir"]])
(stdout, stderr) = proc.communicate()
if proc.returncode != 0:
self.log.error("failed to unmount the configuration partition")
self.log.error("output of mount: %s" % (stderr,))
return False
self.log.info("configuration partition unmounted")
return True
def getAvailablePartitions(self):
"""returns a sequence of found config partitions"""
proc = subprocess.Popen(
shell = False,
stdout = subprocess.PIPE,
args = [
self.prefs["Programs"]["blkid"],
"-c", os.path.devnull,
"-t", "LABEL=%s" % self.prefs["Main"]["ConfigVolumeLabel"] ])
(output, error) = proc.communicate()
if output:
return [e.strip().split(":",1)[0] for e in output.splitlines()]
else:
return []
def __getitem__(self, key):
"""redirect all requests to the 'prefs' attribute"""
return self.prefs[key]
def __getPreferences(self, config_file):
import StringIO
config_rules = StringIO.StringIO(self.validation_spec)
try:
prefs = configobj.ConfigObj(config_file, configspec=config_rules)
if prefs:
self.log.info("found config: %s" % prefs.items())
else:
raise CryptoBoxExceptions.CBConfigUnavailableError("failed to load the config file: %s" % config_file)
except IOError:
raise CryptoBoxExceptions.CBConfigUnavailableError("unable to open the config file: %s" % config_file)
return prefs
def __validateConfig(self):
result = self.prefs.validate(CryptoBoxSettingsValidator(), preserve_errors=True)
error_list = configobj.flatten_errors(self.prefs, result)
if not error_list: return
errorMsgs = []
for sections, key, text in error_list:
section_name = "->".join(sections)
if not text:
errorMsg = "undefined configuration value (%s) in section '%s'" % (key, section_name)
else:
errorMsg = "invalid configuration value (%s) in section '%s': %s" % (key, section_name, text)
errorMsgs.append(errorMsg)
raise CryptoBoxExceptions.CBConfigError, "\n".join(errorMsgs)
def __checkUnknownPreferences(self):
import StringIO
config_rules = configobj.ConfigObj(StringIO.StringIO(self.validation_spec), list_values=False)
self.__recursiveConfigSectionCheck("", self.prefs, config_rules)
def __recursiveConfigSectionCheck(self, section_path, section_config, section_rules):
"""should be called by '__checkUnknownPreferences' for every section
sends a warning message to the logger for every undefined (see validation_spec)
configuration setting
"""
for e in section_config.keys():
element_path = section_path + e
if e in section_rules.keys():
if isinstance(section_config[e], configobj.Section):
if isinstance(section_rules[e], configobj.Section):
self.__recursiveConfigSectionCheck(element_path + "->", section_config[e], section_rules[e])
else:
self.log.warn("configuration setting should be a value instead of a section name: %s" % element_path)
else:
if not isinstance(section_rules[e], configobj.Section):
pass # good - the setting is valid
else:
self.log.warn("configuration setting should be a section name instead of a value: %s" % element_path)
else:
self.log.warn("unknown configuration setting: %s" % element_path)
def __getNameDatabase(self):
try:
try:
nameDB_file = os.path.join(self.prefs["Locations"]["SettingsDir"], self.NAMEDB_FILE)
except KeyError:
raise CryptoBoxExceptions.CBConfigUndefinedError("Locations", "SettingsDir")
except SyntaxError:
raise CryptoBoxExceptions.CBConfigInvalidValueError("Locations", "SettingsDir", nameDB_file, "failed to interprete the filename of the name database correctly (%s)" % nameDB_file)
## create nameDB if necessary
if os.path.exists(nameDB_file):
nameDB = configobj.ConfigObj(nameDB_file)
else:
nameDB = configobj.ConfigObj(nameDB_file, create_empty=True)
## check if nameDB file was created successfully?
if not os.path.exists(nameDB_file):
raise CryptoBoxExceptions.CBEnvironmentError("failed to create name database (%s)" % nameDB_file)
return nameDB
def __getPluginConfig(self):
import StringIO
plugin_rules = StringIO.StringIO(self.pluginValidationSpec)
try:
try:
pluginConf_file = os.path.join(self.prefs["Locations"]["SettingsDir"], self.PLUGINCONF_FILE)
except KeyError:
raise CryptoBoxExceptions.CBConfigUndefinedError("Locations", "SettingsDir")
except SyntaxError:
raise CryptoBoxExceptions.CBConfigInvalidValueError("Locations", "SettingsDir", pluginConf_file, "failed to interprete the filename of the plugin config file correctly (%s)" % pluginConf_file)
## create pluginConf_file if necessary
if os.path.exists(pluginConf_file):
pluginConf = configobj.ConfigObj(pluginConf_file, configspec=plugin_rules)
else:
pluginConf = configobj.ConfigObj(pluginConf_file, configspec=plugin_rules, create_empty=True)
## validate and convert values according to the spec
pluginConf.validate(validate.Validator())
## check if pluginConf_file file was created successfully?
if not os.path.exists(pluginConf_file):
raise CryptoBoxExceptions.CBEnvironmentError("failed to create plugin configuration file (%s)" % pluginConf_file)
return pluginConf
def __getUserDB(self):
import StringIO, sha
userDB_rules = StringIO.StringIO(self.userDatabaseSpec)
try:
try:
userDB_file = os.path.join(self.prefs["Locations"]["SettingsDir"], self.USERDB_FILE)
except KeyError:
raise CryptoBoxExceptions.CBConfigUndefinedError("Locations", "SettingsDir")
except SyntaxError:
raise CryptoBoxExceptions.CBConfigInvalidValueError("Locations", "SettingsDir", userDB_file, "failed to interprete the filename of the users database file correctly (%s)" % userDB_file)
## create userDB_file if necessary
if os.path.exists(userDB_file):
userDB = configobj.ConfigObj(userDB_file, configspec=userDB_rules)
else:
userDB = configobj.ConfigObj(userDB_file, configspec=userDB_rules, create_empty=True)
## validate and set default value for "admin" user
userDB.validate(validate.Validator())
## check if userDB file was created successfully?
if not os.path.exists(userDB_file):
raise CryptoBoxExceptions.CBEnvironmentError("failed to create user database file (%s)" % userDB_file)
## define password hash function - never use "sha" directly - SPOT
userDB.getDigest = lambda password: sha.new(password).hexdigest()
return userDB
def __getMiscFiles(self):
misc_dir = os.path.join(self.prefs["Locations"]["SettingsDir"], "misc")
if (not os.path.isdir(misc_dir)) or (not os.access(misc_dir, os.X_OK)):
return []
return [MiscConfigFile(os.path.join(misc_dir, f), self.log)
for f in os.listdir(misc_dir)
if os.path.isfile(os.path.join(misc_dir, f))]
def __getConfigFileName(self, config_file):
# search for the configuration file
import types
if config_file is None:
# no config file was specified - we will look for it in the ususal locations
conf_file_list = [os.path.expanduser(f)
for f in self.CONF_LOCATIONS
if os.path.exists(os.path.expanduser(f))]
if not conf_file_list:
# no possible config file found in the usual locations
raise CryptoBoxExceptions.CBConfigUnavailableError()
config_file = conf_file_list[0]
else:
# a config file was specified (e.g. via command line)
if type(config_file) != types.StringType:
raise CryptoBoxExceptions.CBConfigUnavailableError("invalid config file specified: %s" % config_file)
if not os.path.exists(config_file):
raise CryptoBoxExceptions.CBConfigUnavailableError("could not find the specified configuration file (%s)" % config_file)
return config_file
def __configureLogHandler(self):
try:
log_level = self.prefs["Log"]["Level"].upper()
log_level_avail = ["DEBUG", "INFO", "WARN", "ERROR"]
if not log_level in log_level_avail:
raise TypeError
except KeyError:
raise CryptoBoxExceptions.CBConfigUndefinedError("Log", "Level")
except TypeError:
raise CryptoBoxExceptions.CBConfigInvalidValueError("Log", "Level", log_level, "invalid log level: only %s are allowed" % log_level_avail)
try:
try:
log_handler = logging.FileHandler(self.prefs["Log"]["Details"])
except KeyError:
raise CryptoBoxExceptions.CBConfigUndefinedError("Log", "Details")
except IOError:
raise CryptoBoxExceptions.CBEnvironmentError("could not create the log file (%s)" % self.prefs["Log"]["Details"])
log_handler.setFormatter(logging.Formatter('%(asctime)s CryptoBox %(levelname)s: %(message)s'))
cbox_log = logging.getLogger("CryptoBox")
## remove previous handlers
cbox_log.handlers = []
## add new one
cbox_log.addHandler(log_handler)
## do not call parent's handlers
cbox_log.propagate = False
## 'log_level' is a string -> use 'getattr'
cbox_log.setLevel(getattr(logging,log_level))
## the logger named "CryptoBox" is configured now
validation_spec = """
[Main]
AllowedDevices = list(min=1)
DefaultVolumePrefix = string(min=1)
DefaultCipher = string(default="aes-cbc-essiv:sha256")
ConfigVolumeLabel = string(min=1,default="cbox_config")
[Locations]
MountParentDir = directoryExists(default="/var/cache/cryptobox/mnt")
SettingsDir = directoryExists(default="/var/cache/cryptobox/settings")
TemplateDir = directoryExists(default="/usr/share/cryptobox/template")
LangDir = directoryExists(default="/usr/share/cryptobox/lang")
DocDir = directoryExists(default="/usr/share/doc/cryptobox/html")
PluginDir = directoryExists(default="/usr/share/cryptobox/plugins")
[Log]
Level = option("debug", "info", "warn", "error", default="warn")
Destination = option("file", default="file")
Details = string(min=1)
[WebSettings]
Stylesheet = string(min=1)
Language = string(min=1, default="en")
[Programs]
cryptsetup = fileExecutable(default="/sbin/cryptsetup")
mkfs-data = fileExecutable(default="/sbin/mkfs.ext3")
blkid = fileExecutable(default="/sbin/blkid")
blockdev = fileExecutable(default="/sbin/blockdev")
mount = fileExecutable(default="/bin/mount")
umount = fileExecutable(default="/bin/umount")
super = fileExecutable(default="/usr/bin/super")
# this is the "program" name as defined in /etc/super.tab
CryptoBoxRootActions = string(min=1)
"""
pluginValidationSpec = """
[__many__]
enabled = boolean(default=None)
requestAuth = boolean(default=None)
rank = integer(default=None)
"""
userDatabaseSpec = """
[admins]
admin = string(default=d033e22ae348aeb5660fc2140aec35850c4da997)
"""
class CryptoBoxSettingsValidator(validate.Validator):
def __init__(self):
validate.Validator.__init__(self)
self.functions["directoryExists"] = self.check_directoryExists
self.functions["fileExecutable"] = self.check_fileExecutable
self.functions["fileWriteable"] = self.check_fileWriteable
def check_directoryExists(self, value):
dir_path = os.path.abspath(value)
if not os.path.isdir(dir_path):
raise validate.VdtValueError("%s (not found)" % value)
if not os.access(dir_path, os.X_OK):
raise validate.VdtValueError("%s (access denied)" % value)
return dir_path
def check_fileExecutable(self, value):
file_path = os.path.abspath(value)
if not os.path.isfile(file_path):
raise validate.VdtValueError("%s (not found)" % value)
if not os.access(file_path, os.X_OK):
raise validate.VdtValueError("%s (access denied)" % value)
return file_path
def check_fileWriteable(self, value):
file_path = os.path.abspath(value)
if os.path.isfile(file_path):
if not os.access(file_path, os.W_OK):
raise validate.VdtValueError("%s (not found)" % value)
else:
parent_dir = os.path.dirname(file_path)
if os.path.isdir(parent_dir) and os.access(parent_dir, os.W_OK):
return file_path
raise validate.VdtValueError("%s (directory does not exist)" % value)
return file_path
class MiscConfigFile:
maxSize = 20480
def __init__(self, filename, logger):
self.filename = filename
self.log = logger
self.load()
def load(self):
fd = open(self.filename, "rb")
## limit the maximum size
self.content = fd.read(self.maxSize)
if fd.tell() == self.maxSize:
self.log.warn("file in misc settings directory (%s) is bigger than allowed (%s)" % (self.filename, self.maxSize))
fd.close()
def save(self):
save_dir = os.path.dirname(self.filename)
## create the directory, if necessary
if not os.path.isdir(save_dir):
try:
os.mkdir(save_dir)
except IOError:
return False
## save the content of the file
try:
fd = open(self.filename, "wb")
except IOError:
return False
try:
fd.write(self.content)
fd.close()
return True
except IOError:
fd.close()
return False