import logging import validate import os import CryptoBoxExceptions try: import configobj ## needed for reading and writing of the config file except: raise CryptoBoxExceptions.CBEnvironmentError("couldn't import 'configobj'! Try 'apt-get install python-configobj'.") class CryptoBoxSettings: CONF_LOCATIONS = [ "./cryptobox.conf", "~/.cryptobox.conf", "/etc/cryptobox/cryptobox.conf"] def __init__(self, config_file=None): self.log = logging.getLogger("CryptoBox") config_file = self.__getConfigFileName(config_file) self.log.info("loading config file: %s" % config_file) self.prefs = self.__getPreferences(config_file) self.__validateConfig() self.__configureLogHandler() self.__checkUnknownPreferences() self.nameDB = self.__getNameDatabase() def __getitem__(self, key): """redirect all requests to the 'prefs' attribute""" return self.prefs[key] def __getPreferences(self, config_file): import StringIO config_rules = StringIO.StringIO(self.validation_spec) try: prefs = configobj.ConfigObj(config_file, configspec=config_rules) if prefs: self.log.info("found config: %s" % prefs.items()) else: raise CryptoBoxExceptions.CBConfigUnavailableError("failed to load the config file: %s" % config_file) except IOError: raise CryptoBoxExceptions.CBConfigUnavailableError("unable to open the config file: %s" % config_file) return prefs def __validateConfig(self): result = self.prefs.validate(CryptoBoxSettingsValidator(), preserve_errors=True) error_list = configobj.flatten_errors(self.prefs, result) if not error_list: return errorMsgs = [] for sections, key, text in error_list: section_name = "->".join(sections) if not text: errorMsg = "undefined configuration value (%s) in section '%s'" % (key, section_name) else: errorMsg = "invalid configuration value (%s) in section '%s': %s" % (key, section_name, text) errorMsgs.append(errorMsg) raise CryptoBoxExceptions.CBConfigError, "\n".join(errorMsgs) def __checkUnknownPreferences(self): import StringIO config_rules = configobj.ConfigObj(StringIO.StringIO(self.validation_spec), list_values=False) self.__recursiveConfigSectionCheck("", self.prefs, config_rules) def __recursiveConfigSectionCheck(self, section_path, section_config, section_rules): """should be called by '__checkUnknownPreferences' for every section sends a warning message to the logger for every undefined (see validation_spec) configuration setting """ for e in section_config.keys(): element_path = section_path + e if e in section_rules.keys(): if isinstance(section_config[e], configobj.Section): if isinstance(section_rules[e], configobj.Section): self.__recursiveConfigSectionCheck(element_path + "->", section_config[e], section_rules[e]) else: self.log.warn("configuration setting should be a value instead of a section name: %s" % element_path) else: if not isinstance(section_rules[e], configobj.Section): pass # good - the setting is valid else: self.log.warn("configuration setting should be a section name instead of a value: %s" % element_path) else: self.log.warn("unknown configuration setting: %s" % element_path) def __getNameDatabase(self): try: try: nameDB_file = self.prefs["Locations"]["NameDatabase"] except KeyError: raise CryptoBoxExceptions.CBConfigUndefinedError("Locations", "NameDatabase") except SyntaxError: raise CryptoBoxExceptions.CBConfigInvalidValueError("Locations", "NameDatabase", nameDB_file, "failed to interprete the filename of the name database correctly") ## create nameDB is necessary if os.path.exists(nameDB_file): nameDB = configobj.ConfigObj(nameDB_file) else: nameDB = configobj.ConfigObj(nameDB_file, create_empty=True) ## check if nameDB file was created successfully? if not os.path.exists(nameDB_file): raise CryptoBoxExceptions.CBEnvironmentError("failed to create name database (%s)" % nameDB_file) return nameDB def __getConfigFileName(self, config_file): # search for the configuration file import types if config_file is None: # no config file was specified - we will look for it in the ususal locations conf_file_list = [os.path.expanduser(f) for f in self.CONF_LOCATIONS if os.path.exists(os.path.expanduser(f))] if not conf_file_list: # no possible config file found in the usual locations raise CryptoBoxExceptions.CBConfigUnavailableError() config_file = conf_file_list[0] else: # a config file was specified (e.g. via command line) if type(config_file) != types.StringType: raise CryptoBoxExceptions.CBConfigUnavailableError("invalid config file specified: %s" % config_file) if not os.path.exists(config_file): raise CryptoBoxExceptions.CBConfigUnavailableError("could not find the specified configuration file (%s)" % config_file) return config_file def __configureLogHandler(self): try: log_level = self.prefs["Log"]["Level"].upper() log_level_avail = ["DEBUG", "INFO", "WARN", "ERROR"] if not log_level in log_level_avail: raise TypeError except KeyError: raise CryptoBoxExceptions.CBConfigUndefinedError("Log", "Level") except TypeError: raise CryptoBoxExceptions.CBConfigInvalidValueError("Log", "Level", log_level, "invalid log level: only %s are allowed" % log_level_avail) try: try: log_handler = logging.FileHandler(self.prefs["Log"]["Details"]) except KeyError: raise CryptoBoxExceptions.CBConfigUndefinedError("Log", "Details") except IOError: raise CryptoBoxExceptions.CBEnvironmentError("could not create the log file (%s)" % self.prefs["Log"]["Details"]) log_handler.setFormatter(logging.Formatter('%(asctime)s %(module)s %(levelname)s: %(message)s')) cbox_log = logging.getLogger("CryptoBox") ## remove previous handlers cbox_log.handlers = [] ## add new one cbox_log.addHandler(log_handler) ## do not call parent's handlers cbox_log.propagate = False ## 'log_level' is a string -> use 'getattr' cbox_log.setLevel(getattr(logging,log_level)) ## the logger named "CryptoBox" is configured now validation_spec = """ [Main] AllowedDevices = list(min=1) DefaultVolumePrefix = string(min=1) DefaultCipher = string(default="aes-cbc-essiv:sha256") [Locations] MountParentDir = directoryExists(default="/var/cache/cryptobox/mnt") NameDatabase = fileWriteable(default="/var/cache/cryptobox/volumen_names.db") TemplateDir = directoryExists(default="/usr/share/cryptobox/template") LangDir = directoryExists(default="/usr/share/cryptobox/lang") DocDir = directoryExists(default="/usr/share/doc/cryptobox/html") PluginDir = directoryExists(default="/usr/share/cryptobox/plugins") [Log] Level = option("debug", "info", "warn", "error", default="warn") Destination = option("file", default="file") Details = string(min=1) [WebSettings] Stylesheet = string(min=1) Language = string(min=1, default="en") DocLanguage = string(min=1, default="en") [Programs] cryptsetup = fileExecutable(default="/sbin/cryptsetup") mkfs-data = fileExecutable(default="/sbin/mkfs.ext3") mkfs-config = fileExecutable(default="/sbin/mkfs.ext2") blkid = fileExecutable(default="/sbin/blkid") mount = fileExecutable(default="/bin/mount") umount = fileExecutable(default="/bin/umount") super = fileExecutable(default="/usr/bin/super") # this is the "program" name as defined in /etc/super.tab CryptoBoxRootActions = string(min=1) """ class CryptoBoxSettingsValidator(validate.Validator): def __init__(self): validate.Validator.__init__(self) self.functions["directoryExists"] = self.check_directoryExists self.functions["fileExecutable"] = self.check_fileExecutable self.functions["fileWriteable"] = self.check_fileWriteable def check_directoryExists(self, value): dir_path = os.path.abspath(value) if not os.path.isdir(dir_path): raise validate.VdtValueError("%s (not found)" % value) if not os.access(dir_path, os.X_OK): raise validate.VdtValueError("%s (access denied)" % value) return dir_path def check_fileExecutable(self, value): file_path = os.path.abspath(value) if not os.path.isfile(file_path): raise validate.VdtValueError("%s (not found)" % value) if not os.access(file_path, os.X_OK): raise validate.VdtValueError("%s (access denied)" % value) return file_path def check_fileWriteable(self, value): file_path = os.path.abspath(value) if os.path.isfile(file_path): if not os.access(file_path, os.W_OK): raise validate.VdtValueError("%s (not found)" % value) else: parent_dir = os.path.dirname(file_path) if os.path.isdir(parent_dir) and os.access(parent_dir, os.W_OK): return file_path raise validate.VdtValueError("%s (directory does not exist)" % value) return file_path