2006-09-08 13:02:27 +02:00
import logging
import validate
import os
import CryptoBoxExceptions
try :
import configobj ## needed for reading and writing of the config file
except :
raise CryptoBoxExceptions . CBEnvironmentError ( " couldn ' t import ' configobj ' ! Try ' apt-get install python-configobj ' . " )
class CryptoBoxSettings :
CONF_LOCATIONS = [
" ./cryptobox.conf " ,
" ~/.cryptobox.conf " ,
" /etc/cryptobox/cryptobox.conf " ]
def __init__ ( self , config_file = None ) :
self . log = logging . getLogger ( " CryptoBox " )
config_file = self . __getConfigFileName ( config_file )
self . log . info ( " loading config file: %s " % config_file )
self . prefs = self . __getPreferences ( config_file )
self . __validateConfig ( )
self . __configureLogHandler ( )
self . __checkUnknownPreferences ( )
self . nameDB = self . __getNameDatabase ( )
def __getitem__ ( self , key ) :
""" redirect all requests to the ' prefs ' attribute """
return self . prefs [ key ]
def __getPreferences ( self , config_file ) :
import StringIO
config_rules = StringIO . StringIO ( self . validation_spec )
try :
prefs = configobj . ConfigObj ( config_file , configspec = config_rules )
if prefs :
self . log . info ( " found config: %s " % prefs . items ( ) )
else :
raise CryptoBoxExceptions . CBConfigUnavailableError ( " failed to load the config file: %s " % config_file )
except IOError :
raise CryptoBoxExceptions . CBConfigUnavailableError ( " unable to open the config file: %s " % config_file )
return prefs
def __validateConfig ( self ) :
result = self . prefs . validate ( CryptoBoxSettingsValidator ( ) , preserve_errors = True )
error_list = configobj . flatten_errors ( self . prefs , result )
if not error_list : return
errorMsgs = [ ]
for sections , key , text in error_list :
section_name = " -> " . join ( sections )
if not text :
errorMsg = " undefined configuration value ( %s ) in section ' %s ' " % ( key , section_name )
else :
errorMsg = " invalid configuration value ( %s ) in section ' %s ' : %s " % ( key , section_name , text )
errorMsgs . append ( errorMsg )
raise CryptoBoxExceptions . CBConfigError , " \n " . join ( errorMsgs )
def __checkUnknownPreferences ( self ) :
import StringIO
config_rules = configobj . ConfigObj ( StringIO . StringIO ( self . validation_spec ) , list_values = False )
self . __recursiveConfigSectionCheck ( " " , self . prefs , config_rules )
def __recursiveConfigSectionCheck ( self , section_path , section_config , section_rules ) :
""" should be called by ' __checkUnknownPreferences ' for every section
sends a warning message to the logger for every undefined ( see validation_spec )
configuration setting
"""
for e in section_config . keys ( ) :
element_path = section_path + e
if e in section_rules . keys ( ) :
if isinstance ( section_config [ e ] , configobj . Section ) :
if isinstance ( section_rules [ e ] , configobj . Section ) :
self . __recursiveConfigSectionCheck ( element_path + " -> " , section_config [ e ] , section_rules [ e ] )
else :
self . log . warn ( " configuration setting should be a value instead of a section name: %s " % element_path )
else :
if not isinstance ( section_rules [ e ] , configobj . Section ) :
pass # good - the setting is valid
else :
self . log . warn ( " configuration setting should be a section name instead of a value: %s " % element_path )
else :
self . log . warn ( " unknown configuration setting: %s " % element_path )
def __getNameDatabase ( self ) :
try :
try :
nameDB_file = self . prefs [ " Locations " ] [ " NameDatabase " ]
except KeyError :
raise CryptoBoxExceptions . CBConfigUndefinedError ( " Locations " , " NameDatabase " )
except SyntaxError :
raise CryptoBoxExceptions . CBConfigInvalidValueError ( " Locations " , " NameDatabase " , nameDB_file , " failed to interprete the filename of the name database correctly " )
## create nameDB is necessary
if os . path . exists ( nameDB_file ) :
nameDB = configobj . ConfigObj ( nameDB_file )
else :
nameDB = configobj . ConfigObj ( nameDB_file , create_empty = True )
## check if nameDB file was created successfully?
if not os . path . exists ( nameDB_file ) :
raise CryptoBoxExceptions . CBEnvironmentError ( " failed to create name database ( %s ) " % nameDB_file )
return nameDB
def __getConfigFileName ( self , config_file ) :
# search for the configuration file
import types
if config_file is None :
# no config file was specified - we will look for it in the ususal locations
conf_file_list = [ os . path . expanduser ( f )
for f in self . CONF_LOCATIONS
if os . path . exists ( os . path . expanduser ( f ) ) ]
if not conf_file_list :
# no possible config file found in the usual locations
raise CryptoBoxExceptions . CBConfigUnavailableError ( )
config_file = conf_file_list [ 0 ]
else :
# a config file was specified (e.g. via command line)
if type ( config_file ) != types . StringType :
raise CryptoBoxExceptions . CBConfigUnavailableError ( " invalid config file specified: %s " % config_file )
if not os . path . exists ( config_file ) :
raise CryptoBoxExceptions . CBConfigUnavailableError ( " could not find the specified configuration file ( %s ) " % config_file )
return config_file
def __configureLogHandler ( self ) :
try :
log_level = self . prefs [ " Log " ] [ " Level " ] . upper ( )
log_level_avail = [ " DEBUG " , " INFO " , " WARN " , " ERROR " ]
if not log_level in log_level_avail :
raise TypeError
except KeyError :
raise CryptoBoxExceptions . CBConfigUndefinedError ( " Log " , " Level " )
except TypeError :
raise CryptoBoxExceptions . CBConfigInvalidValueError ( " Log " , " Level " , log_level , " invalid log level: only %s are allowed " % log_level_avail )
try :
try :
log_handler = logging . FileHandler ( self . prefs [ " Log " ] [ " Details " ] )
except KeyError :
raise CryptoBoxExceptions . CBConfigUndefinedError ( " Log " , " Details " )
except IOError :
raise CryptoBoxExceptions . CBEnvironmentError ( " could not create the log file ( %s ) " % self . prefs [ " Log " ] [ " Details " ] )
log_handler . setFormatter ( logging . Formatter ( ' %(asctime)s %(module)s %(levelname)s : %(message)s ' ) )
cbox_log = logging . getLogger ( " CryptoBox " )
## remove previous handlers
cbox_log . handlers = [ ]
## add new one
cbox_log . addHandler ( log_handler )
## do not call parent's handlers
cbox_log . propagate = False
## 'log_level' is a string -> use 'getattr'
cbox_log . setLevel ( getattr ( logging , log_level ) )
## the logger named "CryptoBox" is configured now
validation_spec = """
[ Main ]
AllowedDevices = list ( min = 1 )
DefaultVolumePrefix = string ( min = 1 )
DefaultCipher = string ( default = " aes-cbc-essiv:sha256 " )
2006-09-20 12:40:37 +02:00
ConfigVolumeLabel = string ( min = 1 , default = " cbox_config " )
2006-09-08 13:02:27 +02:00
[ Locations ]
MountParentDir = directoryExists ( default = " /var/cache/cryptobox/mnt " )
NameDatabase = fileWriteable ( default = " /var/cache/cryptobox/volumen_names.db " )
TemplateDir = directoryExists ( default = " /usr/share/cryptobox/template " )
LangDir = directoryExists ( default = " /usr/share/cryptobox/lang " )
DocDir = directoryExists ( default = " /usr/share/doc/cryptobox/html " )
2006-09-12 10:55:20 +02:00
PluginDir = directoryExists ( default = " /usr/share/cryptobox/plugins " )
2006-09-08 13:02:27 +02:00
[ Log ]
Level = option ( " debug " , " info " , " warn " , " error " , default = " warn " )
Destination = option ( " file " , default = " file " )
Details = string ( min = 1 )
[ WebSettings ]
Stylesheet = string ( min = 1 )
Language = string ( min = 1 , default = " en " )
DocLanguage = string ( min = 1 , default = " en " )
[ Programs ]
cryptsetup = fileExecutable ( default = " /sbin/cryptsetup " )
mkfs - data = fileExecutable ( default = " /sbin/mkfs.ext3 " )
blkid = fileExecutable ( default = " /sbin/blkid " )
mount = fileExecutable ( default = " /bin/mount " )
umount = fileExecutable ( default = " /bin/umount " )
super = fileExecutable ( default = " /usr/bin/super " )
# this is the "program" name as defined in /etc/super.tab
CryptoBoxRootActions = string ( min = 1 )
"""
class CryptoBoxSettingsValidator ( validate . Validator ) :
def __init__ ( self ) :
validate . Validator . __init__ ( self )
self . functions [ " directoryExists " ] = self . check_directoryExists
self . functions [ " fileExecutable " ] = self . check_fileExecutable
self . functions [ " fileWriteable " ] = self . check_fileWriteable
def check_directoryExists ( self , value ) :
dir_path = os . path . abspath ( value )
if not os . path . isdir ( dir_path ) :
raise validate . VdtValueError ( " %s (not found) " % value )
if not os . access ( dir_path , os . X_OK ) :
raise validate . VdtValueError ( " %s (access denied) " % value )
return dir_path
def check_fileExecutable ( self , value ) :
file_path = os . path . abspath ( value )
if not os . path . isfile ( file_path ) :
raise validate . VdtValueError ( " %s (not found) " % value )
if not os . access ( file_path , os . X_OK ) :
raise validate . VdtValueError ( " %s (access denied) " % value )
return file_path
def check_fileWriteable ( self , value ) :
file_path = os . path . abspath ( value )
if os . path . isfile ( file_path ) :
if not os . access ( file_path , os . W_OK ) :
raise validate . VdtValueError ( " %s (not found) " % value )
else :
parent_dir = os . path . dirname ( file_path )
if os . path . isdir ( parent_dir ) and os . access ( parent_dir , os . W_OK ) :
return file_path
raise validate . VdtValueError ( " %s (directory does not exist) " % value )
return file_path