# # Copyright 2006 sense.lab e.V. # # This file is part of the CryptoBox. # # The CryptoBox is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # The CryptoBox is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with the CryptoBox; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # """Manage the configuration of a CryptoBox """ __revision__ = "$Id" from cryptobox.core.exceptions import * import logging import subprocess import os import configobj, validate class CryptoBoxSettings: """Manage the various configuration files of the CryptoBox """ CONF_LOCATIONS = [ "./cryptobox.conf", "~/.cryptobox.conf", "/etc/cryptobox-server/cryptobox.conf"] VOLUMESDB_FILE = "cryptobox_volumes.db" PLUGINCONF_FILE = "cryptobox_plugins.conf" USERDB_FILE = "cryptobox_users.db" def __init__(self, config_file=None): self.log = logging.getLogger("CryptoBox") config_file = self.__get_config_filename(config_file) self.log.info("loading config file: %s" % config_file) self.prefs = self.__get_preferences(config_file) self.__validate_config() self.__configure_log_handler() self.__check_unknown_preferences() self.prepare_partition() self.volumes_db = self.__get_volumes_database() self.plugin_conf = self.__get_plugin_config() self.user_db = self.__get_user_db() self.misc_files = self.__get_misc_files() def write(self): """ write all local setting files including the content of the "misc" subdirectory """ status = True try: self.volumes_db.write() except IOError: self.log.warn("could not save the volume database") status = False try: self.plugin_conf.write() except IOError: self.log.warn("could not save the plugin configuration") status = False try: self.user_db.write() except IOError: self.log.warn("could not save the user database") status = False for misc_file in self.misc_files: if not misc_file.save(): self.log.warn("could not save a misc setting file (%s)" % misc_file.filename) status = False return status def requires_partition(self): return bool(self.prefs["Main"]["UseConfigPartition"]) def get_active_partition(self): """Return the currently active cnfiguration partition. """ settings_dir = self.prefs["Locations"]["SettingsDir"] if not os.path.ismount(settings_dir): return None for line in file("/proc/mounts"): fields = line.split(" ") mount_dir = fields[1] try: if os.path.samefile(mount_dir, settings_dir): return fields[0] except OSError: pass ## no matching entry found return None def mount_partition(self): """Mount a config partition. """ self.log.debug("trying to mount configuration partition") if not self.requires_partition(): self.log.warn("mountConfigPartition: configuration partition is " + "not required - mounting anyway") if self.get_active_partition(): self.log.warn("mountConfigPartition: configuration partition already " + "mounted - not mounting again") return False conf_partitions = self.get_available_partitions() if not conf_partitions: self.log.error("no configuration partition found - you have to create " + "it first") return False partition = conf_partitions[0] proc = subprocess.Popen( shell = False, stdout = subprocess.PIPE, stderr = subprocess.PIPE, args = [ self.prefs["Programs"]["super"], self.prefs["Programs"]["CryptoBoxRootActions"], "mount", partition, self.prefs["Locations"]["SettingsDir"]]) (stdout, stderr) = proc.communicate() if proc.returncode != 0: self.log.error("failed to mount the configuration partition: %s" % partition) self.log.error("output of mount: %s" % (stderr,)) return False self.log.info("configuration partition mounted: %s" % partition) return True def umount_partition(self): """Umount the currently active configuration partition. """ if not self.get_active_partition(): self.log.warn("umountConfigPartition: no configuration partition mounted") return False proc = subprocess.Popen( shell = False, stdout = subprocess.PIPE, stderr = subprocess.PIPE, args = [ self.prefs["Programs"]["super"], self.prefs["Programs"]["CryptoBoxRootActions"], "umount", self.prefs["Locations"]["SettingsDir"]]) (stdout, stderr) = proc.communicate() if proc.returncode != 0: self.log.error("failed to unmount the configuration partition") self.log.error("output of mount: %s" % (stderr,)) return False self.log.info("configuration partition unmounted") return True def get_available_partitions(self): """returns a sequence of found config partitions""" proc = subprocess.Popen( shell = False, stdout = subprocess.PIPE, args = [ self.prefs["Programs"]["blkid"], "-c", os.path.devnull, "-t", "LABEL=%s" % self.prefs["Main"]["ConfigVolumeLabel"] ]) (output, error) = proc.communicate() if output: return [e.strip().split(":", 1)[0] for e in output.splitlines()] else: return [] def prepare_partition(self): """Mount a config partition if necessary. """ if self.requires_partition() and not self.get_active_partition(): self.mount_partition() def __getitem__(self, key): """redirect all requests to the 'prefs' attribute""" return self.prefs[key] def __get_preferences(self, config_file): """Load the CryptoBox configuration. """ import StringIO config_rules = StringIO.StringIO(self.validation_spec) try: prefs = configobj.ConfigObj(config_file, configspec=config_rules) if prefs: self.log.info("found config: %s" % prefs.items()) else: raise CBConfigUnavailableError( "failed to load the config file: %s" % config_file) except IOError: raise CBConfigUnavailableError( "unable to open the config file: %s" % config_file) return prefs def __validate_config(self): """Check the configuration settings and cast value types. """ result = self.prefs.validate(CryptoBoxSettingsValidator(), preserve_errors=True) error_list = configobj.flatten_errors(self.prefs, result) if not error_list: return error_msgs = [] for sections, key, text in error_list: section_name = "->".join(sections) if not text: error_msg = "undefined configuration value (%s) in section '%s'" % \ (key, section_name) else: error_msg = "invalid configuration value (%s) in section '%s': %s" % \ (key, section_name, text) error_msgs.append(error_msg) raise CBConfigError, "\n".join(error_msgs) def __check_unknown_preferences(self): """Check the configuration file for unknown settings to avoid spelling mistakes. """ import StringIO config_rules = configobj.ConfigObj(StringIO.StringIO(self.validation_spec), list_values=False) self.__recursive_section_check("", self.prefs, config_rules) def __recursive_section_check(self, section_path, section_config, section_rules): """should be called by '__check_unknown_preferences' for every section sends a warning message to the logger for every undefined (see validation_spec) configuration setting """ for section in section_config.keys(): element_path = section_path + section if section in section_rules.keys(): if isinstance(section_config[section], configobj.Section): if isinstance(section_rules[section], configobj.Section): self.__recursive_section_check(element_path + "->", section_config[section], section_rules[section]) else: self.log.warn("configuration setting should be a value " + "instead of a section name: %s" % element_path) else: if not isinstance(section_rules[section], configobj.Section): pass # good - the setting is valid else: self.log.warn("configuration setting should be a section " + "name instead of a value: %s" % element_path) else: self.log.warn("unknown configuration setting: %s" % element_path) def __get_plugin_config(self): """Load the plugin configuration file if it exists. """ import StringIO plugin_rules = StringIO.StringIO(self.pluginValidationSpec) try: try: plugin_conf_file = os.path.join( self.prefs["Locations"]["SettingsDir"], self.PLUGINCONF_FILE) except KeyError: raise CBConfigUndefinedError("Locations", "SettingsDir") except SyntaxError: raise CBConfigInvalidValueError("Locations", "SettingsDir", plugin_conf_file, "failed to interprete the filename of the plugin config file " + "correctly (%s)" % plugin_conf_file) ## create plugin_conf_file if necessary if os.path.exists(plugin_conf_file): plugin_conf = configobj.ConfigObj(plugin_conf_file, configspec=plugin_rules) else: plugin_conf = configobj.ConfigObj(plugin_conf_file, configspec=plugin_rules, create_empty=True) ## validate and convert values according to the spec plugin_conf.validate(validate.Validator()) ## check if plugin_conf_file file was created successfully? if not os.path.exists(plugin_conf_file): raise CBEnvironmentError( "failed to create plugin configuration file (%s)" % plugin_conf_file) return plugin_conf def __get_volumes_database(self): """Load the volume database file if it exists. """ try: try: conf_file = os.path.join( self.prefs["Locations"]["SettingsDir"], self.VOLUMESDB_FILE) except KeyError: raise CBConfigUndefinedError("Locations", "SettingsDir") except SyntaxError: raise CBConfigInvalidValueError("Locations", "SettingsDir", conf_file, "failed to interprete the filename of the volume database " + "correctly (%s)" % conf_file) ## create conf_file if necessary if os.path.exists(conf_file): conf = configobj.ConfigObj(conf_file) else: conf = configobj.ConfigObj(conf_file, create_empty=True) ## check if conf_file file was created successfully? if not os.path.exists(conf_file): raise CBEnvironmentError( "failed to create volume database file (%s)" % conf_file) return conf def __get_user_db(self): """Load the user database file if it exists. """ import StringIO, sha user_db_rules = StringIO.StringIO(self.userDatabaseSpec) try: try: user_db_file = os.path.join( self.prefs["Locations"]["SettingsDir"], self.USERDB_FILE) except KeyError: raise CBConfigUndefinedError("Locations", "SettingsDir") except SyntaxError: raise CBConfigInvalidValueError("Locations", "SettingsDir", user_db_file, "failed to interprete the filename of the users database file " + "correctly (%s)" % user_db_file) ## create user_db_file if necessary if os.path.exists(user_db_file): user_db = configobj.ConfigObj(user_db_file, configspec=user_db_rules) else: user_db = configobj.ConfigObj(user_db_file, configspec=user_db_rules, create_empty=True) ## validate and set default value for "admin" user user_db.validate(validate.Validator()) ## check if user_db file was created successfully? if not os.path.exists(user_db_file): raise CBEnvironmentError( "failed to create user database file (%s)" % user_db_file) ## define password hash function - never use "sha" directly - SPOT user_db.get_digest = lambda password: sha.new(password).hexdigest() return user_db def __get_misc_files(self): """Load miscelleanous configuration files. e.g.: an ssl certificate, ... """ misc_dir = os.path.join(self.prefs["Locations"]["SettingsDir"], "misc") if (not os.path.isdir(misc_dir)) or (not os.access(misc_dir, os.X_OK)): return [] return [MiscConfigFile(os.path.join(misc_dir, f), self.log) for f in os.listdir(misc_dir) if os.path.isfile(os.path.join(misc_dir, f))] def __get_config_filename(self, config_file): """Search for the configuration file. """ import types if config_file is None: # no config file was specified - we will look for it in the ususal locations conf_file_list = [os.path.expanduser(f) for f in self.CONF_LOCATIONS if os.path.exists(os.path.expanduser(f))] if not conf_file_list: # no possible config file found in the usual locations raise CBConfigUnavailableError() config_file = conf_file_list[0] else: # a config file was specified (e.g. via command line) if type(config_file) != types.StringType: raise CBConfigUnavailableError( "invalid config file specified: %s" % config_file) if not os.path.exists(config_file): raise CBConfigUnavailableError( "could not find the specified configuration file (%s)" % config_file) return config_file def __configure_log_handler(self): """Configure the log handler of the CryptoBox according to the config. """ try: log_level = self.prefs["Log"]["Level"].upper() log_level_avail = ["DEBUG", "INFO", "WARN", "ERROR"] if not log_level in log_level_avail: raise TypeError except KeyError: raise CBConfigUndefinedError("Log", "Level") except TypeError: raise CBConfigInvalidValueError("Log", "Level", log_level, "invalid log level: only %s are allowed" % log_level_avail) try: try: log_handler = logging.FileHandler(self.prefs["Log"]["Details"]) except KeyError: raise CBConfigUndefinedError("Log", "Details") except IOError: raise CBEnvironmentError("could not write to log file (%s)" % \ self.prefs["Log"]["Details"]) log_handler.setFormatter( logging.Formatter('%(asctime)s CryptoBox %(levelname)s: %(message)s')) cbox_log = logging.getLogger("CryptoBox") ## remove previous handlers cbox_log.handlers = [] ## add new one cbox_log.addHandler(log_handler) ## do not call parent's handlers cbox_log.propagate = False ## 'log_level' is a string -> use 'getattr' cbox_log.setLevel(getattr(logging, log_level)) ## the logger named "CryptoBox" is configured now validation_spec = """ [Main] AllowedDevices = list(min=1) DefaultVolumePrefix = string(min=1) DefaultCipher = string(default="aes-cbc-essiv:sha256") ConfigVolumeLabel = string(min=1, default="cbox_config") UseConfigPartition = integer(min=0, max=1, default=0) DisabledPlugins = list(default=list()) [Locations] MountParentDir = directoryExists(default="/var/cache/cryptobox-server/mnt") SettingsDir = directoryExists(default="/var/cache/cryptobox-server/settings") TemplateDir = directoryExists(default="/usr/share/cryptobox-server/template") DocDir = directoryExists(default="/usr/share/doc/cryptobox-server/www-data") PluginDir = listOfExistingDirectories(default=list("/usr/share/cryptobox-server/plugins")) EventDir = string(default="/etc/cryptobox-server/events.d") [Log] Level = option("debug", "info", "warn", "error", default="warn") Destination = option("file", default="file") Details = string(min=1) [WebSettings] Stylesheet = string(min=1) Languages = list(min=1,default=list("en")) [Programs] cryptsetup = fileExecutable(default="/sbin/cryptsetup") mkfs = fileExecutable(default="/sbin/mkfs") nice = fileExecutable(default="/usr/bin/nice") blkid = fileExecutable(default="/sbin/blkid") blockdev = fileExecutable(default="/sbin/blockdev") mount = fileExecutable(default="/bin/mount") umount = fileExecutable(default="/bin/umount") super = fileExecutable(default="/usr/bin/super") # this is the "program" name as defined in /etc/super.tab CryptoBoxRootActions = string(min=1) """ pluginValidationSpec = """ [__many__] visibility = boolean(default=None) requestAuth = boolean(default=None) rank = integer(default=None) """ userDatabaseSpec = """ [admins] admin = string(default=d033e22ae348aeb5660fc2140aec35850c4da997) """ class CryptoBoxSettingsValidator(validate.Validator): """Some custom configuration check functions. """ def __init__(self): validate.Validator.__init__(self) self.functions["directoryExists"] = self.check_directory_exists self.functions["fileExecutable"] = self.check_file_executable self.functions["fileWriteable"] = self.check_file_writeable self.functions["listOfExistingDirectories"] = self.check_existing_directories def check_directory_exists(self, value): """Is the directory accessible? """ dir_path = os.path.abspath(value) if not os.path.isdir(dir_path): raise validate.VdtValueError("%s (not found)" % value) if not os.access(dir_path, os.X_OK): raise validate.VdtValueError("%s (access denied)" % value) return dir_path def check_file_executable(self, value): """Is the file executable? """ file_path = os.path.abspath(value) if not os.path.isfile(file_path): raise validate.VdtValueError("%s (not found)" % value) if not os.access(file_path, os.X_OK): raise validate.VdtValueError("%s (access denied)" % value) return file_path def check_file_writeable(self, value): """Is the file writeable? """ file_path = os.path.abspath(value) if os.path.isfile(file_path): if not os.access(file_path, os.W_OK): raise validate.VdtValueError("%s (not found)" % value) else: parent_dir = os.path.dirname(file_path) if os.path.isdir(parent_dir) and os.access(parent_dir, os.W_OK): return file_path raise validate.VdtValueError("%s (directory does not exist)" % value) return file_path def check_existing_directories(self, value): """Are these directories accessible? """ if not value: raise validate.VdtValueError("no plugin directory specified") if not isinstance(value, list): value = [value] result = [] for one_dir in value: dir_path = os.path.abspath(one_dir) if not os.path.isdir(dir_path): raise validate.VdtValueError( "%s (plugin directory not found)" % one_dir) if not os.access(dir_path, os.X_OK): raise validate.VdtValueError( "%s (access denied for plugin directory)" % one_dir) result.append(dir_path) return result class MiscConfigFile: """all other config files (e.g. a ssl certificate) to be stored""" maxSize = 20480 def __init__(self, filename, logger): self.filename = filename self.log = logger self.content = None self.load() def load(self): """Load a configuration file into memory. """ fdesc = open(self.filename, "rb") ## limit the maximum size self.content = fdesc.read(self.maxSize) if fdesc.tell() == self.maxSize: self.log.warn("file in misc settings directory (" + str(self.filename) \ + ") is bigger than allowed (" + str(self.maxSize) + ")") fdesc.close() def save(self): """Save a configuration file to disk. """ save_dir = os.path.dirname(self.filename) ## create the directory, if necessary if not os.path.isdir(save_dir): try: os.mkdir(save_dir) except IOError: return False ## save the content of the file try: fdesc = open(self.filename, "wb") except IOError: return False try: fdesc.write(self.content) fdesc.close() return True except IOError: fdesc.close() return False