lars
90efd72b8b
plugins are now classes inherited from CryptoBoxPlugin language detection added (via request header "Accept-Language")
237 lines
8.6 KiB
Python
237 lines
8.6 KiB
Python
import logging
|
|
import validate
|
|
import os
|
|
import CryptoBoxExceptions
|
|
try:
|
|
import configobj ## needed for reading and writing of the config file
|
|
except:
|
|
raise CryptoBoxExceptions.CBEnvironmentError("couldn't import 'configobj'! Try 'apt-get install python-configobj'.")
|
|
|
|
|
|
class CryptoBoxSettings:
|
|
|
|
CONF_LOCATIONS = [
|
|
"./cryptobox.conf",
|
|
"~/.cryptobox.conf",
|
|
"/etc/cryptobox/cryptobox.conf"]
|
|
|
|
|
|
def __init__(self, config_file=None):
|
|
self.log = logging.getLogger("CryptoBox")
|
|
config_file = self.__getConfigFileName(config_file)
|
|
self.log.info("loading config file: %s" % config_file)
|
|
self.prefs = self.__getPreferences(config_file)
|
|
self.__validateConfig()
|
|
self.__configureLogHandler()
|
|
self.__checkUnknownPreferences()
|
|
self.nameDB = self.__getNameDatabase()
|
|
|
|
|
|
def __getitem__(self, key):
|
|
"""redirect all requests to the 'prefs' attribute"""
|
|
return self.prefs[key]
|
|
|
|
|
|
def __getPreferences(self, config_file):
|
|
import StringIO
|
|
config_rules = StringIO.StringIO(self.validation_spec)
|
|
try:
|
|
prefs = configobj.ConfigObj(config_file, configspec=config_rules)
|
|
if prefs:
|
|
self.log.info("found config: %s" % prefs.items())
|
|
else:
|
|
raise CryptoBoxExceptions.CBConfigUnavailableError("failed to load the config file: %s" % config_file)
|
|
except IOError:
|
|
raise CryptoBoxExceptions.CBConfigUnavailableError("unable to open the config file: %s" % config_file)
|
|
return prefs
|
|
|
|
|
|
def __validateConfig(self):
|
|
result = self.prefs.validate(CryptoBoxSettingsValidator(), preserve_errors=True)
|
|
error_list = configobj.flatten_errors(self.prefs, result)
|
|
if not error_list: return
|
|
errorMsgs = []
|
|
for sections, key, text in error_list:
|
|
section_name = "->".join(sections)
|
|
if not text:
|
|
errorMsg = "undefined configuration value (%s) in section '%s'" % (key, section_name)
|
|
else:
|
|
errorMsg = "invalid configuration value (%s) in section '%s': %s" % (key, section_name, text)
|
|
errorMsgs.append(errorMsg)
|
|
raise CryptoBoxExceptions.CBConfigError, "\n".join(errorMsgs)
|
|
|
|
|
|
def __checkUnknownPreferences(self):
|
|
import StringIO
|
|
config_rules = configobj.ConfigObj(StringIO.StringIO(self.validation_spec), list_values=False)
|
|
self.__recursiveConfigSectionCheck("", self.prefs, config_rules)
|
|
|
|
|
|
def __recursiveConfigSectionCheck(self, section_path, section_config, section_rules):
|
|
"""should be called by '__checkUnknownPreferences' for every section
|
|
sends a warning message to the logger for every undefined (see validation_spec)
|
|
configuration setting
|
|
"""
|
|
for e in section_config.keys():
|
|
element_path = section_path + e
|
|
if e in section_rules.keys():
|
|
if isinstance(section_config[e], configobj.Section):
|
|
if isinstance(section_rules[e], configobj.Section):
|
|
self.__recursiveConfigSectionCheck(element_path + "->", section_config[e], section_rules[e])
|
|
else:
|
|
self.log.warn("configuration setting should be a value instead of a section name: %s" % element_path)
|
|
else:
|
|
if not isinstance(section_rules[e], configobj.Section):
|
|
pass # good - the setting is valid
|
|
else:
|
|
self.log.warn("configuration setting should be a section name instead of a value: %s" % element_path)
|
|
else:
|
|
self.log.warn("unknown configuration setting: %s" % element_path)
|
|
|
|
|
|
def __getNameDatabase(self):
|
|
try:
|
|
try:
|
|
nameDB_file = self.prefs["Locations"]["NameDatabase"]
|
|
except KeyError:
|
|
raise CryptoBoxExceptions.CBConfigUndefinedError("Locations", "NameDatabase")
|
|
except SyntaxError:
|
|
raise CryptoBoxExceptions.CBConfigInvalidValueError("Locations", "NameDatabase", nameDB_file, "failed to interprete the filename of the name database correctly")
|
|
## create nameDB is necessary
|
|
if os.path.exists(nameDB_file):
|
|
nameDB = configobj.ConfigObj(nameDB_file)
|
|
else:
|
|
nameDB = configobj.ConfigObj(nameDB_file, create_empty=True)
|
|
## check if nameDB file was created successfully?
|
|
if not os.path.exists(nameDB_file):
|
|
raise CryptoBoxExceptions.CBEnvironmentError("failed to create name database (%s)" % nameDB_file)
|
|
return nameDB
|
|
|
|
|
|
def __getConfigFileName(self, config_file):
|
|
# search for the configuration file
|
|
import types
|
|
if config_file is None:
|
|
# no config file was specified - we will look for it in the ususal locations
|
|
conf_file_list = [os.path.expanduser(f)
|
|
for f in self.CONF_LOCATIONS
|
|
if os.path.exists(os.path.expanduser(f))]
|
|
if not conf_file_list:
|
|
# no possible config file found in the usual locations
|
|
raise CryptoBoxExceptions.CBConfigUnavailableError()
|
|
config_file = conf_file_list[0]
|
|
else:
|
|
# a config file was specified (e.g. via command line)
|
|
if type(config_file) != types.StringType:
|
|
raise CryptoBoxExceptions.CBConfigUnavailableError("invalid config file specified: %s" % config_file)
|
|
if not os.path.exists(config_file):
|
|
raise CryptoBoxExceptions.CBConfigUnavailableError("could not find the specified configuration file (%s)" % config_file)
|
|
return config_file
|
|
|
|
|
|
def __configureLogHandler(self):
|
|
try:
|
|
log_level = self.prefs["Log"]["Level"].upper()
|
|
log_level_avail = ["DEBUG", "INFO", "WARN", "ERROR"]
|
|
if not log_level in log_level_avail:
|
|
raise TypeError
|
|
except KeyError:
|
|
raise CryptoBoxExceptions.CBConfigUndefinedError("Log", "Level")
|
|
except TypeError:
|
|
raise CryptoBoxExceptions.CBConfigInvalidValueError("Log", "Level", log_level, "invalid log level: only %s are allowed" % log_level_avail)
|
|
try:
|
|
try:
|
|
log_handler = logging.FileHandler(self.prefs["Log"]["Details"])
|
|
except KeyError:
|
|
raise CryptoBoxExceptions.CBConfigUndefinedError("Log", "Details")
|
|
except IOError:
|
|
raise CryptoBoxExceptions.CBEnvironmentError("could not create the log file (%s)" % self.prefs["Log"]["Details"])
|
|
log_handler.setFormatter(logging.Formatter('%(asctime)s CryptoBox %(levelname)s: %(message)s'))
|
|
cbox_log = logging.getLogger("CryptoBox")
|
|
## remove previous handlers
|
|
cbox_log.handlers = []
|
|
## add new one
|
|
cbox_log.addHandler(log_handler)
|
|
## do not call parent's handlers
|
|
cbox_log.propagate = False
|
|
## 'log_level' is a string -> use 'getattr'
|
|
cbox_log.setLevel(getattr(logging,log_level))
|
|
## the logger named "CryptoBox" is configured now
|
|
|
|
|
|
validation_spec = """
|
|
[Main]
|
|
AllowedDevices = list(min=1)
|
|
DefaultVolumePrefix = string(min=1)
|
|
DefaultCipher = string(default="aes-cbc-essiv:sha256")
|
|
ConfigVolumeLabel = string(min=1,default="cbox_config")
|
|
|
|
[Locations]
|
|
MountParentDir = directoryExists(default="/var/cache/cryptobox/mnt")
|
|
NameDatabase = fileWriteable(default="/var/cache/cryptobox/volumen_names.db")
|
|
TemplateDir = directoryExists(default="/usr/share/cryptobox/template")
|
|
LangDir = directoryExists(default="/usr/share/cryptobox/lang")
|
|
DocDir = directoryExists(default="/usr/share/doc/cryptobox/html")
|
|
PluginDir = directoryExists(default="/usr/share/cryptobox/plugins")
|
|
|
|
[Log]
|
|
Level = option("debug", "info", "warn", "error", default="warn")
|
|
Destination = option("file", default="file")
|
|
Details = string(min=1)
|
|
|
|
[WebSettings]
|
|
Stylesheet = string(min=1)
|
|
Language = string(min=1, default="en")
|
|
|
|
[Programs]
|
|
cryptsetup = fileExecutable(default="/sbin/cryptsetup")
|
|
mkfs-data = fileExecutable(default="/sbin/mkfs.ext3")
|
|
blkid = fileExecutable(default="/sbin/blkid")
|
|
mount = fileExecutable(default="/bin/mount")
|
|
umount = fileExecutable(default="/bin/umount")
|
|
super = fileExecutable(default="/usr/bin/super")
|
|
# this is the "program" name as defined in /etc/super.tab
|
|
CryptoBoxRootActions = string(min=1)
|
|
"""
|
|
|
|
|
|
|
|
class CryptoBoxSettingsValidator(validate.Validator):
|
|
|
|
def __init__(self):
|
|
validate.Validator.__init__(self)
|
|
self.functions["directoryExists"] = self.check_directoryExists
|
|
self.functions["fileExecutable"] = self.check_fileExecutable
|
|
self.functions["fileWriteable"] = self.check_fileWriteable
|
|
|
|
|
|
def check_directoryExists(self, value):
|
|
dir_path = os.path.abspath(value)
|
|
if not os.path.isdir(dir_path):
|
|
raise validate.VdtValueError("%s (not found)" % value)
|
|
if not os.access(dir_path, os.X_OK):
|
|
raise validate.VdtValueError("%s (access denied)" % value)
|
|
return dir_path
|
|
|
|
|
|
def check_fileExecutable(self, value):
|
|
file_path = os.path.abspath(value)
|
|
if not os.path.isfile(file_path):
|
|
raise validate.VdtValueError("%s (not found)" % value)
|
|
if not os.access(file_path, os.X_OK):
|
|
raise validate.VdtValueError("%s (access denied)" % value)
|
|
return file_path
|
|
|
|
|
|
def check_fileWriteable(self, value):
|
|
file_path = os.path.abspath(value)
|
|
if os.path.isfile(file_path):
|
|
if not os.access(file_path, os.W_OK):
|
|
raise validate.VdtValueError("%s (not found)" % value)
|
|
else:
|
|
parent_dir = os.path.dirname(file_path)
|
|
if os.path.isdir(parent_dir) and os.access(parent_dir, os.W_OK):
|
|
return file_path
|
|
raise validate.VdtValueError("%s (directory does not exist)" % value)
|
|
return file_path
|
|
|