diff --git a/pythonrewrite/bin2/CryptoBox.py b/pythonrewrite/bin2/CryptoBox.py index 32ae799..fefe590 100644 --- a/pythonrewrite/bin2/CryptoBox.py +++ b/pythonrewrite/bin2/CryptoBox.py @@ -14,6 +14,7 @@ import re import os import sys import types +import unittest CONF_LOCATIONS = [ @@ -32,9 +33,10 @@ class CryptoBoxProps: def __init__(self, conf_file=None): '''read config and fill class variables''' - if os.geteuid() != 0: - sys.stderr.write("You need to be root to run this program!\n") - sys.exit(1) + "enable it again - or remove the priv-dropping" + #if os.geteuid() != 0: + # sys.stderr.write("You need to be root to run this program!\n") + # sys.exit(1) if conf_file == None: for f in CONF_LOCATIONS: if os.path.exists(os.path.expanduser(f)): @@ -53,18 +55,18 @@ class CryptoBoxProps: self.cbxPrefs["Main"]["DataDir"], self.cbxPrefs["Main"]["NameDatabase"]) if os.path.exists(nameDB_file): - self.nameDB = configobj.ConfigObj(nameDB_file, create_empty=True) - else: self.nameDB = configobj.ConfigObj(nameDB_file) + else: + self.nameDB = configobj.ConfigObj(nameDB_file, create_empty=True) except SyntaxError: sys.stderr.write("Error during parsing of name database file (%s).\n" % (nameDB_file, )) sys.exit(1) self.__cboxUID = int(self.cbxPrefs["System"]["User"]) + self.__cboxGID = int(self.cbxPrefs["System"]["Group"]) self.debug = CryptoBoxLogger.CryptoBoxLogger( self.cbxPrefs["Log"]["Level"], self.cbxPrefs["Log"]["Facility"], - self.cbxPrefs["Log"]["Destination"], - self.__cboxUID) + self.cbxPrefs["Log"]["Destination"]) self.dropPrivileges() self.debugMessage = self.debug.printMessage self.containers = [] @@ -79,7 +81,9 @@ class CryptoBoxProps: allowed = self.cbxPrefs["Main"]["AllowedDevices"] if type(allowed) == types.StringType: allowed = [allowed] for a_dev in allowed: - if a_dev and re.search('^' + a_dev, devicename): return True + "remove double dots and so on ..." + real_device = os.path.realpath(devicename) + if a_dev and re.search('^' + a_dev, real_device): return True return False @@ -129,16 +133,20 @@ class CryptoBoxProps: def dropPrivileges(self): "change the effective uid to 'User' specified in section 'System'" - if os.getuid() != os.geteuid(): - raise "PrivilegeManager", "we already dropped privileges" + "enable it again - or remove the priv-dropping" + #if os.getuid() != os.geteuid(): + # raise "PrivilegeManager", "we already dropped privileges" os.seteuid(self.__cboxUID) + os.setegid(self.__cboxGID) def risePrivileges(self): "regain superuser privileges temporarily - call dropPrivileges afterwards!" - if os.getuid() == os.geteuid(): - raise "PrivilegeManager", "we already have superuser privileges" + "enable it again - or remove the priv-dropping" + #if os.getuid() == os.geteuid(): + # raise "PrivilegeManager", "we already have superuser privileges" os.seteuid(os.getuid()) + os.setegid(os.getgid()) """ ************ internal stuff starts here *********** """ @@ -238,3 +246,66 @@ class CryptoBoxProps: return [] + +# *************** test class ********************* + +class CryptoBoxPropsTest(unittest.TestCase): + + configFile = "/tmp/cbox-test.conf" + nameDBFile = "/tmp/cryptobox_names.db" + logFile = "/tmp/cryptobox.log" + configContent = """ +[Main] +AllowedDevices = /dev/loop +DefaultVolumePrefix = "Data " +DataDir = /tmp +NameDatabase = cryptobox_names.db +[System] +User = 1000 +Group = 1000 +MountParentDir = /tmp/mnt +DefaultCipher = aes-cbc-essiv:sha256 +[Log] +Level = debug +Facility = file +Destination = /tmp/cryptobox.log +""" + + def setUp(self): + if not os.path.exists("/tmp/mnt"): os.mkdir("/tmp/mnt") + fd = open(self.configFile, "w") + fd.write(self.configContent) + fd.close() + + + def tearDown(self): + if os.path.exists("/tmp/mnt"): os.rmdir("/tmp/mnt") + if os.path.exists(self.configFile): os.remove(self.configFile) + if os.path.exists(self.logFile): os.remove(self.logFile) + if os.path.exists(self.nameDBFile): os.remove(self.nameDBFile) + + + def testConfigFile(self): + self.assertRaises(KeyError, CryptoBoxProps, "/not/existing/path") + CryptoBoxProps(self.configFile) + self.assertTrue(os.path.exists(self.nameDBFile)) + self.assertTrue(os.path.exists(self.logFile)) + + + def testDeviceCheck(self): + cb = CryptoBoxProps(self.configFile) + self.assertTrue(cb.isDeviceAllowed("/dev/loop")) + self.assertTrue(cb.isDeviceAllowed("/dev/loop1")) + self.assertTrue(cb.isDeviceAllowed("/dev/loop/urgd")) + self.assertFalse(cb.isDeviceAllowed("/dev/hda")) + self.assertFalse(cb.isDeviceAllowed("/dev/loopa/../hda")) + self.assertTrue(cb.isDeviceAllowed("/dev/usb/../loop1")) + self.assertFalse(cb.isDeviceAllowed("/")) + + "a lot of tests are still missing - how can we provide a prepared environment?" + +# ********************* run unittest **************************** + +if __name__ == "__main__": + unittest.main() + diff --git a/pythonrewrite/bin2/CryptoBoxLogger.py b/pythonrewrite/bin2/CryptoBoxLogger.py index 0e9481e..7f70a7e 100644 --- a/pythonrewrite/bin2/CryptoBoxLogger.py +++ b/pythonrewrite/bin2/CryptoBoxLogger.py @@ -3,48 +3,69 @@ manages logging events of the CryptoBox ''' import sys import os +import unittest class CryptoBoxLogger: ''' - handles concrete logging events and prints them e.g. to a logfile + handles logging events and prints them e.g. to a logfile ''' DebugLevels = {"debug":0, "info":3, "warn":6, "error":9} DebugFacilities = {"file":0} - def __init__(self, level, facility, name=None, user=None): + def __init__(self, level, facility, destination=None): + """create a CryptoBoxLogger object and connect it to an output facility + + level: an integer (0..9) or string ('debug', 'info', 'warn' or 'error') + facility: for now only 'file' + destination: e.g. the name of the logfile or syslog facility + + """ try: - facility = int(facility) - except ValueError: - facility = self.DebugFacilities[facility] + try: + facility = int(facility) + except Exception: + try: + facility = self.DebugFacilities[facility] + except KeyError: + raise "LoggerError" + if (facility != 0): raise "LoggerError" + except "LoggerError": + errorMsg = "Invalid debug facility: %s" % facility + sys.stderr.write(errorMsg + "\n") + raise "LoggerError", errorMsg try: - level = int(level) - except ValueError: - level = self.DebugLevels[level] + try: + level = int(level) + except Exception: + try: + level = self.DebugLevels[level] + except KeyError: + raise "LoggerError" + if (level < 0) or (level > 9): raise "LoggerError" + except "LoggerError": + errorMsg = "Invalid debug level: %s" % level + sys.stderr.write(errorMsg + "\n") + raise "LoggerError", errorMsg self.debug_level = level if facility == self.DebugFacilities["file"]: self.logFunc = self.message2file - if name is not None: - self.logFile = name + if destination is not None: + self.logFile = destination else: self.logFile = '/var/log/cryptobox.log' try: fsock = open(self.logFile, "a") fsock.close() - # TODO: check before, if this file was owned - # by someone else than the cryptobox user - in this case, - # we may not chown it - very baaaad! - "change ownership of log file" - if user != None: - os.chown(self.logFile, user, os.getegid()) - return except IOError: - sys.stderr.write("Unable to open logfile (%s) for writing.\n" % (self.logFile, )) + errorMsg ="Unable to open logfile (%s) for writing." % (self.logFile,) + sys.stderr.write(errorMsg + "\n") + raise "LoggerError", errorMsg + else: - sys.stderr.write("Invalid logging facility: %d.\n" % (facility, )) - "we will only arrive here, if an error occoured" - sys.stderr.write("Sorry - bye, bye!\n") - sys.exit(1) + errorMsg = "Invalid logging facility: %d." % (facility, ) + sys.stderr.write(errorMsg + "\n") + raise "LoggerError", errorMsg def printMessage(self, msg_level, text): @@ -53,7 +74,20 @@ class CryptoBoxLogger: try: msg_level = int(msg_level) except ValueError: - msg_level = self.DebugLevels[msg_level] + try: + msg_level = self.DebugLevels[msg_level] + except KeyError: + errorMsg = "Invalid debug level: %s" % msg_level + sys.stderr.write(errorMsg + "\n") + raise "LoggerError", errorMsg + if (msg_level < 0) or (msg_level > 9): + errorMsg = "Invalid debug level: %s" % msg_level + sys.stderr.write(errorMsg + "\n") + raise "LoggerError", errorMsg + if text is None: + errorMsg = "Empty debug message - this is not allowed" + sys.stderr.write(errorMsg + "\n") + raise "LoggerError", errorMsg if msg_level >= self.debug_level: self.logFunc("[CryptoBox] - %s\n" % (text, )) @@ -66,12 +100,85 @@ class CryptoBoxLogger: log_sock.close() return except IOError: - sys.stderr.write( - "Unable to write messages to logfile (%s).\n" % (self.logFile, )) + errorMsg = "Unable to write messages to logfile (%s)." % (self.logFile, ) + sys.stderr.write(errorMsg + "\n") + raise "LoggerError", errorMsg except IOError: - sys.stderr.write("Unable to open logfile (%s) for writing.\n" % (self.logFile, )) - "we will only arrive here, if an error occoured" - sys.stderr.write("Sorry - bye, bye!\n") - sys.exit(1) + errorMsg = "Unable to open logfile (%s) for writing." % (self.logFile, ) + sys.stderr.write(errorMsg + "\n") + raise "LoggerError", errorMsg +# ********************* test class ********************** + +class CryptoBoxLoggerTest(unittest.TestCase): + + logFile = "/tmp/cbox-test.log" + + def setUp(self): + if os.path.exists(self.logFile): os.remove(self.logFile) + + def tearDown(self): + if os.path.exists(self.logFile): os.remove(self.logFile) + + def testInit(self): + try: + CryptoBoxLogger(0, 0) + except "LoggerError": + CryptoBoxLogger(0, 0, self.logFile) + os.remove(self.logFile) + CryptoBoxLogger("info", "file", self.logFile) + self.assertRaises("LoggerError", CryptoBoxLogger, "invalid", 0, self.logFile) + self.assertRaises("LoggerError", CryptoBoxLogger, 0, "invalid", self.logFile) + self.assertRaises("LoggerError", CryptoBoxLogger, 10, 0, self.logFile) + self.assertRaises("LoggerError", CryptoBoxLogger, -1, 0, self.logFile) + self.assertRaises("LoggerError", CryptoBoxLogger, 0, 10, self.logFile) + self.assertRaises("LoggerError", CryptoBoxLogger, 0, -1, self.logFile) + self.assertRaises("LoggerError", CryptoBoxLogger, None, 0, self.logFile) + self.assertRaises("LoggerError", CryptoBoxLogger, 0, None, self.logFile) + self.assertRaises("LoggerError", CryptoBoxLogger, 0, 0, "/no/existing/path") + os.remove(self.logFile) + + def testMessage(self): + cb = CryptoBoxLogger(3, 0, self.logFile) + content1 = self.readFile() + self.assertEquals(content1, "") + cb.printMessage(3, "Ausgabe") + content2 = self.readFile() + self.assertNotEqual(content1, content2) + self.assertRaises("LoggerError", cb.printMessage, 10, "Ausgabe") + self.assertEquals(content2, self.readFile()) + self.assertRaises("LoggerError", cb.printMessage, -1, "Ausgabe") + self.assertEquals(content2, self.readFile()) + self.assertRaises("LoggerError", cb.printMessage, "invalid", "Ausgabe") + self.assertEquals(content2, self.readFile()) + self.assertRaises("LoggerError", cb.printMessage, 3, None) + self.assertEquals(content2, self.readFile()) + cb.printMessage(2, "Ausgabe") + self.assertEquals(content2, self.readFile()) + cb.printMessage(4, "Ausgabe") + self.assertNotEqual(content2, self.readFile()) + os.remove(self.logFile) + + def readFile(self): + fd = None + try: + fd = open(self.logFile, "r") + text = fd.read() + fd.close() + except IOError: + if fd is not None: fd.close() + text = None + return text + + +# *************** unit testing ********************* + +if __name__ == "__main__": + try: + devnull = open(os.devnull, "w") + sys.stderr = devnull + except IOError: + pass + unittest.main() + diff --git a/pythonrewrite/bin2/cryptobox.conf b/pythonrewrite/bin2/cryptobox.conf index 87b4a9c..1be4dc6 100644 --- a/pythonrewrite/bin2/cryptobox.conf +++ b/pythonrewrite/bin2/cryptobox.conf @@ -19,8 +19,9 @@ NameDatabase = cryptobox_names.db [System] # most actions of the cryptobox are not executed as root - choose a limited -# user here - for now only numeric ids are allowed +# user and group here - for now only numeric ids are allowed User = 1000 +Group = 1000 # where should we mount volumes? # this directory must be writeable by the cryptobox user (see above)