cryptonas-branches/pythonrewrite/bin/CryptoBoxSettings.py

238 lines
8.6 KiB
Python

import logging
import validate
import os
import CryptoBoxExceptions
try:
import configobj ## needed for reading and writing of the config file
except:
raise CryptoBoxExceptions.CBEnvironmentError("couldn't import 'configobj'! Try 'apt-get install python-configobj'.")
class CryptoBoxSettings:
CONF_LOCATIONS = [
"./cryptobox.conf",
"~/.cryptobox.conf",
"/etc/cryptobox/cryptobox.conf"]
def __init__(self, config_file=None):
self.log = logging.getLogger("CryptoBox")
config_file = self.__getConfigFileName(config_file)
self.log.info("loading config file: %s" % config_file)
self.prefs = self.__getPreferences(config_file)
self.__validateConfig()
self.__configureLogHandler()
self.__checkUnknownPreferences()
self.nameDB = self.__getNameDatabase()
def __getitem__(self, key):
"""redirect all requests to the 'prefs' attribute"""
return self.prefs[key]
def __getPreferences(self, config_file):
import StringIO
config_rules = StringIO.StringIO(self.validation_spec)
try:
prefs = configobj.ConfigObj(config_file, configspec=config_rules)
if prefs:
self.log.info("found config: %s" % prefs.items())
else:
raise CryptoBoxExceptions.CBConfigUnavailableError("failed to load the config file: %s" % config_file)
except IOError:
raise CryptoBoxExceptions.CBConfigUnavailableError("unable to open the config file: %s" % config_file)
return prefs
def __validateConfig(self):
result = self.prefs.validate(CryptoBoxSettingsValidator(), preserve_errors=True)
error_list = configobj.flatten_errors(self.prefs, result)
if not error_list: return
errorMsgs = []
for sections, key, text in error_list:
section_name = "->".join(sections)
if not text:
errorMsg = "undefined configuration value (%s) in section '%s'" % (key, section_name)
else:
errorMsg = "invalid configuration value (%s) in section '%s': %s" % (key, section_name, text)
errorMsgs.append(errorMsg)
raise CryptoBoxExceptions.CBConfigError, "\n".join(errorMsgs)
def __checkUnknownPreferences(self):
import StringIO
config_rules = configobj.ConfigObj(StringIO.StringIO(self.validation_spec), list_values=False)
self.__recursiveConfigSectionCheck("", self.prefs, config_rules)
def __recursiveConfigSectionCheck(self, section_path, section_config, section_rules):
"""should be called by '__checkUnknownPreferences' for every section
sends a warning message to the logger for every undefined (see validation_spec)
configuration setting
"""
for e in section_config.keys():
element_path = section_path + e
if e in section_rules.keys():
if isinstance(section_config[e], configobj.Section):
if isinstance(section_rules[e], configobj.Section):
self.__recursiveConfigSectionCheck(element_path + "->", section_config[e], section_rules[e])
else:
self.log.warn("configuration setting should be a value instead of a section name: %s" % element_path)
else:
if not isinstance(section_rules[e], configobj.Section):
pass # good - the setting is valid
else:
self.log.warn("configuration setting should be a section name instead of a value: %s" % element_path)
else:
self.log.warn("unknown configuration setting: %s" % element_path)
def __getNameDatabase(self):
try:
try:
nameDB_file = self.prefs["Locations"]["NameDatabase"]
except KeyError:
raise CryptoBoxExceptions.CBConfigUndefinedError("Locations", "NameDatabase")
except SyntaxError:
raise CryptoBoxExceptions.CBConfigInvalidValueError("Locations", "NameDatabase", nameDB_file, "failed to interprete the filename of the name database correctly")
## create nameDB is necessary
if os.path.exists(nameDB_file):
nameDB = configobj.ConfigObj(nameDB_file)
else:
nameDB = configobj.ConfigObj(nameDB_file, create_empty=True)
## check if nameDB file was created successfully?
if not os.path.exists(nameDB_file):
raise CryptoBoxExceptions.CBEnvironmentError("failed to create name database (%s)" % nameDB_file)
return nameDB
def __getConfigFileName(self, config_file):
# search for the configuration file
import types
if config_file is None:
# no config file was specified - we will look for it in the ususal locations
conf_file_list = [os.path.expanduser(f)
for f in self.CONF_LOCATIONS
if os.path.exists(os.path.expanduser(f))]
if not conf_file_list:
# no possible config file found in the usual locations
raise CryptoBoxExceptions.CBConfigUnavailableError()
config_file = conf_file_list[0]
else:
# a config file was specified (e.g. via command line)
if type(config_file) != types.StringType:
raise CryptoBoxExceptions.CBConfigUnavailableError("invalid config file specified: %s" % config_file)
if not os.path.exists(config_file):
raise CryptoBoxExceptions.CBConfigUnavailableError("could not find the specified configuration file (%s)" % config_file)
return config_file
def __configureLogHandler(self):
try:
log_level = self.prefs["Log"]["Level"].upper()
log_level_avail = ["DEBUG", "INFO", "WARN", "ERROR"]
if not log_level in log_level_avail:
raise TypeError
except KeyError:
raise CryptoBoxExceptions.CBConfigUndefinedError("Log", "Level")
except TypeError:
raise CryptoBoxExceptions.CBConfigInvalidValueError("Log", "Level", log_level, "invalid log level: only %s are allowed" % log_level_avail)
try:
try:
log_handler = logging.FileHandler(self.prefs["Log"]["Details"])
except KeyError:
raise CryptoBoxExceptions.CBConfigUndefinedError("Log", "Details")
except IOError:
raise CryptoBoxExceptions.CBEnvironmentError("could not create the log file (%s)" % self.prefs["Log"]["Details"])
log_handler.setFormatter(logging.Formatter('%(asctime)s %(module)s %(levelname)s: %(message)s'))
cbox_log = logging.getLogger("CryptoBox")
## remove previous handlers
cbox_log.handlers = []
## add new one
cbox_log.addHandler(log_handler)
## do not call parent's handlers
cbox_log.propagate = False
## 'log_level' is a string -> use 'getattr'
cbox_log.setLevel(getattr(logging,log_level))
## the logger named "CryptoBox" is configured now
validation_spec = """
[Main]
AllowedDevices = list(min=1)
DefaultVolumePrefix = string(min=1)
DefaultCipher = string(default="aes-cbc-essiv:sha256")
ConfigVolumeLabel = string(min=1,default="cbox_config")
[Locations]
MountParentDir = directoryExists(default="/var/cache/cryptobox/mnt")
NameDatabase = fileWriteable(default="/var/cache/cryptobox/volumen_names.db")
TemplateDir = directoryExists(default="/usr/share/cryptobox/template")
LangDir = directoryExists(default="/usr/share/cryptobox/lang")
DocDir = directoryExists(default="/usr/share/doc/cryptobox/html")
PluginDir = directoryExists(default="/usr/share/cryptobox/plugins")
[Log]
Level = option("debug", "info", "warn", "error", default="warn")
Destination = option("file", default="file")
Details = string(min=1)
[WebSettings]
Stylesheet = string(min=1)
Language = string(min=1, default="en")
DocLanguage = string(min=1, default="en")
[Programs]
cryptsetup = fileExecutable(default="/sbin/cryptsetup")
mkfs-data = fileExecutable(default="/sbin/mkfs.ext3")
blkid = fileExecutable(default="/sbin/blkid")
mount = fileExecutable(default="/bin/mount")
umount = fileExecutable(default="/bin/umount")
super = fileExecutable(default="/usr/bin/super")
# this is the "program" name as defined in /etc/super.tab
CryptoBoxRootActions = string(min=1)
"""
class CryptoBoxSettingsValidator(validate.Validator):
def __init__(self):
validate.Validator.__init__(self)
self.functions["directoryExists"] = self.check_directoryExists
self.functions["fileExecutable"] = self.check_fileExecutable
self.functions["fileWriteable"] = self.check_fileWriteable
def check_directoryExists(self, value):
dir_path = os.path.abspath(value)
if not os.path.isdir(dir_path):
raise validate.VdtValueError("%s (not found)" % value)
if not os.access(dir_path, os.X_OK):
raise validate.VdtValueError("%s (access denied)" % value)
return dir_path
def check_fileExecutable(self, value):
file_path = os.path.abspath(value)
if not os.path.isfile(file_path):
raise validate.VdtValueError("%s (not found)" % value)
if not os.access(file_path, os.X_OK):
raise validate.VdtValueError("%s (access denied)" % value)
return file_path
def check_fileWriteable(self, value):
file_path = os.path.abspath(value)
if os.path.isfile(file_path):
if not os.access(file_path, os.W_OK):
raise validate.VdtValueError("%s (not found)" % value)
else:
parent_dir = os.path.dirname(file_path)
if os.path.isdir(parent_dir) and os.access(parent_dir, os.W_OK):
return file_path
raise validate.VdtValueError("%s (directory does not exist)" % value)
return file_path