unittests for CryptoBoxTools
mount/umount moved to separate volume plugins volume name change moved to 'volume_props' plugin webinterface for password change (luks) added attribute "pluginCapabilities" added to plugins attribute "requestAuth" added to plugins http authentication implemented (for now: static user database)
This commit is contained in:
parent
ca13aebdc8
commit
9321677078
9 changed files with 226 additions and 216 deletions
|
@ -124,6 +124,23 @@ class CryptoBoxProps(CryptoBox):
|
||||||
self.containers.sort(cmp = lambda x,y: x.getName() < y.getName() and -1 or 1)
|
self.containers.sort(cmp = lambda x,y: x.getName() < y.getName() and -1 or 1)
|
||||||
|
|
||||||
|
|
||||||
|
def getConfigPartitions(self):
|
||||||
|
"""returns a sequence of found config partitions"""
|
||||||
|
import subprocess
|
||||||
|
proc = subprocess.Popen(
|
||||||
|
shell = False,
|
||||||
|
stdout = subprocess.PIPE,
|
||||||
|
args = [
|
||||||
|
self.prefs["Programs"]["blkid"],
|
||||||
|
"-c", os.path.devnull,
|
||||||
|
"-t", "LABEL=%s" % self.prefs["Main"]["ConfigVolumeLabel"] ])
|
||||||
|
(output, error) = proc.communicate()
|
||||||
|
if output:
|
||||||
|
return [e.strip().split(":",1)[0] for e in output.splitlines()]
|
||||||
|
else:
|
||||||
|
return []
|
||||||
|
|
||||||
|
|
||||||
def isConfigPartition(self, device):
|
def isConfigPartition(self, device):
|
||||||
import subprocess
|
import subprocess
|
||||||
proc = subprocess.Popen(
|
proc = subprocess.Popen(
|
||||||
|
@ -222,6 +239,8 @@ class CryptoBoxProps(CryptoBox):
|
||||||
old_index = self.containers.index(e)
|
old_index = self.containers.index(e)
|
||||||
self.containers.remove(e)
|
self.containers.remove(e)
|
||||||
self.containers.insert(old_index, CryptoBoxContainer.CryptoBoxContainer(dev,self))
|
self.containers.insert(old_index, CryptoBoxContainer.CryptoBoxContainer(dev,self))
|
||||||
|
## there should be no reason for any failure
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
def getNameForUUID(self, uuid):
|
def getNameForUUID(self, uuid):
|
||||||
|
|
|
@ -65,7 +65,8 @@ class CryptoBoxContainer:
|
||||||
for a in prev_name_owner:
|
for a in prev_name_owner:
|
||||||
if a.isMounted():
|
if a.isMounted():
|
||||||
raise CBNameActivelyUsed("the supplied new name is already in use for an active partition")
|
raise CBNameActivelyUsed("the supplied new name is already in use for an active partition")
|
||||||
self.cbox.setNameForUUID(self.uuid, new_name)
|
if not self.cbox.setNameForUUID(self.uuid, new_name):
|
||||||
|
raise CBContainerError("failed to change the volume name for unknown reasons")
|
||||||
self.name = new_name
|
self.name = new_name
|
||||||
|
|
||||||
|
|
||||||
|
@ -374,7 +375,7 @@ class CryptoBoxContainer:
|
||||||
if proc.returncode != 0:
|
if proc.returncode != 0:
|
||||||
errorMsg = "Could not umount the filesystem: %s" % (proc.stderr.read().strip(), )
|
errorMsg = "Could not umount the filesystem: %s" % (proc.stderr.read().strip(), )
|
||||||
self.log.warn(errorMsg)
|
self.log.warn(errorMsg)
|
||||||
raise CBMountError(errorMsg)
|
raise CBUmountError(errorMsg)
|
||||||
if os.path.exists(os.path.join(self.__dmDir, self.name)):
|
if os.path.exists(os.path.join(self.__dmDir, self.name)):
|
||||||
proc = subprocess.Popen(
|
proc = subprocess.Popen(
|
||||||
shell = False,
|
shell = False,
|
||||||
|
@ -392,7 +393,7 @@ class CryptoBoxContainer:
|
||||||
if proc.returncode != 0:
|
if proc.returncode != 0:
|
||||||
errorMsg = "Could not remove the luks mapping: %s" % (proc.stderr.read().strip(), )
|
errorMsg = "Could not remove the luks mapping: %s" % (proc.stderr.read().strip(), )
|
||||||
self.log.warn(errorMsg)
|
self.log.warn(errorMsg)
|
||||||
raise CBMountError(errorMsg)
|
raise CBUmountError(errorMsg)
|
||||||
devnull.close()
|
devnull.close()
|
||||||
|
|
||||||
|
|
||||||
|
@ -452,7 +453,7 @@ class CryptoBoxContainer:
|
||||||
if proc.returncode != 0:
|
if proc.returncode != 0:
|
||||||
errorMsg = "Could not umount the filesystem: %s" % (proc.stderr.read().strip(), )
|
errorMsg = "Could not umount the filesystem: %s" % (proc.stderr.read().strip(), )
|
||||||
self.log.warn(errorMsg)
|
self.log.warn(errorMsg)
|
||||||
raise CBMountError(errorMsg)
|
raise CBUmountError(errorMsg)
|
||||||
devnull.close()
|
devnull.close()
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -102,3 +102,6 @@ class CBChangePasswordError(CBContainerError):
|
||||||
class CBMountError(CBContainerError):
|
class CBMountError(CBContainerError):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
class CBUmountError(CBContainerError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
|
@ -8,6 +8,12 @@ import os
|
||||||
|
|
||||||
class CryptoBoxPlugin:
|
class CryptoBoxPlugin:
|
||||||
|
|
||||||
|
## default capability is "system" - the other supported capability is: "volume"
|
||||||
|
pluginCapabilities = [ "system" ]
|
||||||
|
|
||||||
|
## does this plugin require admin authentification?
|
||||||
|
requestAuth = False
|
||||||
|
|
||||||
def __init__(self, cbox, pluginDir):
|
def __init__(self, cbox, pluginDir):
|
||||||
self.cbox = cbox
|
self.cbox = cbox
|
||||||
self.hdf = {}
|
self.hdf = {}
|
||||||
|
|
|
@ -63,8 +63,13 @@ def getAbsoluteDeviceName(shortname):
|
||||||
|
|
||||||
|
|
||||||
def findMajorMinorOfDevice(device):
|
def findMajorMinorOfDevice(device):
|
||||||
"return the major/minor numbers of a block device by querying /sys/block/?/dev"
|
"return the major/minor numbers of a block device"
|
||||||
if not os.path.exists(os.path.join(os.path.sep,"sys","block",device)): return None
|
if re.match("/", device) or not os.path.exists(os.path.join(os.path.sep,"sys","block",device)):
|
||||||
|
## maybe it is an absolute device name
|
||||||
|
if not os.path.exists(device): return None
|
||||||
|
## okay - it seems to to a device node
|
||||||
|
rdev = os.stat(device).st_rdev
|
||||||
|
return (os.major(rdev), os.minor(rdev))
|
||||||
blockdev_info_file = os.path.join(os.path.join(os.path.sep,"sys","block", device), "dev")
|
blockdev_info_file = os.path.join(os.path.join(os.path.sep,"sys","block", device), "dev")
|
||||||
try:
|
try:
|
||||||
f_blockdev_info = open(blockdev_info_file, "r")
|
f_blockdev_info = open(blockdev_info_file, "r")
|
||||||
|
@ -81,6 +86,7 @@ def findMajorMinorOfDevice(device):
|
||||||
return None
|
return None
|
||||||
except IOError:
|
except IOError:
|
||||||
pass
|
pass
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
def findMajorMinorDeviceName(dir, major, minor):
|
def findMajorMinorDeviceName(dir, major, minor):
|
||||||
|
@ -116,3 +122,38 @@ def getParentBlockDevices():
|
||||||
devs.append(p_device)
|
devs.append(p_device)
|
||||||
return map(getAbsoluteDeviceName, devs)
|
return map(getAbsoluteDeviceName, devs)
|
||||||
|
|
||||||
|
|
||||||
|
def isPartOfBlockDevice(parent, subdevice):
|
||||||
|
"""check if the given block device is a parent of 'subdevice'
|
||||||
|
e.g. for checking if a partition belongs to a block device"""
|
||||||
|
try:
|
||||||
|
(par_major, par_minor) = findMajorMinorOfDevice(parent)
|
||||||
|
(sub_major, sub_minor) = findMajorMinorOfDevice(subdevice)
|
||||||
|
except TypeError:
|
||||||
|
## at least one of these devices did not return a valid major/minor combination
|
||||||
|
return False
|
||||||
|
## search the entry below '/sys/block' belonging to the parent
|
||||||
|
root = os.path.join(os.path.sep, 'sys', 'block')
|
||||||
|
for bldev in os.listdir(root):
|
||||||
|
blpath = os.path.join(root, bldev, 'dev')
|
||||||
|
if os.access(blpath, os.R_OK):
|
||||||
|
try:
|
||||||
|
if (str(par_major), str(par_minor)) == tuple([e for e in file(blpath)][0].strip().split(":",1)):
|
||||||
|
parent_path = os.path.join(root, bldev)
|
||||||
|
break
|
||||||
|
except IndexError, OSError:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
## no block device with this major/minor combination found below '/sys/block'
|
||||||
|
return False
|
||||||
|
for subbldev in os.listdir(parent_path):
|
||||||
|
subblpath = os.path.join(parent_path, subbldev, "dev")
|
||||||
|
if os.access(subblpath, os.R_OK):
|
||||||
|
try:
|
||||||
|
if (str(sub_major), str(sub_minor)) == tuple([e for e in file(subblpath)][0].strip().split(":",1)):
|
||||||
|
## the name of the subdevice node is not important - we found it!
|
||||||
|
return True
|
||||||
|
except IndexError, OSError:
|
||||||
|
pass
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
|
@ -87,5 +87,7 @@ class WebInterfaceDataset(dict):
|
||||||
self[entryName] = p.getName()
|
self[entryName] = p.getName()
|
||||||
self[entryName + ".Rank"] = lang_data.getValue("Rank", "100")
|
self[entryName + ".Rank"] = lang_data.getValue("Rank", "100")
|
||||||
self[entryName + ".Link"] = lang_data.getValue("Link", p.getName())
|
self[entryName + ".Link"] = lang_data.getValue("Link", p.getName())
|
||||||
|
for a in p.pluginCapabilities:
|
||||||
|
self[entryName + ".Types." + a] = "1"
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -3,6 +3,10 @@ import WebInterfaceDataset
|
||||||
import re
|
import re
|
||||||
import Plugins
|
import Plugins
|
||||||
from CryptoBoxExceptions import *
|
from CryptoBoxExceptions import *
|
||||||
|
import cherrypy
|
||||||
|
|
||||||
|
# TODO: for now the admin access is defined statically
|
||||||
|
authDict = {"test": "tester"}
|
||||||
|
|
||||||
|
|
||||||
class WebInterfacePlugins:
|
class WebInterfacePlugins:
|
||||||
|
@ -57,10 +61,48 @@ class WebInterfaceSites:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
## this is a function decorator to check authentication
|
||||||
|
## it has to be defined before any page definition requiring authentification
|
||||||
|
def __requestAuth(self, authDict):
|
||||||
|
def check_credentials(site):
|
||||||
|
def _inner_wrapper(self, *args, **kargs):
|
||||||
|
import base64
|
||||||
|
## define a "non-allowed" function
|
||||||
|
user, password = None, None
|
||||||
|
try:
|
||||||
|
resp = cherrypy.request.headers["Authorization"][6:] # ignore "Basic "
|
||||||
|
(user, password) = base64.b64decode(resp).split(":",1)
|
||||||
|
except KeyError:
|
||||||
|
## no "authorization" header was sent
|
||||||
|
pass
|
||||||
|
except TypeError:
|
||||||
|
## invalid base64 string
|
||||||
|
pass
|
||||||
|
except AttributeError:
|
||||||
|
## no cherrypy response header defined
|
||||||
|
pass
|
||||||
|
if user in authDict.keys():
|
||||||
|
if password == authDict[user]:
|
||||||
|
## ok: return the choosen page
|
||||||
|
self.cbox.log.info("access granted for: %s" % user)
|
||||||
|
return site(self, *args, **kargs)
|
||||||
|
else:
|
||||||
|
self.cbox.log.info("wrong password supplied for: %s" % user)
|
||||||
|
else:
|
||||||
|
self.cbox.log.info("unknown user: %s" % str(user))
|
||||||
|
## wrong credentials: return "access denied"
|
||||||
|
cherrypy.response.headers["WWW-Authenticate"] = '''Basic realm="Test-Arena"'''
|
||||||
|
cherrypy.response.status = 401
|
||||||
|
return self.__render("access_denied")
|
||||||
|
return _inner_wrapper
|
||||||
|
return check_credentials
|
||||||
|
|
||||||
|
|
||||||
######################################################################
|
######################################################################
|
||||||
## put real sites down here and don't forget to expose them at the end
|
## put real sites down here and don't forget to expose them at the end
|
||||||
|
|
||||||
|
|
||||||
|
@cherrypy.expose
|
||||||
def status(self, weblang=""):
|
def status(self, weblang=""):
|
||||||
'''shows the current status of the box
|
'''shows the current status of the box
|
||||||
'''
|
'''
|
||||||
|
@ -79,6 +121,7 @@ class WebInterfaceSites:
|
||||||
return self.__render("show_status")
|
return self.__render("show_status")
|
||||||
|
|
||||||
|
|
||||||
|
@cherrypy.expose
|
||||||
def doc(self,page="",weblang=""):
|
def doc(self,page="",weblang=""):
|
||||||
'''prints the offline wikipage
|
'''prints the offline wikipage
|
||||||
'''
|
'''
|
||||||
|
@ -96,18 +139,21 @@ class WebInterfaceSites:
|
||||||
return self.__render("show_doc")
|
return self.__render("show_doc")
|
||||||
|
|
||||||
|
|
||||||
|
@cherrypy.expose
|
||||||
def system(self, weblang=""):
|
def system(self, weblang=""):
|
||||||
self.__resetDataset()
|
self.__resetDataset()
|
||||||
self.__setWebLang(weblang)
|
self.__setWebLang(weblang)
|
||||||
return self.__render("form_system")
|
return self.__render("form_system")
|
||||||
|
|
||||||
|
|
||||||
|
@cherrypy.expose
|
||||||
def index(self, weblang=""):
|
def index(self, weblang=""):
|
||||||
self.__resetDataset()
|
self.__resetDataset()
|
||||||
self.__setWebLang(weblang)
|
self.__setWebLang(weblang)
|
||||||
return self.__render("show_status")
|
return self.__render("show_status")
|
||||||
|
|
||||||
|
|
||||||
|
@cherrypy.expose
|
||||||
def show_volume(self, device="", weblang=""):
|
def show_volume(self, device="", weblang=""):
|
||||||
self.__resetDataset()
|
self.__resetDataset()
|
||||||
self.__setWebLang(weblang)
|
self.__setWebLang(weblang)
|
||||||
|
@ -120,202 +166,73 @@ class WebInterfaceSites:
|
||||||
return self.__render("show_status")
|
return self.__render("show_status")
|
||||||
|
|
||||||
|
|
||||||
|
@cherrypy.expose
|
||||||
def show_volumes(self, weblang=""):
|
def show_volumes(self, weblang=""):
|
||||||
self.__resetDataset()
|
self.__resetDataset()
|
||||||
self.__setWebLang(weblang)
|
self.__setWebLang(weblang)
|
||||||
return self.__render("show_volumes")
|
return self.__render("show_volumes")
|
||||||
|
|
||||||
|
|
||||||
def volume_name_set(self, device="", volume_name="", weblang=""):
|
|
||||||
self.__resetDataset()
|
|
||||||
self.__setWebLang(weblang)
|
|
||||||
if self.__setDevice(device):
|
|
||||||
volume_name = volume_name.strip()
|
|
||||||
if self.__checkVolumeName(volume_name):
|
|
||||||
container = self.cbox.getContainer(device)
|
|
||||||
try:
|
|
||||||
container.setName(volume_name)
|
|
||||||
except CBContainerError, errMsg:
|
|
||||||
self.log.warn("failed to rename the volume '%s' to '%s: %s'" % (device, volume_name, errMsg))
|
|
||||||
self.dataset["Data.Warning"] = "SetVolumeNameFailed"
|
|
||||||
else:
|
|
||||||
self.log.info("successfully renamed volume '%s' to '%s'" % (device, volume_name))
|
|
||||||
self.dataset.setCurrentDiskState(device)
|
|
||||||
else:
|
|
||||||
self.dataset["Data.Warning"] = "InvalidVolumeName"
|
|
||||||
return self.__render("show_volume")
|
|
||||||
else:
|
|
||||||
if self.cbox.getContainerList():
|
|
||||||
return self.__render("show_volumes")
|
|
||||||
else:
|
|
||||||
return self.__render("show_status")
|
|
||||||
|
|
||||||
|
|
||||||
def mount_do(self, device, crypto_password=None, weblang=""):
|
|
||||||
self.__resetDataset()
|
|
||||||
self.__setWebLang(weblang)
|
|
||||||
if self.__setDevice(device):
|
|
||||||
container = self.cbox.getContainer(device)
|
|
||||||
if container.isMounted():
|
|
||||||
self.dataset["Data.Warning"] = "IsMounted"
|
|
||||||
self.log.warn("the device (%s) is already mounted" % device)
|
|
||||||
else:
|
|
||||||
try:
|
|
||||||
if container.getType() == container.Types["luks"]:
|
|
||||||
## encrypted luks container
|
|
||||||
if not crypto_password:
|
|
||||||
self.dataset["Data.Warning"] = "EmptyCryptoPassword"
|
|
||||||
self.log.warn("no password was supplied for mounting of device '%s'" % device)
|
|
||||||
return self.__render("show_volume")
|
|
||||||
else:
|
|
||||||
container.mount(crypto_password)
|
|
||||||
elif container.getType() == container.Types["plain"]:
|
|
||||||
## plain container
|
|
||||||
container.mount()
|
|
||||||
else:
|
|
||||||
## mounting is not possible
|
|
||||||
self.dataset["Data.Warning"] = "InvalidType"
|
|
||||||
self.log.warn("this type of container (%s) cannot be mounted - sorry!" % device)
|
|
||||||
except (Exception, "MountError"):
|
|
||||||
self.dataset["Data.Warning"] = "MountFailed"
|
|
||||||
self.log.warn("failed to mount the device (%s)" % device)
|
|
||||||
else:
|
|
||||||
self.log.info("successfully mounted the container (%s)" % device)
|
|
||||||
self.dataset.setCurrentDiskState(device)
|
|
||||||
else:
|
|
||||||
if self.cbox.getContainerList():
|
|
||||||
return self.__render("show_volumes")
|
|
||||||
else:
|
|
||||||
return self.__render("show_status")
|
|
||||||
return self.__render("show_volume")
|
|
||||||
|
|
||||||
|
|
||||||
def volume_init_ask(self, device, encryption=None, weblang=""):
|
|
||||||
self.__resetDataset()
|
|
||||||
self.__setWebLang(weblang)
|
|
||||||
if self.__setDevice(device):
|
|
||||||
container = self.cbox.getContainer(device)
|
|
||||||
if container.isMounted():
|
|
||||||
self.dataset["Data.Warning"] = "VolumeMayNotBeMounted"
|
|
||||||
self.log.warn("initialization is not possible as long as the device (%s) is mounted" % device)
|
|
||||||
return self.__render("show_volume")
|
|
||||||
else:
|
|
||||||
if encryption is None:
|
|
||||||
self.dataset["Data.Init.isCrypto"] = 0
|
|
||||||
else:
|
|
||||||
self.dataset["Data.Init.isCrypto"] = 1
|
|
||||||
return self.__render("form_init")
|
|
||||||
else:
|
|
||||||
if self.cbox.getContainerList():
|
|
||||||
return self.__render("show_volumes")
|
|
||||||
else:
|
|
||||||
return self.__render("show_status")
|
|
||||||
|
|
||||||
|
|
||||||
def init_do(self, device, confirm="", crypto_password=None, crypto_password2=None, encryption=None, weblang=""):
|
|
||||||
self.__resetDataset()
|
|
||||||
self.__setWebLang(weblang)
|
|
||||||
if self.__setDevice(device):
|
|
||||||
container = self.cbox.getContainer(device)
|
|
||||||
## set 'Data.Init.isCrypto' - just in case, we have to show the same form again
|
|
||||||
if encryption is None:
|
|
||||||
self.dataset["Data.Init.isCrypto"] = 0
|
|
||||||
else:
|
|
||||||
self.dataset["Data.Init.isCrypto"] = 1
|
|
||||||
if container.isMounted():
|
|
||||||
self.dataset["Data.Warning"] = "VolumeMayNotBeMounted"
|
|
||||||
self.log.warn("initialization is not possible as long as the device (%s) is mounted" % device)
|
|
||||||
return self.__render("form_init")
|
|
||||||
else:
|
|
||||||
if confirm != self.__getLanguageValue("Text.ConfirmInit"):
|
|
||||||
self.dataset["Data.Warning"] = "InitNotConfirmed"
|
|
||||||
self.log.warn("the confirmation sentence for initialization of the device '%s' was wrong" % device)
|
|
||||||
return self.__render("form_init")
|
|
||||||
try:
|
|
||||||
if not encryption is None:
|
|
||||||
if not crypto_password:
|
|
||||||
self.dataset["Data.Warning"] = "EmptyCryptoPassword"
|
|
||||||
self.log.warn("no crypto password was supplied for initialization of device '%s'" % device)
|
|
||||||
return self.__render("form_init")
|
|
||||||
if crypto_password != crypto_password2:
|
|
||||||
self.dataset["Data.Warning"] = "DifferentCryptoPasswords"
|
|
||||||
self.log.warn("the crypto password was not repeated correctly for initialization of device '%s'" % device)
|
|
||||||
return self.__render("form_init")
|
|
||||||
container.create(container.Types["luks"], crypto_password)
|
|
||||||
else:
|
|
||||||
container.create(container.Types["plain"])
|
|
||||||
except CBContainerError, errMsg:
|
|
||||||
self.dataset["Data.Warning"] = "CreateFailed"
|
|
||||||
self.log.warn("initialization of device '%s' failed" % device)
|
|
||||||
self.log.warn("reason: %s" % errMsg)
|
|
||||||
return self.__render("form_init")
|
|
||||||
else:
|
|
||||||
self.log.info("successfully initialized device '%s'" % device)
|
|
||||||
# reread the dataset
|
|
||||||
self.dataset.setCurrentDiskState(device)
|
|
||||||
return self.__render("show_volume")
|
|
||||||
else:
|
|
||||||
if self.cbox.getContainerList():
|
|
||||||
return self.__render("show_volumes")
|
|
||||||
else:
|
|
||||||
return self.__render("show_status")
|
|
||||||
|
|
||||||
|
|
||||||
def test(self, weblang=""):
|
|
||||||
import cherrypy
|
|
||||||
self.__resetDataset()
|
|
||||||
self.__setWebLang(weblang)
|
|
||||||
return "test passed"
|
|
||||||
|
|
||||||
|
|
||||||
def umount_do(self, device, weblang=""):
|
|
||||||
self.__resetDataset()
|
|
||||||
self.__setWebLang(weblang)
|
|
||||||
if self.__setDevice(device):
|
|
||||||
container = self.cbox.getContainer(device)
|
|
||||||
if not container.isMounted():
|
|
||||||
self.dataset["Data.Warning"] = "NotMounted"
|
|
||||||
self.log.warn("the device (%s) is currently not mounted" % device)
|
|
||||||
else:
|
|
||||||
try:
|
|
||||||
if container.getType() == container.Types["luks"]:
|
|
||||||
## encrypted luks container
|
|
||||||
container.umount()
|
|
||||||
elif container.getType() == container.Types["plain"]:
|
|
||||||
## plain container
|
|
||||||
container.umount()
|
|
||||||
else:
|
|
||||||
## umounting is not possible
|
|
||||||
self.dataset["Data.Warning"] = "InvalidType"
|
|
||||||
self.log.warn("this type of container (%s) cannot be umounted - sorry!" % device)
|
|
||||||
except (Exception, "UmountError"):
|
|
||||||
self.dataset["Data.Warning"] = "UmountFailed"
|
|
||||||
self.log.warn("failed to unmount the device (%s)" % device)
|
|
||||||
else:
|
|
||||||
self.log.info("successfully unmounted the container (%s)" % device)
|
|
||||||
# reread the dataset
|
|
||||||
self.dataset.setCurrentDiskState(device)
|
|
||||||
else:
|
|
||||||
if self.cbox.getContainerList():
|
|
||||||
return self.__render("show_volumes")
|
|
||||||
else:
|
|
||||||
return self.__render("show_status")
|
|
||||||
return self.__render("show_volume")
|
|
||||||
|
|
||||||
|
|
||||||
def return_plugin_action(self, plugin):
|
def return_plugin_action(self, plugin):
|
||||||
def handler(**args):
|
def handler(self, **args):
|
||||||
self.__resetDataset()
|
self.__resetDataset()
|
||||||
try:
|
try:
|
||||||
self.__setWebLang(args["weblang"])
|
self.__setWebLang(args["weblang"])
|
||||||
del args["weblang"]
|
del args["weblang"]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
pass
|
pass
|
||||||
|
## check the device argument of volume plugins
|
||||||
|
if "volume" in plugin.pluginCapabilities:
|
||||||
|
try:
|
||||||
|
## initialize the dataset of the selected device if necessary
|
||||||
|
if self.__setDevice(args["device"]):
|
||||||
|
plugin.device = args["device"]
|
||||||
|
self.dataset.setCurrentDiskState(plugin.device)
|
||||||
|
else:
|
||||||
|
return self.__render("show_status")
|
||||||
|
except KeyError:
|
||||||
|
return self.__render("show_status")
|
||||||
|
else:
|
||||||
|
## the parameter 'device' exists - we have to remove it
|
||||||
|
del args["device"]
|
||||||
|
## call the plugin handler
|
||||||
nextTemplate = plugin.doAction(**args)
|
nextTemplate = plugin.doAction(**args)
|
||||||
## set the default template
|
## for 'volume' plugins: reread the dataset of the current disk
|
||||||
if not nextTemplate: nextTemplate = "form_system"
|
## additionally: set the default template for plugins
|
||||||
|
if "volume" in plugin.pluginCapabilities:
|
||||||
|
self.dataset.setCurrentDiskState(plugin.device)
|
||||||
|
if not nextTemplate: nextTemplate = "show_volume"
|
||||||
|
else:
|
||||||
|
if not nextTemplate: nextTemplate = "form_system"
|
||||||
|
## save the currently active plugin name
|
||||||
|
self.dataset["Data.ActivePlugin"] = plugin.getName()
|
||||||
return self.__render(nextTemplate, plugin)
|
return self.__render(nextTemplate, plugin)
|
||||||
return handler
|
## apply authentication?
|
||||||
|
if plugin.requestAuth:
|
||||||
|
return lambda **args: self.__requestAuth(authDict)(handler)(self, **args)
|
||||||
|
else:
|
||||||
|
return lambda **args: handler(self, **args)
|
||||||
|
|
||||||
|
|
||||||
|
## test authentication
|
||||||
|
@cherrypy.expose
|
||||||
|
@__requestAuth(None, authDict)
|
||||||
|
def test(self, weblang=""):
|
||||||
|
self.__resetDataset()
|
||||||
|
self.__setWebLang(weblang)
|
||||||
|
return "test passed"
|
||||||
|
|
||||||
|
|
||||||
|
@cherrypy.expose
|
||||||
|
def test_stream(self):
|
||||||
|
"""just for testing purposes - to check if the "stream_response" feature
|
||||||
|
actually works - for now (September 02006) it does not seem to be ok"""
|
||||||
|
import time
|
||||||
|
yield "<html><head><title>neu</title></head><body><p><ul>"
|
||||||
|
for a in range(10):
|
||||||
|
yield "<li>yes: %d - %s</li>" % (a, str(time.time()))
|
||||||
|
time.sleep(1)
|
||||||
|
yield "</ul></p></html>"
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@ -453,6 +370,7 @@ class WebInterfaceSites:
|
||||||
plugin_cs_file = plugin.getTemplateFileName(template)
|
plugin_cs_file = plugin.getTemplateFileName(template)
|
||||||
if plugin_cs_file:
|
if plugin_cs_file:
|
||||||
self.dataset["Settings.TemplateFile"] = plugin_cs_file
|
self.dataset["Settings.TemplateFile"] = plugin_cs_file
|
||||||
|
|
||||||
## add the current state of the plugins to the hdf dataset
|
## add the current state of the plugins to the hdf dataset
|
||||||
self.dataset["Data.Status.Plugins.%s" % plugin.getName()] = plugin.getStatus()
|
self.dataset["Data.Status.Plugins.%s" % plugin.getName()] = plugin.getStatus()
|
||||||
## load the language data
|
## load the language data
|
||||||
|
@ -494,31 +412,3 @@ class WebInterfaceSites:
|
||||||
yield line + "\n"
|
yield line + "\n"
|
||||||
|
|
||||||
|
|
||||||
def test_stream(self):
|
|
||||||
"""just for testing purposes - to check if the "stream_response" feature
|
|
||||||
actually works - for now (September 02006) it does not seem to be ok"""
|
|
||||||
import time
|
|
||||||
yield "<html><head><title>neu</title></head><body><p><ul>"
|
|
||||||
for a in range(10):
|
|
||||||
yield "<li>yes: %d - %s</li>" % (a, str(time.time()))
|
|
||||||
time.sleep(1)
|
|
||||||
yield "</ul></p></html>"
|
|
||||||
|
|
||||||
|
|
||||||
############################################################################
|
|
||||||
## to make the sites visible through the webserver they must be exposed here
|
|
||||||
index.exposed = True
|
|
||||||
doc.exposed = True
|
|
||||||
system.exposed = True
|
|
||||||
status.exposed = True
|
|
||||||
show_volume.exposed = True
|
|
||||||
volume_name_set.exposed = True
|
|
||||||
mount_do.exposed = True
|
|
||||||
volume_init_ask.exposed = True
|
|
||||||
init_do.exposed = True
|
|
||||||
umount_do.exposed = True
|
|
||||||
show_volumes.exposed = True
|
|
||||||
test.exposed = True
|
|
||||||
test_stream.exposed = True
|
|
||||||
|
|
||||||
|
|
||||||
|
|
48
pythonrewrite/bin/unittests.CryptoBoxTools.py
Executable file
48
pythonrewrite/bin/unittests.CryptoBoxTools.py
Executable file
|
@ -0,0 +1,48 @@
|
||||||
|
#!/usr/bin/env python
|
||||||
|
|
||||||
|
import unittest
|
||||||
|
import CryptoBoxTools
|
||||||
|
import os
|
||||||
|
|
||||||
|
|
||||||
|
class CryptoBoxToolsTests(unittest.TestCase):
|
||||||
|
|
||||||
|
def testGetAbsoluteDeviceName(self):
|
||||||
|
func = CryptoBoxTools.getAbsoluteDeviceName
|
||||||
|
self.assertTrue(func("hda") == "/dev/hda")
|
||||||
|
self.assertTrue(func("loop0") == "/dev/loop0")
|
||||||
|
self.assertTrue(func(os.path.devnull) == os.path.devnull)
|
||||||
|
|
||||||
|
|
||||||
|
def testFindMajorMinorOfDevice(self):
|
||||||
|
func = CryptoBoxTools.findMajorMinorOfDevice
|
||||||
|
self.assertTrue(func("/dev/hda") == (3,0))
|
||||||
|
self.assertTrue(func("/dev/hda1") == (3,1))
|
||||||
|
self.assertTrue(func(os.path.devnull) == (1,3))
|
||||||
|
self.assertTrue(func("/dev/nothere") is None)
|
||||||
|
|
||||||
|
|
||||||
|
def testFindMajorMinorDeviceName(self):
|
||||||
|
func = CryptoBoxTools.findMajorMinorDeviceName
|
||||||
|
dir = os.path.join(os.path.sep, "dev")
|
||||||
|
self.assertTrue(os.path.join(dir,"hda") in func(dir,3,0))
|
||||||
|
self.assertTrue(os.path.devnull in func(dir,1,3))
|
||||||
|
self.assertFalse(os.path.devnull in func(dir,2,3))
|
||||||
|
|
||||||
|
|
||||||
|
def testIsPartOfBlockDevice(self):
|
||||||
|
func = CryptoBoxTools.isPartOfBlockDevice
|
||||||
|
self.assertTrue(func("/dev/hda", "/dev/hda1"))
|
||||||
|
self.assertFalse(func("/dev/hda", "/dev/hda"))
|
||||||
|
self.assertFalse(func("/dev/hda1", "/dev/hda"))
|
||||||
|
self.assertFalse(func("/dev/hda1", "/dev/hda1"))
|
||||||
|
self.assertFalse(func("/dev/hda", "/dev/hdb1"))
|
||||||
|
self.assertFalse(func(None, "/dev/hdb1"))
|
||||||
|
self.assertFalse(func("/dev/hda", None))
|
||||||
|
self.assertFalse(func(None, ""))
|
||||||
|
self.assertFalse(func("loop0", "loop1"))
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
|
|
0
pythonrewrite/bin/unittests.CryptoBoxWebserver.py
Normal file → Executable file
0
pythonrewrite/bin/unittests.CryptoBoxWebserver.py
Normal file → Executable file
Loading…
Reference in a new issue