moved pythonrewrite branch to trunk

This commit is contained in:
lars 2006-11-06 16:05:00 +00:00
parent 214959e851
commit 414389951e
230 changed files with 56027 additions and 1620 deletions

View file

@ -532,7 +532,7 @@ if ( ! &check_ssl()) {
if ($device eq '') {
&debug_msg(DEBUG_INFO, "invalid device: " . $query->param('device'));
$pagedata->setValue('Data.Warning', 'InvalidDevice');
$pagedata->setValue('Data.Action', 'empty');
$pagedata->setValue('Data.Action', 'emptu');
} elsif ( ! &check_config()) {
$pagedata->setValue('Data.Warning', 'NotInitialized');
$pagedata->setValue('Data.Action', 'form_init');

276
bin/CryptoBox.py Executable file
View file

@ -0,0 +1,276 @@
#!/usr/bin/env python2.4
'''
This is the web interface for a fileserver managing encrypted filesystems.
It was originally written in bash/perl. Now a complete rewrite is in
progress. So things might be confusing here. Hopefully not for long.
:)
'''
# check python version
import sys
(ver_major, ver_minor, ver_sub, ver_desc, ver_subsub) = sys.version_info
if (ver_major < 2) or ((ver_major == 2) and (ver_minor < 4)):
sys.stderr.write("You need a python version >= 2.4\nCurrent version is:\n %s\n" % sys.version)
sys.exit(1)
import CryptoBoxContainer
from CryptoBoxExceptions import *
import re
import os
import CryptoBoxTools
import subprocess
class CryptoBox:
'''this class rules them all!
put things like logging, conf and oter stuff in here,
that might be used by more classes, it will be passed on to them'''
VERSION = "0.3~1"
def __init__(self, config_file=None):
import CryptoBoxSettings
self.log = self.__getStartupLogger()
self.prefs = CryptoBoxSettings.CryptoBoxSettings(config_file)
self.__runTests()
def __getStartupLogger(self):
import logging
'''initialises the logging system
use it with: 'self.log.[debug|info|warning|error|critical](logmessage)'
all classes should get the logging instance during __init__:
self.log = logging.getLogger("CryptoBox")
first we output all warnings/errors to stderr
as soon as we opened the config file successfully, we redirect debug output
to the configured destination'''
## basicConfig(...) needs python >= 2.4
try:
log_handler = logging.getLogger("CryptoBox")
logging.basicConfig(
format='%(asctime)s CryptoBox %(levelname)s: %(message)s',
stderr=sys.stderr)
log_handler.setLevel(logging.ERROR)
log_handler.info("loggingsystem is up'n running")
## from now on everything can be logged via self.log...
except:
raise CBEnvironmentError("couldn't initialise the loggingsystem. I give up.")
return log_handler
# do some initial checks
def __runTests(self):
self.__runTestUID()
self.__runTestRootPriv()
def __runTestUID(self):
if os.geteuid() == 0:
raise CBEnvironmentError("you may not run the cryptobox as root")
def __runTestRootPriv(self):
"""try to run 'super' with 'CryptoBoxRootActions'"""
try:
devnull = open(os.devnull, "w")
except IOError:
raise CBEnvironmentError("could not open %s for writing!" % os.devnull)
try:
prog_super = self.prefs["Programs"]["super"]
except KeyError:
raise CBConfigUndefinedError("Programs", "super")
try:
prog_rootactions = self.prefs["Programs"]["CryptoBoxRootActions"]
except KeyError:
raise CBConfigUndefinedError("Programs", "CryptoBoxRootActions")
try:
proc = subprocess.Popen(
shell = False,
stdout = devnull,
stderr = devnull,
args = [prog_super, prog_rootactions, "check"])
except OSError:
raise CBEnvironmentError("failed to execute 'super' (%s)" % self.prefs["Programs"]["super"])
proc.wait()
if proc.returncode != 0:
raise CBEnvironmentError("failed to call CryptoBoxRootActions (%s) via 'super' - maybe you did not add the appropriate line to /etc/super.tab?" % prog_rootactions)
# this method just demonstrates inheritance effects - may be removed
def cbx_inheritance_test(self, string="you lucky widow"):
self.log.info(string)
# RFC: why should CryptoBoxProps inherit CryptoBox? [l]
# RFC: shouldn't we move all useful functions of CryptoBoxProps to CryptoBox? [l]
class CryptoBoxProps(CryptoBox):
'''Get and set the properties of a CryptoBox
This class contains all available devices that may be accessed.
All properties of the cryptobox can be accessed by this class.
'''
def __init__(self, config_file=None):
'''read config and fill class variables'''
CryptoBox.__init__(self, config_file)
self.reReadContainerList()
def reReadContainerList(self):
self.log.debug("rereading container list")
self.containers = []
for device in CryptoBoxTools.getAvailablePartitions():
if self.isDeviceAllowed(device) and not self.isConfigPartition(device):
self.containers.append(CryptoBoxContainer.CryptoBoxContainer(device, self))
## sort by container name
self.containers.sort(cmp = lambda x,y: x.getName() < y.getName() and -1 or 1)
def isConfigPartition(self, device):
proc = subprocess.Popen(
shell = False,
stdout = subprocess.PIPE,
args = [
self.prefs["Programs"]["blkid"],
"-c", os.path.devnull,
"-o", "value",
"-s", "LABEL",
device])
(output, error) = proc.communicate()
return output.strip() == self.prefs["Main"]["ConfigVolumeLabel"]
def isDeviceAllowed(self, devicename):
"check if a device is white-listed for being used as cryptobox containers"
import types
allowed = self.prefs["Main"]["AllowedDevices"]
if type(allowed) == types.StringType: allowed = [allowed]
for a_dev in allowed:
"remove double dots and so on ..."
real_device = os.path.realpath(devicename)
if a_dev and re.search('^' + a_dev, real_device): return True
return False
def getLogData(self, lines=None, maxSize=None):
"""get the most recent log entries of the cryptobox
the maximum number and size of these entries can be limited by 'lines' and 'maxSize'
"""
# return nothing if the currently selected log output is not a file
try:
if self.prefs["Log"]["Destination"].upper() != "FILE": return []
log_file = self.prefs["Log"]["Details"]
except KeyError:
self.log.error("could not evaluate one of the following config settings: [Log]->Destination or [Log]->Details")
return []
try:
fd = open(log_file, "r")
if maxSize: fd.seek(-maxSize, 2) # seek relative to the end of the file
content = fd.readlines()
fd.close()
except IOError:
self.log.warn("failed to read the log file (%s)" % log_file)
return []
if lines: content = content[-lines:]
content.reverse()
return content
def getContainerList(self, filterType=None, filterName=None):
"retrieve the list of all containers of this cryptobox"
try:
result = self.containers[:]
if filterType != None:
if filterType in range(len(CryptoBoxContainer.Types)):
return [e for e in self.containers if e.getType() == filterType]
else:
self.log.info("invalid filterType (%d)" % filterType)
result.clear()
if filterName != None:
result = [e for e in self.containers if e.getName() == filterName]
return result
except AttributeError:
return []
def getContainer(self, device):
"retrieve the container element for this device"
all = [e for e in self.getContainerList() if e.device == device]
if all:
return all[0]
else:
return None
def setNameForUUID(self, uuid, name):
"assign a name to a uuid in the ContainerNameDatabase"
used_uuid = self.getUUIDForName(name)
"first remove potential conflicting uuid/name combination"
if used_uuid:
## remember the container which name was overriden
for e in self.containers:
if e.getName() == name:
forcedRename = e
break
del self.prefs.nameDB[used_uuid]
self.prefs.nameDB[uuid] = name
self.prefs.nameDB.write()
## rename the container that lost its name (necessary while we use cherrypy)
if used_uuid:
## this is surely not the best way to regenerate the name
dev = e.getDevice()
old_index = self.containers.index(e)
self.containers.remove(e)
self.containers.insert(old_index, CryptoBoxContainer.CryptoBoxContainer(dev,self))
## there should be no reason for any failure
return True
def getNameForUUID(self, uuid):
"get the name belonging to a specified key (usually the UUID of a fs)"
try:
return self.prefs.nameDB[uuid]
except KeyError:
return None
def getUUIDForName(self, name):
""" get the key belonging to a value in the ContainerNameDatabase
this is the reverse action of 'getNameForUUID' """
for key in self.prefs.nameDB.keys():
if self.prefs.nameDB[key] == name: return key
"the uuid was not found"
return None
def removeUUID(self, uuid):
if uuid in self.prefs.nameDB.keys():
del self.prefs.nameDB[uuid]
return True
else:
return False
def getAvailableLanguages(self):
'''reads all files in path LangDir and returns a list of
basenames from existing hdf files, that should are all available
languages'''
languages = [ f.rstrip(".hdf")
for f in os.listdir(self.prefs["Locations"]["LangDir"])
if f.endswith(".hdf") ]
if len(languages) < 1:
self.log.error("No .hdf files found! The website won't render properly.")
return languages
if __name__ == "__main__":
cb = CryptoBoxProps()

607
bin/CryptoBoxContainer.py Executable file
View file

@ -0,0 +1,607 @@
#!/usr/bin/env python2.4
## check python version
import sys
(ver_major, ver_minor, ver_sub, ver_desc, ver_subsub) = sys.version_info
if (ver_major < 2) or ((ver_major == 2) and (ver_minor < 4)):
sys.stderr.write("You need a python version >= 2.4\nCurrent version is:\n %s\n" % sys.version)
sys.exit(1)
import subprocess
import os
import re
import logging
from CryptoBoxExceptions import *
"""exceptions:
VolumeIsActive
NameActivelyUsed
InvalidName
InvalidPassword
InvalidType
CreateError
MountError
ChangePasswordError
"""
class CryptoBoxContainer:
Types = {
"unused":0,
"plain":1,
"luks":2,
"swap":3}
__fsTypes = {
"plain":["ext3", "ext2", "vfat", "reiser"],
"swap":["swap"]}
# TODO: more filesystem types? / check 'reiser'
__dmDir = "/dev/mapper"
def __init__(self, device, cbox):
self.device = device
self.cbox = cbox
self.log = logging.getLogger("CryptoBox")
self.resetObject()
def getName(self):
return self.name
def setName(self, new_name):
if new_name == self.name: return
if self.isMounted():
raise CBVolumeIsActive("the container must be inactive during renaming")
if not re.search(r'^[a-zA-Z0-9_\.\- ]+$', new_name):
raise CBInvalidName("the supplied new name contains illegal characters")
"check for active partitions with the same name"
prev_name_owner = self.cbox.getContainerList(filterName=new_name)
if prev_name_owner:
for a in prev_name_owner:
if a.isMounted():
raise CBNameActivelyUsed("the supplied new name is already in use for an active partition")
if not self.cbox.setNameForUUID(self.uuid, new_name):
raise CBContainerError("failed to change the volume name for unknown reasons")
self.name = new_name
def getDevice(self):
return self.device
def getType(self):
return self.type
def isMounted(self):
return os.path.ismount(self.__getMountPoint())
def getCapacity(self):
"""return the current capacity state of the volume
the volume may not be mounted
the result is a tuple of values in megabyte:
(size, available, used)
"""
info = os.statvfs(self.__getMountPoint())
return (
int(info.f_bsize*info.f_blocks/1024/1024),
int(info.f_bsize*info.f_bavail/1024/1024),
int(info.f_bsize*(info.f_blocks-info.f_bavail)/1024/1024))
def getSize(self):
"""return the size of the block device (_not_ of the filesystem)
the result is a value in megabyte
an error is indicated by "-1"
"""
import CryptoBoxTools
return CryptoBoxTools.getBlockDeviceSize(self.device)
def resetObject(self):
""" recheck the information about this container
this is especially useful after changing the type via 'create' """
self.uuid = self.__getUUID()
self.type = self.__getTypeOfPartition()
self.name = self.__getNameOfContainer()
if self.type == self.Types["luks"]:
self.mount = self.__mountLuks
self.umount = self.__umountLuks
elif self.type == self.Types["plain"]:
self.mount = self.__mountPlain
self.umount = self.__umountPlain
def create(self, type, password=None):
old_name = self.getName()
if type == self.Types["luks"]:
self.__createLuks(password)
elif type == self.Types["plain"]:
self.__createPlain()
else:
raise CBInvalidType("invalid container type (%d) supplied" % (type, ))
## no exception was raised during creation -> we can continue
## reset the properties (encryption state, ...) of the device
self.resetObject()
## restore the old name (must be after resetObject)
self.setName(old_name)
def changePassword(self, oldpw, newpw):
if self.type != self.Types["luks"]:
raise CBInvalidType("changing of password is possible only for luks containers")
if not oldpw:
raise CBInvalidPassword("no old password supplied for password change")
if not newpw:
raise CBInvalidPassword("no new password supplied for password change")
"return if new and old passwords are the same"
if oldpw == newpw: return
if self.isMounted():
raise CBVolumeIsActive("this container is currently active")
devnull = None
try:
devnull = open(os.devnull, "w")
except IOError:
self.log.warn("Could not open %s" % (os.devnull, ))
"remove any potential open luks mapping"
self.__umountLuks()
"create the luks header"
proc = subprocess.Popen(
shell = False,
stdin = subprocess.PIPE,
stdout = subprocess.PIPE,
stderr = subprocess.PIPE,
args = [
self.cbox.prefs["Programs"]["super"],
self.cbox.prefs["Programs"]["CryptoBoxRootActions"],
"cryptsetup",
"luksAddKey",
self.device,
"--batch-mode"])
proc.stdin.write("%s\n%s" % (oldpw, newpw))
(output, errout) = proc.communicate()
if proc.returncode != 0:
errorMsg = "Could not add a new luks key: %s - %s" % (output.strip(), errout.strip(), )
self.log.error(errorMsg)
raise CBChangePasswordError(errorMsg)
## retrieve the key slot we used for unlocking
keys_found = re.search(r'key slot (\d{1,3}) unlocked', output).groups()
if keys_found:
keyslot = int(keys_found[0])
else:
raise CBChangePasswordError("could not get the old key slot")
"remove the old key"
proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = devnull,
stderr = subprocess.PIPE,
args = [
self.cbox.prefs["Programs"]["cryptsetup"],
"--batch-mode",
"luksDelKey",
self.device,
"%d" % (keyslot, )])
proc.wait()
if proc.returncode != 0:
errorMsg = "Could not remove the old luks key: %s" % (proc.stderr.read().strip(), )
self.log.error(errorMsg)
raise CBChangePasswordError(errorMsg)
" ****************** internal stuff ********************* "
def __getNameOfContainer(self):
"retrieve the name of the container by querying the database"
def_name = self.cbox.getNameForUUID(self.uuid)
if def_name: return def_name
"there is no name defined for this uuid - we will propose a good one"
prefix = self.cbox.prefs["Main"]["DefaultVolumePrefix"]
unused_found = False
counter = 1
while not unused_found:
guess = prefix + str(counter)
if self.cbox.getUUIDForName(guess):
counter += 1
else:
unused_found = True
self.cbox.setNameForUUID(self.uuid, guess)
return guess
def __getUUID(self):
if self.__getTypeOfPartition() == self.Types["luks"]:
guess = self.__getLuksUUID()
else:
guess = self.__getNonLuksUUID()
## did we get a valid value?
if guess:
return guess
else:
## emergency default value
return self.device.replace(os.path.sep, "_")
def __getLuksUUID(self):
"""get uuid for luks devices"""
proc = subprocess.Popen(
shell = False,
stdout = subprocess.PIPE,
stderr = subprocess.PIPE,
args = [self.cbox.prefs["Programs"]["cryptsetup"],
"luksUUID",
self.device])
(stdout, stderr) = proc.communicate()
if proc.returncode != 0:
self.cbox.log.info("could not retrieve luks uuid (%s): %s", (self.device, stderr.strip()))
return None
return stdout.strip()
def __getNonLuksUUID(self):
"""return UUID for ext2/3 and vfat filesystems"""
try:
devnull = open(os.devnull, "w")
except IOError:
self.warn("Could not open %s" % (os.devnull, ))
proc = subprocess.Popen(
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
args=[self.cbox.prefs["Programs"]["blkid"],
"-s", "UUID",
"-o", "value",
"-c", os.devnull,
"-w", os.devnull,
self.device])
(stdout, stderr) = proc.communicate()
devnull.close()
## execution failed?
if proc.returncode != 0:
self.log.info("retrieving of partition type (%s) via 'blkid' failed: %s - maybe it is encrypted?" % (self.device, stderr.strip()))
return None
## return output of blkid
return stdout.strip()
def __getTypeOfPartition(self):
"retrieve the type of the given partition (see CryptoBoxContainer.Types)"
if self.__isLuksPartition(): return self.Types["luks"]
typeOfPartition = self.__getTypeIdOfPartition()
if typeOfPartition in self.__fsTypes["plain"]:
return self.Types["plain"]
if typeOfPartition in self.__fsTypes["swap"]:
return self.Types["swap"]
return self.Types["unused"]
def __getTypeIdOfPartition(self):
"returns the type of the partition (see 'man blkid')"
devnull = None
try:
devnull = open(os.devnull, "w")
except IOError:
self.log.warn("Could not open %s" % (os.devnull, ))
proc = subprocess.Popen(
shell=False,
stdin=None,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
args=[self.cbox.prefs["Programs"]["blkid"],
"-s", "TYPE",
"-o", "value",
"-c", os.devnull,
"-w", os.devnull,
self.device])
proc.wait()
output = proc.stdout.read().strip()
if proc.returncode != 0:
self.log.warn("retrieving of partition type via 'blkid' failed: %s" % (proc.stderr.read().strip(), ))
return None
devnull.close()
return output
def __isLuksPartition(self):
"check if the given device is a luks partition"
devnull = None
try:
devnull = open(os.devnull, "w")
except IOError:
self.log.warn("Could not open %s" % (os.devnull, ))
proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = devnull,
stderr = devnull,
args = [
self.cbox.prefs["Programs"]["cryptsetup"],
"--batch-mode",
"isLuks",
self.device])
proc.wait()
devnull.close()
return proc.returncode == 0
def __getMountPoint(self):
"return the name of the mountpoint of this volume"
return os.path.join(self.cbox.prefs["Locations"]["MountParentDir"], self.name)
def __mountLuks(self, password):
"mount a luks partition"
if not password:
raise CBInvalidPassword("no password supplied for luksOpen")
if self.isMounted(): raise CBVolumeIsActive("this container is already active")
self.__umountLuks()
try:
devnull = open(os.devnull, "w")
except IOError:
self.log.warn("Could not open %s" % (os.devnull, ))
self.__cleanMountDirs()
if not os.path.exists(self.__getMountPoint()):
os.mkdir(self.__getMountPoint())
if not os.path.exists(self.__getMountPoint()):
errorMsg = "Could not create mountpoint (%s)" % (self.__getMountPoint(), )
self.log.error(errorMsg)
raise CBMountError(errorMsg)
proc = subprocess.Popen(
shell = False,
stdin = subprocess.PIPE,
stdout = devnull,
stderr = subprocess.PIPE,
args = [
self.cbox.prefs["Programs"]["super"],
self.cbox.prefs["Programs"]["CryptoBoxRootActions"],
"cryptsetup",
"luksOpen",
self.device,
self.name,
"--batch-mode"])
proc.stdin.write(password)
(output, errout) = proc.communicate()
if proc.returncode != 0:
errorMsg = "Could not open the luks mapping: %s" % (errout.strip(), )
self.log.warn(errorMsg)
raise CBMountError(errorMsg)
proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = devnull,
stderr = subprocess.PIPE,
args = [
self.cbox.prefs["Programs"]["super"],
self.cbox.prefs["Programs"]["CryptoBoxRootActions"],
"mount",
os.path.join(self.__dmDir, self.name),
self.__getMountPoint()])
proc.wait()
if proc.returncode != 0:
errorMsg = "Could not mount the filesystem: %s" % (proc.stderr.read().strip(), )
self.log.warn(errorMsg)
raise CBMountError(errorMsg)
devnull.close()
def __umountLuks(self):
"umount a luks partition"
devnull = None
try:
devnull = open(os.devnull, "w")
except IOError:
self.log.warn("Could not open %s" % (os.devnull, ))
if self.isMounted():
proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = devnull,
stderr = subprocess.PIPE,
args = [
self.cbox.prefs["Programs"]["super"],
self.cbox.prefs["Programs"]["CryptoBoxRootActions"],
"umount",
self.__getMountPoint()])
proc.wait()
if proc.returncode != 0:
errorMsg = "Could not umount the filesystem: %s" % (proc.stderr.read().strip(), )
self.log.warn(errorMsg)
raise CBUmountError(errorMsg)
if os.path.exists(os.path.join(self.__dmDir, self.name)):
proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = devnull,
stderr = subprocess.PIPE,
args = [
self.cbox.prefs["Programs"]["super"],
self.cbox.prefs["Programs"]["CryptoBoxRootActions"],
"cryptsetup",
"luksClose",
self.name,
"--batch-mode"])
proc.wait()
if proc.returncode != 0:
errorMsg = "Could not remove the luks mapping: %s" % (proc.stderr.read().strip(), )
self.log.warn(errorMsg)
raise CBUmountError(errorMsg)
devnull.close()
def __mountPlain(self):
"mount a plaintext partition"
if self.isMounted(): raise CBVolumeIsActive("this container is already active")
devnull = None
try:
devnull = open(os.devnull, "w")
except IOError:
self.log.warn("Could not open %s" % (os.devnull, ))
self.__cleanMountDirs()
if not os.path.exists(self.__getMountPoint()):
os.mkdir(self.__getMountPoint())
if not os.path.exists(self.__getMountPoint()):
errorMsg = "Could not create mountpoint (%s)" % (self.__getMountPoint(), )
self.log.error(errorMsg)
raise CBMountError(errorMsg)
proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = devnull,
stderr = subprocess.PIPE,
args = [
self.cbox.prefs["Programs"]["super"],
self.cbox.prefs["Programs"]["CryptoBoxRootActions"],
"mount",
self.device,
self.__getMountPoint()])
proc.wait()
if proc.returncode != 0:
errorMsg = "Could not mount the filesystem: %s" % (proc.stderr.read().strip(), )
self.log.warn(errorMsg)
raise CBMountError(errorMsg)
devnull.close()
def __umountPlain(self):
"umount a plaintext partition"
devnull = None
try:
devnull = open(os.devnull, "w")
except IOError:
self.log.warn("Could not open %s" % (os.devnull, ))
if self.isMounted():
proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = devnull,
stderr = subprocess.PIPE,
args = [
self.cbox.prefs["Programs"]["super"],
self.cbox.prefs["Programs"]["CryptoBoxRootActions"],
"umount",
self.__getMountPoint()])
proc.wait()
if proc.returncode != 0:
errorMsg = "Could not umount the filesystem: %s" % (proc.stderr.read().strip(), )
self.log.warn(errorMsg)
raise CBUmountError(errorMsg)
devnull.close()
def __createPlain(self):
"make a plaintext partition"
if self.isMounted():
raise CBVolumeIsActive("deactivate the partition before filesystem initialization")
devnull = None
try:
devnull = open(os.devnull, "w")
except IOError:
self.log.warn("Could not open %s" % (os.devnull, ))
proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = devnull,
stderr = subprocess.PIPE,
args = [
self.cbox.prefs["Programs"]["mkfs-data"],
self.device])
proc.wait()
if proc.returncode != 0:
errorMsg = "Could not create the filesystem: %s" % (proc.stderr.read().strip(), )
self.log.error(errorMsg)
raise CBCreateError(errorMsg)
devnull.close()
def __createLuks(self, password):
"make a luks partition"
if not password:
raise CBInvalidPassword("no password supplied for new luks mapping")
if self.isMounted():
raise CBVolumeIsActive("deactivate the partition before filesystem initialization")
devnull = None
try:
devnull = open(os.devnull, "w")
except IOError:
self.log.warn("Could not open %s" % (os.devnull, ))
"remove any potential open luks mapping"
self.__umountLuks()
"create the luks header"
proc = subprocess.Popen(
shell = False,
stdin = subprocess.PIPE,
stdout = devnull,
stderr = subprocess.PIPE,
args = [
self.cbox.prefs["Programs"]["super"],
self.cbox.prefs["Programs"]["CryptoBoxRootActions"],
"cryptsetup",
"luksFormat",
self.device,
"--batch-mode",
"--cipher", self.cbox.prefs["Main"]["DefaultCipher"],
"--iter-time", "2000"])
proc.stdin.write(password)
(output, errout) = proc.communicate()
if proc.returncode != 0:
errorMsg = "Could not create the luks header: %s" % (errout.strip(), )
self.log.error(errorMsg)
raise CBCreateError(errorMsg)
"open the luks container for mkfs"
proc = subprocess.Popen(
shell = False,
stdin = subprocess.PIPE,
stdout = devnull,
stderr = subprocess.PIPE,
args = [
self.cbox.prefs["Programs"]["super"],
self.cbox.prefs["Programs"]["CryptoBoxRootActions"],
"cryptsetup",
"luksOpen",
self.device,
self.name,
"--batch-mode"])
proc.stdin.write(password)
(output, errout) = proc.communicate()
if proc.returncode != 0:
errorMsg = "Could not open the new luks mapping: %s" % (errout.strip(), )
self.log.error(errorMsg)
raise CBCreateError(errorMsg)
"make the filesystem"
proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = devnull,
stderr = subprocess.PIPE,
args = [
self.cbox.prefs["Programs"]["mkfs-data"],
os.path.join(self.__dmDir, self.name)])
proc.wait()
"remove the mapping - for every exit status"
self.__umountLuks()
if proc.returncode != 0:
errorMsg = "Could not create the filesystem: %s" % (proc.stderr.read().strip(), )
self.log.error(errorMsg)
"remove the luks mapping"
raise CBCreateError(errorMsg)
devnull.close()
def __cleanMountDirs(self):
""" remove all unnecessary subdirs of the mount parent directory
this should be called for every (u)mount """
subdirs = os.listdir(self.cbox.prefs["Locations"]["MountParentDir"])
for dir in subdirs:
abs_dir = os.path.join(self.cbox.prefs["Locations"]["MountParentDir"], dir)
if (not os.path.islink(abs_dir)) and os.path.isdir(abs_dir) and (not os.path.ismount(abs_dir)):
os.rmdir(abs_dir)

107
bin/CryptoBoxExceptions.py Normal file
View file

@ -0,0 +1,107 @@
"""
exceptions of the cryptobox package
"""
class CryptoBoxError(Exception):
"""base class for exceptions of the cryptobox"""
pass
class CBConfigError(CryptoBoxError):
"""any kind of error related to the configuration of a cryptobox"""
pass
class CBConfigUnavailableError(CBConfigError):
"""config file/input was not available at all"""
def __init__(self, source=None):
self.source = source
def __str__(self):
if self.source:
return "failed to access the configuration of the cryptobox: %s" % self.source
else:
return "failed to access the configuration of the cryptobox"
class CBConfigUndefinedError(CBConfigError):
"""a specific configuration setting was not defined"""
def __init__(self, section, name=None):
self.section = section
self.name = name
def __str__(self):
# is it a settings or a section?
if self.name:
# setting
return "undefined configuration setting: [%s]->%s - please check your configuration file" % (self.section, self.name)
else:
# section
return "undefined configuration section: [%s] - please check your configuration file" % (self.section, )
class CBConfigInvalidValueError(CBConfigError):
"""a configuration setting was invalid somehow"""
def __init__(self, section, name, value, reason):
self.section = section
self.name = name
self.value = value
self.reason = reason
def __str__(self):
return "invalid configuration setting [%s]->%s (%s): %s" % (self.section, self.name, self.value, self.reason)
class CBEnvironmentError(CryptoBoxError):
"""some part of the environment of the cryptobox is broken
e.g. the wrong version of a required program
"""
def __init__(self, desc):
self.desc = desc
def __str__(self):
return "misconfiguration detected: %s" % self.desc
class CBContainerError(CryptoBoxError):
"""any error raised while manipulating a cryptobox container"""
def __init__(self, desc):
self.desc = desc
def __str__(self):
return self.desc
class CBCreateError(CBContainerError):
pass
class CBVolumeIsActive(CBContainerError):
pass
class CBInvalidName(CBContainerError):
pass
class CBNameActivelyUsed(CBContainerError):
pass
class CBInvalidType(CBContainerError):
pass
class CBInvalidPassword(CBContainerError):
pass
class CBChangePasswordError(CBContainerError):
pass
class CBMountError(CBContainerError):
pass
class CBUmountError(CBContainerError):
pass

165
bin/CryptoBoxPlugin.py Normal file
View file

@ -0,0 +1,165 @@
# $Id$
#
# parent class for all plugins of the CryptoBox
#
import os
import cherrypy
class CryptoBoxPlugin:
## default capability is "system" - the other supported capability is: "volume"
pluginCapabilities = [ "system" ]
## does this plugin require admin authentification?
requestAuth = False
## is this plugin enabled by default?
enabled = True
## default rank (0..100) of the plugin in listings (lower value means higher priority)
rank = 80
## default icon of this plugin (relative path)
defaultIconFileName = "plugin_icon.png"
def __init__(self, cbox, pluginDir):
self.cbox = cbox
self.hdf = {}
self.pluginDir = pluginDir
self.hdf_prefix = "Data.Plugins.%s." % self.getName()
def doAction(self, **args):
"""override doAction with your plugin code"""
raise Exception, "undefined action handler ('doAction') in plugin '%'" % self.getName()
def getStatus(self):
"""you should override this, to supply useful state information"""
raise Exception, "undefined state handler ('getStatus') in plugin '%'" % self.getName()
def getName(self):
"""the name of the python file (module) should be the name of the plugin"""
return self.__module__
@cherrypy.expose
def getIcon(self, image=None, **kargs):
"""return the image data of the icon of the plugin
the parameter 'image' may be used for alternative image locations (relative
to the directory of the plugin)
'**kargs' is necessary, as a 'weblang' attribute may be specified (and ignored)"""
import cherrypy, re
if (image is None): # or (re.search(u'[\w-\.]', image)):
plugin_icon_file = os.path.join(self.pluginDir, self.defaultIconFileName)
else:
plugin_icon_file = os.path.join(self.pluginDir, image)
if not os.access(plugin_icon_file, os.R_OK):
plugin_icon_file = os.path.join(self.cbox.prefs["Locations"]["PluginDir"], "plugin_icon_unknown.png")
return cherrypy.lib.cptools.serveFile(plugin_icon_file)
def getTemplateFileName(self, template_name):
"""return the filename of the template, if it is part of this plugin
use this function to check, if the plugin provides the specified template
"""
result_file = os.path.join(self.pluginDir, template_name + ".cs")
if os.access(result_file, os.R_OK) and os.path.isfile(result_file):
return result_file
else:
return None
def getLanguageData(self, lang="en"):
try:
import neo_cgi, neo_util
except:
raise CryptoBoxExceptions.CBEnvironmentError("couldn't import 'neo_*'! Try 'apt-get install python-clearsilver'.")
langdir = os.path.abspath(os.path.join(self.pluginDir, "lang"))
## first: the default language file (english)
langFiles = [os.path.join(langdir, "en.hdf")]
## maybe we have to load a translation afterwards
if lang != "en":
langFiles.append(os.path.join(langdir, lang + ".hdf"))
file_found = False
lang_hdf = neo_util.HDF()
for langFile in langFiles:
if os.access(langFile, os.R_OK):
lang_hdf.readFile(langFile)
file_found = True
if file_found:
return lang_hdf
else:
self.cbox.log.debug("Couldn't find a valid plugin language file (%s)" % str(langFiles))
return None
def loadDataSet(self, hdf):
for (key, value) in self.hdf.items():
hdf.setValue(key, str(value))
def isAuthRequired(self):
"""check if this plugin requires authentication
first step: check plugin configuration
second step: check default value of plugin"""
try:
if self.cbox.prefs.pluginConf[self.getName()]["requestAuth"] is None:
return self.requestAuth
if self.cbox.prefs.pluginConf[self.getName()]["requestAuth"]:
return True
else:
return False
except KeyError:
return self.requestAuth
def isEnabled(self):
"""check if this plugin is enabled
first step: check plugin configuration
second step: check default value of plugin"""
import types
try:
if self.cbox.prefs.pluginConf[self.getName()]["enabled"] is None:
return self.enabled
if self.cbox.prefs.pluginConf[self.getName()]["enabled"]:
return True
else:
return False
except KeyError:
return self.enabled
def getRank(self):
"""check the rank of this plugin
first step: check plugin configuration
second step: check default value of plugin"""
try:
if self.cbox.prefs.pluginConf[self.getName()]["rank"] is None:
return self.rank
return int(self.cbox.prefs.pluginConf[self.getName()]["rank"])
except KeyError, TypeError:
return self.rank
def getTestClass(self):
import imp
pl_file = os.path.join(self.pluginDir, "unittests.py")
if os.access(pl_file, os.R_OK) and os.path.isfile(pl_file):
try:
return getattr(imp.load_source("unittests_%s" % self.getName(), pl_file), "unittests")
except AttributeError:
pass
try:
self.cbox.log.info("could not load unittests for plugin: %s" % self.getName())
except AttributeError:
pass
return None

386
bin/CryptoBoxRootActions.py Executable file
View file

@ -0,0 +1,386 @@
#!/usr/bin/env python2.4
"""module for executing the programs, that need root privileges
Syntax:
- program
- device
- [action]
- [action args]
this script will always return with an exitcode 0 (true), if "check" is the only argument
"""
import os
import sys
import subprocess
import pwd
import grp
import types
allowedProgs = {
"sfdisk": "/sbin/sfdisk",
"cryptsetup": "/sbin/cryptsetup",
"mount": "/bin/mount",
"umount": "/bin/umount",
"blkid": "/sbin/blkid",
}
DEV_TYPES = { "pipe":1, "char":2, "dir":4, "block":6, "file":8, "link":10, "socket":12}
def checkIfPluginIsSafe(plugin):
"""check if the plugin and its parents are only writeable for root"""
#FIXME: for now we may skip this test - but users will not like it this way :)
return True
props = os.stat(plugin)
## check if it is owned by non-root
if props.st_uid != 0: return False
## check group-write permission if gid is not zero
if (props.st_gid != 0) and (props.st_mode % 32 / 16 > 0): return False
## check if it is world-writeable
if props.st_mode % 4 / 2 > 0: return False
## are we at root-level (directory-wise)? If yes, then we are ok ...
if plugin == os.path.sep: return True
## check if the parent directory is ok - recursively :)
return checkIfPluginIsSafe(os.path.dirname(os.path.abspath(plugin)))
def checkIfPluginIsValid(plugin):
import imp
try:
x = imp.load_source("cbox_plugin",plugin)
except Exception:
return False
try:
if getattr(x, "PLUGIN_TYPE") == "cryptobox":
return True
else:
return False
except Exception:
return False
def call_plugin(args):
"""check if the plugin may be called - and do it finally ..."""
plugin = os.path.abspath(args[0])
del args[0]
## check existence and excutability
if not os.access(plugin, os.X_OK):
raise Exception, "could not find executable plugin (%s)" % plugin
## check if the plugin (and its parents) are only writeable for root
if not checkIfPluginIsSafe(plugin):
raise Exception, "the plugin (%s) was not safe - check its (and its parents') permissions" % plugin
## check if the plugin is a python program, that is marked as a cryptobox plugin
if not checkIfPluginIsValid(plugin):
raise Exception, "the plugin (%s) is not a correctly marked python script" % plugin
args.insert(0,plugin)
proc = subprocess.Popen(
shell = False,
args = args)
proc.wait()
return proc.returncode == 0
def isWriteable(device, force_dev_type=None):
"""check if the calling user (not root!) has write access to the device/file
the real (not the effictive) user id is used for the check
additionally the permissions of the default groups of the real uid are checked
this check works nicely together with "super", as it changes (by default) only
the effective uid (not the real uid)
"""
# first check, if the device/file exists
if not os.path.exists(device):
return False
# check the type of the device - if necessary
if not force_dev_type is None:
dev_type = os.stat(device).st_mode % 65536 / 4096
if dev_type != force_dev_type: return False
# retrieve the information for the real user id
(trustUserName, trustUID, groupsOfTrustUser) = getUserInfo(os.getuid())
# set the default groups of the caller for the check (restore them later)
savedGroups = os.getgroups()
os.setgroups(groupsOfTrustUser)
# check permissions
result = os.access(device, os.W_OK) and os.access(device, os.R_OK)
# reset the groups of this process
os.setgroups(savedGroups)
return result
def run_cryptsetup(args):
"""execute cryptsetup as root
@args: list of arguments - they will be treated accordingly to the first element
of this list (the action)"""
if not args: raise "WrongArguments", "no action for cryptsetup supplied"
if type(args) != types.ListType: raise "WrongArguments", "invalid arguments supplied: %s" % (args, )
try:
action = args[0]
del args[0]
device = None
cmd_args = []
if action == "luksFormat":
device = args[0]; del args[0]
cmd_args.append(action)
cmd_args.append(device)
elif action == "luksUUID":
device = args[0]; del args[0]
cmd_args.append(action)
cmd_args.append(device)
elif action == "luksOpen":
if len(args) < 2: raise "WrongArguments", "missing arguments"
device = args[0]; del args[0]
destination = args[0]; del args[0]
cmd_args.append(action)
cmd_args.append(device)
cmd_args.append(destination)
elif action == "luksClose":
if len(args) < 1: raise "WrongArguments", "missing arguments"
destination = args[0]; del args[0]
# maybe add a check for the mapped device's permissions?
# dmsetup deps self.device
cmd_args.append(action)
cmd_args.append(destination)
elif action == "luksAddKey":
device = args[0]; del args[0]
cmd_args.append(action)
cmd_args.append(device)
elif action == "luksDelKey":
if len(cs_args) < 2: raise "WrongArguments", "missing arguments"
device = args[0]; del args[0]
cmd_args.insert(-1, action)
cmd_args.insert(-1, device)
elif action == "isLuks":
device = args[0]; del args[0]
cmd_args.append(action)
cmd_args.append(device)
else: raise "WrongArguments", "invalid action supplied: %s" % (action, )
# check if a device was defined - and check it
if (not device is None) and (not isWriteable(device, DEV_TYPES["block"])):
raise "WrongArguments", "%s is not a writeable block device" % (device, )
cs_args = [allowedProgs["cryptsetup"]]
cs_args.extend(args)
cs_args.extend(cmd_args)
except (TypeError, IndexError):
raise "WrongArguments", "invalid arguments supplied: %s" % (args, )
# execute cryptsetup with the given parameters
proc = subprocess.Popen(
shell = False,
args = cs_args)
proc.wait()
## chown the devmapper block device to the cryptobox user
if (proc.returncode == 0) and (action == "luksOpen"):
os.chown(os.path.join(os.path.sep, "dev", "mapper", destination), os.getuid(), os.getgid())
return proc.returncode == 0
def run_sfdisk(args):
"""execute sfdisk for partitioning
not implemented yet"""
print "ok - you are free to call sfdisk ..."
print " not yet implemented ..."
return True
def getFSType(device):
"""get the filesystem type of a device"""
proc = subprocess.Popen(
shell = False,
stdout = subprocess.PIPE,
args = [ allowedProgs["blkid"],
"-s", "TYPE",
"-o", "value",
"-c", os.devnull,
"-w", os.devnull,
device])
(stdout, stderr) = proc.communicate()
if proc.returncode != 0:
return None
return stdout.strip()
def run_mount(args):
"""execute mount
"""
if not args: raise "WrongArguments", "no destination for mount supplied"
if type(args) != types.ListType: raise "WrongArguments", "invalid arguments supplied: %s" % (args, )
try:
device = args[0]
del args[0]
destination = args[0]
del args[0]
# check permissions for the device
if not isWriteable(device, DEV_TYPES["block"]):
raise "WrongArguments", "%s is not a writeable block device" % (device, )
## check permissions for the mountpoint
if not isWriteable(destination, DEV_TYPES["dir"]):
raise "WrongArguments", "the mountpoint (%s) is not writeable" % (destination, )
# check for additional (not allowed) arguments
if len(args) != 0:
raise "WrongArguments", "too many arguments for 'mount': %s" % (args, )
except TypeError:
raise "WrongArguments", "invalid arguments supplied: %s" % (args, )
# execute mount with the given parameters
# first overwrite the real uid, as 'mount' wants this to be zero (root)
savedUID = os.getuid()
os.setuid(os.geteuid())
## we have to change the permissions of the mounted directory - otherwise it will
## not be writeable for the cryptobox user
## for 'vfat' we have to do this during mount
## for ext2/3 we have to do it afterward
## first: get the user/group of the target
(trustUserName, trustUID, groupsOfTrustUser) = getUserInfo(savedUID)
trustGID = groupsOfTrustUser[0]
fsType = getFSType(device)
## define arguments