python directory renamed to former perl directory

comment on RFC
This commit is contained in:
lars 2006-09-13 11:10:38 +00:00
parent e80b8874ff
commit f99e155b8c
16 changed files with 1 additions and 1 deletions

346
pythonrewrite/bin/CryptoBox.py Executable file
View file

@ -0,0 +1,346 @@
#!/usr/bin/env python2.4
'''
This is the web interface for a fileserver managing encrypted filesystems.
It was originally written in bash/perl. Now a complete rewrite is in
progress. So things might be confusing here. Hopefully not for long.
:)
'''
# check python version
import sys
(ver_major, ver_minor, ver_sub, ver_desc, ver_subsub) = sys.version_info
if (ver_major < 2) or ((ver_major == 2) and (ver_minor < 4)):
sys.stderr.write("You need a python version >= 2.4\nCurrent version is:\n %s\n" % sys.version)
sys.exit(1)
import CryptoBoxContainer
from CryptoBoxExceptions import *
import re
import os
class CryptoBox:
'''this class rules them all!
put things like logging, conf and oter stuff in here,
that might be used by more classes, it will be passed on to them'''
VERSION = "0.3~1"
def __init__(self, config_file=None):
import CryptoBoxSettings
self.log = self.__getStartupLogger()
self.prefs = CryptoBoxSettings.CryptoBoxSettings(config_file)
self.__runTests()
def __getStartupLogger(self):
import logging
'''initialises the logging system
use it with: 'self.log.[debug|info|warning|error|critical](logmessage)'
all classes should get the logging instance during __init__:
self.log = logging.getLogger("CryptoBox")
first we output all warnings/errors to stderr
as soon as we opened the config file successfully, we redirect debug output
to the configured destination'''
## basicConfig(...) needs python >= 2.4
try:
log_handler = logging.getLogger("CryptoBox")
logging.basicConfig(
format='%(asctime)s %(module)s %(levelname)s %(message)s',
stderr=sys.stderr)
log_handler.setLevel(logging.ERROR)
log_handler.info("loggingsystem is up'n running")
## from now on everything can be logged via self.log...
except:
raise CBEnvironmentError("couldn't initialise the loggingsystem. I give up.")
return log_handler
# do some initial checks
def __runTests(self):
self.__runTestRootPriv()
def __runTestRootPriv(self):
"""try to run 'super' with 'CryptoBoxRootActions'"""
import subprocess
try:
devnull = open(os.devnull, "w")
except IOError:
raise CBEnvironmentError("could not open %s for writing!" % os.devnull)
try:
prog_super = self.prefs["Programs"]["super"]
except KeyError:
raise CBConfigUndefinedError("Programs", "super")
try:
prog_rootactions = self.prefs["Programs"]["CryptoBoxRootActions"]
except KeyError:
raise CBConfigUndefinedError("Programs", "CryptoBoxRootActions")
try:
proc = subprocess.Popen(
shell = False,
stdout = devnull,
stderr = devnull,
args = [prog_super, prog_rootactions, "check"])
except OSError:
raise CBEnvironmentError("failed to execute 'super' (%s)" % self.prefs["Programs"]["super"])
proc.wait()
if proc.returncode != 0:
raise CBEnvironmentError("failed to call CryptoBoxRootActions (%s) via 'super' - maybe you did not add the appropriate line to /etc/super.tab?" % prog_rootactions)
# this method just demonstrates inheritance effects - may be removed
def cbx_inheritance_test(self, string="you lucky widow"):
self.log.info(string)
# RFC: why should CryptoBoxProps inherit CryptoBox? [l]
# RFC: shouldn't we move all useful functions of CryptoBoxProps to CryptoBox? [l]
class CryptoBoxProps(CryptoBox):
'''Get and set the properties of a CryptoBox
This class contains all available devices that may be accessed.
All properties of the cryptobox can be accessed by this class.
'''
def __init__(self, config_file=None):
'''read config and fill class variables'''
CryptoBox.__init__(self, config_file)
self.containers = []
for device in self.__getAvailablePartitions():
if self.isDeviceAllowed(device):
self.containers.append(CryptoBoxContainer.CryptoBoxContainer(device, self))
def isDeviceAllowed(self, devicename):
"check if a device is white-listed for being used as cryptobox containers"
import types
allowed = self.prefs["Main"]["AllowedDevices"]
if type(allowed) == types.StringType: allowed = [allowed]
for a_dev in allowed:
"remove double dots and so on ..."
real_device = os.path.realpath(devicename)
if a_dev and re.search('^' + a_dev, real_device): return True
return False
def getLogData(self, lines=None, maxSize=None):
"""get the most recent log entries of the cryptobox
the maximum number and size of these entries can be limited by 'lines' and 'maxSize'
"""
# return nothing if the currently selected log output is not a file
try:
if self.prefs["Log"]["Destination"].upper() != "FILE": return []
log_file = self.prefs["Log"]["Details"]
except KeyError:
self.log.error("could not evaluate one of the following config settings: [Log]->Destination or [Log]->Details")
return []
try:
fd = open(log_file, "r")
if maxSize: fd.seek(-maxSize, 2) # seek relative to the end of the file
content = fd.readlines()
fd.close()
except IOError:
self.log.warn("failed to read the log file (%s)" % log_file)
return []
if lines: content = content[-lines:]
content.reverse()
return content
def getContainerList(self, filterType=None, filterName=None):
"retrieve the list of all containers of this cryptobox"
try:
result = self.containers[:]
if filterType != None:
if filterType in range(len(CryptoBoxContainer.Types)):
return [e for e in self.containers if e.getType() == filterType]
else:
self.log.info("invalid filterType (%d)" % filterType)
result.clear()
if filterName != None:
result = [e for e in self.containers if e.getName() == filterName]
return result
except AttributeError:
return []
def getContainer(self, device):
"retrieve the container element for this device"
all = [e for e in self.getContainerList() if e.device == device]
if all:
return all[0]
else:
return None
def setNameForUUID(self, uuid, name):
"assign a name to a uuid in the ContainerNameDatabase"
used_uuid = self.getUUIDForName(name)
"first remove potential conflicting uuid/name combination"
if used_uuid: del self.prefs.nameDB[used_uuid]
self.prefs.nameDB[uuid] = name
self.prefs.nameDB.write()
def getNameForUUID(self, uuid):
"get the name belonging to a specified key (usually the UUID of a fs)"
try:
return self.prefs.nameDB[uuid]
except KeyError:
return None
def getUUIDForName(self, name):
""" get the key belonging to a value in the ContainerNameDatabase
this is the reverse action of 'getNameForUUID' """
for key in self.prefs.nameDB.keys():
if self.prefs.nameDB[key] == name: return key
"the uuid was not found"
return None
def getAvailableLanguages(self):
'''reads all files in path LangDir and returns a list of
basenames from existing hdf files, that should are all available
languages'''
languages = []
for file in os.listdir(self.prefs["Locations"]["LangDir"]):
if file.endswith(".hdf"): languages.append(file.rstrip(".hdf"))
if len(languages) < 1:
self.log.warn("No .hdf files found! The website won't render properly.")
return languages
def getAvailableDocLanguages(self):
'''reads all dirs in path DocDir and returns a list of
available documentation languages, this list might be empty.'''
doclangs = []
regpat = re.compile(r"^\w+$")
try:
doc_dir = self.prefs["Locations"]["DocDir"]
except KeyError:
self.log.error("Could not find a configuration setting: [Locations]->DocDir - please check the config file")
return []
if not os.path.exists(doc_dir):
self.log.error("The configured documentation directory (%s) does not exist" % (doc_dir, ))
return []
try:
for e in os.listdir(doc_dir):
if regpat.search(e) and os.path.isdir(os.path.join(doc_dir, e)):
doclangs.append(e)
if len(doclangs) < 1:
self.log.warn("Didn't find any documentation files.")
return doclangs
except OSError:
self.log.error("Could not access the documentations directory (%s)" % (doc_dir,))
return []
""" ************ internal stuff starts here *********** """
def __getAvailablePartitions(self):
"retrieve a list of all available containers"
ret_list = []
try:
"the following reads all lines of /proc/partitions and adds the mentioned devices"
fpart = open("/proc/partitions", "r")
try:
line = fpart.readline()
while line:
p_details = line.split()
if (len(p_details) == 4):
"the following code prevents double entries like /dev/hda and /dev/hda1"
(p_major, p_minor, p_size, p_device) = p_details
if re.search('^[0-9]*$', p_major) and re.search('^[0-9]*$', p_minor):
p_parent = re.sub('[1-9]?[0-9]$', '', p_device)
if p_parent == p_device:
if [e for e in ret_list if re.search('^' + p_parent + '[1-9]?[0-9]$', e)]:
"major partition - its children are already in the list"
pass
else:
"major partition - but there are no children for now"
ret_list.append(p_device)
else:
"minor partition - remove parent if necessary"
if p_parent in ret_list: ret_list.remove(p_parent)
ret_list.append(p_device)
line = fpart.readline()
finally:
fpart.close()
return [self.__getAbsoluteDeviceName(e) for e in ret_list]
except IOError:
self.log.warning("Could not read /proc/partitions")
return []
def __getAbsoluteDeviceName(self, shortname):
""" returns the absolute file name of a device (e.g.: "hda1" -> "/dev/hda1")
this does also work for device mapper devices
if the result is non-unique, one arbitrary value is returned"""
if re.search('^/', shortname): return shortname
default = os.path.join("/dev", shortname)
if os.path.exists(default): return default
result = self.__findMajorMinorOfDevice(shortname)
"if no valid major/minor was found -> exit"
if not result: return default
(major, minor) = result
"for device-mapper devices (major == 254) ..."
if major == 254:
result = self.__findMajorMinorDeviceName("/dev/mapper", major, minor)
if result: return result[0]
"now check all files in /dev"
result = self.__findMajorMinorDeviceName("/dev", major, minor)
if result: return result[0]
return default
def __findMajorMinorOfDevice(self, device):
"return the major/minor numbers of a block device by querying /sys/block/?/dev"
if not os.path.exists(os.path.join("/sys/block", device)): return None
blockdev_info_file = os.path.join(os.path.join("/sys/block", device), "dev")
try:
f_blockdev_info = open(blockdev_info_file, "r")
blockdev_info = f_blockdev_info.read()
f_blockdev_info.close()
(str_major, str_minor) = blockdev_info.split(":")
"numeric conversion"
try:
major = int(str_major)
minor = int(str_minor)
return (major, minor)
except ValueError:
"unknown device numbers -> stop guessing"
return None
except IOError:
pass
def __findMajorMinorDeviceName(self, dir, major, minor):
"returns the names of devices with the specified major and minor number"
collected = []
try:
subdirs = [os.path.join(dir, e) for e in os.listdir(dir) if (not os.path.islink(os.path.join(dir, e))) and os.path.isdir(os.path.join(dir, e))]
"do a recursive call to parse the directory tree"
for dirs in subdirs:
collected.extend(self.__findMajorMinorDeviceName(dirs, major, minor))
"filter all device inodes in this directory"
collected.extend([os.path.realpath(os.path.join(dir, e)) for e in os.listdir(dir) if (os.major(os.stat(os.path.join(dir, e)).st_rdev) == major) and (os.minor(os.stat(os.path.join(dir, e)).st_rdev) == minor)])
result = []
for e in collected:
if e not in result: result.append(e)
return collected
except OSError:
return []
if __name__ == "__main__":
cb = CryptoBox()

View file

@ -0,0 +1,568 @@
#!/usr/bin/env python2.4
## check python version
import sys
(ver_major, ver_minor, ver_sub, ver_desc, ver_subsub) = sys.version_info
if (ver_major < 2) or ((ver_major == 2) and (ver_minor < 4)):
sys.stderr.write("You need a python version >= 2.4\nCurrent version is:\n %s\n" % sys.version)
sys.exit(1)
import subprocess
import os
import re
import logging
"""exceptions:
VolumeIsActive
NameActivelyUsed
InvalidName
InvalidPassword
InvalidType
CreateError
MountError
ChangePasswordError
"""
class CryptoBoxContainer:
Types = {
"unused":0,
"plain":1,
"luks":2,
"swap":3}
__fsTypes = {
"plain":["ext3", "ext2", "vfat", "reiser"],
"swap":["swap"]}
# TODO: more filesystem types? / check 'reiser'
__dmDir = "/dev/mapper"
def __init__(self, device, cbox):
self.device = device
self.cbox = cbox
self.log = logging.getLogger("CryptoBox")
self.Progs = self.cbox.prefs["Programs"]
self.__resetObject()
def getName(self):
return self.name
def setName(self, new_name):
if new_name == self.name: return
if self.isMounted():
raise "VolumeIsActive", "the container must be inactive during renaming"
if not re.search(r'^[a-zA-Z0-9_\.\- ]+$', new_name):
raise "InvalidName", "the supplied new name contains illegal characters"
"check for active partitions with the same name"
prev_name_owner = self.cbox.getContainerList(filterName=new_name)
if prev_name_owner:
for a in prev_name_owner:
if a.isMounted():
raise "NameActivelyUsed", "the supplied new name is already in use for an active partition"
self.cbox.setNameForUUID(self.uuid, new_name)
self.name = new_name
def getDevice(self):
return self.device
def getType(self):
return self.type
def isMounted(self):
return os.path.ismount(self.__getMountPoint())
def getCapacity(self):
"""return the current capacity state of the volume
the result is a tuple of values in megabyte:
(size, available, used)
"""
info = os.statvfs(self.__getMountPoint())
return (
int(info.f_bsize*info.f_blocks/1024/1024),
int(info.f_bsize*info.f_bavail/1024/1024),
int(info.f_bsize*(info.f_blocks-info.f_bavail)/1024/1024))
def create(self, type, password=None):
if type == self.Types["luks"]:
self.__createLuks(password)
self.__resetObject()
return
if type == self.Types["plain"]:
self.__createPlain()
self.__resetObject()
return
raise "InvalidType", "invalid container type (%d) supplied" % (type, )
def changePassword(self, oldpw, newpw):
if self.type != self.Types["luks"]:
raise "InvalidType", \
"changing of password is possible only for luks containers"
if not oldpw:
raise "InvalidPassword", "no old password supplied for password change"
if not newpw:
raise "InvalidPassword", "no new password supplied for password change"
"return if new and old passwords are the same"
if oldpw == newpw: return
if self.isMounted():
raise "VolumeIsActive", "this container is currently active"
devnull = None
try:
devnull = open(os.devnull, "w")
except IOError:
self.log.warn("Could not open %s" % (os.devnull, ))
"remove any potential open luks mapping"
self.__umountLuks()
"create the luks header"
proc = subprocess.Popen(
shell = False,
stdin = subprocess.PIPE,
stdout = subprocess.PIPE,
stderr = subprocess.PIPE,
args = [
self.Progs["super"],
self.Progs["CryptoBoxRootActions"],
"cryptsetup",
"luksAddKey",
self.device,
"--batch-mode"])
proc.stdin.write("%s\n%s" % (oldpw, newpw))
(output, errout) = proc.communicate()
if proc.returncode != 0:
errorMsg = "Could not add a new luks key: %s - %s" % (output.strip(), errout.strip(), )
self.log.error(errorMsg)
raise "ChangePasswordError", errorMsg
## retrieve the key slot we used for unlocking
keys_found = re.search(r'key slot (\d{1,3}) unlocked', output).groups()
if keys_found:
keyslot = int(keys_found[0])
else:
raise "ChangePasswordError", "could not get the old key slot"
"remove the old key"
proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = devnull,
stderr = subprocess.PIPE,
args = [
self.Progs["cryptsetup"],
"--batch-mode",
"luksDelKey",
self.device,
"%d" % (keyslot, )])
proc.wait()
if proc.returncode != 0:
errorMsg = "Could not remove the old luks key: %s" % (proc.stderr.read().strip(), )
self.log.error(errorMsg)
raise "ChangePasswordError", errorMsg
" ****************** internal stuff ********************* "
def __resetObject(self):
""" recheck the information about this container
this is especially useful after changing the type via 'create' """
self.uuid = self.__getUUID()
self.type = self.__getTypeOfPartition()
self.name = self.__getNameOfContainer()
if self.type == self.Types["luks"]:
self.mount = self.__mountLuks
self.umount = self.__umountLuks
if self.type == self.Types["plain"]:
self.mount = self.__mountPlain
self.umount = self.__umountPlain
def __getNameOfContainer(self):
"retrieve the name of the container by querying the database"
def_name = self.cbox.getNameForUUID(self.uuid)
if def_name: return def_name
"there is no name defined for this uuid - we will propose a good one"
prefix = self.cbox.prefs["Main"]["DefaultVolumePrefix"]
unused_found = False
counter = 1
while not unused_found:
guess = prefix + str(counter)
if self.cbox.getUUIDForName(guess):
counter += 1
else:
unused_found = True
self.cbox.setNameForUUID(self.uuid, guess)
return guess
def __getUUID(self):
"""return UUID for luks partitions, ext2/3 and vfat filesystems"""
devnull = None
try:
devnull = open(os.devnull, "w")
except IOError:
self.warn("Could not open %s" % (os.devnull, ))
proc = subprocess.Popen(
shell=False,
stdin=None,
stdout=subprocess.PIPE,
stderr=devnull,
args=[self.Progs["blkid"],
"-s", "UUID",
"-o", "value",
"-c", os.devnull,
"-w", os.devnull,
self.device])
proc.wait()
result = proc.stdout.read().strip()
if proc.returncode != 0:
self.log.warn("retrieving of partition type via 'blkid' failed: %s" % (proc.stderr.read().strip(), ))
return None
devnull.close()
if result: return result
return self.device.replace(os.path.sep, "_")
def __getTypeOfPartition(self):
"retrieve the type of the given partition (see CryptoBoxContainer.Types)"
if self.__isLuksPartition(): return self.Types["luks"]
typeOfPartition = self.__getTypeIdOfPartition()
if typeOfPartition in self.__fsTypes["plain"]:
return self.Types["plain"]
if typeOfPartition in self.__fsTypes["swap"]:
return self.Types["swap"]
return self.Types["unused"]
def __getTypeIdOfPartition(self):
"returns the type of the partition (see 'man blkid')"
devnull = None
try:
devnull = open(os.devnull, "w")
except IOError:
self.log.warn("Could not open %s" % (os.devnull, ))
proc = subprocess.Popen(
shell=False,
stdin=None,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
args=[self.Progs["blkid"],
"-s", "TYPE",
"-o", "value",
"-c", os.devnull,
"-w", os.devnull,
self.device])
proc.wait()
output = proc.stdout.read().strip()
if proc.returncode != 0:
self.log.warn("retrieving of partition type via 'blkid' failed: %s" % (proc.stderr.read().strip(), ))
return None
devnull.close()
return output
def __isLuksPartition(self):
"check if the given device is a luks partition"
devnull = None
try:
devnull = open(os.devnull, "w")
except IOError:
self.log.warn("Could not open %s" % (os.devnull, ))
proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = devnull,
stderr = devnull,
args = [
self.Progs["cryptsetup"],
"--batch-mode",
"isLuks",
self.device])
proc.wait()
devnull.close()
return proc.returncode == 0
def __getMountPoint(self):
"return the name of the mountpoint of this volume"
return os.path.join(self.cbox.prefs["Locations"]["MountParentDir"], self.name)
def __mountLuks(self, password):
"mount a luks partition"
if not password:
raise "InvalidPassword", "no password supplied for luksOpen"
if self.isMounted(): raise "VolumeIsActive", "this container is already active"
self.__umountLuks()
try:
devnull = open(os.devnull, "w")
except IOError:
self.log.warn("Could not open %s" % (os.devnull, ))
self.__cleanMountDirs()
if not os.path.exists(self.__getMountPoint()):
os.mkdir(self.__getMountPoint())
if not os.path.exists(self.__getMountPoint()):
errorMsg = "Could not create mountpoint (%s)" % (self.__getMountPoint(), )
self.log.error(errorMsg)
raise "MountError", errorMsg
proc = subprocess.Popen(
shell = False,
stdin = subprocess.PIPE,
stdout = devnull,
stderr = subprocess.PIPE,
args = [
self.Progs["super"],
self.Progs["CryptoBoxRootActions"],
"cryptsetup",
"luksOpen",
self.device,
self.name,
"--batch-mode"])
proc.stdin.write(password)
(output, errout) = proc.communicate()
if proc.returncode != 0:
errorMsg = "Could not open the luks mapping: %s" % (errout.strip(), )
self.log.warn(errorMsg)
raise "MountError", errorMsg
proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = devnull,
stderr = subprocess.PIPE,
args = [
self.Progs["super"],
self.Progs["CryptoBoxRootActions"],
"mount",
os.path.join(self.__dmDir, self.name),
self.__getMountPoint()])
proc.wait()
if proc.returncode != 0:
errorMsg = "Could not mount the filesystem: %s" % (proc.stderr.read().strip(), )
self.log.warn(errorMsg)
raise "MountError", errorMsg
devnull.close()
def __umountLuks(self):
"umount a luks partition"
devnull = None
try:
devnull = open(os.devnull, "w")
except IOError:
self.log.warn("Could not open %s" % (os.devnull, ))
if self.isMounted():
proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = devnull,
stderr = subprocess.PIPE,
args = [
self.Progs["super"],
self.Progs["CryptoBoxRootActions"],
"umount",
self.__getMountPoint()])
proc.wait()
if proc.returncode != 0:
errorMsg = "Could not umount the filesystem: %s" % (proc.stderr.read().strip(), )
self.log.warn(errorMsg)
raise "MountError", errorMsg
if os.path.exists(os.path.join(self.__dmDir, self.name)):
proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = devnull,
stderr = subprocess.PIPE,
args = [
self.Progs["super"],
self.Progs["CryptoBoxRootActions"],
"cryptsetup",
"luksClose",
self.name,
"--batch-mode"])
proc.wait()
if proc.returncode != 0:
errorMsg = "Could not remove the luks mapping: %s" % (proc.stderr.read().strip(), )
self.log.warn(errorMsg)
raise "MountError", errorMsg
devnull.close()
def __mountPlain(self):
"mount a plaintext partition"
if self.isMounted(): raise "VolumeIsActive", "this container is already active"
devnull = None
try:
devnull = open(os.devnull, "w")
except IOError:
self.log.warn("Could not open %s" % (os.devnull, ))
self.__cleanMountDirs()
if not os.path.exists(self.__getMountPoint()):
os.mkdir(self.__getMountPoint())
if not os.path.exists(self.__getMountPoint()):
errorMsg = "Could not create mountpoint (%s)" % (self.__getMountPoint(), )
self.log.error(errorMsg)
raise "MountError", errorMsg
proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = devnull,
stderr = subprocess.PIPE,
args = [
self.Progs["super"],
self.Progs["CryptoBoxRootActions"],
"mount",
self.device,
self.__getMountPoint()])
proc.wait()
if proc.returncode != 0:
errorMsg = "Could not mount the filesystem: %s" % (proc.stderr.read().strip(), )
self.log.warn(errorMsg)
raise "MountError", errorMsg
devnull.close()
def __umountPlain(self):
"umount a plaintext partition"
devnull = None
try:
devnull = open(os.devnull, "w")
except IOError:
self.log.warn("Could not open %s" % (os.devnull, ))
if self.isMounted():
proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = devnull,
stderr = subprocess.PIPE,
args = [
self.Progs["super"],
self.Progs["CryptoBoxRootActions"],
"umount",
self.__getMountPoint()])
proc.wait()
if proc.returncode != 0:
errorMsg = "Could not umount the filesystem: %s" % (proc.stderr.read().strip(), )
self.log.warn(errorMsg)
raise "MountError", errorMsg
devnull.close()
def __createPlain(self):
"make a plaintext partition"
if self.isMounted():
raise "VolumeIsActive", \
"deactivate the partition before filesystem initialization"
devnull = None
try:
devnull = open(os.devnull, "w")
except IOError:
self.log.warn("Could not open %s" % (os.devnull, ))
proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = devnull,
stderr = subprocess.PIPE,
args = [
self.Progs["mkfs-data"],
self.device])
proc.wait()
if proc.returncode != 0:
errorMsg = "Could not create the filesystem: %s" % (proc.stderr.read().strip(), )
self.log.error(errorMsg)
raise "CreateError", errorMsg
devnull.close()
def __createLuks(self, password):
"make a luks partition"
if not password:
raise "InvalidPassword", "no password supplied for new luks mapping"
if self.isMounted():
raise "VolumeIsActive", \
"deactivate the partition before filesystem initialization"
devnull = None
try:
devnull = open(os.devnull, "w")
except IOError:
self.log.warn("Could not open %s" % (os.devnull, ))
"remove any potential open luks mapping"
self.__umountLuks()
"create the luks header"
proc = subprocess.Popen(
shell = False,
stdin = subprocess.PIPE,
stdout = devnull,
stderr = subprocess.PIPE,
args = [
self.Progs["super"],
self.Progs["CryptoBoxRootActions"],
"cryptsetup",
"luksFormat",
self.device,
"--batch-mode",
"--cipher", self.cbox.prefs["Main"]["DefaultCipher"],
"--iter-time", "2000"])
proc.stdin.write(password)
(output, errout) = proc.communicate()
if proc.returncode != 0:
errorMsg = "Could not create the luks header: %s" % (errout.strip(), )
self.log.error(errorMsg)
raise "CreateError", errorMsg
"open the luks container for mkfs"
proc = subprocess.Popen(
shell = False,
stdin = subprocess.PIPE,
stdout = devnull,
stderr = subprocess.PIPE,
args = [
self.Progs["super"],
self.Progs["CryptoBoxRootActions"],
"cryptsetup",
"luksOpen",
self.device,
self.name,
"--batch-mode"])
proc.stdin.write(password)
(output, errout) = proc.communicate()
if proc.returncode != 0:
errorMsg = "Could not open the new luks mapping: %s" % (errout.strip(), )
self.log.error(errorMsg)
raise "CreateError", errorMsg
"make the filesystem"
proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = devnull,
stderr = subprocess.PIPE,
args = [
self.Progs["mkfs-data"],
os.path.join(self.__dmDir, self.name)])
proc.wait()
"remove the mapping - for every exit status"
self.__umountLuks()
if proc.returncode != 0:
errorMsg = "Could not create the filesystem: %s" % (proc.stderr.read().strip(), )
self.log.error(errorMsg)
"remove the luks mapping"
raise "CreateError", errorMsg
devnull.close()
def __cleanMountDirs(self):
""" remove all unnecessary subdirs of the mount parent directory
this should be called for every (u)mount """
subdirs = os.listdir(self.cbox.prefs["Locations"]["MountParentDir"])
for dir in subdirs:
abs_dir = os.path.join(self.cbox.prefs["Locations"]["MountParentDir"], dir)
if (not os.path.islink(abs_dir)) and os.path.isdir(abs_dir) and (not os.path.ismount(abs_dir)):
os.rmdir(abs_dir)

View file

@ -0,0 +1,80 @@
"""
exceptions of the cryptobox package
"""
class CryptoBoxError(Exception):
"""base class for exceptions of the cryptobox"""
pass
class CBPluginError(CryptoBoxError):
"""should be raised for plugin specific problems"""
class CBPluginActionError(CBPluginError):
"""should be raised when a plugin action failed"""
pass
class CBConfigError(CryptoBoxError):
"""any kind of error related to the configuration of a cryptobox"""
pass
class CBConfigUnavailableError(CBConfigError):
"""config file/input was not available at all"""
def __init__(self, source=None):
self.source = source
def __str__(self):
if self.source:
return "failed to access the configuration of the cryptobox: %s" % self.source
else:
return "failed to access the configuration of the cryptobox"
class CBConfigUndefinedError(CBConfigError):
"""a specific configuration setting was not defined"""
def __init__(self, section, name=None):
self.section = section
self.name = name
def __str__(self):
# is it a settings or a section?
if self.name:
# setting
return "undefined configuration setting: [%s]->%s - please check your configuration file" % (self.section, self.name)
else:
# section
return "undefined configuration section: [%s] - please check your configuration file" % (self.section, )
class CBConfigInvalidValueError(CBConfigError):
"""a configuration setting was invalid somehow"""
def __init__(self, section, name, value, reason):
self.section = section
self.name = name
self.value = value
self.reason = reason
def __str__(self):
return "invalid configuration setting [%s]->%s (%s): %s" % (self.section, self.name, self.value, self.reason)
class CBEnvironmentError(CryptoBoxError):
"""some part of the environment of the cryptobox is broken
e.g. the wrong version of a required program
"""
def __init__(self, desc):
self.desc = desc
def __str__(self):
return "misconfiguration detected: %s" % self.desc

View file

@ -0,0 +1,330 @@
#!/usr/bin/env python2.4
"""module for executing the programs, that need root privileges
Syntax:
- program
- device
- [action]
- [action args]
this script will always return with an exitcode 0 (true), if "check" is the only argument
"""
import os
import sys
import subprocess
import pwd
import grp
import types
allowedProgs = {
"sfdisk": "/sbin/sfdisk",
"cryptsetup": "/sbin/cryptsetup",
"mount": "/bin/mount",
"umount": "/bin/umount",
}
DEV_TYPES = { "pipe":1, "char":2, "dir":4, "block":6, "file":8, "link":10, "socket":12}
def checkIfPluginIsSafe(plugin):
"""check if the plugin and its parents are only writeable for root"""
#FIXME: for now we may skip this test - but users will not like it this way :)
return True
props = os.stat(plugin)
## check if it is owned by non-root
if props.st_uid != 0: return False
## check group-write permission if gid is not zero
if (props.st_gid != 0) and (props.st_mode % 32 / 16 > 0): return False
## check if it is world-writeable
if props.st_mode % 4 / 2 > 0: return False
## are we at root-level (directory-wise)? If yes, then we are ok ...
if plugin == os.path.sep: return True
## check if the parent directory is ok - recursively :)
return checkIfPluginIsSafe(os.path.dirname(os.path.abspath(plugin)))
def checkIfPluginIsValid(plugin):
import imp
try:
x = imp.load_source("cbox_plugin",plugin)
except Exception:
return False
try:
if getattr(x, "PLUGIN_TYPE") == "cryptobox":
return True
else:
return False
except Exception:
return False
def call_plugin(args):
"""check if the plugin may be called - and do it finally ..."""
plugin = os.path.abspath(args[0])
del args[0]
## check existence and excutability
if not os.access(plugin, os.X_OK):
raise Exception, "could not find executable plugin (%s)" % plugin
## check if the plugin (and its parents) are only writeable for root
if not checkIfPluginIsSafe(plugin):
raise Exception, "the plugin (%s) was not safe - check its (and its parents') permissions" % plugin
## check if the plugin is a python program, that is marked as a cryptobox plugin
if not checkIfPluginIsValid(plugin):
raise Exception, "the plugin (%s) is not a correctly marked python script" % plugin
args.insert(0,plugin)
proc = subprocess.Popen(
shell = False,
args = args)
proc.communicate()
return proc.returncode == 0
def isWriteable(device, force_dev_type=None):
"""check if the calling user (not root!) has write access to the device/file
the real (not the effictive) user id is used for the check
additionally the permissions of the default groups of the real uid are checked
this check works nicely together with "super", as it changes (by default) only
the effective uid (not the real uid)
"""
# first check, if the device/file exists
if not os.path.exists(device):
return False
# check the type of the device - if necessary
if not force_dev_type is None:
dev_type = os.stat(device).st_mode % 65536 / 4096
if dev_type != force_dev_type: return False
# retrieve the information for the real user id
(trustUserName, trustUID, groupsOfTrustUser) = getUserInfo(os.getuid())
# set the default groups of the caller for the check (restore them later)
savedGroups = os.getgroups()
os.setgroups(groupsOfTrustUser)
# check permissions
result = os.access(device, os.W_OK) and os.access(device, os.R_OK)
# reset the groups of this process
os.setgroups(savedGroups)
return result
def run_cryptsetup(args):
"""execute cryptsetup as root
@args: list of arguments - they will be treated accordingly to the first element
of this list (the action)"""
if not args: raise "WrongArguments", "no action for cryptsetup supplied"
if type(args) != types.ListType: raise "WrongArguments", "invalid arguments supplied: %s" % (args, )
try:
action = args[0]
del args[0]
device = None
cmd_args = []
if action == "luksFormat":
device = args[0]; del args[0]
cmd_args.append(action)
cmd_args.append(device)
elif action == "luksUUID":
device = args[0]; del args[0]
cmd_args.append(action)
cmd_args.append(device)
elif action == "luksOpen":
if len(args) < 2: raise "WrongArguments", "missing arguments"
device = args[0]; del args[0]
destination = args[0]; del args[0]
cmd_args.append(action)
cmd_args.append(device)
cmd_args.append(destination)
elif action == "luksClose":
if len(args) < 1: raise "WrongArguments", "missing arguments"
destination = args[0]; del args[0]
# maybe add a check for the mapped device's permissions?
# dmsetup deps self.device
cmd_args.append(action)
cmd_args.append(destination)
elif action == "luksAddKey":
device = args[0]; del args[0]
cmd_args.append(action)
cmd_args.append(device)
elif action == "luksDelKey":
if len(cs_args) < 2: raise "WrongArguments", "missing arguments"
device = args[0]; del args[0]
cmd_args.insert(-1, action)
cmd_args.insert(-1, device)
elif action == "isLuks":
device = args[0]; del args[0]
cmd_args.append(action)
cmd_args.append(device)
else: raise "WrongArguments", "invalid action supplied: %s" % (action, )
# check if a device was defined - and check it
if (not device is None) and (not isWriteable(device, DEV_TYPES["block"])):
raise "WrongArguments", "%s is not a writeable block device" % (device, )
cs_args = [allowedProgs["cryptsetup"]]
cs_args.extend(args)
cs_args.extend(cmd_args)
except (TypeError, IndexError):
raise "WrongArguments", "invalid arguments supplied: %s" % (args, )
# execute cryptsetup with the given parameters
proc = subprocess.Popen(
shell = False,
args = cs_args)
proc.communicate()
return proc.returncode == 0
def run_sfdisk(args):
"""execute sfdisk for partitioning
not implemented yet"""
print "ok - you are free to call sfdisk ..."
print " not yet implemented ..."
return True
def run_mount(args):
"""execute mount
"""
if not args: raise "WrongArguments", "no destination for mount supplied"
if type(args) != types.ListType: raise "WrongArguments", "invalid arguments supplied: %s" % (args, )
try:
device = args[0]
del args[0]
destination = args[0]
del args[0]
# check permissions for the device
if not isWriteable(device, DEV_TYPES["block"]):
raise "WrongArguments", "%s is not a writeable block device" % (device, )
# check permissions for the mountpoint
if not isWriteable(destination, DEV_TYPES["dir"]):
raise "WrongArguments", "the mountpoint (%s) is not writeable" % (destination, )
# check for additional (not allowed) arguments
if len(args) != 0:
raise "WrongArguments", "too many arguments for 'mount': %s" % (args, )
except TypeError:
raise "WrongArguments", "invalid arguments supplied: %s" % (args, )
# execute mount with the given parameters
# first overwrite the real uid, as 'mount' wants this to be zero (root)
savedUID = os.getuid()
os.setuid(os.geteuid())
# execute mount
proc = subprocess.Popen(
shell = False,
args = [allowedProgs["mount"], device, destination])
proc.communicate()
# restore previous real uid
os.setuid(savedUID)
return proc.returncode == 0
def run_umount(args):
"""execute mount
"""
if not args: raise "WrongArguments", "no mountpoint for umount supplied"
if type(args) != types.ListType: raise "WrongArguments", "invalid arguments supplied"
try:
destination = args[0]
del args[0]
# check permissions for the destination
if not isWriteable(destination, DEV_TYPES["dir"]):
raise "WrongArguments", "the mountpoint (%s) is not writeable" % (destination, )
if len(args) != 0: raise "WrongArguments", "umount does not allow arguments"
except TypeError:
raise "WrongArguments", "invalid arguments supplied"
# execute umount with the given parameters
# first overwrite the real uid, as 'umount' wants this to be zero (root)
savedUID = os.getuid()
os.setuid(os.geteuid())
# execute umount (with the parameter '-l' - lazy umount)
proc = subprocess.Popen(
shell = False,
args = [allowedProgs["umount"], "-l", destination])
proc.communicate()
# restore previous real uid
os.setuid(savedUID)
return proc.returncode == 0
def getUserInfo(user):
"""return information about the specified user
@user: (uid or name)
@return: tuple of (name, uid, (groups))
"""
if user is None: raise "KeyError", "no user supplied"
# first check, if 'user' contains an id - then check for a name
try:
userinfo = pwd.getpwuid(user)
except TypeError:
# if a KeyError is raised again, then the supplied user was invalid
userinfo = pwd.getpwnam(user)
u_groups =[one_group.gr_gid
for one_group in grp.getgrall()
if userinfo.pw_name in one_group.gr_mem]
if not userinfo.pw_gid in u_groups: u_groups.append(userinfo.pw_gid)
return (userinfo.pw_name, userinfo.pw_uid, u_groups)
# **************** main **********************
# prevent import
if __name__ == "__main__":
# do we have root privileges (effective uid is zero)?
if os.geteuid() != 0:
sys.stderr.write("the effective uid is not zero - you should use 'super' to call this script (%s)" % sys.argv[0])
sys.exit(100)
# remove program name
args = sys.argv[1:]
# do not allow to use root permissions (real uid may not be zero)
if os.getuid() == 0:
sys.stderr.write("the uid of the caller is zero (root) - this is not allowed\n")
sys.exit(100)
# did the user call the "check" action?
if (len(args) == 1) and (args[0].lower() == "check"):
# exit silently
sys.exit(0)
if args[0].lower() == "plugin":
del args[0]
try:
isOK = call_plugin(args)
except Exception, errMsg:
sys.stderr.write("Execution of plugin failed: %s\n" % errMsg)
sys.exit(100)
if isOK:
sys.exit(0)
else:
sys.exit(1)
# check parameters count
if len(args) < 2:
sys.stderr.write("Not enough arguments supplied (%s)!\n" % " ".join(args))
sys.exit(100)
progRequest = args[0]
del args[0]
if not progRequest in allowedProgs.keys():
sys.stderr.write("Invalid program requested: %s\n" % progRequest)
sys.exit(100)
if progRequest == "cryptsetup": runner = run_cryptsetup
elif progRequest == "sfdisk": runner = run_sfdisk
elif progRequest == "mount": runner = run_mount
elif progRequest == "umount": runner = run_umount
else:
sys.stderr.write("The interface for this program (%s) is not yet implemented!\n" % progRequest)
sys.exit(100)
try:
if runner(args):
sys.exit(0)
else:
sys.exit(1)
except "WrongArguments", errstr:
sys.stderr.write("Execution failed: %s\n" % errstr)
sys.exit(100)

View file

@ -0,0 +1,238 @@
import logging
import validate
import os
import CryptoBoxExceptions
try:
import configobj ## needed for reading and writing of the config file
except:
raise CryptoBoxExceptions.CBEnvironmentError("couldn't import 'configobj'! Try 'apt-get install python-configobj'.")
class CryptoBoxSettings:
CONF_LOCATIONS = [
"./cryptobox.conf",
"~/.cryptobox.conf",
"/etc/cryptobox/cryptobox.conf"]
def __init__(self, config_file=None):
self.log = logging.getLogger("CryptoBox")
config_file = self.__getConfigFileName(config_file)
self.log.info("loading config file: %s" % config_file)
self.prefs = self.__getPreferences(config_file)
self.__validateConfig()
self.__configureLogHandler()
self.__checkUnknownPreferences()
self.nameDB = self.__getNameDatabase()
def __getitem__(self, key):
"""redirect all requests to the 'prefs' attribute"""
return self.prefs[key]
def __getPreferences(self, config_file):
import StringIO
config_rules = StringIO.StringIO(self.validation_spec)
try:
prefs = configobj.ConfigObj(config_file, configspec=config_rules)
if prefs:
self.log.info("found config: %s" % prefs.items())
else:
raise CryptoBoxExceptions.CBConfigUnavailableError("failed to load the config file: %s" % config_file)
except IOError:
raise CryptoBoxExceptions.CBConfigUnavailableError("unable to open the config file: %s" % config_file)
return prefs
def __validateConfig(self):
result = self.prefs.validate(CryptoBoxSettingsValidator(), preserve_errors=True)
error_list = configobj.flatten_errors(self.prefs, result)
if not error_list: return
errorMsgs = []
for sections, key, text in error_list:
section_name = "->".join(sections)
if not text:
errorMsg = "undefined configuration value (%s) in section '%s'" % (key, section_name)
else:
errorMsg = "invalid configuration value (%s) in section '%s': %s" % (key, section_name, text)
errorMsgs.append(errorMsg)
raise CryptoBoxExceptions.CBConfigError, "\n".join(errorMsgs)
def __checkUnknownPreferences(self):
import StringIO
config_rules = configobj.ConfigObj(StringIO.StringIO(self.validation_spec), list_values=False)
self.__recursiveConfigSectionCheck("", self.prefs, config_rules)
def __recursiveConfigSectionCheck(self, section_path, section_config, section_rules):
"""should be called by '__checkUnknownPreferences' for every section
sends a warning message to the logger for every undefined (see validation_spec)
configuration setting
"""
for e in section_config.keys():
element_path = section_path + e
if e in section_rules.keys():
if isinstance(section_config[e], configobj.Section):
if isinstance(section_rules[e], configobj.Section):
self.__recursiveConfigSectionCheck(element_path + "->", section_config[e], section_rules[e])
else:
self.log.warn("configuration setting should be a value instead of a section name: %s" % element_path)
else:
if not isinstance(section_rules[e], configobj.Section):
pass # good - the setting is valid
else:
self.log.warn("configuration setting should be a section name instead of a value: %s" % element_path)
else:
self.log.warn("unknown configuration setting: %s" % element_path)
def __getNameDatabase(self):
try:
try:
nameDB_file = self.prefs["Locations"]["NameDatabase"]
except KeyError:
raise CryptoBoxExceptions.CBConfigUndefinedError("Locations", "NameDatabase")
except SyntaxError:
raise CryptoBoxExceptions.CBConfigInvalidValueError("Locations", "NameDatabase", nameDB_file, "failed to interprete the filename of the name database correctly")
## create nameDB is necessary
if os.path.exists(nameDB_file):
nameDB = configobj.ConfigObj(nameDB_file)
else:
nameDB = configobj.ConfigObj(nameDB_file, create_empty=True)
## check if nameDB file was created successfully?
if not os.path.exists(nameDB_file):
raise CryptoBoxExceptions.CBEnvironmentError("failed to create name database (%s)" % nameDB_file)
return nameDB
def __getConfigFileName(self, config_file):
# search for the configuration file
import types
if config_file is None:
# no config file was specified - we will look for it in the ususal locations
conf_file_list = [os.path.expanduser(f)
for f in self.CONF_LOCATIONS
if os.path.exists(os.path.expanduser(f))]
if not conf_file_list:
# no possible config file found in the usual locations
raise CryptoBoxExceptions.CBConfigUnavailableError()
config_file = conf_file_list[0]
else:
# a config file was specified (e.g. via command line)
if type(config_file) != types.StringType:
raise CryptoBoxExceptions.CBConfigUnavailableError("invalid config file specified: %s" % config_file)
if not os.path.exists(config_file):
raise CryptoBoxExceptions.CBConfigUnavailableError("could not find the specified configuration file (%s)" % config_file)
return config_file
def __configureLogHandler(self):
try:
log_level = self.prefs["Log"]["Level"].upper()
log_level_avail = ["DEBUG", "INFO", "WARN", "ERROR"]
if not log_level in log_level_avail:
raise TypeError
except KeyError:
raise CryptoBoxExceptions.CBConfigUndefinedError("Log", "Level")
except TypeError:
raise CryptoBoxExceptions.CBConfigInvalidValueError("Log", "Level", log_level, "invalid log level: only %s are allowed" % log_level_avail)
try:
try:
log_handler = logging.FileHandler(self.prefs["Log"]["Details"])
except KeyError:
raise CryptoBoxExceptions.CBConfigUndefinedError("Log", "Details")
except IOError:
raise CryptoBoxExceptions.CBEnvironmentError("could not create the log file (%s)" % self.prefs["Log"]["Details"])
log_handler.setFormatter(logging.Formatter('%(asctime)s %(module)s %(levelname)s: %(message)s'))
cbox_log = logging.getLogger("CryptoBox")
## remove previous handlers
cbox_log.handlers = []
## add new one
cbox_log.addHandler(log_handler)
## do not call parent's handlers
cbox_log.propagate = False
## 'log_level' is a string -> use 'getattr'
cbox_log.setLevel(getattr(logging,log_level))
## the logger named "CryptoBox" is configured now
validation_spec = """
[Main]
AllowedDevices = list(min=1)
DefaultVolumePrefix = string(min=1)
DefaultCipher = string(default="aes-cbc-essiv:sha256")
[Locations]
MountParentDir = directoryExists(default="/var/cache/cryptobox/mnt")
NameDatabase = fileWriteable(default="/var/cache/cryptobox/volumen_names.db")
TemplateDir = directoryExists(default="/usr/share/cryptobox/template")
LangDir = directoryExists(default="/usr/share/cryptobox/lang")
DocDir = directoryExists(default="/usr/share/doc/cryptobox/html")
PluginDir = directoryExists(default="/usr/share/cryptobox/plugins")
[Log]
Level = option("debug", "info", "warn", "error", default="warn")
Destination = option("file", default="file")
Details = string(min=1)
[WebSettings]
Stylesheet = string(min=1)
Language = string(min=1, default="en")
DocLanguage = string(min=1, default="en")
[Programs]
cryptsetup = fileExecutable(default="/sbin/cryptsetup")
mkfs-data = fileExecutable(default="/sbin/mkfs.ext3")
mkfs-config = fileExecutable(default="/sbin/mkfs.ext2")
blkid = fileExecutable(default="/sbin/blkid")
mount = fileExecutable(default="/bin/mount")
umount = fileExecutable(default="/bin/umount")
super = fileExecutable(default="/usr/bin/super")
# this is the "program" name as defined in /etc/super.tab
CryptoBoxRootActions = string(min=1)
"""
class CryptoBoxSettingsValidator(validate.Validator):
def __init__(self):
validate.Validator.__init__(self)
self.functions["directoryExists"] = self.check_directoryExists
self.functions["fileExecutable"] = self.check_fileExecutable
self.functions["fileWriteable"] = self.check_fileWriteable
def check_directoryExists(self, value):
dir_path = os.path.abspath(value)
if not os.path.isdir(dir_path):
raise validate.VdtValueError("%s (not found)" % value)
if not os.access(dir_path, os.X_OK):
raise validate.VdtValueError("%s (access denied)" % value)
return dir_path
def check_fileExecutable(self, value):
file_path = os.path.abspath(value)
if not os.path.isfile(file_path):
raise validate.VdtValueError("%s (not found)" % value)
if not os.access(file_path, os.X_OK):
raise validate.VdtValueError("%s (access denied)" % value)
return file_path
def check_fileWriteable(self, value):
file_path = os.path.abspath(value)
if os.path.isfile(file_path):
if not os.access(file_path, os.W_OK):
raise validate.VdtValueError("%s (not found)" % value)
else:
parent_dir = os.path.dirname(file_path)
if os.path.isdir(parent_dir) and os.access(parent_dir, os.W_OK):
return file_path
raise validate.VdtValueError("%s (directory does not exist)" % value)
return file_path

View file

@ -0,0 +1,37 @@
#!/usr/bin/env python2.4
import os
import WebInterfaceSites
try:
import cherrypy
except:
print "Could not import the cherrypy module! Try 'apt-get install python-cherrypy'."
sys.exit(1)
class CryptoBoxWebserver:
'''this class starts the cherryp webserver and serves the single sites'''
def __init__(self):
cherrypy.root = WebInterfaceSites.WebInterfaceSites()
#expose static content:
#I currently have no idea how to cleanly extract the stylesheet path from
#the config object without an extra CryptoBox.CryptoBoxProps instance.
#perhaps put config handling into a seperate class in CryptoBox.py?
#
# the following manual mapping is necessary, as we may not use relative
# paths in the config file
cherrypy.config.configMap.update({
"/cryptobox-misc": {
"staticFilter.on" : True,
"staticFilter.dir": os.path.abspath("../www-data" )}
})
def start(self):
# just use this config, when we're startet directly
cherrypy.config.update(file = "cryptoboxwebserver.conf")
cherrypy.server.start()
if __name__ == "__main__":
cbw = CryptoBoxWebserver()
cbw.start()

View file

@ -0,0 +1,76 @@
# $Id$
import imp
import os
import logging
class PluginManager:
"""manage available plugins"""
def __init__(self, plugin_dirs=None):
self.log = logging.getLogger("CryptoBox")
if hasattr(plugin_dirs, "__iter__"):
self.plugin_dirs = [os.path.abspath(dir) for dir in plugin_dirs]
else:
self.plugin_dirs = [os.path.abspath(plugin_dirs)]
def allPlugins(self):
for plfile in self.__getPluginFiles():
yield os.path.basename(plfile)[:-3]
def getPlugin(self, name):
for plfile in self.__getPluginFiles():
if name == os.path.basename(plfile)[:-3]:
return imp.load_source(name, plfile)
else:
return None
def getTemplateFileName(self, plname, template_name):
"""return the name of the template file, if it is part of the mentioned plugin
"""
result = [os.path.join(os.path.dirname(cur_file), template_name + ".cs")
for cur_file in self.__getPluginFiles()
if plname == os.path.basename(cur_file)[:-3]]
for templ_file in result:
if os.access(templ_file, os.R_OK) and os.path.isfile(templ_file):
return templ_file
else:
return None
def loadLanguageData(self, hdf, lang="en"):
for plfile in self.__getPluginFiles():
langdir = os.path.join(os.path.dirname(plfile), "lang")
selected_langfile = os.path.join(langdir, lang + ".hdf")
default_langfile = os.path.join(langdir, "en.hdf")
for langfile in (selected_langfile, default_langfile):
if os.access(langfile, os.R_OK):
self.log.debug("Loading plugin language file: %s" % langfile)
hdf.readFile(langfile)
break
else:
self.log.debug("Couldn't find a plugin language file (%s)" % default_langfile)
def __getPluginFiles(self):
result = []
for dir in [os.path.abspath(e) for e in self.plugin_dirs if os.access(e, os.R_OK) and os.path.isdir(e)]:
for plname in [f for f in os.listdir(dir)]:
pldir = os.path.join(dir, plname)
plfile = os.path.join(pldir, plname + ".py")
if os.path.isfile(plfile) and os.access(plfile, os.R_OK):
result.append(plfile)
return result
if __name__ == "__main__":
x = PluginManager("../plugins")
for a in x.allPlugins():
print "Plugin: %s" % a
print x.getPlugin(a).getStatus()

View file

@ -0,0 +1,80 @@
import os
import CryptoBoxContainer
## useful constant for some functions
CONT_TYPES = CryptoBoxContainer.CryptoBoxContainer.Types
class WebInterfaceDataset(dict):
"""this class contains all data that should be available for the clearsilver
templates
"""
def __init__(self, cbox, prefs):
self.prefs = prefs
self.cbox = cbox
self.__setConfigValues()
self.__setCryptoBoxState()
def setPluginState(self, plugins):
for pl in plugins.allPlugins():
self["Data.Status.Modules." + pl] = plugins.getPlugin(pl).getStatus(self.cbox)
def setCurrentDiskState(self, device):
for container in self.cbox.getContainerList():
if container.getDevice() == device:
isEncrypted = (container.getType() == CONT_TYPES["luks"]) and 1 or 0
isPlain = (container.getType() == CONT_TYPES["plain"]) and 1 or 0
isMounted = container.isMounted() and 1 or 0
self["Data.CurrentDisk.device"] = container.getDevice()
self["Data.CurrentDisk.name"] = container.getName()
self["Data.CurrentDisk.encryption"] = isEncrypted
self["Data.CurrentDisk.plaintext"] = isPlain
self["Data.CurrentDisk.active"] = isMounted
if isMounted:
(size, avail, used) = container.getCapacity()
percent = used / size
self["Data.CurrentDisk.capacity.used"] = used
self["Data.CurrentDisk.capacity.free"] = avail
self["Data.CurrentDisk.capacity.size"] = size
self["Data.CurrentDisk.capacity.percent"] = percent
def __setConfigValues(self):
self["Settings.TemplateDir"] = os.path.abspath(self.prefs["Locations"]["TemplateDir"])
self["Settings.LanguageDir"] = os.path.abspath(self.prefs["Locations"]["LangDir"])
self["Settings.DocDir"] = os.path.abspath(self.prefs["Locations"]["DocDir"])
self["Settings.Stylesheet"] = self.prefs["WebSettings"]["Stylesheet"]
self["Settings.Language"] = self.prefs["WebSettings"]["Language"]
self["Settings.DocLang"] = self.prefs["WebSettings"]["DocLanguage"]
def __setCryptoBoxState(self):
self["Data.Version"] = self.cbox.VERSION
avail_counter = 0
active_counter = 0
for container in self.cbox.getContainerList():
isEncrypted = (container.getType() == CONT_TYPES["luks"]) and 1 or 0
isPlain = (container.getType() == CONT_TYPES["plain"]) and 1 or 0
isMounted = container.isMounted() and 1 or 0
self["Data.Disks.%d.device" % avail_counter] = container.getDevice()
self["Data.Disks.%d.name" % avail_counter] = container.getName()
self["Data.Disks.%d.encryption" % avail_counter] = isEncrypted
self["Data.Disks.%d.plaintext" % avail_counter] = isPlain
self["Data.Disks.%d.active" % avail_counter] = isMounted
if isMounted: active_counter += 1
avail_counter += 1
self["Data.activeDisksCount"] = active_counter
for lang in self.cbox.getAvailableLanguages():
self["Data.Languages." + lang] = self.__getLanguageName(lang)
## TODO: open issues: Data.Config.AdminPasswordIsSet
def __getLanguageName(self, lang):
import neo_cgi, neo_util, neo_cs
hdf_path = os.path.join(self.prefs["Locations"]["LangDir"], lang + ".hdf")
hdf = neo_util.HDF()
hdf.readFile(hdf_path)
return hdf.getValue("Lang.Name",lang)

View file

@ -0,0 +1,494 @@
import CryptoBox
import WebInterfaceDataset
import re
from Plugins import PluginManager
from CryptoBoxExceptions import *
class WebInterfaceSites:
'''
url2func = {'index':'show_status','doc':'show_doc','logs':'show_log'}
'''
def __init__(self):
import logging
self.cbox = CryptoBox.CryptoBoxProps()
self.log = logging.getLogger("CryptoBox")
self.prefs = self.cbox.prefs
self.__resetDataset()
self.plugins = PluginManager(self.prefs["Locations"]["PluginDir"])
self.__exposePlugins()
def __exposePlugins(self):
for plname in self.plugins.allPlugins():
self.log.info("Plugin '%s' loaded" % plname)
## this should be the "easiest" way to expose all modules as URLs
setattr(self, "module_" + plname, self.return_module_action(plname))
setattr(getattr(self, "module_" + plname), "exposed", True)
def __resetDataset(self):
self.dataset = WebInterfaceDataset.WebInterfaceDataset(self.cbox, self.prefs)
def __isHDAvailable(self):
#TODO: implement this
return True
def __check_config(self):
#TODO: from now on a cryptobox is always configured
return True
def __check_init_running(self):
#TODO: implement this check (is mkfs still running?)
return False
######################################################################
## put real sites down here and don't forget to expose them at the end
def logs(self, weblang=""):
'''displays a HTML version of the logfile
'''
self.__resetDataset()
self.__setWebLang(weblang)
self.dataset["Data.Log"] = "<br/>".join(self.cbox.getLogData(lines=30, maxSize=2000))
return self.__render("show_log")
def status(self, weblang=""):
'''shows the current status of the box
'''
self.__resetDataset()
self.__setWebLang(weblang)
if not self.__check_config():
self.dataset["Data.Warning"] = "NotInitialized"
return self.__render("form_init")
elif self.__check_init_running():
self.dataset["Data.Warning"] = "InitNotFinished"
self.dataset["Data.Redirect.Action"] = "form_config"
self.dataset["Data.Redirect.Delay"] = "30"
return self.__render("empty")
else:
self.dataset["Data.Redirect.Delay"] = "60"
return self.__render("show_status")
def doc(self,page="",weblang=""):
'''prints the offline wikipage
'''
import re
self.__resetDataset()
self.__setWebLang(weblang)
## check for invalid characters
if page and not re.search(u'\W', page):
self.dataset["Data.Doc.Page"] = page
else:
## display this page as default help page
self.dataset["Data.Doc.Page"] ="CryptoBoxUser"
return self.__render("show_doc")
def system(self, weblang=""):
self.__resetDataset()
self.__setWebLang(weblang)
return self.__render("form_system")
def index(self, weblang=""):
self.__resetDataset()
self.__setWebLang(weblang)
return self.__render("show_status")
def show_volume(self, device="", weblang=""):
self.__resetDataset()
self.__setWebLang(weblang)
if self.__setDevice(device):
return self.__render("show_volume")
else:
if self.cbox.getContainerList():
return self.__render("show_volumes")
else:
return self.__render("show_status")
def show_volumes(self, weblang=""):
self.__resetDataset()
self.__setWebLang(weblang)
return self.__render("show_volumes")
def volume_name_set(self, device="", volume_name="", weblang=""):
self.__resetDataset()
self.__setWebLang(weblang)
if self.__setDevice(device):
volume_name = volume_name.strip()
if self.__checkVolumeName(volume_name):
container = self.cbox.getContainer(device)
try:
container.setName(volume_name)
# TODO: specify the possible exceptions
except Exception:
self.log.warn("failed to rename the volume '%s' to '%s'" % (device, volume_name))
self.dataset["Data.Warning"] = "SetVolumeNameFailed"
else:
self.log.info("successfully renamed volume '%s' to '%s'" % (device, volume_name))
# reread the dataset
self.__resetDataset()
self.dataset.setCurrentDiskState(device)
else:
self.dataset["Data.Warning"] = "InvalidVolumeName"
return self.__render("show_volume")
else:
if self.cbox.getContainerList():
return self.__render("show_volumes")
else:
return self.__render("show_status")
def mount_do(self, device, crypto_password=None, weblang=""):
self.__resetDataset()
self.__setWebLang(weblang)
if self.__setDevice(device):
container = self.cbox.getContainer(device)
if container.isMounted():
self.dataset["Data.Warning"] = "IsMounted"
self.log.warn("the device (%s) is already mounted" % device)
else:
try:
if container.getType() == container.Types["luks"]:
## encrypted luks container
if not crypto_password:
self.dataset["Data.Warning"] = "EmptyCryptoPassword"
self.log.warn("no password was supplied for mounting of device '%s'" % device)
return self.__render("show_volume")
else:
container.mount(crypto_password)
elif container.getType() == container.Types["plain"]:
## plain container
container.mount()
else:
## mounting is not possible
# TODO: wrong warning message - replace it
self.dataset["Data.Warning"] = "MountFailed"
self.log.warn("this type of container (%s) cannot be mounted - sorry!" % device)
except (Exception, "MountError"):
self.dataset["Data.Warning"] = "MountFailed"
self.log.warn("failed to mount the device (%s)" % device)
else:
self.log.info("successfully mounted the container (%s)" % device)
# reread the dataset
self.__resetDataset()
self.dataset.setCurrentDiskState(device)
else:
if self.cbox.getContainerList():
return self.__render("show_volumes")
else:
return self.__render("show_status")
return self.__render("show_volume")
def volume_init_ask(self, device, encryption=None, weblang=""):
self.__resetDataset()
self.__setWebLang(weblang)
if self.__setDevice(device):
container = self.cbox.getContainer(device)
if container.isMounted():
self.dataset["Data.Warning"] = "VolumeMayNotBeMounted"
self.log.warn("initialization is not possible as long as the device (%s) is mounted" % device)
return self.__render("show_volume")
else:
if encryption is None:
self.dataset["Data.Init.isCrypto"] = 0
else:
self.dataset["Data.Init.isCrypto"] = 1
return self.__render("form_init")
else:
if self.cbox.getContainerList():
return self.__render("show_volumes")
else:
return self.__render("show_status")
def init_do(self, device, confirm, crypto_password=None, crypto_password2=None, encryption=None, weblang=""):
self.__resetDataset()
self.__setWebLang(weblang)
if self.__setDevice(device):
container = self.cbox.getContainer(device)
## set 'Data.Init.isCrypto' - just in case, we have to show the same form again
if encryption is None:
self.dataset["Data.Init.isCrypto"] = 0
else:
self.dataset["Data.Init.isCrypto"] = 1
if container.isMounted():
self.dataset["Data.Warning"] = "VolumeMayNotBeMounted"
self.log.warn("initialization is not possible as long as the device (%s) is mounted" % device)
return self.__render("form_init")
else:
# TODO: we have to compare 'confirm' with the value in the language file - IMPORTANT!
if not confirm:
self.dataset["Data.Warning"] = "InitNotConfirmed"
self.log.warn("the confirmation sentence for initialization of the device '%s' was wrong" % device)
return self.__render("form_init")
try:
if not encryption is None:
if not crypto_password:
self.dataset["Data.Warning"] = "EmptyCryptoPassword"
self.log.warn("no crypto password was supplied for initialization of device '%s'" % device)
return self.__render("form_init")
if crypto_password != crypto_password2:
self.dataset["Data.Warning"] = "DifferentCryptoPasswords"
self.log.warn("the crypto password was not repeated correctly for initialization of device '%s'" % device)
return self.__render("form_init")
container.create(container.Types["luks"], crypto_password)
else:
container.create(container.Types["plain"])
# TODO: specify the exception
except Exception, errMsg:
# TODO: wrong error/warning message - change it
self.dataset["Data.Error"] = "InitFailed"
self.log.warn("initialization of device '%s' failed" % device)
self.log.warn("reason: %s" % errMsg)
return self.__render("form_init")
else:
self.log.info("successfully initialized device '%s'" % device)
# reread the dataset
self.__resetDataset()
self.dataset.setCurrentDiskState(device)
return self.__render("show_volume")
else:
if self.cbox.getContainerList():
return self.__render("show_volumes")
else:
return self.__render("show_status")
def test(self):
self.__resetDataset()
return "test passed"
def umount_do(self, device, weblang=""):
self.__resetDataset()
self.__setWebLang(weblang)
if self.__setDevice(device):
container = self.cbox.getContainer(device)
if not container.isMounted():
self.dataset["Data.Warning"] = "NotMounted"
self.log.warn("the device (%s) is currently not mounted" % device)
else:
try:
if container.getType() == container.Types["luks"]:
## encrypted luks container
container.umount()
elif container.getType() == container.Types["plain"]:
## plain container
container.umount()
else:
## mounting is not possible
# TODO: wrong warning message - replace it
self.dataset["Data.Warning"] = "UmountFailed"
self.log.warn("this type of container (%s) cannot be umounted - sorry!" % device)
except (Exception, "UmountError"):
self.dataset["Data.Warning"] = "UmountFailed"
self.log.warn("failed to unmount the device (%s)" % device)
else:
self.log.info("successfully unmounted the container (%s)" % device)
# reread the dataset
self.__resetDataset()
self.dataset.setCurrentDiskState(device)
else:
if self.cbox.getContainerList():
return self.__render("show_volumes")
else:
return self.__render("show_status")
return self.__render("show_volume")
def return_module_action(self, module):
def handler(**args):
self.__resetDataset()
try:
self.__setWebLang(args["weblang"])
del args["weblang"]
except KeyError:
pass
plugin = self.plugins.getPlugin(module)
try:
nextTemplate = plugin.doAction(self.cbox, **args)
except CBPluginActionError, errMsg:
self.log.debug(errMsg)
self.dataset["Data.Warning"] = errMsg
nextTemplate = "empty"
plugin.prepareForm(self.dataset, self.cbox)
return self.__render(nextTemplate, module)
return handler
'''
## DONE: these functions are pythonized
#################### show_log #######################
##################### doc ############################
##################### poweroff ######################
##################### reboot ########################
## but there are even more TODO
#-------------------------------------------------------#
# here you may define all cases that require a harddisk #
#-------------------------------------------------------#
################ umount_do #######################
elif action == "unmount_do":
if not device:
self.log.debug("invalid device chosen: %s" device
settings["Data.Warning"] = "InvalidDevice"
settings["Data.Action"] = "empty"
elif not True: #TODO: replace True with check_config()
settings["Data.Warning"] = "NotInitialized"
settings["Data.Action"] = "form_init"
elif True: #TODO: replace True with check_init_running()
settings["Data.Warning"] = "InitNotFinished"
settings["Data.Action"] = "empty"
settings["Data.Redirect.Action"] = "form_config"
settings["Data.Redirect.Delay"] = "30"
elif not True: #TODO: replace True with check_mounted(device)
settings["Data.Warning"] = "NotMounted"
settings["Data.Action"] = "show_volume"
else: #unmount
#TODO: replace this line with umount_vol(device)
if True: #TODO: replace True with check_mounted(device)
settings["Data.Warning"] = "UmountFailed"
settings["Data.Action"] = "show_volume"
else:
settings["Data.Action"] = "show_volume"
################ mount_do ########################
elif action == "mount_do":
if device:
pass #TODO: is_encrypted = check_device_encryption(device)
else:
self.log.debug("invalid device chosen: %s" device
settings["Data.Warning"] = "InvalidDevice"
settings["Data.Action"] = "empty"
elif not True: #TODO: replace True with check_config()
settings["Data.Warning"] = "NotInitialized"
settings["Data.Action"] = "form_init"
#at cryptobox.pl line 568
'''
##################### input checker ##########################
def __setWebLang(self, value):
## TODO: add some code to evaluate the language setting of the browser
guess = value
availLangs = self.cbox.getAvailableLanguages()
## TODO: add some warnings for an invalid choosen language
if not guess or \
not guess in availLangs or \
re.search(u'\W', guess):
guess = self.prefs["WebSettings"]["Language"]
## maybe the language is still not valid
if not guess in availLangs:
self.log.warn("the configured language is invalid: %s" % guess)
guess = availLangs[0]
self.dataset["Settings.Language"] = guess
self.dataset["Settings.DocLang"] = guess
self.dataset["Settings.LinkAttrs.weblang"] = guess
def __setDevice(self, device):
if device and re.match(u'[\w /\-]+$', device) and self.cbox.getContainer(device):
self.log.debug("select device: %s" % device)
self.dataset.setCurrentDiskState(device)
return True
else:
self.log.warn("invalid device: %s" % device)
self.dataset["Data.Warning"] = "InvalidDevice"
return False
def __checkVolumeName(self, name):
if name and re.match(u'[\w \-]+$', name):
return True
else:
return False
def __render(self, template, module=None):
'''renders from clearsilver templates and returns the resulting html
Gets a dictionary with all settings, nessessary for rendering.
In fact the dictionary is a copy of the CryptoBoxWerbserverSite
object, that calls this render method. This might be bloat but
this way the render method has always a complete, actual set of values.
'''
import os
try:
import neo_cgi, neo_util, neo_cs
except ImportError:
errorMsg = "Could not import clearsilver modules. Try 'apt-get install python-clearsilver'."
self.log.error(errorMsg)
sys.stderr.write(errorMsg)
raise ImportError, errorMsg
module_cs_file = False
if module:
module_cs_file = self.plugins.getTemplateFileName(module, template)
default_cs_file = os.path.join(self.prefs["Locations"]["TemplateDir"], template + ".cs")
self.dataset["Data.TemplateFile"] = module_cs_file or default_cs_file
self.log.info("rendering site: " + template)
cs_path = os.path.join(self.prefs["Locations"]["TemplateDir"], "main.cs")
if not os.access(cs_path, os.R_OK):
log.error("Couldn't read clearsilver file: %s" % cs_path)
return "Couldn't read clearsilver file: %s" % cs_path
# use the user selected language instead of the configured
hdf_path = os.path.join(self.prefs["Locations"]["LangDir"], self.dataset["Settings.Language"] + ".hdf")
if not os.access(hdf_path, os.R_OK):
log.error("Couldn't read language file: %s" % hdf_path)
return "Couldn't read language file: %s" % hdf_path
## add the current state of the plugins to the hdf dataset
self.dataset.setPluginState(self.plugins)
hdf = neo_util.HDF()
hdf.readFile(hdf_path)
self.log.debug(self.dataset)
for key in self.dataset.keys():
hdf.setValue(key,str(self.dataset[key]))
## load languaga data of plugins
self.plugins.loadLanguageData(hdf, lang=self.dataset["Settings.Language"])
cs = neo_cs.CS(hdf)
cs.parseFile(cs_path)
return cs.render()
############################################################################
## to make the sites visible through the webserver they must be exposed here
index.exposed = True
doc.exposed = True
logs.exposed = True
system.exposed = True
status.exposed = True
show_volume.exposed = True
volume_name_set.exposed = True
mount_do.exposed = True
volume_init_ask.exposed = True
init_do.exposed = True
umount_do.exposed = True
show_volumes.exposed = True
test.exposed = True
"""
## TODO: check this before anything else
if self.cbox.getAvailableDocLanguages():
self.dataset["Data.Error"] = "NoDocumentation"
return self.__render("show_status")
"""

View file

@ -0,0 +1,18 @@
Maybe we can add some notes here to get a consistent coding experience :)
-------------------------------------------------------------------------------
comments:
- should be usable for pydoc
- ''' or """ at the beginning of every class/method
- ## for longterm comments, that are useful for understanding
- #blabla for codelines, that are out for experimenting and might be used later again
error handling:
- unspecific error handling is evil (try: "grep -r except: .")
unit testing:
- first write a unittest and then write the relating code until the unittest stops failing :)
- 'unittests.ClassName.py' should contain all tests for 'ClassName.py'
- commits with broken unit tests are evil (fix or disable the code (not the test ;) ))

View file

@ -0,0 +1,80 @@
[Main]
# comma separated list of possible prefixes for accesible devices
# beware: .e.g "/dev/hd" grants access to _all_ harddisks
AllowedDevices = /dev/loop
# the default name prefix of not unnamed containers
DefaultVolumePrefix = "Data "
# which cipher should cryptsetup-luks use?
DefaultCipher = aes-cbc-essiv:sha256
[Locations]
# where should we mount volumes?
# this directory must be writeable by the cryptobox user (see above)
MountParentDir = /var/cache/cryptobox/mnt
# the name-database file - inside of DataDir
#NameDatabase = /var/cache/cryptobox/cryptobox_names.db
NameDatabase = cryptobox_names.db
# where are the clearsilver templates?
#TemplateDir = /usr/share/cryptobox/templates
TemplateDir = ../templates
# path to language files
#LangDir = /usr/share/cryptobox/lang
LangDir = ../lang
# path to documentation files
#DocDir = /usr/share/doc/cryptobox/html
DocDir = ../doc/html
# path to the plugin directory
#PluginDir = /usr/share/cryptobox/plugins
PluginDir = ../plugins
[Log]
# possible values are "debug", "info", "warn" and "error" or numbers from
# 0 (debug) to 7 (error)
Level = debug
# where to write the log messages to?
# possible values are: file
# syslog support will be added later
Destination = file
# depending on the choosen destination (see above) you may select
# details. Possible values for the different destinations are:
# file: $FILENAME
# syslog: $LOG_FACILITY
#Details = /var/log/cryptobox.log
Details = ./cryptobox.log
[WebSettings]
# URL of default stylesheet
Stylesheet = /cryptobox-misc/cryptobox.css
# default language
Language = de
# default language for documentation
DocLanguage = de
[Programs]
cryptsetup = /sbin/cryptsetup
mkfs-data = /sbin/mkfs.ext3
mkfs-config = /sbin/mkfs.ext2
blkid = /sbin/blkid
mount = /bin/mount
umount = /bin/umount
super = /usr/bin/super
# this is the "program" name as defined in /etc/super.tab
CryptoBoxRootActions = CryptoBoxRootActions

View file

@ -0,0 +1,14 @@
[global]
server.socketPort = 8080
#server.environment = "production"
server.environment = "development"
server.logToScreen = True
server.threadPool = 1
server.reverseDNS = False
server.logFile = "cryptoboxwebserver.log"
[/favicon.ico]
static_filter.on = True
# TODO: for now, we do not have a cryptobox favicon
static_filter.file = "/usr/share/doc/python-cherrypy/cherrypy/favicon.ico"

View file

@ -0,0 +1,2 @@
# adapt the following line to your local setup and add it to /etc/super.tab
CryptoBoxRootActions /your/local/path/to/CryptoBoxRootActions.py yourUserName

View file

@ -0,0 +1,116 @@
#!/usr/bin/env python2.4
"""
BEWARE: this script may overwrite the data of one of your loop devices. You
should restrict the AllowedDevices directive in cryptobox.conf to exclude
your precious black devices from being used by this script.
the following script runs a number of tests for different parts
"""
from CryptoBox import CryptoBoxProps
from CryptoBoxContainer import CryptoBoxContainer
import sys
def main():
cb = CryptoBoxProps()
print "Confguration:"
print "\tConfig file:\t\t%s" % (cb.cbxPrefs.filename, )
print "\tAllowed devices:\t%s" % (cb.cbxPrefs["Main"]["AllowedDevices"], )
"""for e in cb.getContainerList(filterType=CryptoBoxContainer.Types["luks"]):"""
for e in cb.getContainerList():
print "\t\t%d\t\t%s - %s - %d" % (cb.getContainerList().index(e), e.getDevice(), e.getName(), e.getType())
if not cb.getContainerList():
print "no loop devices found for testing"
sys.exit(1)
if len(cb.getContainerList()) > 1:
print "I found more than one available loop device - I will stop now to avoid risking data loss."
print "Please change the 'AllowedDevices' setting in 'cryptobox.conf' to reduce the number of allowed devices to only one."
sys.exit(1)
testElement = cb.getContainerList()[0]
print "\nRunning some tests now ..."
if not plain_tests(testElement):
print "some previous tests failed - we should stop now"
sys.exit(1)
luks_tests(testElement)
" ***************** some functions ******************** "
def luks_tests(e):
# umount if necessary
try:
e.umount()
except "MountError":
pass
e.create(e.Types["luks"], "alt")
print "\tluks create:\tok"
e.changePassword("alt","neu")
print "\tluks changepw:\tok"
e.setName("lalla")
print "\tluks setName:\tok"
try:
e.mount("neu")
except "MountError":
pass
if e.isMounted(): print "\tluks mount:\tok"
else: print "\tluks mount:\tfailed"
print "\tCapacity (size, free, used) [MB]:\t%s" % (e.getCapacity(), )
try:
e.umount()
except "MountError":
pass
if e.isMounted(): print "\tluks umount:\tfailed"
else: print "\tluks umount:\tok"
if e.isMounted(): return False
else: return True
def plain_tests(e):
# umount if necessary
try:
e.umount()
except "MountError":
pass
e.create(e.Types["plain"])
print "\tplain create:\tok"
e.setName("plain-lili")
print "\tplain setName:\tok"
try:
e.mount()
except "MountError":
pass
if e.isMounted(): print "\tplain mount:\tok"
else: print "\tplain mount:\tfailed"
print "\tCapacity (size, free, used) [MB]:\t%s" % (e.getCapacity(), )
try:
e.umount()
except "MountError":
pass
if e.isMounted(): print "\tplain umount:\tfailed"
else: print "\tplain umount:\tok"
if e.isMounted(): return False
else: return True
# ************ main ****************
main()

View file

@ -0,0 +1,136 @@
#!/usr/bin/env python2.4
import unittest
import sys
from CryptoBox import *
from CryptoBoxExceptions import *
import CryptoBoxSettings
class CryptoBoxPropsDeviceTests(unittest.TestCase):
import CryptoBox
cb = CryptoBox.CryptoBoxProps()
def testAllowedDevices(self):
'''isDeviceAllowed should accept permitted devices'''
self.assertTrue(self.cb.isDeviceAllowed("/dev/loop"))
self.assertTrue(self.cb.isDeviceAllowed("/dev/loop1"))
self.assertTrue(self.cb.isDeviceAllowed("/dev/loop/urgd"))
self.assertTrue(self.cb.isDeviceAllowed("/dev/usb/../loop1"))
def testDeniedDevices(self):
'''isDeviceAllowed should fail with not explicitly allowed devices'''
self.assertFalse(self.cb.isDeviceAllowed("/dev/hda"))
self.assertFalse(self.cb.isDeviceAllowed("/dev/loopa/../hda"))
self.assertFalse(self.cb.isDeviceAllowed("/"))
class CryptoBoxPropsConfigTests(unittest.TestCase):
'''test here if everything with the config turns right'''
import os
import CryptoBox
files = {
"configFileOK" : "cbox-test_ok.conf",
"configFileBroken" : "cbox-test_broken.conf",
"nameDBFile" : "cryptobox_names.db",
"logFile" : "cryptobox.log",
"tmpdir" : "cryptobox-mnt" }
tmpdirname = ""
filenames = {}
configContentOK = """
[Main]
AllowedDevices = /dev/loop
DefaultVolumePrefix = "Data "
DefaultCipher = aes-cbc-essiv:sha256
[Locations]
NameDatabase = %s/cryptobox_names.db
MountParentDir = %s
TemplateDir = ../templates
LangDir = ../lang
DocDir = ../doc/html
PluginDir = ../plugins
[Log]
Level = debug
Destination = file
Details = %s/cryptobox.log
[WebSettings]
Stylesheet = /cryptobox-misc/cryptobox.css
[Programs]
blkid = /sbin/blkid
cryptsetup = /sbin/cryptsetup
super = /usr/bin/super
CryptoBoxRootActions = CryptoBoxRootActions
"""
def setUp(self):
'''generate all files in tmp and remember the names'''
import tempfile
os = self.os
self.tmpdirname = tempfile.mkdtemp(prefix="cbox-")
for file in self.files.keys():
self.filenames[file] = os.path.join(self.tmpdirname, self.files[file])
self.writeConfig()
def tearDown(self):
'''remove the created tmpfiles'''
os = self.os
# remove temp files
for file in self.filenames.values():
compl_name = os.path.join(self.tmpdirname, file)
if os.path.exists(compl_name):
os.remove(compl_name)
# remove temp dir
os.rmdir(self.tmpdirname)
def testConfigInit(self):
'''Check various branches of config file loading'''
import os
self.assertRaises(CBConfigUnavailableError, self.CryptoBox.CryptoBoxProps,"/invalid/path/to/config/file")
self.assertRaises(CBConfigUnavailableError, self.CryptoBox.CryptoBoxProps,"/etc/shadow")
""" check one of the following things:
1) are we successfully using an existing config file?
2) do we break, if no config file is there?
depending on the existence of a config file, only one of these conditions
can be checked - hints for more comprehensive tests are appreciated :) """
for a in CryptoBoxSettings.CryptoBoxSettings.CONF_LOCATIONS:
if os.path.exists(a):
self.CryptoBox.CryptoBoxProps()
break # this skips the 'else' clause
else: self.assertRaises(CBConfigUnavailableError, self.CryptoBox.CryptoBoxProps)
self.assertRaises(CBConfigUnavailableError, self.CryptoBox.CryptoBoxProps,[])
def testBrokenConfigs(self):
"""Check various broken configurations"""
self.writeConfig("NameDatabase", "#out", filename=self.filenames["configFileBroken"])
self.assertRaises(CBConfigError, self.CryptoBox.CryptoBoxProps,self.filenames["configFileBroken"])
self.writeConfig("Level", "Level = ho", filename=self.filenames["configFileBroken"])
self.assertRaises(CBConfigError, self.CryptoBox.CryptoBoxProps,self.filenames["configFileBroken"])
self.writeConfig("Details", "#out", filename=self.filenames["configFileBroken"])
self.assertRaises(CBConfigError, self.CryptoBox.CryptoBoxProps,self.filenames["configFileBroken"])
self.writeConfig("super", "super=/bin/invalid/no", filename=self.filenames["configFileBroken"])
self.assertRaises(CBConfigError, self.CryptoBox.CryptoBoxProps,self.filenames["configFileBroken"])
self.writeConfig("CryptoBoxRootActions", "#not here", filename=self.filenames["configFileBroken"])
self.assertRaises(CBConfigError, self.CryptoBox.CryptoBoxProps,self.filenames["configFileBroken"])
self.writeConfig("CryptoBoxRootActions", "CryptoBoxRootActions = /bin/false", filename=self.filenames["configFileBroken"])
self.assertRaises(CBEnvironmentError, self.CryptoBox.CryptoBoxProps,self.filenames["configFileBroken"])
def writeConfig(self, replace=None, newline=None, filename=None):
"""write a config file and (optional) replace a line in it"""
import re
if not filename: filename = self.filenames["configFileOK"]
content = self.configContentOK % (self.tmpdirname, self.tmpdirname, self.tmpdirname)
if replace:
pattern = re.compile('^' + replace + '\\s*=.*$', flags=re.M)
content = re.sub(pattern, newline, content)
cf = open(filename, "w")
cf.write(content)
cf.close()
if __name__ == "__main__":
unittest.main()

View file

@ -0,0 +1,109 @@
import unittest
import twill
import cherrypy
import WebInterfaceSites
from twill import get_browser
## this makes assertRaises shorter
from twill.errors import *
from mechanize import BrowserStateError, LinkNotFoundError
## we do the following, for easy surfing
## e.g. use: cbx.go(your_url)
## commands api: http://twill.idyll.org/commands.html
cbx = twill.commands
CBXHOST="localhost"
CBXPORT=8080
CBX="http://"+CBXHOST+":"+str(CBXPORT)
class WebserverTests(unittest.TestCase):
'''this class checks the webserver, using "twill"
the tests in this class are from the browsers point of view, so not
really unittests.
fetch twill from: http://twill.idyll.org
one way to manually run twill code is through the python
interpreter commandline e.g.:
import twill
twill.shell.main()
go http://localhost:8080
find "my very special html content"
help
'''
def setUp(self):
'''configures the cherrypy server that it works nice with twill
'''
cherrypy.config.update({
'server.logToScreen' : False,
'autoreload.on': False,
'server.threadPool': 1,
'server.environment': 'production',
})
cherrypy.root = WebInterfaceSites.WebInterfaceSites()
cherrypy.server.start(initOnly=True, serverClass=None)
from cherrypy._cpwsgi import wsgiApp
twill.add_wsgi_intercept(CBXHOST, CBXPORT, lambda: wsgiApp)
# grab the output of twill commands
self.output = open("/tmp/twill.log","a")
twill.set_output(self.output)
def tearDown(self):
'''clean up the room when leaving'''
# remove intercept.
twill.remove_wsgi_intercept(CBXHOST, CBXPORT)
# shut down the cherrypy server.
cherrypy.server.stop()
self.output.close()
########################################
######## the tests start here ##########
def test_is_server_running(self):
'''the server should run under given name and port'''
cbx.go(CBX)
# wrong port should fail
self.assertRaises(BrowserStateError, cbx.go, "http://"+CBXHOST+":"+str(CBXPORT+1))
# wrong hostname too
self.assertRaises(BrowserStateError, cbx.go, "http://localhorst:"+str(CBXPORT))
def test_helppages(self):
'''helpsites should be availbale in different languages'''
cbx.go(CBX)
cbx.go("doc")
cbx.find("Table of Contents")
cbx.go("doc?weblang=en")
cbx.find("Table of Contents")
cbx.find("Getting started")
self.assertRaises(TwillAssertionError, cbx.notfind, "Table of Contents")
cbx.go("doc?weblang=de")
cbx.find("Table of Contents")
cbx.find("Wie geht es los")
cbx.go("doc?weblang=si")
#TODO: if weblang=si no help is displayed at all
self.assertRaises(TwillAssertionError, cbx.notfind, "Table of Contents")
def test_goto_status(self):
cbx.go(CBX)
cbx.go("status")
cbx.find("Status")
def test_goto_system(self):
cbx.go(CBX)
cbx.go("system")
cbx.find("System")
cbx.notfind("Sstem")
self.assertRaises(TwillAssertionError, cbx.find, "Sstem")
self.assertRaises(TwillAssertionError, cbx.notfind, "System")
if __name__ == "__main__":
unittest.main()