223 lines
6.8 KiB
Python
223 lines
6.8 KiB
Python
'''
|
|
this class will not be used anymore and can be removed from svn
|
|
|
|
manages logging events of the CryptoBox
|
|
'''
|
|
|
|
import sys
|
|
import os
|
|
import syslog
|
|
import unittest
|
|
|
|
class CryptoBoxLogger:
|
|
'''
|
|
handles logging events and prints them e.g. to a logfile
|
|
'''
|
|
|
|
DebugLevels = {
|
|
"debug": syslog.LOG_DEBUG,
|
|
"info": syslog.LOG_INFO,
|
|
"notice": syslog.LOG_NOTICE,
|
|
"warn": syslog.LOG_WARNING,
|
|
"error": syslog.LOG_ERR,
|
|
"crit": syslog.LOG_CRIT,
|
|
"alert": syslog.LOG_ALERT,
|
|
"emerg": syslog.LOG_EMERG}
|
|
DebugDestinations = {"file":0, "syslog":1}
|
|
|
|
def __init__(self, level, destination, args=None):
|
|
"""create a CryptoBoxLogger object and connect it to an output destination
|
|
|
|
level: string (debug/info/notice/warn/error/crit/alert/emerg) or syslog level
|
|
destination: the string "file" or "syslog"
|
|
args: e.g. the name of the logfile or syslog facility
|
|
"""
|
|
try:
|
|
try:
|
|
destination = int(destination)
|
|
except Exception:
|
|
try:
|
|
destination = self.DebugDestinations[destination]
|
|
except KeyError:
|
|
raise "LoggerError"
|
|
if not destination in self.DebugDestinations.values(): raise "LoggerError"
|
|
except "LoggerError":
|
|
errorMsg = "Invalid debug destination: %s" % destination
|
|
sys.stderr.write(errorMsg + "\n")
|
|
raise "LoggerError", errorMsg
|
|
try:
|
|
try:
|
|
level = int(level)
|
|
except Exception:
|
|
try:
|
|
level = self.DebugLevels[level]
|
|
except KeyError:
|
|
raise "LoggerError"
|
|
if not level in self.DebugLevels.values(): raise "LoggerError"
|
|
except "LoggerError":
|
|
errorMsg = "Invalid debug level: %s" % level
|
|
sys.stderr.write(errorMsg + "\n")
|
|
raise "LoggerError", errorMsg
|
|
self.debug_level = level
|
|
if destination == self.DebugDestinations["file"]:
|
|
self.logFunc = self.message2file
|
|
if args is not None:
|
|
self.logFile = args
|
|
else:
|
|
self.logFile = '/var/log/cryptobox.log'
|
|
try:
|
|
fsock = open(self.logFile, "a")
|
|
fsock.close()
|
|
except IOError:
|
|
errorMsg ="Unable to open logfile (%s) for writing." % (self.logFile,)
|
|
sys.stderr.write(errorMsg + "\n")
|
|
raise "LoggerError", errorMsg
|
|
elif destination == self.DebugDestinations["syslog"]:
|
|
self.logFunc = self.message2syslog
|
|
if args is None:
|
|
syslog.openlog("CryptoBox", 0, syslog.LOG_USER)
|
|
else:
|
|
syslog.openlog("CryptoBox", 0, args)
|
|
else:
|
|
errorMsg = "Invalid logging facility: %d." % (facility, )
|
|
sys.stderr.write(errorMsg + "\n")
|
|
raise "LoggerError", errorMsg
|
|
|
|
|
|
def printMessage(self, msg_level, text):
|
|
if msg_level is None: msg_level = self.DebugLevels["debug"]
|
|
"convert debuglevel from string to int, if necessary"
|
|
try:
|
|
msg_level = int(msg_level)
|
|
except ValueError:
|
|
try:
|
|
msg_level = self.DebugLevels[msg_level]
|
|
except KeyError:
|
|
errorMsg = "Invalid debug level: %s" % msg_level
|
|
sys.stderr.write(errorMsg + "\n")
|
|
raise "LoggerError", errorMsg
|
|
if not msg_level in self.DebugLevels.values():
|
|
errorMsg = "Invalid debug level: %s" % msg_level
|
|
sys.stderr.write(errorMsg + "\n")
|
|
raise "LoggerError", errorMsg
|
|
if text is None:
|
|
errorMsg = "Empty debug message - this is not allowed"
|
|
sys.stderr.write(errorMsg + "\n")
|
|
raise "LoggerError", errorMsg
|
|
if msg_level <= self.debug_level:
|
|
self.logFunc(text, msg_level)
|
|
|
|
|
|
def message2file(self, text, level):
|
|
# "level" gets ignored (but syslog needs it)
|
|
try:
|
|
log_sock = open(self.logFile, "a")
|
|
try:
|
|
log_sock.writelines("[CryptoBox] - %s\n" % (text, ))
|
|
log_sock.close()
|
|
return
|
|
except IOError:
|
|
errorMsg = "Unable to write messages to logfile (%s)." % (self.logFile, )
|
|
sys.stderr.write(errorMsg + "\n")
|
|
raise "LoggerError", errorMsg
|
|
except IOError:
|
|
errorMsg = "Unable to open logfile (%s) for writing." % (self.logFile, )
|
|
sys.stderr.write("[CryptoBox] - %s\n" % (errorMsg, ))
|
|
raise "LoggerError", errorMsg
|
|
|
|
|
|
def message2syslog(self, text, level):
|
|
syslog_level = [self.DebugLevels[e]
|
|
for e in self.DebugLevels.keys()
|
|
if self.DebugLevels[e] == level
|
|
][0]
|
|
syslog.syslog(syslog_level, text)
|
|
|
|
|
|
# ********************* test class **********************
|
|
|
|
class CryptoBoxLoggerTest(unittest.TestCase):
|
|
|
|
logFile = "/tmp/cbox-test.log"
|
|
|
|
def setUp(self):
|
|
if os.path.exists(self.logFile): os.remove(self.logFile)
|
|
|
|
|
|
def tearDown(self):
|
|
if os.path.exists(self.logFile): os.remove(self.logFile)
|
|
|
|
|
|
def testInit(self):
|
|
"""Initialization should fail for invalid parameters"""
|
|
try:
|
|
CryptoBoxLogger(syslog.LOG_ERR, 0)
|
|
except "LoggerError":
|
|
CryptoBoxLogger(syslog.LOG_ERR, 0, self.logFile)
|
|
os.remove(self.logFile)
|
|
CryptoBoxLogger("info", "file", self.logFile)
|
|
self.assertRaises("LoggerError", CryptoBoxLogger, "invalid", 0, self.logFile)
|
|
self.assertRaises("LoggerError", CryptoBoxLogger, syslog.LOG_ERR, "invalid", self.logFile)
|
|
self.assertRaises("LoggerError", CryptoBoxLogger, 3353, 0, self.logFile)
|
|
self.assertRaises("LoggerError", CryptoBoxLogger, -1, 0, self.logFile)
|
|
self.assertRaises("LoggerError", CryptoBoxLogger, syslog.LOG_INFO, self.logFile)
|
|
self.assertRaises("LoggerError", CryptoBoxLogger, syslog.LOG_CRIT, -1, self.logFile)
|
|
self.assertRaises("LoggerError", CryptoBoxLogger, None, 0, self.logFile)
|
|
self.assertRaises("LoggerError", CryptoBoxLogger, syslog.LOG_WARNING, None, self.logFile)
|
|
self.assertRaises("LoggerError", CryptoBoxLogger, syslog.LOG_EMERG, 0, "/no/existing/path")
|
|
|
|
|
|
def testOutputParams(self):
|
|
"""Output should fail for invalid parameters"""
|
|
cb = CryptoBoxLogger(syslog.LOG_ERR, 0, self.logFile)
|
|
self.assertRaises("LoggerError", cb.printMessage, 3353, "Ausgabe")
|
|
self.assertRaises("LoggerError", cb.printMessage, -1, "Ausgabe")
|
|
self.assertRaises("LoggerError", cb.printMessage, "invalid", "Ausgabe")
|
|
self.assertRaises("LoggerError", cb.printMessage, syslog.LOG_DEBUG, None)
|
|
|
|
def testFile(self):
|
|
"""Do not write messages below specified priority to a file"""
|
|
cb = CryptoBoxLogger(syslog.LOG_ERR, 0, self.logFile)
|
|
content1 = self.readFile()
|
|
self.assertEquals(content1, "")
|
|
cb.printMessage(syslog.LOG_ERR, "Ausgabe")
|
|
content2 = self.readFile()
|
|
self.assertNotEqual(content1, content2)
|
|
cb.printMessage(syslog.LOG_DEBUG, "Ausgabe")
|
|
self.assertEquals(content2, self.readFile())
|
|
cb.printMessage(syslog.LOG_CRIT, "Ausgabe")
|
|
self.assertNotEqual(content2, self.readFile())
|
|
|
|
|
|
def testSyslog(self):
|
|
"""Check syslog output"""
|
|
cb = CryptoBoxLogger(syslog.LOG_DEBUG, "syslog")
|
|
cb.printMessage(syslog.LOG_DEBUG, "just a verification check")
|
|
"""sorry - we do not have a way to check, if something was written somewhere
|
|
so we cannot do other checks beside initialization and writing"""
|
|
|
|
|
|
|
|
def readFile(self):
|
|
fd = None
|
|
try:
|
|
fd = open(self.logFile, "r")
|
|
text = fd.read()
|
|
fd.close()
|
|
except IOError:
|
|
if fd is not None: fd.close()
|
|
text = None
|
|
return text
|
|
|
|
|
|
# *************** unit testing *********************
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
devnull = open(os.devnull, "w")
|
|
sys.stderr = devnull
|
|
except IOError:
|
|
pass
|
|
unittest.main()
|
|
|