lars
b951efdd9c
syslog support added moved program locations to config file removed obsolete CryptoBoxPreferences added some requests for comments (RFC)
221 lines
6.8 KiB
Python
221 lines
6.8 KiB
Python
'''
|
|
manages logging events of the CryptoBox
|
|
'''
|
|
|
|
import sys
|
|
import os
|
|
import syslog
|
|
import unittest
|
|
|
|
class CryptoBoxLogger:
|
|
'''
|
|
handles logging events and prints them e.g. to a logfile
|
|
'''
|
|
|
|
DebugLevels = {
|
|
"debug": syslog.LOG_DEBUG,
|
|
"info": syslog.LOG_INFO,
|
|
"notice": syslog.LOG_NOTICE,
|
|
"warn": syslog.LOG_WARNING,
|
|
"error": syslog.LOG_ERR,
|
|
"crit": syslog.LOG_CRIT,
|
|
"alert": syslog.LOG_ALERT,
|
|
"emerg": syslog.LOG_EMERG}
|
|
DebugDestinations = {"file":0, "syslog":1}
|
|
|
|
def __init__(self, level, destination, args=None):
|
|
"""create a CryptoBoxLogger object and connect it to an output destination
|
|
|
|
level: string (debug/info/notice/warn/error/crit/alert/emerg) or syslog level
|
|
destination: the string "file" or "syslog"
|
|
args: e.g. the name of the logfile or syslog facility
|
|
"""
|
|
try:
|
|
try:
|
|
destination = int(destination)
|
|
except Exception:
|
|
try:
|
|
destination = self.DebugDestinations[destination]
|
|
except KeyError:
|
|
raise "LoggerError"
|
|
if not destination in self.DebugDestinations.values(): raise "LoggerError"
|
|
except "LoggerError":
|
|
errorMsg = "Invalid debug destination: %s" % destination
|
|
sys.stderr.write(errorMsg + "\n")
|
|
raise "LoggerError", errorMsg
|
|
try:
|
|
try:
|
|
level = int(level)
|
|
except Exception:
|
|
try:
|
|
level = self.DebugLevels[level]
|
|
except KeyError:
|
|
raise "LoggerError"
|
|
if not level in self.DebugLevels.values(): raise "LoggerError"
|
|
except "LoggerError":
|
|
errorMsg = "Invalid debug level: %s" % level
|
|
sys.stderr.write(errorMsg + "\n")
|
|
raise "LoggerError", errorMsg
|
|
self.debug_level = level
|
|
if destination == self.DebugDestinations["file"]:
|
|
self.logFunc = self.message2file
|
|
if args is not None:
|
|
self.logFile = args
|
|
else:
|
|
self.logFile = '/var/log/cryptobox.log'
|
|
try:
|
|
fsock = open(self.logFile, "a")
|
|
fsock.close()
|
|
except IOError:
|
|
errorMsg ="Unable to open logfile (%s) for writing." % (self.logFile,)
|
|
sys.stderr.write(errorMsg + "\n")
|
|
raise "LoggerError", errorMsg
|
|
elif destination == self.DebugDestinations["syslog"]:
|
|
self.logFunc = self.message2syslog
|
|
if args is None:
|
|
syslog.openlog("CryptoBox", 0, syslog.LOG_USER)
|
|
else:
|
|
syslog.openlog("CryptoBox", 0, args)
|
|
else:
|
|
errorMsg = "Invalid logging facility: %d." % (facility, )
|
|
sys.stderr.write(errorMsg + "\n")
|
|
raise "LoggerError", errorMsg
|
|
|
|
|
|
def printMessage(self, msg_level, text):
|
|
if msg_level is None: msg_level = self.DebugLevels["debug"]
|
|
"convert debuglevel from string to int, if necessary"
|
|
try:
|
|
msg_level = int(msg_level)
|
|
except ValueError:
|
|
try:
|
|
msg_level = self.DebugLevels[msg_level]
|
|
except KeyError:
|
|
errorMsg = "Invalid debug level: %s" % msg_level
|
|
sys.stderr.write(errorMsg + "\n")
|
|
raise "LoggerError", errorMsg
|
|
if not msg_level in self.DebugLevels.values():
|
|
errorMsg = "Invalid debug level: %s" % msg_level
|
|
sys.stderr.write(errorMsg + "\n")
|
|
raise "LoggerError", errorMsg
|
|
if text is None:
|
|
errorMsg = "Empty debug message - this is not allowed"
|
|
sys.stderr.write(errorMsg + "\n")
|
|
raise "LoggerError", errorMsg
|
|
if msg_level <= self.debug_level:
|
|
self.logFunc(text, msg_level)
|
|
|
|
|
|
def message2file(self, text, level):
|
|
# "level" gets ignored (but syslog needs it)
|
|
try:
|
|
log_sock = open(self.logFile, "a")
|
|
try:
|
|
log_sock.writelines("[CryptoBox] - %s\n" % (text, ))
|
|
log_sock.close()
|
|
return
|
|
except IOError:
|
|
errorMsg = "Unable to write messages to logfile (%s)." % (self.logFile, )
|
|
sys.stderr.write(errorMsg + "\n")
|
|
raise "LoggerError", errorMsg
|
|
except IOError:
|
|
errorMsg = "Unable to open logfile (%s) for writing." % (self.logFile, )
|
|
sys.stderr.write("[CryptoBox] - %s\n" % (errorMsg, ))
|
|
raise "LoggerError", errorMsg
|
|
|
|
|
|
def message2syslog(self, text, level):
|
|
syslog_level = [self.DebugLevels[e]
|
|
for e in self.DebugLevels.keys()
|
|
if self.DebugLevels[e] == level
|
|
][0]
|
|
syslog.syslog(syslog_level, text)
|
|
|
|
|
|
# ********************* test class **********************
|
|
|
|
class CryptoBoxLoggerTest(unittest.TestCase):
|
|
|
|
logFile = "/tmp/cbox-test.log"
|
|
|
|
def setUp(self):
|
|
if os.path.exists(self.logFile): os.remove(self.logFile)
|
|
|
|
|
|
def tearDown(self):
|
|
if os.path.exists(self.logFile): os.remove(self.logFile)
|
|
|
|
|
|
def testInit(self):
|
|
"""Initialization should fail for invalid parameters"""
|
|
try:
|
|
CryptoBoxLogger(syslog.LOG_ERR, 0)
|
|
except "LoggerError":
|
|
CryptoBoxLogger(syslog.LOG_ERR, 0, self.logFile)
|
|
os.remove(self.logFile)
|
|
CryptoBoxLogger("info", "file", self.logFile)
|
|
self.assertRaises("LoggerError", CryptoBoxLogger, "invalid", 0, self.logFile)
|
|
self.assertRaises("LoggerError", CryptoBoxLogger, syslog.LOG_ERR, "invalid", self.logFile)
|
|
self.assertRaises("LoggerError", CryptoBoxLogger, 3353, 0, self.logFile)
|
|
self.assertRaises("LoggerError", CryptoBoxLogger, -1, 0, self.logFile)
|
|
self.assertRaises("LoggerError", CryptoBoxLogger, syslog.LOG_INFO, self.logFile)
|
|
self.assertRaises("LoggerError", CryptoBoxLogger, syslog.LOG_CRIT, -1, self.logFile)
|
|
self.assertRaises("LoggerError", CryptoBoxLogger, None, 0, self.logFile)
|
|
self.assertRaises("LoggerError", CryptoBoxLogger, syslog.LOG_WARNING, None, self.logFile)
|
|
self.assertRaises("LoggerError", CryptoBoxLogger, syslog.LOG_EMERG, 0, "/no/existing/path")
|
|
|
|
|
|
def testOutputParams(self):
|
|
"""Output should fail for invalid parameters"""
|
|
cb = CryptoBoxLogger(syslog.LOG_ERR, 0, self.logFile)
|
|
self.assertRaises("LoggerError", cb.printMessage, 3353, "Ausgabe")
|
|
self.assertRaises("LoggerError", cb.printMessage, -1, "Ausgabe")
|
|
self.assertRaises("LoggerError", cb.printMessage, "invalid", "Ausgabe")
|
|
self.assertRaises("LoggerError", cb.printMessage, syslog.LOG_DEBUG, None)
|
|
|
|
def testFile(self):
|
|
"""Do not write messages below specified priority to a file"""
|
|
cb = CryptoBoxLogger(syslog.LOG_ERR, 0, self.logFile)
|
|
content1 = self.readFile()
|
|
self.assertEquals(content1, "")
|
|
cb.printMessage(syslog.LOG_ERR, "Ausgabe")
|
|
content2 = self.readFile()
|
|
self.assertNotEqual(content1, content2)
|
|
cb.printMessage(syslog.LOG_DEBUG, "Ausgabe")
|
|
self.assertEquals(content2, self.readFile())
|
|
cb.printMessage(syslog.LOG_CRIT, "Ausgabe")
|
|
self.assertNotEqual(content2, self.readFile())
|
|
|
|
|
|
def testSyslog(self):
|
|
"""Check syslog output"""
|
|
cb = CryptoBoxLogger(syslog.LOG_DEBUG, "syslog")
|
|
cb.printMessage(syslog.LOG_DEBUG, "just a verification check")
|
|
"""sorry - we do not have a way to check, if something was written somewhere
|
|
so we cannot do other checks beside initialization and writing"""
|
|
|
|
|
|
|
|
def readFile(self):
|
|
fd = None
|
|
try:
|
|
fd = open(self.logFile, "r")
|
|
text = fd.read()
|
|
fd.close()
|
|
except IOError:
|
|
if fd is not None: fd.close()
|
|
text = None
|
|
return text
|
|
|
|
|
|
# *************** unit testing *********************
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
devnull = open(os.devnull, "w")
|
|
sys.stderr = devnull
|
|
except IOError:
|
|
pass
|
|
unittest.main()
|
|
|