From 9321677078a62536168738b0130c55c67d98b88c Mon Sep 17 00:00:00 2001
From: lars
Date: Mon, 9 Oct 2006 16:44:35 +0000
Subject: [PATCH] unittests for CryptoBoxTools mount/umount moved to separate
volume plugins volume name change moved to 'volume_props' plugin webinterface
for password change (luks) added attribute "pluginCapabilities" added to
plugins attribute "requestAuth" added to plugins http authentication
implemented (for now: static user database)
---
pythonrewrite/bin/CryptoBox.py | 19 ++
pythonrewrite/bin/CryptoBoxContainer.py | 9 +-
pythonrewrite/bin/CryptoBoxExceptions.py | 3 +
pythonrewrite/bin/CryptoBoxPlugin.py | 6 +
pythonrewrite/bin/CryptoBoxTools.py | 45 ++-
pythonrewrite/bin/WebInterfaceDataset.py | 2 +
pythonrewrite/bin/WebInterfaceSites.py | 310 ++++++------------
pythonrewrite/bin/unittests.CryptoBoxTools.py | 48 +++
.../bin/unittests.CryptoBoxWebserver.py | 0
9 files changed, 226 insertions(+), 216 deletions(-)
create mode 100755 pythonrewrite/bin/unittests.CryptoBoxTools.py
mode change 100644 => 100755 pythonrewrite/bin/unittests.CryptoBoxWebserver.py
diff --git a/pythonrewrite/bin/CryptoBox.py b/pythonrewrite/bin/CryptoBox.py
index 171d546..3c9532b 100755
--- a/pythonrewrite/bin/CryptoBox.py
+++ b/pythonrewrite/bin/CryptoBox.py
@@ -122,6 +122,23 @@ class CryptoBoxProps(CryptoBox):
self.containers.append(CryptoBoxContainer.CryptoBoxContainer(device, self))
## sort by container name
self.containers.sort(cmp = lambda x,y: x.getName() < y.getName() and -1 or 1)
+
+
+ def getConfigPartitions(self):
+ """returns a sequence of found config partitions"""
+ import subprocess
+ proc = subprocess.Popen(
+ shell = False,
+ stdout = subprocess.PIPE,
+ args = [
+ self.prefs["Programs"]["blkid"],
+ "-c", os.path.devnull,
+ "-t", "LABEL=%s" % self.prefs["Main"]["ConfigVolumeLabel"] ])
+ (output, error) = proc.communicate()
+ if output:
+ return [e.strip().split(":",1)[0] for e in output.splitlines()]
+ else:
+ return []
def isConfigPartition(self, device):
@@ -222,6 +239,8 @@ class CryptoBoxProps(CryptoBox):
old_index = self.containers.index(e)
self.containers.remove(e)
self.containers.insert(old_index, CryptoBoxContainer.CryptoBoxContainer(dev,self))
+ ## there should be no reason for any failure
+ return True
def getNameForUUID(self, uuid):
diff --git a/pythonrewrite/bin/CryptoBoxContainer.py b/pythonrewrite/bin/CryptoBoxContainer.py
index 088311b..e0c7c41 100755
--- a/pythonrewrite/bin/CryptoBoxContainer.py
+++ b/pythonrewrite/bin/CryptoBoxContainer.py
@@ -65,7 +65,8 @@ class CryptoBoxContainer:
for a in prev_name_owner:
if a.isMounted():
raise CBNameActivelyUsed("the supplied new name is already in use for an active partition")
- self.cbox.setNameForUUID(self.uuid, new_name)
+ if not self.cbox.setNameForUUID(self.uuid, new_name):
+ raise CBContainerError("failed to change the volume name for unknown reasons")
self.name = new_name
@@ -374,7 +375,7 @@ class CryptoBoxContainer:
if proc.returncode != 0:
errorMsg = "Could not umount the filesystem: %s" % (proc.stderr.read().strip(), )
self.log.warn(errorMsg)
- raise CBMountError(errorMsg)
+ raise CBUmountError(errorMsg)
if os.path.exists(os.path.join(self.__dmDir, self.name)):
proc = subprocess.Popen(
shell = False,
@@ -392,7 +393,7 @@ class CryptoBoxContainer:
if proc.returncode != 0:
errorMsg = "Could not remove the luks mapping: %s" % (proc.stderr.read().strip(), )
self.log.warn(errorMsg)
- raise CBMountError(errorMsg)
+ raise CBUmountError(errorMsg)
devnull.close()
@@ -452,7 +453,7 @@ class CryptoBoxContainer:
if proc.returncode != 0:
errorMsg = "Could not umount the filesystem: %s" % (proc.stderr.read().strip(), )
self.log.warn(errorMsg)
- raise CBMountError(errorMsg)
+ raise CBUmountError(errorMsg)
devnull.close()
diff --git a/pythonrewrite/bin/CryptoBoxExceptions.py b/pythonrewrite/bin/CryptoBoxExceptions.py
index 2ab16ed..743bfcd 100644
--- a/pythonrewrite/bin/CryptoBoxExceptions.py
+++ b/pythonrewrite/bin/CryptoBoxExceptions.py
@@ -102,3 +102,6 @@ class CBChangePasswordError(CBContainerError):
class CBMountError(CBContainerError):
pass
+class CBUmountError(CBContainerError):
+ pass
+
diff --git a/pythonrewrite/bin/CryptoBoxPlugin.py b/pythonrewrite/bin/CryptoBoxPlugin.py
index 13e690e..7f83b32 100644
--- a/pythonrewrite/bin/CryptoBoxPlugin.py
+++ b/pythonrewrite/bin/CryptoBoxPlugin.py
@@ -8,6 +8,12 @@ import os
class CryptoBoxPlugin:
+ ## default capability is "system" - the other supported capability is: "volume"
+ pluginCapabilities = [ "system" ]
+
+ ## does this plugin require admin authentification?
+ requestAuth = False
+
def __init__(self, cbox, pluginDir):
self.cbox = cbox
self.hdf = {}
diff --git a/pythonrewrite/bin/CryptoBoxTools.py b/pythonrewrite/bin/CryptoBoxTools.py
index 5a10ecd..eef0e2f 100644
--- a/pythonrewrite/bin/CryptoBoxTools.py
+++ b/pythonrewrite/bin/CryptoBoxTools.py
@@ -63,8 +63,13 @@ def getAbsoluteDeviceName(shortname):
def findMajorMinorOfDevice(device):
- "return the major/minor numbers of a block device by querying /sys/block/?/dev"
- if not os.path.exists(os.path.join(os.path.sep,"sys","block",device)): return None
+ "return the major/minor numbers of a block device"
+ if re.match("/", device) or not os.path.exists(os.path.join(os.path.sep,"sys","block",device)):
+ ## maybe it is an absolute device name
+ if not os.path.exists(device): return None
+ ## okay - it seems to to a device node
+ rdev = os.stat(device).st_rdev
+ return (os.major(rdev), os.minor(rdev))
blockdev_info_file = os.path.join(os.path.join(os.path.sep,"sys","block", device), "dev")
try:
f_blockdev_info = open(blockdev_info_file, "r")
@@ -81,6 +86,7 @@ def findMajorMinorOfDevice(device):
return None
except IOError:
pass
+ return None
def findMajorMinorDeviceName(dir, major, minor):
@@ -116,3 +122,38 @@ def getParentBlockDevices():
devs.append(p_device)
return map(getAbsoluteDeviceName, devs)
+
+def isPartOfBlockDevice(parent, subdevice):
+ """check if the given block device is a parent of 'subdevice'
+ e.g. for checking if a partition belongs to a block device"""
+ try:
+ (par_major, par_minor) = findMajorMinorOfDevice(parent)
+ (sub_major, sub_minor) = findMajorMinorOfDevice(subdevice)
+ except TypeError:
+ ## at least one of these devices did not return a valid major/minor combination
+ return False
+ ## search the entry below '/sys/block' belonging to the parent
+ root = os.path.join(os.path.sep, 'sys', 'block')
+ for bldev in os.listdir(root):
+ blpath = os.path.join(root, bldev, 'dev')
+ if os.access(blpath, os.R_OK):
+ try:
+ if (str(par_major), str(par_minor)) == tuple([e for e in file(blpath)][0].strip().split(":",1)):
+ parent_path = os.path.join(root, bldev)
+ break
+ except IndexError, OSError:
+ pass
+ else:
+ ## no block device with this major/minor combination found below '/sys/block'
+ return False
+ for subbldev in os.listdir(parent_path):
+ subblpath = os.path.join(parent_path, subbldev, "dev")
+ if os.access(subblpath, os.R_OK):
+ try:
+ if (str(sub_major), str(sub_minor)) == tuple([e for e in file(subblpath)][0].strip().split(":",1)):
+ ## the name of the subdevice node is not important - we found it!
+ return True
+ except IndexError, OSError:
+ pass
+ return False
+
diff --git a/pythonrewrite/bin/WebInterfaceDataset.py b/pythonrewrite/bin/WebInterfaceDataset.py
index f43f9bf..f60319e 100644
--- a/pythonrewrite/bin/WebInterfaceDataset.py
+++ b/pythonrewrite/bin/WebInterfaceDataset.py
@@ -87,5 +87,7 @@ class WebInterfaceDataset(dict):
self[entryName] = p.getName()
self[entryName + ".Rank"] = lang_data.getValue("Rank", "100")
self[entryName + ".Link"] = lang_data.getValue("Link", p.getName())
+ for a in p.pluginCapabilities:
+ self[entryName + ".Types." + a] = "1"
diff --git a/pythonrewrite/bin/WebInterfaceSites.py b/pythonrewrite/bin/WebInterfaceSites.py
index 074f54c..9242b18 100755
--- a/pythonrewrite/bin/WebInterfaceSites.py
+++ b/pythonrewrite/bin/WebInterfaceSites.py
@@ -3,6 +3,10 @@ import WebInterfaceDataset
import re
import Plugins
from CryptoBoxExceptions import *
+import cherrypy
+
+# TODO: for now the admin access is defined statically
+authDict = {"test": "tester"}
class WebInterfacePlugins:
@@ -57,10 +61,48 @@ class WebInterfaceSites:
return False
+ ## this is a function decorator to check authentication
+ ## it has to be defined before any page definition requiring authentification
+ def __requestAuth(self, authDict):
+ def check_credentials(site):
+ def _inner_wrapper(self, *args, **kargs):
+ import base64
+ ## define a "non-allowed" function
+ user, password = None, None
+ try:
+ resp = cherrypy.request.headers["Authorization"][6:] # ignore "Basic "
+ (user, password) = base64.b64decode(resp).split(":",1)
+ except KeyError:
+ ## no "authorization" header was sent
+ pass
+ except TypeError:
+ ## invalid base64 string
+ pass
+ except AttributeError:
+ ## no cherrypy response header defined
+ pass
+ if user in authDict.keys():
+ if password == authDict[user]:
+ ## ok: return the choosen page
+ self.cbox.log.info("access granted for: %s" % user)
+ return site(self, *args, **kargs)
+ else:
+ self.cbox.log.info("wrong password supplied for: %s" % user)
+ else:
+ self.cbox.log.info("unknown user: %s" % str(user))
+ ## wrong credentials: return "access denied"
+ cherrypy.response.headers["WWW-Authenticate"] = '''Basic realm="Test-Arena"'''
+ cherrypy.response.status = 401
+ return self.__render("access_denied")
+ return _inner_wrapper
+ return check_credentials
+
+
######################################################################
## put real sites down here and don't forget to expose them at the end
+ @cherrypy.expose
def status(self, weblang=""):
'''shows the current status of the box
'''
@@ -79,6 +121,7 @@ class WebInterfaceSites:
return self.__render("show_status")
+ @cherrypy.expose
def doc(self,page="",weblang=""):
'''prints the offline wikipage
'''
@@ -96,18 +139,21 @@ class WebInterfaceSites:
return self.__render("show_doc")
+ @cherrypy.expose
def system(self, weblang=""):
self.__resetDataset()
self.__setWebLang(weblang)
return self.__render("form_system")
+
-
+ @cherrypy.expose
def index(self, weblang=""):
self.__resetDataset()
self.__setWebLang(weblang)
return self.__render("show_status")
+ @cherrypy.expose
def show_volume(self, device="", weblang=""):
self.__resetDataset()
self.__setWebLang(weblang)
@@ -120,204 +166,75 @@ class WebInterfaceSites:
return self.__render("show_status")
+ @cherrypy.expose
def show_volumes(self, weblang=""):
self.__resetDataset()
self.__setWebLang(weblang)
return self.__render("show_volumes")
- def volume_name_set(self, device="", volume_name="", weblang=""):
- self.__resetDataset()
- self.__setWebLang(weblang)
- if self.__setDevice(device):
- volume_name = volume_name.strip()
- if self.__checkVolumeName(volume_name):
- container = self.cbox.getContainer(device)
- try:
- container.setName(volume_name)
- except CBContainerError, errMsg:
- self.log.warn("failed to rename the volume '%s' to '%s: %s'" % (device, volume_name, errMsg))
- self.dataset["Data.Warning"] = "SetVolumeNameFailed"
- else:
- self.log.info("successfully renamed volume '%s' to '%s'" % (device, volume_name))
- self.dataset.setCurrentDiskState(device)
- else:
- self.dataset["Data.Warning"] = "InvalidVolumeName"
- return self.__render("show_volume")
- else:
- if self.cbox.getContainerList():
- return self.__render("show_volumes")
- else:
- return self.__render("show_status")
-
-
- def mount_do(self, device, crypto_password=None, weblang=""):
- self.__resetDataset()
- self.__setWebLang(weblang)
- if self.__setDevice(device):
- container = self.cbox.getContainer(device)
- if container.isMounted():
- self.dataset["Data.Warning"] = "IsMounted"
- self.log.warn("the device (%s) is already mounted" % device)
- else:
- try:
- if container.getType() == container.Types["luks"]:
- ## encrypted luks container
- if not crypto_password:
- self.dataset["Data.Warning"] = "EmptyCryptoPassword"
- self.log.warn("no password was supplied for mounting of device '%s'" % device)
- return self.__render("show_volume")
- else:
- container.mount(crypto_password)
- elif container.getType() == container.Types["plain"]:
- ## plain container
- container.mount()
- else:
- ## mounting is not possible
- self.dataset["Data.Warning"] = "InvalidType"
- self.log.warn("this type of container (%s) cannot be mounted - sorry!" % device)
- except (Exception, "MountError"):
- self.dataset["Data.Warning"] = "MountFailed"
- self.log.warn("failed to mount the device (%s)" % device)
- else:
- self.log.info("successfully mounted the container (%s)" % device)
- self.dataset.setCurrentDiskState(device)
- else:
- if self.cbox.getContainerList():
- return self.__render("show_volumes")
- else:
- return self.__render("show_status")
- return self.__render("show_volume")
-
-
- def volume_init_ask(self, device, encryption=None, weblang=""):
- self.__resetDataset()
- self.__setWebLang(weblang)
- if self.__setDevice(device):
- container = self.cbox.getContainer(device)
- if container.isMounted():
- self.dataset["Data.Warning"] = "VolumeMayNotBeMounted"
- self.log.warn("initialization is not possible as long as the device (%s) is mounted" % device)
- return self.__render("show_volume")
- else:
- if encryption is None:
- self.dataset["Data.Init.isCrypto"] = 0
- else:
- self.dataset["Data.Init.isCrypto"] = 1
- return self.__render("form_init")
- else:
- if self.cbox.getContainerList():
- return self.__render("show_volumes")
- else:
- return self.__render("show_status")
-
-
- def init_do(self, device, confirm="", crypto_password=None, crypto_password2=None, encryption=None, weblang=""):
- self.__resetDataset()
- self.__setWebLang(weblang)
- if self.__setDevice(device):
- container = self.cbox.getContainer(device)
- ## set 'Data.Init.isCrypto' - just in case, we have to show the same form again
- if encryption is None:
- self.dataset["Data.Init.isCrypto"] = 0
- else:
- self.dataset["Data.Init.isCrypto"] = 1
- if container.isMounted():
- self.dataset["Data.Warning"] = "VolumeMayNotBeMounted"
- self.log.warn("initialization is not possible as long as the device (%s) is mounted" % device)
- return self.__render("form_init")
- else:
- if confirm != self.__getLanguageValue("Text.ConfirmInit"):
- self.dataset["Data.Warning"] = "InitNotConfirmed"
- self.log.warn("the confirmation sentence for initialization of the device '%s' was wrong" % device)
- return self.__render("form_init")
- try:
- if not encryption is None:
- if not crypto_password:
- self.dataset["Data.Warning"] = "EmptyCryptoPassword"
- self.log.warn("no crypto password was supplied for initialization of device '%s'" % device)
- return self.__render("form_init")
- if crypto_password != crypto_password2:
- self.dataset["Data.Warning"] = "DifferentCryptoPasswords"
- self.log.warn("the crypto password was not repeated correctly for initialization of device '%s'" % device)
- return self.__render("form_init")
- container.create(container.Types["luks"], crypto_password)
- else:
- container.create(container.Types["plain"])
- except CBContainerError, errMsg:
- self.dataset["Data.Warning"] = "CreateFailed"
- self.log.warn("initialization of device '%s' failed" % device)
- self.log.warn("reason: %s" % errMsg)
- return self.__render("form_init")
- else:
- self.log.info("successfully initialized device '%s'" % device)
- # reread the dataset
- self.dataset.setCurrentDiskState(device)
- return self.__render("show_volume")
- else:
- if self.cbox.getContainerList():
- return self.__render("show_volumes")
- else:
- return self.__render("show_status")
-
-
- def test(self, weblang=""):
- import cherrypy
- self.__resetDataset()
- self.__setWebLang(weblang)
- return "test passed"
-
-
- def umount_do(self, device, weblang=""):
- self.__resetDataset()
- self.__setWebLang(weblang)
- if self.__setDevice(device):
- container = self.cbox.getContainer(device)
- if not container.isMounted():
- self.dataset["Data.Warning"] = "NotMounted"
- self.log.warn("the device (%s) is currently not mounted" % device)
- else:
- try:
- if container.getType() == container.Types["luks"]:
- ## encrypted luks container
- container.umount()
- elif container.getType() == container.Types["plain"]:
- ## plain container
- container.umount()
- else:
- ## umounting is not possible
- self.dataset["Data.Warning"] = "InvalidType"
- self.log.warn("this type of container (%s) cannot be umounted - sorry!" % device)
- except (Exception, "UmountError"):
- self.dataset["Data.Warning"] = "UmountFailed"
- self.log.warn("failed to unmount the device (%s)" % device)
- else:
- self.log.info("successfully unmounted the container (%s)" % device)
- # reread the dataset
- self.dataset.setCurrentDiskState(device)
- else:
- if self.cbox.getContainerList():
- return self.__render("show_volumes")
- else:
- return self.__render("show_status")
- return self.__render("show_volume")
-
-
def return_plugin_action(self, plugin):
- def handler(**args):
+ def handler(self, **args):
self.__resetDataset()
try:
self.__setWebLang(args["weblang"])
del args["weblang"]
except KeyError:
pass
+ ## check the device argument of volume plugins
+ if "volume" in plugin.pluginCapabilities:
+ try:
+ ## initialize the dataset of the selected device if necessary
+ if self.__setDevice(args["device"]):
+ plugin.device = args["device"]
+ self.dataset.setCurrentDiskState(plugin.device)
+ else:
+ return self.__render("show_status")
+ except KeyError:
+ return self.__render("show_status")
+ else:
+ ## the parameter 'device' exists - we have to remove it
+ del args["device"]
+ ## call the plugin handler
nextTemplate = plugin.doAction(**args)
- ## set the default template
- if not nextTemplate: nextTemplate = "form_system"
+ ## for 'volume' plugins: reread the dataset of the current disk
+ ## additionally: set the default template for plugins
+ if "volume" in plugin.pluginCapabilities:
+ self.dataset.setCurrentDiskState(plugin.device)
+ if not nextTemplate: nextTemplate = "show_volume"
+ else:
+ if not nextTemplate: nextTemplate = "form_system"
+ ## save the currently active plugin name
+ self.dataset["Data.ActivePlugin"] = plugin.getName()
return self.__render(nextTemplate, plugin)
- return handler
+ ## apply authentication?
+ if plugin.requestAuth:
+ return lambda **args: self.__requestAuth(authDict)(handler)(self, **args)
+ else:
+ return lambda **args: handler(self, **args)
+ ## test authentication
+ @cherrypy.expose
+ @__requestAuth(None, authDict)
+ def test(self, weblang=""):
+ self.__resetDataset()
+ self.__setWebLang(weblang)
+ return "test passed"
+
+
+ @cherrypy.expose
+ def test_stream(self):
+ """just for testing purposes - to check if the "stream_response" feature
+ actually works - for now (September 02006) it does not seem to be ok"""
+ import time
+ yield "neu"
+ for a in range(10):
+ yield "- yes: %d - %s
" % (a, str(time.time()))
+ time.sleep(1)
+ yield "
"
+
+
##################### input checker ##########################
def __setWebLang(self, value):
@@ -453,6 +370,7 @@ class WebInterfaceSites:
plugin_cs_file = plugin.getTemplateFileName(template)
if plugin_cs_file:
self.dataset["Settings.TemplateFile"] = plugin_cs_file
+
## add the current state of the plugins to the hdf dataset
self.dataset["Data.Status.Plugins.%s" % plugin.getName()] = plugin.getStatus()
## load the language data
@@ -494,31 +412,3 @@ class WebInterfaceSites:
yield line + "\n"
- def test_stream(self):
- """just for testing purposes - to check if the "stream_response" feature
- actually works - for now (September 02006) it does not seem to be ok"""
- import time
- yield "neu"
- for a in range(10):
- yield "- yes: %d - %s
" % (a, str(time.time()))
- time.sleep(1)
- yield "
"
-
-
- ############################################################################
- ## to make the sites visible through the webserver they must be exposed here
- index.exposed = True
- doc.exposed = True
- system.exposed = True
- status.exposed = True
- show_volume.exposed = True
- volume_name_set.exposed = True
- mount_do.exposed = True
- volume_init_ask.exposed = True
- init_do.exposed = True
- umount_do.exposed = True
- show_volumes.exposed = True
- test.exposed = True
- test_stream.exposed = True
-
-
diff --git a/pythonrewrite/bin/unittests.CryptoBoxTools.py b/pythonrewrite/bin/unittests.CryptoBoxTools.py
new file mode 100755
index 0000000..cb27257
--- /dev/null
+++ b/pythonrewrite/bin/unittests.CryptoBoxTools.py
@@ -0,0 +1,48 @@
+#!/usr/bin/env python
+
+import unittest
+import CryptoBoxTools
+import os
+
+
+class CryptoBoxToolsTests(unittest.TestCase):
+
+ def testGetAbsoluteDeviceName(self):
+ func = CryptoBoxTools.getAbsoluteDeviceName
+ self.assertTrue(func("hda") == "/dev/hda")
+ self.assertTrue(func("loop0") == "/dev/loop0")
+ self.assertTrue(func(os.path.devnull) == os.path.devnull)
+
+
+ def testFindMajorMinorOfDevice(self):
+ func = CryptoBoxTools.findMajorMinorOfDevice
+ self.assertTrue(func("/dev/hda") == (3,0))
+ self.assertTrue(func("/dev/hda1") == (3,1))
+ self.assertTrue(func(os.path.devnull) == (1,3))
+ self.assertTrue(func("/dev/nothere") is None)
+
+
+ def testFindMajorMinorDeviceName(self):
+ func = CryptoBoxTools.findMajorMinorDeviceName
+ dir = os.path.join(os.path.sep, "dev")
+ self.assertTrue(os.path.join(dir,"hda") in func(dir,3,0))
+ self.assertTrue(os.path.devnull in func(dir,1,3))
+ self.assertFalse(os.path.devnull in func(dir,2,3))
+
+
+ def testIsPartOfBlockDevice(self):
+ func = CryptoBoxTools.isPartOfBlockDevice
+ self.assertTrue(func("/dev/hda", "/dev/hda1"))
+ self.assertFalse(func("/dev/hda", "/dev/hda"))
+ self.assertFalse(func("/dev/hda1", "/dev/hda"))
+ self.assertFalse(func("/dev/hda1", "/dev/hda1"))
+ self.assertFalse(func("/dev/hda", "/dev/hdb1"))
+ self.assertFalse(func(None, "/dev/hdb1"))
+ self.assertFalse(func("/dev/hda", None))
+ self.assertFalse(func(None, ""))
+ self.assertFalse(func("loop0", "loop1"))
+
+
+if __name__ == "__main__":
+ unittest.main()
+
diff --git a/pythonrewrite/bin/unittests.CryptoBoxWebserver.py b/pythonrewrite/bin/unittests.CryptoBoxWebserver.py
old mode 100644
new mode 100755