added unittest for CryptoBoxLogger and CryptoBoxProps

fixed bug in devicename check
minor improvements
This commit is contained in:
lars 2006-08-18 14:00:49 +00:00
parent 07a63dbd9c
commit a0ce824eb2
3 changed files with 221 additions and 42 deletions

View file

@ -14,6 +14,7 @@ import re
import os
import sys
import types
import unittest
CONF_LOCATIONS = [
@ -32,9 +33,10 @@ class CryptoBoxProps:
def __init__(self, conf_file=None):
'''read config and fill class variables'''
if os.geteuid() != 0:
sys.stderr.write("You need to be root to run this program!\n")
sys.exit(1)
"enable it again - or remove the priv-dropping"
#if os.geteuid() != 0:
# sys.stderr.write("You need to be root to run this program!\n")
# sys.exit(1)
if conf_file == None:
for f in CONF_LOCATIONS:
if os.path.exists(os.path.expanduser(f)):
@ -53,18 +55,18 @@ class CryptoBoxProps:
self.cbxPrefs["Main"]["DataDir"],
self.cbxPrefs["Main"]["NameDatabase"])
if os.path.exists(nameDB_file):
self.nameDB = configobj.ConfigObj(nameDB_file, create_empty=True)
else:
self.nameDB = configobj.ConfigObj(nameDB_file)
else:
self.nameDB = configobj.ConfigObj(nameDB_file, create_empty=True)
except SyntaxError:
sys.stderr.write("Error during parsing of name database file (%s).\n" % (nameDB_file, ))
sys.exit(1)
self.__cboxUID = int(self.cbxPrefs["System"]["User"])
self.__cboxGID = int(self.cbxPrefs["System"]["Group"])
self.debug = CryptoBoxLogger.CryptoBoxLogger(
self.cbxPrefs["Log"]["Level"],
self.cbxPrefs["Log"]["Facility"],
self.cbxPrefs["Log"]["Destination"],
self.__cboxUID)
self.cbxPrefs["Log"]["Destination"])
self.dropPrivileges()
self.debugMessage = self.debug.printMessage
self.containers = []
@ -79,7 +81,9 @@ class CryptoBoxProps:
allowed = self.cbxPrefs["Main"]["AllowedDevices"]
if type(allowed) == types.StringType: allowed = [allowed]
for a_dev in allowed:
if a_dev and re.search('^' + a_dev, devicename): return True
"remove double dots and so on ..."
real_device = os.path.realpath(devicename)
if a_dev and re.search('^' + a_dev, real_device): return True
return False
@ -129,16 +133,20 @@ class CryptoBoxProps:
def dropPrivileges(self):
"change the effective uid to 'User' specified in section 'System'"
if os.getuid() != os.geteuid():
raise "PrivilegeManager", "we already dropped privileges"
"enable it again - or remove the priv-dropping"
#if os.getuid() != os.geteuid():
# raise "PrivilegeManager", "we already dropped privileges"
os.seteuid(self.__cboxUID)
os.setegid(self.__cboxGID)
def risePrivileges(self):
"regain superuser privileges temporarily - call dropPrivileges afterwards!"
if os.getuid() == os.geteuid():
raise "PrivilegeManager", "we already have superuser privileges"
"enable it again - or remove the priv-dropping"
#if os.getuid() == os.geteuid():
# raise "PrivilegeManager", "we already have superuser privileges"
os.seteuid(os.getuid())
os.setegid(os.getgid())
""" ************ internal stuff starts here *********** """
@ -238,3 +246,66 @@ class CryptoBoxProps:
return []
# *************** test class *********************
class CryptoBoxPropsTest(unittest.TestCase):
configFile = "/tmp/cbox-test.conf"
nameDBFile = "/tmp/cryptobox_names.db"
logFile = "/tmp/cryptobox.log"
configContent = """
[Main]
AllowedDevices = /dev/loop
DefaultVolumePrefix = "Data "
DataDir = /tmp
NameDatabase = cryptobox_names.db
[System]
User = 1000
Group = 1000
MountParentDir = /tmp/mnt
DefaultCipher = aes-cbc-essiv:sha256
[Log]
Level = debug
Facility = file
Destination = /tmp/cryptobox.log
"""
def setUp(self):
if not os.path.exists("/tmp/mnt"): os.mkdir("/tmp/mnt")
fd = open(self.configFile, "w")
fd.write(self.configContent)
fd.close()
def tearDown(self):
if os.path.exists("/tmp/mnt"): os.rmdir("/tmp/mnt")
if os.path.exists(self.configFile): os.remove(self.configFile)
if os.path.exists(self.logFile): os.remove(self.logFile)
if os.path.exists(self.nameDBFile): os.remove(self.nameDBFile)
def testConfigFile(self):
self.assertRaises(KeyError, CryptoBoxProps, "/not/existing/path")
CryptoBoxProps(self.configFile)
self.assertTrue(os.path.exists(self.nameDBFile))
self.assertTrue(os.path.exists(self.logFile))
def testDeviceCheck(self):
cb = CryptoBoxProps(self.configFile)
self.assertTrue(cb.isDeviceAllowed("/dev/loop"))
self.assertTrue(cb.isDeviceAllowed("/dev/loop1"))
self.assertTrue(cb.isDeviceAllowed("/dev/loop/urgd"))
self.assertFalse(cb.isDeviceAllowed("/dev/hda"))
self.assertFalse(cb.isDeviceAllowed("/dev/loopa/../hda"))
self.assertTrue(cb.isDeviceAllowed("/dev/usb/../loop1"))
self.assertFalse(cb.isDeviceAllowed("/"))
"a lot of tests are still missing - how can we provide a prepared environment?"
# ********************* run unittest ****************************
if __name__ == "__main__":
unittest.main()

View file

@ -3,48 +3,69 @@ manages logging events of the CryptoBox
'''
import sys
import os
import unittest
class CryptoBoxLogger:
'''
handles concrete logging events and prints them e.g. to a logfile
handles logging events and prints them e.g. to a logfile
'''
DebugLevels = {"debug":0, "info":3, "warn":6, "error":9}
DebugFacilities = {"file":0}
def __init__(self, level, facility, name=None, user=None):
def __init__(self, level, facility, destination=None):
"""create a CryptoBoxLogger object and connect it to an output facility
level: an integer (0..9) or string ('debug', 'info', 'warn' or 'error')
facility: for now only 'file'
destination: e.g. the name of the logfile or syslog facility
"""
try:
facility = int(facility)
except ValueError:
facility = self.DebugFacilities[facility]
try:
facility = int(facility)
except Exception:
try:
facility = self.DebugFacilities[facility]
except KeyError:
raise "LoggerError"
if (facility != 0): raise "LoggerError"
except "LoggerError":
errorMsg = "Invalid debug facility: %s" % facility
sys.stderr.write(errorMsg + "\n")
raise "LoggerError", errorMsg
try:
level = int(level)
except ValueError:
level = self.DebugLevels[level]
try:
level = int(level)
except Exception:
try:
level = self.DebugLevels[level]
except KeyError:
raise "LoggerError"
if (level < 0) or (level > 9): raise "LoggerError"
except "LoggerError":
errorMsg = "Invalid debug level: %s" % level
sys.stderr.write(errorMsg + "\n")
raise "LoggerError", errorMsg
self.debug_level = level
if facility == self.DebugFacilities["file"]:
self.logFunc = self.message2file
if name is not None:
self.logFile = name
if destination is not None:
self.logFile = destination
else:
self.logFile = '/var/log/cryptobox.log'
try:
fsock = open(self.logFile, "a")
fsock.close()
# TODO: check before, if this file was owned
# by someone else than the cryptobox user - in this case,
# we may not chown it - very baaaad!
"change ownership of log file"
if user != None:
os.chown(self.logFile, user, os.getegid())
return
except IOError:
sys.stderr.write("Unable to open logfile (%s) for writing.\n" % (self.logFile, ))
errorMsg ="Unable to open logfile (%s) for writing." % (self.logFile,)
sys.stderr.write(errorMsg + "\n")
raise "LoggerError", errorMsg
else:
sys.stderr.write("Invalid logging facility: %d.\n" % (facility, ))
"we will only arrive here, if an error occoured"
sys.stderr.write("Sorry - bye, bye!\n")
sys.exit(1)
errorMsg = "Invalid logging facility: %d." % (facility, )
sys.stderr.write(errorMsg + "\n")
raise "LoggerError", errorMsg
def printMessage(self, msg_level, text):
@ -53,7 +74,20 @@ class CryptoBoxLogger:
try:
msg_level = int(msg_level)
except ValueError:
msg_level = self.DebugLevels[msg_level]
try:
msg_level = self.DebugLevels[msg_level]
except KeyError:
errorMsg = "Invalid debug level: %s" % msg_level
sys.stderr.write(errorMsg + "\n")
raise "LoggerError", errorMsg
if (msg_level < 0) or (msg_level > 9):
errorMsg = "Invalid debug level: %s" % msg_level
sys.stderr.write(errorMsg + "\n")
raise "LoggerError", errorMsg
if text is None:
errorMsg = "Empty debug message - this is not allowed"
sys.stderr.write(errorMsg + "\n")
raise "LoggerError", errorMsg
if msg_level >= self.debug_level:
self.logFunc("[CryptoBox] - %s\n" % (text, ))
@ -66,12 +100,85 @@ class CryptoBoxLogger:
log_sock.close()
return
except IOError:
sys.stderr.write(
"Unable to write messages to logfile (%s).\n" % (self.logFile, ))
errorMsg = "Unable to write messages to logfile (%s)." % (self.logFile, )
sys.stderr.write(errorMsg + "\n")
raise "LoggerError", errorMsg
except IOError:
sys.stderr.write("Unable to open logfile (%s) for writing.\n" % (self.logFile, ))
"we will only arrive here, if an error occoured"
sys.stderr.write("Sorry - bye, bye!\n")
sys.exit(1)
errorMsg = "Unable to open logfile (%s) for writing." % (self.logFile, )
sys.stderr.write(errorMsg + "\n")
raise "LoggerError", errorMsg
# ********************* test class **********************
class CryptoBoxLoggerTest(unittest.TestCase):
logFile = "/tmp/cbox-test.log"
def setUp(self):
if os.path.exists(self.logFile): os.remove(self.logFile)
def tearDown(self):
if os.path.exists(self.logFile): os.remove(self.logFile)
def testInit(self):
try:
CryptoBoxLogger(0, 0)
except "LoggerError":
CryptoBoxLogger(0, 0, self.logFile)
os.remove(self.logFile)
CryptoBoxLogger("info", "file", self.logFile)
self.assertRaises("LoggerError", CryptoBoxLogger, "invalid", 0, self.logFile)
self.assertRaises("LoggerError", CryptoBoxLogger, 0, "invalid", self.logFile)
self.assertRaises("LoggerError", CryptoBoxLogger, 10, 0, self.logFile)
self.assertRaises("LoggerError", CryptoBoxLogger, -1, 0, self.logFile)
self.assertRaises("LoggerError", CryptoBoxLogger, 0, 10, self.logFile)
self.assertRaises("LoggerError", CryptoBoxLogger, 0, -1, self.logFile)
self.assertRaises("LoggerError", CryptoBoxLogger, None, 0, self.logFile)
self.assertRaises("LoggerError", CryptoBoxLogger, 0, None, self.logFile)
self.assertRaises("LoggerError", CryptoBoxLogger, 0, 0, "/no/existing/path")
os.remove(self.logFile)
def testMessage(self):
cb = CryptoBoxLogger(3, 0, self.logFile)
content1 = self.readFile()
self.assertEquals(content1, "")
cb.printMessage(3, "Ausgabe")
content2 = self.readFile()
self.assertNotEqual(content1, content2)
self.assertRaises("LoggerError", cb.printMessage, 10, "Ausgabe")
self.assertEquals(content2, self.readFile())
self.assertRaises("LoggerError", cb.printMessage, -1, "Ausgabe")
self.assertEquals(content2, self.readFile())
self.assertRaises("LoggerError", cb.printMessage, "invalid", "Ausgabe")
self.assertEquals(content2, self.readFile())
self.assertRaises("LoggerError", cb.printMessage, 3, None)
self.assertEquals(content2, self.readFile())
cb.printMessage(2, "Ausgabe")
self.assertEquals(content2, self.readFile())
cb.printMessage(4, "Ausgabe")
self.assertNotEqual(content2, self.readFile())
os.remove(self.logFile)
def readFile(self):
fd = None
try:
fd = open(self.logFile, "r")
text = fd.read()
fd.close()
except IOError:
if fd is not None: fd.close()
text = None
return text
# *************** unit testing *********************
if __name__ == "__main__":
try:
devnull = open(os.devnull, "w")
sys.stderr = devnull
except IOError:
pass
unittest.main()

View file

@ -19,8 +19,9 @@ NameDatabase = cryptobox_names.db
[System]
# most actions of the cryptobox are not executed as root - choose a limited
# user here - for now only numeric ids are allowed
# user and group here - for now only numeric ids are allowed
User = 1000
Group = 1000
# where should we mount volumes?
# this directory must be writeable by the cryptobox user (see above)