moved configuration handling to CryptoBoxSettings
added configuration validation updated unittests.CryptoBox for new configuration validationmaster
parent
0aa1f9f74b
commit
88fc900cc5
@ -0,0 +1,237 @@
|
||||
import logging
|
||||
import validate
|
||||
import os
|
||||
import CryptoBoxExceptions
|
||||
try:
|
||||
import configobj ## needed for reading and writing of the config file
|
||||
except:
|
||||
raise CryptoBoxExceptions.CBEnvironmentError("couldn't import 'configobj'! Try 'apt-get install python-configobj'.")
|
||||
|
||||
|
||||
class CryptoBoxSettings:
|
||||
|
||||
CONF_LOCATIONS = [
|
||||
"./cryptobox.conf",
|
||||
"~/.cryptobox.conf",
|
||||
"/etc/cryptobox/cryptobox.conf"]
|
||||
|
||||
|
||||
def __init__(self, config_file=None):
|
||||
self.log = logging.getLogger("CryptoBox")
|
||||
config_file = self.__getConfigFileName(config_file)
|
||||
self.log.info("loading config file: %s" % config_file)
|
||||
self.prefs = self.__getPreferences(config_file)
|
||||
self.__validateConfig()
|
||||
self.__configureLogHandler()
|
||||
self.__checkUnknownPreferences()
|
||||
self.nameDB = self.__getNameDatabase()
|
||||
|
||||
|
||||
def __getitem__(self, key):
|
||||
"""redirect all requests to the 'prefs' attribute"""
|
||||
return self.prefs[key]
|
||||
|
||||
|
||||
def __getPreferences(self, config_file):
|
||||
import StringIO
|
||||
config_rules = StringIO.StringIO(self.validation_spec)
|
||||
try:
|
||||
prefs = configobj.ConfigObj(config_file, configspec=config_rules)
|
||||
if prefs:
|
||||
self.log.info("found config: %s" % prefs.items())
|
||||
else:
|
||||
raise CryptoBoxExceptions.CBConfigUnavailableError("failed to load the config file: %s" % config_file)
|
||||
except IOError:
|
||||
raise CryptoBoxExceptions.CBConfigUnavailableError("unable to open the config file: %s" % config_file)
|
||||
return prefs
|
||||
|
||||
|
||||
def __validateConfig(self):
|
||||
result = self.prefs.validate(CryptoBoxSettingsValidator(), preserve_errors=True)
|
||||
error_list = configobj.flatten_errors(self.prefs, result)
|
||||
if not error_list: return
|
||||
errorMsgs = []
|
||||
for sections, key, text in error_list:
|
||||
section_name = "->".join(sections)
|
||||
if not text:
|
||||
errorMsg = "undefined configuration value (%s) in section '%s'" % (key, section_name)
|
||||
else:
|
||||
errorMsg = "invalid configuration value (%s) in section '%s': %s" % (key, section_name, text)
|
||||
errorMsgs.append(errorMsg)
|
||||
raise CryptoBoxExceptions.CBConfigError, "\n".join(errorMsgs)
|
||||
|
||||
|
||||
def __checkUnknownPreferences(self):
|
||||
import StringIO
|
||||
config_rules = configobj.ConfigObj(StringIO.StringIO(self.validation_spec), list_values=False)
|
||||
self.__recursiveConfigSectionCheck("", self.prefs, config_rules)
|
||||
|
||||
|
||||
def __recursiveConfigSectionCheck(self, section_path, section_config, section_rules):
|
||||
"""should be called by '__checkUnknownPreferences' for every section
|
||||
sends a warning message to the logger for every undefined (see validation_spec)
|
||||
configuration setting
|
||||
"""
|
||||
for e in section_config.keys():
|
||||
element_path = section_path + e
|
||||
if e in section_rules.keys():
|
||||
if isinstance(section_config[e], configobj.Section):
|
||||
if isinstance(section_rules[e], configobj.Section):
|
||||
self.__recursiveConfigSectionCheck(element_path + "->", section_config[e], section_rules[e])
|
||||
else:
|
||||
self.log.warn("configuration setting should be a value instead of a section name: %s" % element_path)
|
||||
else:
|
||||
if not isinstance(section_rules[e], configobj.Section):
|
||||
pass # good - the setting is valid
|
||||
else:
|
||||
self.log.warn("configuration setting should be a section name instead of a value: %s" % element_path)
|
||||
else:
|
||||
self.log.warn("unknown configuration setting: %s" % element_path)
|
||||
|
||||
|
||||
def __getNameDatabase(self):
|
||||
try:
|
||||
try:
|
||||
nameDB_file = self.prefs["Locations"]["NameDatabase"]
|
||||
except KeyError:
|
||||
raise CryptoBoxExceptions.CBConfigUndefinedError("Locations", "NameDatabase")
|
||||
except SyntaxError:
|
||||
raise CryptoBoxExceptions.CBConfigInvalidValueError("Locations", "NameDatabase", nameDB_file, "failed to interprete the filename of the name database correctly")
|
||||
## create nameDB is necessary
|
||||
if os.path.exists(nameDB_file):
|
||||
nameDB = configobj.ConfigObj(nameDB_file)
|
||||
else:
|
||||
nameDB = configobj.ConfigObj(nameDB_file, create_empty=True)
|
||||
## check if nameDB file was created successfully?
|
||||
if not os.path.exists(nameDB_file):
|
||||
raise CryptoBoxExceptions.CBEnvironmentError("failed to create name database (%s)" % nameDB_file)
|
||||
return nameDB
|
||||
|
||||
|
||||
def __getConfigFileName(self, config_file):
|
||||
# search for the configuration file
|
||||
import types
|
||||
if config_file is None:
|
||||
# no config file was specified - we will look for it in the ususal locations
|
||||
conf_file_list = [os.path.expanduser(f)
|
||||
for f in self.CONF_LOCATIONS
|
||||
if os.path.exists(os.path.expanduser(f))]
|
||||
if not conf_file_list:
|
||||
# no possible config file found in the usual locations
|
||||
raise CryptoBoxExceptions.CBConfigUnavailableError()
|
||||
config_file = conf_file_list[0]
|
||||
else:
|
||||
# a config file was specified (e.g. via command line)
|
||||
if type(config_file) != types.StringType:
|
||||
raise CryptoBoxExceptions.CBConfigUnavailableError("invalid config file specified: %s" % config_file)
|
||||
if not os.path.exists(config_file):
|
||||
raise CryptoBoxExceptions.CBConfigUnavailableError("could not find the specified configuration file (%s)" % config_file)
|
||||
return config_file
|
||||
|
||||
|
||||
def __configureLogHandler(self):
|
||||
try:
|
||||
log_level = self.prefs["Log"]["Level"].upper()
|
||||
log_level_avail = ["DEBUG", "INFO", "WARN", "ERROR"]
|
||||
if not log_level in log_level_avail:
|
||||
raise TypeError
|
||||
except KeyError:
|
||||
raise CryptoBoxExceptions.CBConfigUndefinedError("Log", "Level")
|
||||
except TypeError:
|
||||
raise CryptoBoxExceptions.CBConfigInvalidValueError("Log", "Level", log_level, "invalid log level: only %s are allowed" % log_level_avail)
|
||||
try:
|
||||
try:
|
||||
log_handler = logging.FileHandler(self.prefs["Log"]["Details"])
|
||||
except KeyError:
|
||||
raise CryptoBoxExceptions.CBConfigUndefinedError("Log", "Details")
|
||||
except IOError:
|
||||
raise CryptoBoxExceptions.CBEnvironmentError("could not create the log file (%s)" % self.prefs["Log"]["Details"])
|
||||
log_handler.setFormatter(logging.Formatter('%(asctime)s %(module)s %(levelname)s: %(message)s'))
|
||||
cbox_log = logging.getLogger("CryptoBox")
|
||||
## remove previous handlers
|
||||
cbox_log.handlers = []
|
||||
## add new one
|
||||
cbox_log.addHandler(log_handler)
|
||||
## do not call parent's handlers
|
||||
cbox_log.propagate = False
|
||||
## 'log_level' is a string -> use 'getattr'
|
||||
cbox_log.setLevel(getattr(logging,log_level))
|
||||
## the logger named "CryptoBox" is configured now
|
||||
|
||||
|
||||
validation_spec = """
|
||||
[Main]
|
||||
AllowedDevices = list(min=1)
|
||||
DefaultVolumePrefix = string(min=1)
|
||||
DefaultCipher = string(default="aes-cbc-essiv:sha256")
|
||||
|
||||
[Locations]
|
||||
MountParentDir = directoryExists(default="/var/cache/cryptobox/mnt")
|
||||
NameDatabase = fileWriteable(default="/var/cache/cryptobox/volumen_names.db")
|
||||
TemplateDir = directoryExists(default="/usr/share/cryptobox/template")
|
||||
LangDir = directoryExists(default="/usr/share/cryptobox/lang")
|
||||
DocDir = directoryExists(default="/usr/share/doc/cryptobox/html")
|
||||
|
||||
[Log]
|
||||
Level = option("debug", "info", "warn", "error", default="warn")
|
||||
Destination = option("file", default="file")
|
||||
Details = string(min=1)
|
||||
|
||||
[WebSettings]
|
||||
Stylesheet = string(min=1)
|
||||
Language = string(min=1, default="en")
|
||||
DocLanguage = string(min=1, default="en")
|
||||
|
||||
[Programs]
|
||||
cryptsetup = fileExecutable(default="/sbin/cryptsetup")
|
||||
mkfs-data = fileExecutable(default="/sbin/mkfs.ext3")
|
||||
mkfs-config = fileExecutable(default="/sbin/mkfs.ext2")
|
||||
blkid = fileExecutable(default="/sbin/blkid")
|
||||
mount = fileExecutable(default="/bin/mount")
|
||||
umount = fileExecutable(default="/bin/umount")
|
||||
super = fileExecutable(default="/usr/bin/super")
|
||||
# this is the "program" name as defined in /etc/super.tab
|
||||
CryptoBoxRootActions = string(min=1)
|
||||
"""
|
||||
|
||||
|
||||
|
||||
class CryptoBoxSettingsValidator(validate.Validator):
|
||||
|
||||
def __init__(self):
|
||||
validate.Validator.__init__(self)
|
||||
self.functions["directoryExists"] = self.check_directoryExists
|
||||
self.functions["fileExecutable"] = self.check_fileExecutable
|
||||
self.functions["fileWriteable"] = self.check_fileWriteable
|
||||
|
||||
|
||||
def check_directoryExists(self, value):
|
||||
dir_path = os.path.abspath(value)
|
||||
if not os.path.isdir(dir_path):
|
||||
raise validate.VdtValueError("%s (not found)" % value)
|
||||
if not os.access(dir_path, os.X_OK):
|
||||
raise validate.VdtValueError("%s (access denied)" % value)
|
||||
return dir_path
|
||||
|
||||
|
||||
def check_fileExecutable(self, value):
|
||||
file_path = os.path.abspath(value)
|
||||
if not os.path.isfile(file_path):
|
||||
raise validate.VdtValueError("%s (not found)" % value)
|
||||
if not os.access(file_path, os.X_OK):
|
||||
raise validate.VdtValueError("%s (access denied)" % value)
|
||||
return file_path
|
||||
|
||||
|
||||
def check_fileWriteable(self, value):
|
||||
file_path = os.path.abspath(value)
|
||||
if os.path.isfile(file_path):
|
||||
if not os.access(file_path, os.W_OK):
|
||||
raise validate.VdtValueError("%s (not found)" % value)
|
||||
else:
|
||||
parent_dir = os.path.dirname(file_path)
|
||||
if os.path.isdir(parent_dir) and os.access(parent_dir, os.W_OK):
|
||||
return file_path
|
||||
raise validate.VdtValueError("%s (directory does not exist)" % value)
|
||||
return file_path
|
||||
|
Loading…
Reference in New Issue