Branched r1161 trunk for 0.3.5 release

This commit is contained in:
frisco 2009-06-25 02:11:30 +00:00
parent d0ed91ffa8
commit 1d1139428b
802 changed files with 135155 additions and 0 deletions

View file

@ -0,0 +1,14 @@
"""CryptoBox package
The CryptoBox is a webserver. It enables you to control your encrypted
(cryptsetup-luks) and plaintext disks via an easy to use web interface.
The CryptoBox is especially suitable for non-desktop fileservers with
encrypted partitions.
"""
__all__ = ['core', 'web', 'plugins', 'tests']
__revision__ = "$Id$"
__version__ = "0.3.4"

View file

@ -0,0 +1,7 @@
"""Core management functions of the CryptoBox.
"""
__revision__ = "$Id$"
__all__ = [ 'main', 'container', 'exceptions', 'blockdevice', 'settings' ]

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,817 @@
#
# Copyright 02006-02007 sense.lab e.V.
#
# This file is part of the CryptoBox.
#
# The CryptoBox is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# The CryptoBox is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with the CryptoBox; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
"""Manage a single container of the CryptoBox
"""
__revision__ = "$Id$"
import subprocess
import os
import re
import time
from cryptobox.core.exceptions import *
import cryptobox.core.blockdevice
CONTAINERTYPES = {
"unused":0,
"plain":1,
"luks":2,
"swap":3,
}
FSTYPES = {
"plain":["ext3", "ext2", "vfat", "reiserfs", "xfs", "hfs", "jfs", "minix", "ntfs"],
"swap":["swap"]}
## we use this marker to make sure, that we do not remove a non-cryptobox directory
## below the mount directory
MOUNT_DIR_MARKER = '_cryptobox_mount_dir_'
class CryptoBoxContainer:
"""Manage a container of the CryptoBox
"""
def __init__(self, device, cbox):
"""initialize the container
"device" is a cryptobox.core.blockdevice object
"""
self.device = device
self.cbox = cbox
self.uuid = None
self.name = None
self.cont_type = None
self.fs_type = None
self.mount = None
self.umount = None
self.attributes = None
self.reset_object(reset_device=False)
def get_name(self):
"""Return a human readable name for the container.
Available since: 0.3.0
"""
return self.name
def __set_attributes(self):
"""Define the default attributes of a container.
At least there should be a uuid.
Other attributes may be added by features (e.g. automount).
Available since: 0.3.0
"""
try:
## is there already an entry in the database?
self.attributes = self.cbox.prefs.volumes_db[self.get_name()]
self.attributes["uuid"] = self.uuid
except KeyError:
## set default values
self.attributes = { "uuid": self.uuid }
self.cbox.prefs.volumes_db[self.get_name()] = self.attributes
def set_name(self, new_name):
"""Define a human readable name of this container.
this also manages the name database
Available since: 0.3.0
"""
old_name = self.get_name()
if new_name == self.name:
return
## renaming is not possible, if the volume is active, as the mountpoint name
## is the same as the volume name
if self.is_mounted():
raise CBVolumeIsActive("the container must not be active during renaming")
if not re.match(r'[a-zA-Z0-9_\.\- ]+$', new_name):
raise CBInvalidName("the supplied new name contains illegal characters")
## check for another partition with the same name
if self.cbox.get_container_list(filter_name=new_name):
raise CBNameIsInUse("the supplied new name is already in use for another partition")
## maybe there a is an entry in the volumes database (but the partition is not active)
try:
## remove possibly existing inactive database item
del self.cbox.prefs.volumes_db[new_name]
except KeyError:
## no entry - so nothing happens
pass
## set new name
self.name = new_name
## remove old database entry
try:
del self.cbox.prefs.volumes_db[old_name]
except KeyError:
pass
## set new volumes database entry
self.cbox.prefs.volumes_db[new_name] = self.attributes
try:
self.cbox.prefs.volumes_db.write()
except IOError:
self.cbox.log.warn("Failed to store volumes database after set_name")
def is_writeable(self):
"""Return if the container is writeable
this only affects actions like formatting or partitioning
write access for the mounted content is not considered
Available since: 0.3.3
"""
## symlinks are followed automatically
return os.access(self.get_device(), os.W_OK)
def get_device(self):
"""Return the device name of the container
e.g.: /dev/hdc1
Available since: 0.3.0
"""
return self.device.devnodes[0]
def get_type(self):
"""Return the type (int) of this container.
Available since: 0.3.0
"""
return self.cont_type
def get_fs_type(self):
"""Return the filesystem type of this container.
Available since: 0.3.0
"""
return self.fs_type
def is_mounted(self):
"""Check if the container is currently mounted.
Available since: 0.3.0
"""
return os.path.ismount(self.__get_mount_point())
def get_capacity(self):
"""Return the current capacity state of the volume.
the volume may not be mounted
the result is a tuple of values in megabyte:
(size, available, used)
Available since: 0.3.0
"""
info = os.statvfs(self.__get_mount_point())
return (
int(info.f_bsize*info.f_blocks/1024/1024),
int(info.f_bsize*info.f_bavail/1024/1024),
int(info.f_bsize*(info.f_blocks-info.f_bavail)/1024/1024))
def get_size(self):
"""return the size of the block device (_not_ of the filesystem)
the result is a value in megabyte
an error is indicated by "-1"
Available since: 0.3.0
"""
return self.device.size
def reset_object(self, reset_device=True):
"""recheck the information about this container
this is especially useful after changing the type via 'create'
the 'device' attribute does not need to be reset during __init__
Available since: 0.3.0
"""
if reset_device:
self.device.reset()
self.uuid = self.__get_uuid()
self.cont_type = self.__get_type_of_partition()
self.fs_type = self.__get_fs_type()
self.name = self.__get_name_of_container()
self.__set_attributes()
if self.cont_type == CONTAINERTYPES["luks"]:
self.mount = self.__mount_luks
self.umount = self.__umount_luks
elif self.cont_type == CONTAINERTYPES["plain"]:
self.mount = self.__mount_plain
self.umount = self.__umount_plain
def create(self, cont_type, password=None, fs_type="ext3"):
"""Format a container.
Also set a password for encrypted container.
Available since: 0.3.0
"""
if not fs_type in FSTYPES["plain"]:
raise CBInvalidType("invalid filesystem type supplied: %s" % str(fs_type))
old_name = self.get_name()
if cont_type == CONTAINERTYPES["luks"]:
self.__create_luks(password, fs_type)
elif cont_type == CONTAINERTYPES["plain"]:
self.__create_plain(fs_type)
else:
raise CBInvalidType("invalid container type (%d) supplied" % (cont_type, ))
## no exception was raised during creation -> we can continue
## reset the properties (encryption state, ...) of the device
self.reset_object()
## restore the old name, as the uuid was changed during 'create'
## must happen after reset_object
try:
self.set_name(old_name)
except CBNameIsInUse:
## failure is okay
pass
def change_password(self, oldpw, newpw):
"""Change the password of an encrypted container.
Raises an exception for plaintext container.
Available since: 0.3.0
"""
if self.cont_type != CONTAINERTYPES["luks"]:
raise CBInvalidType("changing of password is possible only for luks containers")
if not oldpw:
raise CBInvalidPassword("no old password supplied for password change")
if not newpw:
raise CBInvalidPassword("no new password supplied for password change")
## return if new and old passwords are the same
if oldpw == newpw:
return
#TODO: why can we do this only for non-mounted volumes?
if self.is_mounted():
raise CBVolumeIsActive("this container is currently active")
## remove any potential open luks mapping
self.__umount_luks()
## create the luks header
proc = subprocess.Popen(
shell = False,
stdin = subprocess.PIPE,
stdout = subprocess.PIPE,
stderr = subprocess.PIPE,
args = [
self.cbox.prefs["Programs"]["super"],
self.cbox.prefs["Programs"]["CryptoBoxRootActions"],
"program", "cryptsetup",
"luksAddKey",
self.get_device(),
"--batch-mode"])
proc.stdin.write("%s\n%s" % (oldpw, newpw))
(output, errout) = proc.communicate()
if proc.returncode != 0:
error_msg = "Could not add a new luks key: %s - %s" \
% (output.strip(), errout.strip(), )
self.cbox.log.error(error_msg)
raise CBChangePasswordError(error_msg)
## retrieve the key slot we used for unlocking
keys_found = re.search(r'key slot (\d{1,3}) unlocked', output).groups()
if keys_found:
keyslot = int(keys_found[0])
else:
raise CBChangePasswordError("could not get the old key slot")
## remove the old key
proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = subprocess.PIPE,
stderr = subprocess.PIPE,
args = [
self.cbox.prefs["Programs"]["super"],
self.cbox.prefs["Programs"]["CryptoBoxRootActions"],
"program", "cryptsetup",
"luksDelKey",
self.get_device(),
"%d" % (keyslot, ),
"--batch-mode"])
proc.wait()
if proc.returncode != 0:
error_msg = "Could not remove the old luks key: %s" % (proc.stderr.read().strip(), )
self.cbox.log.error(error_msg)
raise CBChangePasswordError(error_msg)
def is_busy(self):
"""Return the current state of the busy flag of this device.
The busy flag is mainly used to indicate that the device may not be used
while it is being formatted or similar.
Available since: 0.3.1
"""
return self.cbox.get_device_busy_state(self.device.name)
def set_busy(self, new_state, timeout=300):
"""Set the busy state of this device.
Either set or remove this flag.
The timeout is optional and defaults to five minutes.
Available since: 0.3.1
"""
self.cbox.set_device_busy_state(self.device.name, new_state, timeout)
## ****************** internal stuff *********************
def __get_name_of_container(self):
"""retrieve the name of the container by querying the database
call this function only for the initial setup of the container object
"""
found_name = None
for key in self.cbox.prefs.volumes_db.keys():
if self.cbox.prefs.volumes_db[key]["uuid"] == self.uuid:
found_name = key
## the name may not be equal to the name of another existing device
## otherwise problems regarding the mount directory would arise
if found_name:
test_device = cryptobox.core.blockdevice.get_blockdevice(found_name)
if (test_device is None) or (test_device == self.device):
return found_name
## there is no name defined for this uuid - we will propose a good one
prefix = self.cbox.prefs["Main"]["DefaultVolumePrefix"]
unused_found = False
counter = 1
while not unused_found:
guess = prefix + str(counter)
if self.cbox.prefs.volumes_db.has_key(guess):
counter += 1
else:
unused_found = True
return guess
def __get_uuid(self):
"""Retrieve the uuid of the container device.
"""
guess = self.device.uuid
## did we get a valid value?
if guess:
return guess
else:
## emergency default value
return self.device.name
def __get_type_of_partition(self):
"""Retrieve the type of the given partition.
see cryptobox.core.container.CONTAINERTYPES
"""
if self.device.is_luks():
return CONTAINERTYPES["luks"]
type_of_partition = self.__get_type_id_of_partition()
if type_of_partition in FSTYPES["plain"]:
return CONTAINERTYPES["plain"]
if type_of_partition in FSTYPES["swap"]:
return CONTAINERTYPES["swap"]
return CONTAINERTYPES["unused"]
def __get_type_id_of_partition(self):
"returns the type of the partition (see 'man blkid')"
return self.device.type_id
def __get_fs_type(self):
"""returns the filesystem used on a container
for luks devices: return the type of the encrypted container
return None for invalid device, for non-storage devices, ...
"""
if self.device.is_luks():
## luks devices need special care ...
if self.device.holders:
return cryptobox.core.blockdevice.get_blockdevice(
self.device.holders[0]).type_id
else:
## the encrypted container is not open
return None
else:
## common (non-luks) devices
return self.device.type_id
def __get_mount_point(self):
"return the name of the mountpoint of this volume"
return os.path.join(self.cbox.prefs["Locations"]["MountParentDir"],
self.name)
def __mount_luks(self, password):
"mount a luks partition"
if not password:
raise CBInvalidPassword("no password supplied for luksOpen")
if self.is_mounted():
raise CBVolumeIsActive("this container is already active")
self.__umount_luks()
self.__clean_mount_dirs()
if not os.path.exists(self.__get_mount_point()):
self.__create_mount_directory(self.__get_mount_point())
if not os.path.exists(self.__get_mount_point()):
err_msg = "Could not create mountpoint (%s)" % \
(self.__get_mount_point(), )
self.cbox.log.error(err_msg)
raise CBMountError(err_msg)
self.cbox.send_event_notification("premount",
self.__get_event_args())
proc = subprocess.Popen(
shell = False,
stdin = subprocess.PIPE,
stdout = subprocess.PIPE,
stderr = subprocess.PIPE,
args = [
self.cbox.prefs["Programs"]["super"],
self.cbox.prefs["Programs"]["CryptoBoxRootActions"],
"program", "cryptsetup",
"luksOpen",
self.get_device(),
self.name,
"--batch-mode"])
proc.stdin.write(password)
(output, errout) = proc.communicate()
if proc.returncode != 0:
err_msg = "Could not open the luks mapping: %s" % (errout.strip(), )
self.cbox.log.warn(err_msg)
raise CBMountError(err_msg)
## reset device info (reread self.device.holders)
self.device.reset()
if self.device.holders:
## the decrypted blockdevice is available
plain_device = cryptobox.core.blockdevice.get_blockdevice(
self.device.holders[0]).devnodes[0]
else:
err_msg = "Could not find the plaintext container for " \
+ "'%s': %s" % (self.get_device(), "no hold devices found")
self.cbox.log.warn(err_msg)
raise CBMountError(err_msg)
proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = subprocess.PIPE,
stderr = subprocess.PIPE,
args = [
self.cbox.prefs["Programs"]["super"],
self.cbox.prefs["Programs"]["CryptoBoxRootActions"],
"program", "mount",
plain_device,
self.__get_mount_point()])
proc.wait()
if proc.returncode != 0:
err_msg = "Could not mount the filesystem: %s" \
% (proc.stderr.read().strip(), )
self.cbox.log.warn(err_msg)
raise CBMountError(err_msg)
## chmod the mount directory to 0777
## this is the easy way to avoid problems
## it only works for ext2/3 - vfat silently ignore it
## we mounted vfat partitions with umask=0000
try:
os.chmod(self.__get_mount_point(), 0777)
except OSError:
self.cbox.log.warn("Failed to set write permission for the " \
+ "mount directory")
self.cbox.send_event_notification("postmount", self.__get_event_args())
def __umount_luks(self):
"umount a luks partition"
self.cbox.send_event_notification("preumount", self.__get_event_args())
if self.is_mounted():
proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = subprocess.PIPE,
stderr = subprocess.PIPE,
args = [
self.cbox.prefs["Programs"]["super"],
self.cbox.prefs["Programs"]["CryptoBoxRootActions"],
"program", "umount",
self.__get_mount_point()])
proc.wait()
if proc.returncode != 0:
err_msg = "Could not umount the filesystem: %s" % \
(proc.stderr.read().strip(), )
self.cbox.log.warn(err_msg)
raise CBUmountError(err_msg)
## reset device (reread self.device.holders)
self.device.reset()
## are there any dependent devices?
if self.device.holders:
proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = subprocess.PIPE,
stderr = subprocess.PIPE,
args = [
self.cbox.prefs["Programs"]["super"],
self.cbox.prefs["Programs"]["CryptoBoxRootActions"],
"program", "cryptsetup",
"luksClose",
self.name,
"--batch-mode"])
proc.wait()
if proc.returncode != 0:
err_msg = "Could not remove the luks mapping: %s" % (proc.stderr.read().strip(), )
self.cbox.log.warn(err_msg)
raise CBUmountError(err_msg)
self.cbox.send_event_notification("postumount", self.__get_event_args())
def __mount_plain(self):
"mount a plaintext partition"
if self.is_mounted():
raise CBVolumeIsActive("this container is already active")
self.__clean_mount_dirs()
if not os.path.exists(self.__get_mount_point()):
self.__create_mount_directory(self.__get_mount_point())
if not os.path.exists(self.__get_mount_point()):
err_msg = "Could not create mountpoint (%s)" % (self.__get_mount_point(), )
self.cbox.log.error(err_msg)
raise CBMountError(err_msg)
self.cbox.send_event_notification("premount", self.__get_event_args())
proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = subprocess.PIPE,
stderr = subprocess.PIPE,
args = [
self.cbox.prefs["Programs"]["super"],
self.cbox.prefs["Programs"]["CryptoBoxRootActions"],
"program", "mount",
self.get_device(),
self.__get_mount_point()])
proc.wait()
if proc.returncode != 0:
err_msg = "Could not mount the filesystem: %s" % (proc.stderr.read().strip(), )
self.cbox.log.warn(err_msg)
raise CBMountError(err_msg)
## chmod the mount directory to 0777
## this is the easy way to avoid problems
## it only works for ext2/3 - vfat silently ignore it
## we mounted vfat partitions with umask=0000
try:
os.chmod(self.__get_mount_point(), 0777)
except OSError:
self.cbox.log.warn("Failed to set write permission for the mount directory")
self.cbox.send_event_notification("postmount", self.__get_event_args())
def __umount_plain(self):
"umount a plaintext partition"
if not self.is_mounted():
self.cbox.log.info("trying to umount while volume (%s) is not mounted" % \
self.get_device())
return
self.cbox.send_event_notification("preumount", self.__get_event_args())
proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = subprocess.PIPE,
stderr = subprocess.PIPE,
args = [
self.cbox.prefs["Programs"]["super"],
self.cbox.prefs["Programs"]["CryptoBoxRootActions"],
"program", "umount",
self.__get_mount_point()])
proc.wait()
if proc.returncode != 0:
err_msg = "Could not umount the filesystem: %s" % (proc.stderr.read().strip(), )
self.cbox.log.warn(err_msg)
raise CBUmountError(err_msg)
self.cbox.send_event_notification("postumount", self.__get_event_args())
def __create_plain(self, fs_type="ext3"):
"make a plaintext partition"
import threading
if self.is_mounted():
raise CBVolumeIsActive(
"deactivate the partition before filesystem initialization")
def format():
"""This function will get called as a seperate thread.
To avoid the non-sharing cpu distribution between the formatting thread
and the main interface, we fork and let the parent wait for the child.
This should be handled using the kernel's threading features.
"""
## create a local object - to store different values for each thread
loc_data = threading.local()
loc_data.old_name = self.get_name()
self.set_busy(True, 600)
## give the main thread a chance to continue
loc_data.child_pid = os.fork()
if loc_data.child_pid == 0:
loc_data.proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = subprocess.PIPE,
stderr = subprocess.PIPE,
args = [
self.cbox.prefs["Programs"]["nice"],
self.cbox.prefs["Programs"]["mkfs"],
"-t", fs_type, self.get_device()])
loc_data.proc.wait()
## wait to allow error detection
if loc_data.proc.returncode == 0:
time.sleep(5)
## skip cleanup stuff (as common for sys.exit)
os._exit(0)
else:
os.waitpid(loc_data.child_pid, 0)
try:
self.set_name(loc_data.old_name)
except CBNameIsInUse:
pass
self.set_busy(False)
bg_task = threading.Thread(target=format)
bg_task.setDaemon(True)
bg_task.start()
time.sleep(3)
## if the thread exited very fast, then it failed
if not bg_task.isAlive():
raise CBCreateError("formatting of device (%s) failed out " % \
self.get_device() + "of unknown reasons")
def __create_luks(self, password, fs_type="ext3"):
"""Create a luks partition.
"""
import threading
if not password:
raise CBInvalidPassword("no password supplied for new luks mapping")
if self.is_mounted():
raise CBVolumeIsActive("deactivate the partition before filesystem initialization")
## remove any potential open luks mapping
self.__umount_luks()
## create the luks header
proc = subprocess.Popen(
shell = False,
stdin = subprocess.PIPE,
stdout = subprocess.PIPE,
stderr = subprocess.PIPE,
args = [
self.cbox.prefs["Programs"]["super"],
self.cbox.prefs["Programs"]["CryptoBoxRootActions"],
"program", "cryptsetup",
"luksFormat",
self.get_device(),
"--batch-mode",
"--cipher", self.cbox.prefs["Main"]["DefaultCipher"],
"--iter-time", "2000"])
proc.stdin.write(password)
(output, errout) = proc.communicate()
if proc.returncode != 0:
err_msg = "Could not create the luks header: %s" % (errout.strip(), )
self.cbox.log.error(err_msg)
raise CBCreateError(err_msg)
## open the luks container for mkfs
proc = subprocess.Popen(
shell = False,
stdin = subprocess.PIPE,
stdout = subprocess.PIPE,
stderr = subprocess.PIPE,
args = [
self.cbox.prefs["Programs"]["super"],
self.cbox.prefs["Programs"]["CryptoBoxRootActions"],
"program", "cryptsetup",
"luksOpen",
self.get_device(),
self.name,
"--batch-mode"])
proc.stdin.write(password)
(output, errout) = proc.communicate()
if proc.returncode != 0:
err_msg = "Could not open the new luks mapping: %s" % (errout.strip(), )
self.cbox.log.error(err_msg)
raise CBCreateError(err_msg)
## reset device info (reread self.device.holders)
self.device.reset()
if self.device.holders:
## the decrypted blockdevice is available
plain_device = cryptobox.core.blockdevice.get_blockdevice(
self.device.holders[0]).devnodes[0]
else:
err_msg = "Could not find the plaintext container for " \
+ "'%s': %s" % (self.get_device(), "no hold devices found")
self.cbox.log.warn(err_msg)
raise CBMountError(err_msg)
def format_luks():
"""This function will get called as a separate thread.
To avoid the non-sharing cpu distribution between the formatting thread
and the main interface, we fork and let the parent wait for the child.
This should be handled using the kernel's threading features.
"""
## create a local object - to store different values for each thread
loc_data = threading.local()
loc_data.old_name = self.get_name()
self.set_busy(True, 600)
loc_data.child_pid = os.fork()
if loc_data.child_pid == 0:
## make the filesystem
mkfs_args = [ self.cbox.prefs["Programs"]["nice"],
self.cbox.prefs["Programs"]["mkfs"],
"-t", fs_type,
plain_device ]
loc_data.proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = subprocess.PIPE,
stderr = subprocess.PIPE,
args = mkfs_args)
self.cbox.log.debug("Will format LUKS device using `%s'" % mkfs_args)
loc_data.proc.wait()
## wait to allow error detection
if loc_data.proc.returncode == 0:
time.sleep(5)
## skip cleanup stuff (as common for sys.exit)
os._exit(0)
else:
os.waitpid(loc_data.child_pid, 0)
self.set_name(loc_data.old_name)
self.set_busy(False)
## remove the mapping - for every exit status
self.__umount_luks()
bg_task = threading.Thread(target=format_luks)
bg_task.setDaemon(True)
bg_task.start()
time.sleep(3)
## if the thread exited very fast, then it failed
if not bg_task.isAlive():
raise CBCreateError("formatting of device (%s) failed out " \
% self.get_device() + "of unknown reasons")
def __clean_mount_dirs(self):
"""remove all unnecessary subdirs of the mount parent directory
this should be called for every (u)mount """
subdirs = os.listdir(self.cbox.prefs["Locations"]["MountParentDir"])
for one_dir in subdirs:
abs_dir = os.path.join(self.cbox.prefs["Locations"]["MountParentDir"], one_dir)
if (not os.path.islink(abs_dir)) \
and os.path.isdir(abs_dir) \
and (not os.path.ismount(abs_dir)) \
and (os.path.isfile(os.path.join(abs_dir,
MOUNT_DIR_MARKER))) \
and (len(os.listdir(abs_dir)) == 1):
try:
os.remove(os.path.join(abs_dir, MOUNT_DIR_MARKER))
os.rmdir(abs_dir)
except OSError, err_msg:
## we do not care too much about unclean cleaning ...
self.cbox.log.info("failed to clean a mountpoint (%s): %s" \
% (abs_dir, str(err_msg)))
def __create_mount_directory(self, dirname):
"""create and mark a mount directory
this marking helps to remove old mountdirs safely"""
os.mkdir(dirname)
try:
mark_file = file(os.path.join(dirname, MOUNT_DIR_MARKER), "w")
mark_file.close()
except OSError, err_msg:
## we do not care too much about the marking
self.cbox.log.info("failed to mark a mountpoint (%s): %s" % \
(dirname, str(err_msg)))
def __get_event_args(self):
"""Return an array of arguments for event scripts.
for now supported: pre/post-mount/umount events
"""
type_text = [e for e in CONTAINERTYPES.keys()
if CONTAINERTYPES[e] == self.get_type()][0]
return [self.get_device(), self.get_name(), type_text,
self.__get_mount_point()]

View file

@ -0,0 +1,168 @@
#
# Copyright 2006 sense.lab e.V.
#
# This file is part of the CryptoBox.
#
# The CryptoBox is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# The CryptoBox is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with the CryptoBox; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
"""
exceptions of the cryptobox package
"""
__revision__ = "$Id$"
class CBError(Exception):
"""base class for exceptions of the cryptobox"""
prefix = "general cryptobox error"
def __init__(self, desc):
self.desc = desc
def __str__(self):
"""Return the error description.
"""
if self.desc:
return "%s: %s" % (self.prefix, self.desc)
else:
return self.prefix
# main exception classes
class CBConfigError(CBError):
"""any kind of error related to the configuration of a cryptobox"""
pass
class CBContainerError(CBError):
"""Any error raised while manipulating a cryptobox container.
"""
prefix = "cryptobox container error"
class CBEnvironmentError(CBError):
"""some part of the environment of the cryptobox is broken
e.g. the wrong version of a required program
"""
prefix = "cryptobox environment error"
class CBInternalError(CBError):
"""report any failing internal assertions
this is always a bug
"""
prefix = "internal error detected"
# specialized exceptions
class CBConfigUnavailableError(CBConfigError):
"""config file/input was not available at all"""
prefix = "failed to access the configuration of the cryptobox"
class CBConfigUndefinedError(CBConfigError):
"""a specific configuration setting was not defined"""
def __init__(self, section, name=None):
self.section = section
self.name = name
def __str__(self):
"""Output the appropriate string: for a setting or a section.
"""
if self.name:
# setting
return "undefined configuration setting: [" + str(self.section) \
+ "]->" + str(self.name) + " - please check your configuration file"
else:
# section
return "undefined configuration section: [" + str(self.section) \
+ "] - please check your configuration file"
class CBConfigInvalidValueError(CBConfigError):
"""a configuration setting was invalid somehow"""
def __init__(self, section, name, value, reason):
self.section = section
self.name = name
self.value = value
self.reason = reason
def __str__(self):
"""Return the error description.
"""
return "invalid configuration setting [%s]->%s (%s): %s" % \
(self.section, self.name, self.value, self.reason)
class CBCreateError(CBContainerError):
"""Raised if a container could not be created (formatted).
"""
pass
class CBVolumeIsActive(CBContainerError):
"""Raised if a container was active even if it may not for a specific action.
"""
pass
class CBInvalidName(CBContainerError):
"""Raised if someone tried to set an invalid container name.
"""
pass
class CBNameIsInUse(CBContainerError):
"""Raised if the new name of a container is already in use.
"""
pass
class CBInvalidType(CBContainerError):
"""Raised if a container is of an invalid type for a choosen action.
"""
pass
class CBInvalidPassword(CBContainerError):
"""Someone tried to open an ecnrypted container with the wrong password.
"""
pass
class CBChangePasswordError(CBContainerError):
"""Changing of the password of an encrypted container failed.
"""
pass
class CBMountError(CBContainerError):
"""Failed to mount a container.
"""
pass
class CBUmountError(CBContainerError):
"""Failed to umount a container.
"""
pass

View file

@ -0,0 +1,304 @@
#
# Copyright 02006-02007 sense.lab e.V.
#
# This file is part of the CryptoBox.
#
# The CryptoBox is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# The CryptoBox is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with the CryptoBox; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
'''
This is the web interface for a fileserver managing encrypted filesystems.
'''
__revision__ = "$Id$"
import sys
import cryptobox.core.container as cbxContainer
from cryptobox.core.exceptions import CBEnvironmentError, CBConfigUndefinedError
import cryptobox.core.blockdevice as blockdevice
import os
import subprocess
import threading
class CryptoBox:
'''this class rules them all!
put things like logging, conf and other stuff in here,
that might be used by more classes, it will be passed on to them'''
def __init__(self, config_file=None):
import cryptobox.core.settings as cbxSettings
self.log = self.__get_startup_logger()
self.prefs = cbxSettings.CryptoBoxSettings(config_file)
self.__run_tests()
self.__containers = []
self.__busy_devices = {}
self.__busy_devices_sema = threading.BoundedSemaphore()
self.reread_container_list()
def setup(self):
"""Initialize the cryptobox.
"""
self.log.info("Starting up the CryptoBox ...")
def cleanup(self):
"""Umount all containers and shutdown everything safely.
"""
self.log.info("Shutting down the CryptoBox ...")
## umount all containers
self.log.info("Umounting all volumes ...")
self.reread_container_list()
for cont in self.get_container_list():
if cont.is_mounted():
cont.umount()
## save all settings
self.log.info("Storing local settings ...")
## problems with storing are logged automatically
self.prefs.write()
# TODO: improve the configuration partition handling [a]: how?
self.prefs.umount_partition()
## shutdown logging as the last step
try:
self.log.info("Turning off logging ...")
self.log.close()
except AttributeError:
## there should be 'close' action - but it may fail silently
pass
def __get_startup_logger(self):
"""Initialize the configured logging facility of the CryptoBox.
use it with: 'self.log.[debug|info|warning|error|critical](logmessage)'
all classes should get the logging instance during __init__:
self.log = logging.getLogger("CryptoNAS")
first we output all warnings/errors to stderr
as soon as we opened the config file successfully, we redirect debug output
to the configured destination
"""
import logging
## basicConfig(...) needs python >= 2.4
try:
log_handler = logging.getLogger("CryptoNAS")
logging.basicConfig(
format = '%(asctime)s CryptoNAS %(levelname)s: %(message)s',
stderr = sys.stderr)
log_handler.setLevel(logging.ERROR)
log_handler.info("loggingsystem is up'n running")
## from now on everything can be logged via self.log...
except:
raise CBEnvironmentError("couldn't initialise the loggingsystem. I give up.")
return log_handler
def __run_tests(self):
"""Do some initial tests.
"""
self.__run_test_root_priv()
def __run_test_root_priv(self):
"""Try to run 'super' with 'CryptoBoxRootActions'.
"""
try:
devnull = open(os.devnull, "w")
except IOError:
raise CBEnvironmentError("could not open %s for writing!" % os.devnull)
try:
prog_super = self.prefs["Programs"]["super"]
except KeyError:
raise CBConfigUndefinedError("Programs", "super")
try:
prog_rootactions = self.prefs["Programs"]["CryptoBoxRootActions"]
except KeyError:
raise CBConfigUndefinedError("Programs", "CryptoBoxRootActions")
try:
proc = subprocess.Popen(
shell = False,
stdout = devnull,
stderr = devnull,
args = [prog_super, prog_rootactions, "check"])
except OSError:
raise CBEnvironmentError(
"failed to execute 'super' (%s)" % self.prefs["Programs"]["super"])
proc.wait()
if proc.returncode != 0:
raise CBEnvironmentError("failed to call CryptoBoxRootActions ("
+ prog_rootactions + ") via 'super' - maybe you did not add the "
+ "appropriate line to '/etc/super.tab'?")
def reread_container_list(self):
"""Reinitialize the list of available containers.
This should be called whenever the available containers may have changed.
E.g.: after partitioning and after device addition/removal
"""
self.log.debug("rereading container list")
self.__containers = []
blockdevice.CACHE.reset()
for device in blockdevice.Blockdevices().get_storage_devices():
if self.is_device_allowed(device) and not self.is_config_partition(device):
self.__containers.append(cbxContainer.CryptoBoxContainer(device, self))
## sort by container name
self.__containers.sort(cmp = lambda x, y: x.get_name() < y.get_name() and -1 or 1)
def get_device_busy_state(self, device):
"""Return whether a device is currently marked as busy or not.
The busy flag can be turned off manually (recommended) or the timeout
can expire.
"""
import time
self.__busy_devices_sema.acquire()
## not marked as busy
if not self.__busy_devices.has_key(device):
self.__busy_devices_sema.release()
return False
## timer is expired
if time.time() > self.__busy_devices[device]:
del self.__busy_devices[device]
self.__busy_devices_sema.release()
return False
self.__busy_devices_sema.release()
return True
def set_device_busy_state(self, device, new_state, timeout=300):
"""Mark a device as busy.
This is especially useful during formatting, as this may take a long time.
"""
import time
self.__busy_devices_sema.acquire()
self.log.debug("Turn busy flag %s: %s" % (new_state and "on" or "off", device))
if new_state:
self.__busy_devices[device] = time.time() + timeout
else:
if self.__busy_devices.has_key(device):
del self.__busy_devices[device]
self.log.debug("Current busy flags: %s" % str(self.__busy_devices))
self.__busy_devices_sema.release()
def is_config_partition(self, device):
"""Check if a given partition contains configuration informations.
The check is done by comparing the label of the filesystem with a string.
"""
return device.label == self.prefs["Main"]["ConfigVolumeLabel"]
def is_device_allowed(self, device):
"""check if a device is white-listed for being used as cryptobox containers
also check, if the device is readable and writeable for the current user
"""
import types
## if "device" is a string, then turn it into a blockdevice object
if type(device) == types.StringType:
device = blockdevice.get_blockdevice(device)
if device is None:
return False
allowed = self.prefs["Main"]["AllowedDevices"]
if type(allowed) == types.StringType:
allowed = [allowed]
for devnode in device.devnodes:
if [ a_dev for a_dev in allowed if devnode.startswith(a_dev) ]:
if os.access(devnode, os.R_OK | os.W_OK):
self.log.debug("Adding valid device: %s" % devnode)
## move the device to the first position
device.devnodes.remove(devnode)
device.devnodes.insert(0, devnode)
return True
else:
self.log.debug("Skipping device without read and write " \
+ "permissions: %s" % device.name)
self.log.debug("Skipping unusable device: %s" % device.name)
return False
def get_container_list(self, filter_type=None, filter_name=None):
"retrieve the list of all containers of this cryptobox"
try:
result = self.__containers[:]
if filter_type != None:
if filter_type in range(len(cbxContainer.CONTAINERTYPES)):
return [e for e in self.__containers if e.get_type() == filter_type]
else:
self.log.info("invalid filter_type (%d)" % filter_type)
result.clear()
if filter_name != None:
result = [e for e in self.__containers if e.get_name() == filter_name]
return result
except AttributeError:
return []
def get_container(self, device):
"""retrieve the container element for a device
"device" can be a name (e.g. "dm-0") or a blockdevice object
"""
if isinstance(device, str):
devicename = device
else:
devicename = device.name
all = [e for e in self.get_container_list() if e.device.name == devicename]
if all:
return all[0]
else:
return None
def send_event_notification(self, event, event_infos):
"""call all available scripts in the event directory with some event information"""
event_dir = self.prefs["Locations"]["EventDir"]
try:
event_scripts = os.listdir(event_dir)
except OSError:
self.log.warn("event handler can't access dir: (%s)" % event_dir)
return
for fname in event_scripts:
real_fname = os.path.join(event_dir, fname)
if os.path.isfile(real_fname) and os.access(real_fname, os.X_OK):
cmd_args = [ self.prefs["Programs"]["super"],
self.prefs["Programs"]["CryptoBoxRootActions"],
"event", real_fname, event]
cmd_args.extend(event_infos)
proc = subprocess.Popen(
shell = False,
stdout = subprocess.PIPE,
stderr = subprocess.PIPE,
args = cmd_args)
(stdout, stderr) = proc.communicate()
if proc.returncode != 0:
self.log.warn(
"an event script (%s) failed (exitcode=%d) to handle an event (%s): %s" %
(real_fname, proc.returncode, event, stderr.strip()))
else:
self.log.info("event handler (%s) finished successfully: %s" %
(real_fname, event))
if __name__ == "__main__":
CryptoBox()

View file

@ -0,0 +1,863 @@
#
# Copyright 2006 sense.lab e.V.
#
# This file is part of the CryptoBox.
#
# The CryptoBox is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# The CryptoBox is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with the CryptoBox; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
"""Manage the configuration of a CryptoBox
"""
__revision__ = "$Id$"
from cryptobox.core.exceptions import *
import logging
import subprocess
import os
import configobj, validate
import syslog
CONF_LOCATIONS = [
"./cryptobox.conf",
"~/.cryptobox.conf",
"/etc/cryptobox-server/cryptobox.conf"]
VOLUMESDB_FILE = "cryptobox_volumes.db"
PLUGINCONF_FILE = "cryptobox_plugins.conf"
USERDB_FILE = "cryptobox_users.db"
## allow to retrieve the most recently created setting object
CURRENT_SETTING = []
def get_current_settings():
"""return the most recently created setting object
"""
if not CURRENT_SETTING:
return None
else:
return CURRENT_SETTING[0]
class CryptoBoxSettings:
"""Manage the various configuration files of the CryptoBox
"""
def __init__(self, config_file=None):
self.__is_initialized = False
self.log = logging.getLogger("CryptoNAS")
config_file = self.__get_config_filename(config_file)
self.log.info("loading config file: %s" % config_file)
self.prefs = self.__get_preferences(config_file)
if not "PluginSettings" in self.prefs:
self.prefs["PluginSettings"] = {}
self.__validate_config()
self.__configure_log_handler()
self.__check_unknown_preferences()
self.prepare_partition()
self.volumes_db = self.__get_volumes_database()
self.plugin_conf = self.__get_plugin_config()
self.user_db = self.__get_user_db()
self.misc_files = []
self.reload_misc_files()
self.__is_initialized = True
CURRENT_SETTING.insert(0, self)
def reload_misc_files(self):
"""Call this method after creating or removing a 'misc' configuration file
"""
self.misc_files = self.__get_misc_files()
def write(self):
"""
write all local setting files including the content of the "misc" subdirectory
"""
status = True
try:
self.volumes_db.write()
except IOError:
self.log.warn("Could not save the volume database")
status = False
try:
self.plugin_conf.write()
except IOError:
self.log.warn("Could not save the plugin configuration")
status = False
try:
self.user_db.write()
except IOError:
self.log.warn("Could not save the user database")
status = False
for misc_file in self.misc_files:
if not misc_file.save():
self.log.warn("Could not save a misc setting file (%s)" % misc_file.filename)
status = False
return status
def get_misc_config_filename(self, name):
"""Return an absolute filename for a given filename 'name'
'name' should not contain slashes (no directory part!)
"""
return os.path.join(self.prefs["Locations"]["SettingsDir"], "misc", name)
def create_misc_config_file(self, name, content):
"""Create a new configuration file in the 'settings' directory
"name" should be the basename (without a directory)
"content" will be directly written to the file
this method may throw an IOException
"""
misc_conf_file = self.get_misc_config_filename(name)
misc_conf_dir = os.path.dirname(misc_conf_file)
if not os.path.isdir(misc_conf_dir):
try:
os.mkdir(misc_conf_dir)
except OSError, err_msg:
## the caller expects only IOError
raise IOError, err_msg
cfile = open(misc_conf_file, "w")
try:
cfile.write(content)
except IOError:
cfile.close()
raise
cfile.close()
## reread all misc files automatically - this should be ok
self.reload_misc_files()
def requires_partition(self):
return bool(self.prefs["Main"]["UseConfigPartition"])
def get_active_partition(self):
"""Return the currently active cnfiguration partition.
"""
settings_dir = self.prefs["Locations"]["SettingsDir"]
if not os.path.ismount(settings_dir):
return None
for line in file("/proc/mounts"):
fields = line.split(" ")
mount_dir = fields[1]
fs_type = fields[2]
if fs_type == "tmpfs":
## skip ramdisks - these are not really "active partitions"
continue
try:
if os.path.samefile(mount_dir, settings_dir):
return fields[0]
except OSError:
pass
## no matching entry found
return None
def mount_partition(self):
"""Mount a config partition.
"""
self.log.debug("trying to mount configuration partition")
if not self.requires_partition():
self.log.warn("mountConfigPartition: configuration partition is "
+ "not required - mounting anyway")
if self.get_active_partition():
self.log.warn("mountConfigPartition: configuration partition already "
+ "mounted - not mounting again")
return False
conf_partitions = self.get_available_partitions()
mount_dir = self.prefs["Locations"]["SettingsDir"]
if not conf_partitions:
## return, if tmpfs is already mounted
if os.path.ismount(mount_dir):
self.log.info("A ramdisk seems to be already mounted as a config " \
+ "partition - doing nothing ...")
## return without any actions
return True
self.log.warn("no configuration partition found - you have to create "
+ "it first")
## mount tmpfs instead to provide a place for storing stuff
## "_tmpfs_" as parameter for mount is interpreted as a magic word
## by CryptoBoxRootActions
proc = subprocess.Popen(
shell = False,
stdout = subprocess.PIPE,
stderr = subprocess.PIPE,
args = [
self.prefs["Programs"]["super"],
self.prefs["Programs"]["CryptoBoxRootActions"],
"program", "mount",
"_tmpfs_",
mount_dir ])
(stdout, stderr) = proc.communicate()
if proc.returncode != 0:
self.log.error("Failed to mount a ramdisk for storing settings: %s" \
% stderr)
return False
self.log.info("Ramdisk (tmpfs) mounted as config partition ...")
else:
partition = conf_partitions[0]
## umount tmpfs in case it is active
if os.path.ismount(mount_dir):
self.umount_partition()
proc = subprocess.Popen(
shell = False,
stdout = subprocess.PIPE,
stderr = subprocess.PIPE,
args = [
self.prefs["Programs"]["super"],
self.prefs["Programs"]["CryptoBoxRootActions"],
"program", "mount",
partition,
mount_dir ])
(stdout, stderr) = proc.communicate()
if proc.returncode != 0:
self.log.error("Failed to mount the configuration partition (%s): %s" % \
(partition, stderr))
return False
self.log.info("configuration partition mounted: %s" % partition)
## write config files (not during first initialization of this object)
if self.__is_initialized:
self.write()
return True
def umount_partition(self):
"""Umount the currently active configuration partition.
"""
mount_dir = self.prefs["Locations"]["SettingsDir"]
if not os.path.ismount(mount_dir):
self.log.warn("umountConfigPartition: no configuration partition mounted")
return False
self.reload_misc_files()
proc = subprocess.Popen(
shell = False,
stdout = subprocess.PIPE,
stderr = subprocess.PIPE,
args = [
self.prefs["Programs"]["super"],
self.prefs["Programs"]["CryptoBoxRootActions"],
"program", "umount",
mount_dir ])
(stdout, stderr) = proc.communicate()
if proc.returncode != 0:
self.log.error("Failed to unmount the configuration partition: %s" % stderr)
return False
self.log.info("configuration partition unmounted")
return True
def get_available_partitions(self):
"""returns a sequence of found config partitions"""
self.log.debug("Retrieving available configuration partitions ...")
proc = subprocess.Popen(
shell = False,
stdout = subprocess.PIPE,
stderr = subprocess.PIPE,
args = [
self.prefs["Programs"]["blkid"],
"-c", os.path.devnull,
"-t", "LABEL=%s" % self.prefs["Main"]["ConfigVolumeLabel"] ])
(output, error) = proc.communicate()
if proc.returncode == 2:
self.log.info("No configuration partitions found")
return []
elif proc.returncode == 4:
self.log.warn("Failed to call 'blkid' for unknown reasons.")
return []
elif proc.returncode == 0:
if output:
return [e.strip().split(":", 1)[0] for e in output.splitlines()]
else:
return []
else:
self.log.warn("Unknown exit code of 'blkid': %d - %s" \
% (proc.returncode, error))
def prepare_partition(self):
"""Mount a config partition if necessary.
"""
if self.requires_partition() and not self.get_active_partition():
self.mount_partition()
def __getitem__(self, key):
"""redirect all requests to the 'prefs' attribute"""
return self.prefs[key]
def __get_preferences(self, config_file):
"""Load the CryptoBox configuration.
"""
import StringIO
config_rules = StringIO.StringIO(self.validation_spec)
try:
prefs = configobj.ConfigObj(config_file, configspec=config_rules)
if prefs:
self.log.info("found config: %s" % prefs.items())
else:
raise CBConfigUnavailableError(
"failed to load the config file: %s" % config_file)
except IOError, err_msg:
raise CBConfigUnavailableError(
"unable to open the config file (%s): %s" % \
(config_file, err_msg))
except configobj.ConfigObjError, err_msg:
raise CBConfigError("failed to load config file (%s): %s" % \
(config_file, err_msg))
return prefs
def __validate_config(self):
"""Check the configuration settings and cast value types.
"""
result = self.prefs.validate(CryptoBoxSettingsValidator(), preserve_errors=True)
error_list = configobj.flatten_errors(self.prefs, result)
if not error_list:
return
error_msgs = []
for sections, key, text in error_list:
section_name = "->".join(sections)
if not text:
error_msg = "undefined configuration value (%s) in section '%s'" % \
(key, section_name)
else:
error_msg = "invalid configuration value (%s) in section '%s': %s" % \
(key, section_name, text)
error_msgs.append(error_msg)
raise CBConfigError, "\n".join(error_msgs)
def __check_unknown_preferences(self):
"""Check the configuration file for unknown settings to avoid spelling mistakes.
"""
import StringIO
config_rules = configobj.ConfigObj(StringIO.StringIO(self.validation_spec),
list_values=False)
self.__recursive_section_check("", self.prefs, config_rules)
def __recursive_section_check(self, section_path, section_config, section_rules):
"""should be called by '__check_unknown_preferences' for every section
sends a warning message to the logger for every undefined (see validation_spec)
configuration setting
"""
for section in section_config.keys():
element_path = section_path + section
if section in section_rules.keys():
if isinstance(section_config[section], configobj.Section):
if isinstance(section_rules[section], configobj.Section):
self.__recursive_section_check(element_path + "->",
section_config[section], section_rules[section])
else:
self.log.warn("configuration setting should be a value "
+ "instead of a section name: %s" % element_path)
else:
if not isinstance(section_rules[section], configobj.Section):
pass # good - the setting is valid
else:
self.log.warn("configuration setting should be a section "
+ "name instead of a value: %s" % element_path)
elif element_path.startswith("PluginSettings->"):
## ignore plugin settings
pass
else:
self.log.warn("unknown configuration setting: %s" % element_path)
def __get_plugin_config(self):
"""Load the plugin configuration file if it exists.
"""
import StringIO
plugin_rules = StringIO.StringIO(self.pluginValidationSpec)
try:
try:
plugin_conf_file = os.path.join(
self.prefs["Locations"]["SettingsDir"], PLUGINCONF_FILE)
except KeyError:
raise CBConfigUndefinedError("Locations", "SettingsDir")
except SyntaxError:
raise CBConfigInvalidValueError("Locations", "SettingsDir", plugin_conf_file,
"failed to interprete the filename of the plugin config file "
+ "correctly (%s)" % plugin_conf_file)
## create plugin_conf_file if necessary
if os.path.exists(plugin_conf_file):
plugin_conf = configobj.ConfigObj(plugin_conf_file, configspec=plugin_rules)
else:
try:
plugin_conf = configobj.ConfigObj(plugin_conf_file,
configspec=plugin_rules, create_empty=True)
except IOError:
plugin_conf = configobj.ConfigObj(configspec=plugin_rules)
plugin_conf.filename = plugin_conf_file
## validate and convert values according to the spec
plugin_conf.validate(validate.Validator())
return plugin_conf
def __get_volumes_database(self):
"""Load the volume database file if it exists.
"""
#TODO: add configuration specification and validation [a]: -v
try:
try:
conf_file = os.path.join(
self.prefs["Locations"]["SettingsDir"], VOLUMESDB_FILE)
except KeyError:
raise CBConfigUndefinedError("Locations", "SettingsDir")
except SyntaxError:
raise CBConfigInvalidValueError("Locations", "SettingsDir", conf_file,
"failed to interprete the filename of the volume database "
+ "correctly (%s)" % conf_file)
## create conf_file if necessary
if os.path.exists(conf_file):
conf = configobj.ConfigObj(conf_file)
else:
try:
conf = configobj.ConfigObj(conf_file, create_empty=True)
except IOError:
conf = configobj.ConfigObj()
conf.filename = conf_file
return conf
def __get_user_db(self):
"""Load the user database file if it exists.
"""
import StringIO, sha
user_db_rules = StringIO.StringIO(self.userDatabaseSpec)
try:
try:
user_db_file = os.path.join(
self.prefs["Locations"]["SettingsDir"], USERDB_FILE)
except KeyError:
raise CBConfigUndefinedError("Locations", "SettingsDir")
except SyntaxError:
raise CBConfigInvalidValueError("Locations", "SettingsDir", user_db_file,
"failed to interprete the filename of the users database file "
+ "correctly (%s)" % user_db_file)
## create user_db_file if necessary
if os.path.exists(user_db_file):
user_db = configobj.ConfigObj(user_db_file, configspec=user_db_rules)
else:
try:
user_db = configobj.ConfigObj(user_db_file,
configspec=user_db_rules, create_empty=True)
except IOError:
user_db = configobj.ConfigObj(configspec=user_db_rules)
user_db.filename = user_db_file
## validate and set default value for "admin" user
user_db.validate(validate.Validator())
## define password hash function - never use "sha" directly - SPOT
user_db.get_digest = lambda password: sha.new(password).hexdigest()
return user_db
def __get_misc_files(self):
"""Load miscelleanous configuration files.
e.g.: an ssl certificate, ...
"""
misc_dir = os.path.join(self.prefs["Locations"]["SettingsDir"], "misc")
if (not os.path.isdir(misc_dir)) or (not os.access(misc_dir, os.X_OK)):
return []
misc_files = []
for root, dirs, files in os.walk(misc_dir):
misc_files.extend([os.path.join(root, e) for e in files])
return [MiscConfigFile(os.path.join(misc_dir, f), self.log) for f in misc_files]
def __get_config_filename(self, config_file):
"""Search for the configuration file.
"""
import types
if config_file is None:
# no config file was specified - we will look for it in the ususal locations
conf_file_list = [os.path.expanduser(f)
for f in CONF_LOCATIONS
if os.path.exists(os.path.expanduser(f))]
if not conf_file_list:
# no possible config file found in the usual locations
raise CBConfigUnavailableError()
config_file = conf_file_list[0]
else:
# a config file was specified (e.g. via command line)
if type(config_file) != types.StringType:
raise CBConfigUnavailableError(
"invalid config file specified: %s" % config_file)
if not os.path.exists(config_file):
raise CBConfigUnavailableError(
"could not find the specified configuration file (%s)" % config_file)
return config_file
def __configure_log_handler(self):
"""Configure the log handler of the CryptoBox according to the config.
"""
log_level = self.prefs["Log"]["Level"].upper()
log_level_avail = ["DEBUG", "INFO", "WARN", "ERROR"]
if not log_level in log_level_avail:
raise CBConfigInvalidValueError("Log", "Level", log_level,
"invalid log level: only %s are allowed" % str(log_level_avail))
log_destination = self.prefs["Log"]["Destination"].lower()
## keep this in sync with the spec and the log_destination branches below
log_dest_avail = ['file', 'syslog']
if not log_destination in log_dest_avail:
raise CBConfigInvalidValueError("Log", "Destination", log_destination,
"invalid log destination: only %s are allowed" % str(log_dest_avail))
if log_destination == 'file':
try:
log_handler = logging.FileHandler(self.prefs["Log"]["Details"])
except IOError:
raise CBEnvironmentError("could not write to log file (%s)" % \
self.prefs["Log"]["Details"])
log_handler.setFormatter(
logging.Formatter('%(asctime)s %(levelname)s: %(message)s'))
elif log_destination == 'syslog':
log_facility = self.prefs["Log"]["Details"].upper()
log_facil_avail = ['KERN', 'USER', 'MAIL', 'DAEMON', 'AUTH', 'SYSLOG',
'LPR', 'NEWS', 'UUCP', 'CRON', 'AUTHPRIV', 'LOCAL0', 'LOCAL1',
'LOCAL2', 'LOCAL3', 'LOCAL4', 'LOCAL5', 'LOCAL6', 'LOCAL7']
if not log_facility in log_facil_avail:
raise CBConfigInvalidValueError("Log", "Details", log_facility,
"invalid log details for 'syslog': only %s are allowed" % \
str(log_facil_avail))
## retrive the log priority from the syslog module
log_handler = LocalSysLogHandler("CryptoNAS",
getattr(syslog, 'LOG_%s' % log_facility))
log_handler.setFormatter(
logging.Formatter('%(asctime)s CryptoNAS %(levelname)s: %(message)s'))
else:
## this should never happen - we just have it in case someone forgets
## to update the spec, the 'log_dest_avail' or the above branches
raise CBConfigInvalidValueError("Log", "Destination", log_destination,
"invalid log destination: only %s are allowed" % str(log_dest_avail))
cbox_log = logging.getLogger("CryptoNAS")
## remove previous handlers (from 'basicConfig')
cbox_log.handlers = []
## add new one
cbox_log.addHandler(log_handler)
## do not call parent's handlers
cbox_log.propagate = False
## 'log_level' is a string -> use 'getattr'
cbox_log.setLevel(getattr(logging, log_level))
## the logger named "CryptoNAS" is configured now
# We can't use real default values for the "directory_exists" tests below.
# Otherwise configobj complains about the "invalid" default value (if the
# directory does not exist) - even if the default value is not used.
# Up to configobj version 4.3.2 this workaround was not necessary.
validation_spec = """
[Main]
AllowedDevices = listOfDevices(default="/dev/invalid")
DefaultVolumePrefix = string(min=1)
DefaultCipher = string(default="aes-cbc-essiv:sha256")
ConfigVolumeLabel = string(min=1, default="cbox_config")
UseConfigPartition = integer(min=0, max=1, default=0)
DisabledPlugins = listOfPlugins(default=list())
[Locations]
MountParentDir = directoryMountExists(default=None)
SettingsDir = directorySettingsExists(default=None)
TemplateDir = directoryTemplateExists(default=None)
DocDir = directoryDocExists(default=None)
PluginDir = listOfExistingPluginDirectories(default=None)
EventDir = string(default="/etc/cryptobox-server/events.d")
[Log]
Level = option("debug", "info", "warn", "error", default="warn")
Destination = option("file", "syslog", default="file")
Details = string(min=1, default="/var/log/cryptobox-server/cryptobox.log")
[WebSettings]
Stylesheet = string(min=1)
Languages = listOfLanguages(default="en")
[Programs]
cryptsetup = fileExecutable(default="/sbin/cryptsetup")
mkfs = fileExecutable(default="/sbin/mkfs")
nice = fileExecutable(default="/usr/bin/nice")
blkid = fileExecutable(default="/sbin/blkid")
blockdev = fileExecutable(default="/sbin/blockdev")
mount = fileExecutable(default="/bin/mount")
umount = fileExecutable(default="/bin/umount")
super = fileExecutable(default="/usr/bin/super")
# this is the "program" name as defined in /etc/super.tab
CryptoBoxRootActions = string(min=1)
[PluginSettings]
[[__many__]]
"""
pluginValidationSpec = """
[__many__]
visibility = boolean(default=None)
requestAuth = boolean(default=None)
rank = integer(default=None)
"""
userDatabaseSpec = """
[admins]
admin = string(default=d033e22ae348aeb5660fc2140aec35850c4da997)
"""
class CryptoBoxSettingsValidator(validate.Validator):
"""Some custom configuration check functions.
"""
def __init__(self):
validate.Validator.__init__(self)
self.functions["directoryMountExists"] = \
self.check_mount_directory_exists
self.functions["directorySettingsExists"] = \
self.check_settings_directory_exists
self.functions["directoryTemplateExists"] = \
self.check_template_directory_exists
self.functions["directoryDocExists"] = \
self.check_doc_directory_exists
self.functions["fileExecutable"] = self.check_file_executable
self.functions["fileWriteable"] = self.check_file_writeable
self.functions["listOfExistingPluginDirectories"] \
= self.check_existing_plugin_directories
self.functions["listOfLanguages"] = self.list_languages
self.functions["listOfDevices"] = self.list_devices
self.functions["listOfPlugins"] = self.list_plugins
def check_mount_directory_exists(self, value):
"""Is the mount directory accessible?
"""
# use the default path, if the setting is missing
if value is None:
value = "/var/cache/cryptobox-server/mnt"
dir_path = os.path.abspath(value)
if not os.path.isdir(dir_path):
raise validate.VdtValueError("%s (not found)" % value)
if not os.access(dir_path, os.X_OK):
raise validate.VdtValueError("%s (access denied)" % value)
return dir_path
def check_settings_directory_exists(self, value):
"""Is the settings directory accessible?
"""
# use the default path, if the setting is missing
if value is None:
value = "/var/cache/cryptobox-server/settings"
dir_path = os.path.abspath(value)
if not os.path.isdir(dir_path):
raise validate.VdtValueError("%s (not found)" % value)
if not os.access(dir_path, os.X_OK):
raise validate.VdtValueError("%s (access denied)" % value)
return dir_path
def check_template_directory_exists(self, value):
"""Is the template directory accessible?
"""
# use the default path, if the setting is missing
if value is None:
value = "/usr/share/cryptobox-server/templates"
dir_path = os.path.abspath(value)
if not os.path.isdir(dir_path):
raise validate.VdtValueError("%s (not found)" % value)
if not os.access(dir_path, os.X_OK):
raise validate.VdtValueError("%s (access denied)" % value)
return dir_path
def check_doc_directory_exists(self, value):
"""Is the documentation directory accessible?
"""
# use the default path, if the setting is missing
if value is None:
value = "/usr/share/doc/cryptobox-server/html"
dir_path = os.path.abspath(value)
if not os.path.isdir(dir_path):
raise validate.VdtValueError("%s (not found)" % value)
if not os.access(dir_path, os.X_OK):
raise validate.VdtValueError("%s (access denied)" % value)
return dir_path
def check_file_executable(self, value):
"""Is the file executable?
"""
file_path = os.path.abspath(value)
if not os.path.isfile(file_path):
raise validate.VdtValueError("%s (not found)" % value)
if not os.access(file_path, os.X_OK):
raise validate.VdtValueError("%s (access denied)" % value)
return file_path
def check_file_writeable(self, value):
"""Is the file writeable?
"""
file_path = os.path.abspath(value)
if os.path.isfile(file_path):
if not os.access(file_path, os.W_OK):
raise validate.VdtValueError("%s (not found)" % value)
else:
parent_dir = os.path.dirname(file_path)
if os.path.isdir(parent_dir) and os.access(parent_dir, os.W_OK):
return file_path
raise validate.VdtValueError("%s (directory does not exist)" % value)
return file_path
def check_existing_plugin_directories(self, value):
"""Are these directories accessible?
"""
# return the default value, if the settings is missing
if value is None:
value = ["/usr/share/cryptobox-server/plugins"]
if not value:
raise validate.VdtValueError("no plugin directory specified")
if not isinstance(value, list):
value = [value]
result = []
for one_dir in value:
dir_path = os.path.abspath(one_dir)
if not os.path.isdir(dir_path):
raise validate.VdtValueError(
"%s (plugin directory not found)" % one_dir)
if not os.access(dir_path, os.X_OK):
raise validate.VdtValueError(
"%s (access denied for plugin directory)" % one_dir)
result.append(dir_path)
return result
def list_languages(self, langs):
"""Return languages as a list.
"""
if not langs:
raise validate.VdtValueError("no language specified")
if not isinstance(langs, list):
langs = [langs]
return langs
def list_devices(self, devices):
"""Return devices as a list.
"""
if not devices:
raise validate.VdtValueError("no device specified")
if not isinstance(devices, list):
devices = [devices]
return devices
def list_plugins(self, plugins):
"""Return plugin names as a list.
"""
if not plugins:
plugins = []
if isinstance(plugins, basestring):
plugins = [plugins]
elif not isinstance(plugins, list):
raise validate.VdtValueError("invalid list of disabled plugins")
return plugins
class MiscConfigFile:
"""all other config files (e.g. a ssl certificate) to be stored"""
maxSize = 20480
def __init__(self, filename, logger):
self.filename = filename
self.log = logger
self.content = None
self.load()
def load(self):
"""Load a configuration file into memory.
"""
fdesc = open(self.filename, "rb")
## limit the maximum size
self.content = fdesc.read(self.maxSize)
if fdesc.tell() == self.maxSize:
self.log.warn("file in misc settings directory (" + str(self.filename) \
+ ") is bigger than allowed (" + str(self.maxSize) + ")")
fdesc.close()
def save(self):
"""Save a configuration file to disk.
"""
## overriding of ro-files is not necessary (e.g. samba-include.conf)
if os.path.exists(self.filename) and not os.access(self.filename, os.W_OK):
return True
save_dir = os.path.dirname(self.filename)
## create the directory, if necessary
if not os.path.isdir(save_dir):
try:
os.mkdir(save_dir)
except IOError:
return False
## save the content of the file
try:
fdesc = open(self.filename, "wb")
except IOError:
return False
try:
fdesc.write(self.content)
fdesc.close()
return True
except IOError:
fdesc.close()
return False
class LocalSysLogHandler(logging.Handler):
"""Pass logging messages to a local syslog server without unix sockets.
derived from: logging.SysLogHandler
"""
def __init__(self, prepend='CryptoBox', facility=syslog.LOG_USER):
logging.Handler.__init__(self)
self.formatter = None
self.facility = facility
syslog.openlog(prepend, 0, facility)
def close(self):
"""close the syslog connection
"""
syslog.closelog()
logging.Handler.close(self)
def emit(self, record):
"""format and send the log message
"""
msg = "%s: %s" % (record.levelname, record.getMessage())
try:
syslog.syslog(record.levelno, msg)
except Exception:
self.handleError(record)

View file

@ -0,0 +1,7 @@
"""Features may be easily added to the CryptoBox.
"""
__revision__ = "$Id$"
__all__ = [ 'base', 'manage' ]

View file

@ -0,0 +1,313 @@
# $Id$
#
# parent class for all plugins of the CryptoBox
#
# Copyright 2006 sense.lab e.V.
#
# This file is part of the CryptoBox.
#
# The CryptoBox is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# The CryptoBox is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with the CryptoBox; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
"""All features should inherit from this class.
"""
__revision__ = "$Id$"
import os
import cherrypy
import imp
class CryptoBoxPlugin:
"""The base class of all features.
"""
## default capability is "system" - the other supported capability is: "volume"
plugin_capabilities = [ "system" ]
## where should the plugin be visible by default?
plugin_visibility = [ "preferences" ]
## does this plugin require admin authentification?
request_auth = False
## default rank (0..100) of the plugin in listings (lower value means higher priority)
rank = 80
## default icon of this plugin (relative path)
default_icon_filename = "plugin_icon"
## fallback icon file (in the common plugin directory)
fallback_icon_filename = "plugin_icon_unknown"
def __init__(self, cbox, plugin_dir, site_class=None):
if cbox:
self.cbox = cbox
else:
## define empty dummy class as a replacement for a cbox instance
class CBoxPrefs(dict):
plugin_conf = {}
class CBoxLogger:
def debug(self, text):
pass
info = debug
warn = debug
error = debug
class CBoxMinimal:
prefs = CBoxPrefs()
prefs["PluginSettings"] = {}
log = CBoxLogger()
self.cbox = CBoxMinimal()
self.hdf = {}
self.plugin_dir = plugin_dir
self.hdf_prefix = "Data.Plugins.%s." % self.get_name()
self.site = site_class
if not self.get_name() in self.cbox.prefs.plugin_conf:
## initialize plugin configuration
self.cbox.prefs.plugin_conf[self.get_name()] = {}
self.prefs = self.cbox.prefs.plugin_conf[self.get_name()]
self.cbox.log.debug("Plugin '%s': settings " % self.get_name() + \
"loaded from plugin configuration file: %s" % str(self.prefs))
if self.get_name() in self.cbox.prefs["PluginSettings"]:
self.defaults = self.cbox.prefs["PluginSettings"][self.get_name()]
else:
self.defaults = {}
self.cbox.log.debug("Plugin '%s': configuration " % self.get_name() + \
"settings imported from global config file: %s" % str(self.defaults))
## load a possibly existing "root_action.py" scripts as self.root_action
if os.path.isfile(os.path.join(self.plugin_dir, "root_action.py")):
self.root_action = imp.load_source("root_action",
os.path.join(self.plugin_dir, "root_action.py"))
else:
self.root_action = None
def do_action(self, **args):
"""Override do_action with your plugin code
"""
raise Exception, \
"undefined action handler ('do_action') in plugin '%s'" % self.get_name()
def get_status(self):
"""you should override this, to supply useful state information
"""
raise Exception, \
"undefined state handler ('get_status') in plugin '%s'" % self.get_name()
def is_useful(self, device):
"""Return if this plugin is useful for a specific device.
This should only be used for volume plugins. Nice for output filtering.
"""
return True
def get_name(self):
"""the name of the python file (module) should be the name of the plugin
"""
return self.__module__
def handle_event(self, event_name, event_info=None):
"""Any plugin that wants to define event actions may override this.
currently only the following events are defined:
- "bootup" (the cryptobox server is starting)
- "shutdown" (the cryptobox server is stopping)
"""
pass
def get_warnings(self):
"""Return a priority and a warning, if the plugin detects a misconfiguration
valid prioritie ranges are:
- 80..99 loss of data is possible
- 60..79 the cryptobox will probably not work at all
- 40..59 important features will propably not work
- 20..39 heavy security risk OR broken recommended features
- 00..19 possible mild security risk OR broken/missing optional features
"""
return []
@cherrypy.expose
def download(self, **kargs):
"""Deliver a downloadable file - by default return nothing
"""
return ""
@cherrypy.expose
def get_icon(self, image=None, **kargs):
"""return the image data of the icon of the plugin
the parameter 'image' may be used for alternative image locations (relative
to the directory of the plugin)
'**kargs' is necessary, as a 'weblang' attribute may be specified (and ignored)
"""
import re
icon_ext = self.__get_default_icon_extension()
if (image is None) or (not re.match(r'[\w\-\.]*$', image)):
plugin_icon_file = os.path.join(self.plugin_dir,
"%s.%s" % (self.default_icon_filename, icon_ext))
else:
plugin_icon_file = os.path.join(self.plugin_dir, image)
## check if we can find the fallback plugin icon in one of the
## plugin directories
if not os.access(plugin_icon_file, os.R_OK):
for ppath in self.cbox.prefs["Locations"]["PluginDir"]:
plugin_icon_file = os.path.join(ppath,
"%s.%s" % (self.fallback_icon_filename, icon_ext))
if plugin_icon_file:
break
return cherrypy.lib.cptools.serveFile(plugin_icon_file)
def __get_default_icon_extension(self):
"""Return 'png' or 'gif' depending on the 'User-Agent' request header
This is useful, as IE 5.5/6.0 does not render transparent png graphics properly
Internet Explorer 5.5/6.0: return 'gif'
everything else: return 'png'
"""
if ("User-Agent" in cherrypy.request.headers) and \
((cherrypy.request.headers["User-Agent"].find("MSIE 5.5;") != -1) or \
(cherrypy.request.headers["User-Agent"].find("MSIE 6.0;") != -1)):
return "gif"
else:
return "png"
def get_template_filename(self, template_name):
"""return the filename of the template, if it is part of this plugin
use this function to check, if the plugin provides the specified template
"""
result_file = os.path.join(self.plugin_dir, template_name + ".cs")
if os.access(result_file, os.R_OK) and os.path.isfile(result_file):
return result_file
else:
return None
def get_language_data(self):
"""Retrieve the language data of the feature.
Typically this is the content of the language.hdf file as a HDF object.
"""
import neo_cgi, neo_util
lang_hdf = neo_util.HDF()
lang_file = os.path.join(self.plugin_dir, 'language.hdf')
try:
lang_hdf.readFile(lang_file)
except (neo_util.Error, neo_util.ParseError):
self.cbox.log.error("failed to load language file (%s) of plugin (%s):" % \
(lang_file, self.get_name()))
return lang_hdf
def load_dataset(self, hdf):
"""Add the local values of the feature to the hdf dataset.
"""
for (key, value) in self.hdf.items():
hdf.setValue(key, str(value))
## add the stylesheet file if it exists
css_file = os.path.join(self.plugin_dir, self.get_name() + ".css")
if os.path.exists(css_file):
hdf.setValue("Data.StylesheetFiles.%s" % self.get_name(), css_file)
def is_auth_required(self):
"""check if this plugin requires authentication
first step: check plugin configuration
second step: check default value of plugin
"""
if ("requestAuth" in self.prefs) and (not self.prefs["requestAuth"] is None):
return bool(self.prefs["requestAuth"])
else:
return self.request_auth
def is_enabled(self):
"""check if this plugin is enabled
first step: check plugin configuration
second step: check default value of plugin
"""
if ("visibility" in self.prefs) and (not self.prefs["visibility"] is None):
return bool(self.prefs["visibility"])
else:
return bool(self.plugin_visibility)
def get_rank(self):
"""check the rank of this plugin
first step: check plugin configuration
second step: check default value of plugin
"""
if ("rank" in self.prefs) and (not self.prefs["rank"] is None):
return int(self.prefs["rank"])
else:
return int(self.rank)
def set_rank(self, rank):
"""change the current rank of the plugin in plugin_conf
'rank' should be an integer
"""
self.prefs["rank"] = rank
def get_visibility(self):
"""Check which visibility flags of the plugin are set.
"""
try:
if self.prefs["visibility"] is None:
return self.plugin_visibility[:]
return self.prefs["visibility"]
except KeyError:
return self.plugin_visibility
def reset(self):
"""Reinitialize the plugin.
This function should be called before every run
"""
self.hdf = {}
def get_test_class(self):
"""Return the unittest class of the feature.
"""
import imp
pl_file = os.path.join(self.plugin_dir, "unittests.py")
if os.access(pl_file, os.R_OK) and os.path.isfile(pl_file):
try:
return imp.load_source("unittests_%s" % self.get_name(), pl_file).unittests
except AttributeError:
pass
try:
self.cbox.log.info("could not load unittests for plugin: %s" % \
self.get_name())
except AttributeError:
pass
return None

View file

@ -0,0 +1,107 @@
# $Id$
#
# Copyright 2006 sense.lab e.V.
#
# This file is part of the CryptoBox.
#
# The CryptoBox is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# The CryptoBox is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with the CryptoBox; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
"""Manages the pluggable features of the CryptoBox.
"""
__revision__ = "$Id$"
import os
class PluginManager:
"""manage available plugins"""
def __init__(self, cbox, plugin_dirs=".", site_class=None):
self.cbox = cbox
self.site = site_class
if hasattr(plugin_dirs, "__iter__"):
self.plugin_dirs = [os.path.abspath(d) for d in plugin_dirs]
else:
self.plugin_dirs = [os.path.abspath(plugin_dirs)]
self.plugin_list = self.__get_all_plugins()
def get_plugins(self):
"""Return a list of all feature instances.
"""
return self.plugin_list[:]
def get_plugin(self, name):
"""Return the specified feature as an instance.
"""
for plugin in self.plugin_list[:]:
if plugin.get_name() == name:
return plugin
return None
def __get_all_plugins(self):
"""Return all available features as instances.
"""
plist = []
for plfile in self.__get_plugin_files():
plist.append(self.__get_plugin_class(plfile))
return plist
def __get_plugin_class(self, plfile):
"""Return a instance object of the give feature.
"""
import imp
name = os.path.basename(plfile)[:-3]
try:
pl_class = getattr(imp.load_source(name, plfile), name)
except AttributeError:
return None
return pl_class(self.cbox, os.path.dirname(plfile), self.site)
def __get_plugin_files(self):
"""Retrieve all python files that may potentially be a feature.
"""
result = []
if self.cbox and self.cbox.prefs["Main"]["DisabledPlugins"]:
disabled = self.cbox.prefs["Main"]["DisabledPlugins"]
else:
disabled = []
for pdir in [os.path.abspath(e) for e in self.plugin_dirs
if os.access(e, os.R_OK) and os.path.isdir(e)]:
for plname in [f for f in os.listdir(pdir)]:
if plname in disabled:
if self.cbox:
self.cbox.log.info(
"Skipping plugin '%s' (disabled via config)" % plname)
continue
pldir = os.path.join(pdir, plname)
plfile = os.path.join(pldir, plname + ".py")
if os.path.isfile(plfile) and os.access(plfile, os.R_OK):
result.append(plfile)
return result
if __name__ == "__main__":
MANAGER = PluginManager(None, "../plugins")
for one_plugin in MANAGER.get_plugins():
if not one_plugin is None:
print "Plugin: %s" % one_plugin.get_name()

View file

@ -0,0 +1,8 @@
"""Some unittests for the CryptoBox.
"""
__revision__ = "$Id$"
__all__ = [ 'test.cryptobox', 'test.cryptoboxtools', 'test.plugins', 'test.websites',
'base', 'tools' ]

View file

@ -0,0 +1,161 @@
#
# Copyright 2006 sense.lab e.V.
#
# This file is part of the CryptoBox.
#
# The CryptoBox is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# The CryptoBox is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with the CryptoBox; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
"""
this module contains all super classes for different tests
just inherit one of its classes and add some test functions
All testclasses based on the classes of this module may assume the following:
- there is one valid parent blockdevice (self.blockdevice)
- the blockdevice contains exactly two partitions:
- part1: vfat, 50MB, formatted (devicename: self.device)
- part2: ext3, 50MB, formatted
- self.blockdevice_html and self.device_html are url-escaped strings
- all databases (pluginconf, volume names, users) are empty
Additional hints:
- if the current state of self.device is important, then you should umount
it before any of these tests: cryptobox.tests.tools.umount(self.device)
"""
__revision__ = "$Id$"
import unittest
import twill
import cherrypy
import cryptobox.web.sites
import cryptobox.tests.base
## commands api: http://twill.idyll.org/commands.html
CBXHOST = "localhost"
CBXPORT = 8081
CBX_URL = "http://%s:%d/" % (CBXHOST, CBXPORT)
LOG_FILE = "/tmp/cryptobox-twill.log"
WEBLOG_FILE = "/tmp/cryptobox-cherrypy.log"
CONF_FILE = 'cryptobox-unittests.conf'
class CommonTestClass(unittest.TestCase):
"""Super class of all tests of the CryptoBox
prepare environment, set some values ...
"""
def __init__(self, methodName='runTest'):
unittest.TestCase.__init__(self, methodName)
import cryptobox.core.settings as cbox_settings
import cryptobox.tests.tools as testtools
import os
## search for a usable block device
## use /dev/ubd? if possible - otherwise /dev/hd?
## so it will be possible to use these tests inside of a uml
self.DRIVE_TO_CLOBBER = os.environ.get("CNAS_UTEST_CLOBBER")
self.blockdevice = testtools.find_test_device(self.DRIVE_TO_CLOBBER)
## umount the partitions of this device (just to be sure)
for num in range(12):
testtools.umount("%s%d" % (self.blockdevice, num))
## format device and partition block device if necessary
testtools.prepare_partition(self.blockdevice)
self.device = self.blockdevice + "1"
self.blockdevice_html = self.blockdevice.replace("/", "%2F")
## the assumption is not always valid - but usually it is ok
self.device_html = os.path.basename(self.device)
## remove configuration files
## first: retrieve the settings directory
settings_dir = cbox_settings.CryptoBoxSettings(CONF_FILE)\
["Locations"]["SettingsDir"]
for filename in [
cbox_settings.VOLUMESDB_FILE,
cbox_settings.PLUGINCONF_FILE,
cbox_settings.USERDB_FILE]:
try:
os.unlink(os.path.join(settings_dir, filename))
except OSError:
pass
class WebInterfaceTestClass(CommonTestClass):
'''this class checks the webserver, using "twill"
the tests in this class are from the browsers point of view, so not
really unittests.
fetch twill from: http://twill.idyll.org
'''
def __init__(self, methodName='runTest'):
CommonTestClass.__init__(self, methodName)
def setUp(self):
'''configures the cherrypy server that it works nice with twill
'''
CommonTestClass.setUp(self)
cherrypy.config.update({
'server.logToScreen' : False,
'autoreload.on': False,
'server.threadPool': 1,
'server.environment': 'development',
'server.log_tracebacks': True,
'server.log_file': WEBLOG_FILE,
})
cherrypy.root = cryptobox.web.sites.WebInterfaceSites(CONF_FILE)
cherrypy.server.start(initOnly=True, serverClass=None)
from cherrypy._cpwsgi import wsgiApp
twill.add_wsgi_intercept(CBXHOST, CBXPORT, lambda: wsgiApp)
# grab the output of twill commands
self.output = open(LOG_FILE,"a")
twill.set_output(self.output)
self.cmd = twill.commands
self.url = CBX_URL
self.cbox = cherrypy.root.cbox
self.globals, self.locals = twill.namespaces.get_twill_glocals()
def tearDown(self):
'''clean up the room when leaving'''
## remove intercept.
twill.remove_wsgi_intercept(CBXHOST, CBXPORT)
## stop the cryptobox
cherrypy.root.cleanup()
## shut down the cherrypy server.
cherrypy.server.stop()
self.output.close()
## inform the parent
CommonTestClass.tearDown(self)
def __get_soup():
browser = twill.commands.get_browser()
soup = BeautifulSoup(browser.get_html())
return soup
def register_auth(self, url, user="admin", password="admin"):
self.cmd.add_auth("CryptoBox", url, user, password)

View file

@ -0,0 +1,82 @@
#!/usr/bin/env python
#
# Copyright 2006 sense.lab e.V.
#
# This file is part of the CryptoBox.
#
# The CryptoBox is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# The CryptoBox is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with the CryptoBox; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
"""Unittests for cryptobox.core.tools
"""
__revision__ = "$Id$"
import cryptobox.core.blockdevice as blockdevice
from cryptobox.tests.base import CommonTestClass
from cryptobox.core.exceptions import CBInternalError
import os
# different commonly available block devices - used for testing major/minor
COMMON_DEV_INFOS = [
('/sys/block/hda/hda1', '/dev/hda1', '/dev/hda', (3,1)),
('/sys/block/sda/sda1', '/dev/sda1', '/dev/sda', (8,1)),
('/sys/block/ubda/ubda', '/dev/ubda1', '/dev/ubda', (98,1)),
('/sys/block/ubdb/ubdb1', '/dev/ubdb1', '/dev/ubdb', (98,17)),
]
class CoreBlockDevice(CommonTestClass):
"""All unittests for cryptoboxtools
"""
def test_find_major_minor_of_device(self):
"""check the find_major_minor_of_device function
"""
func = blockdevice.get_blockdevice
# check if any of the common block devices is available
blockdevice_found = False
for sys_dir, dev_node, parent, major_minor in COMMON_DEV_INFOS:
if os.path.isdir(sys_dir):
self.assertTrue(func(dev_node).devnum == major_minor)
blockdevice_found = True
# we assume, that we found at least one blockdevice
self.assertTrue(blockdevice_found)
self.assertTrue(func("/dev/nothere") is None)
def test_is_part_of_blockdevice(self):
"""check the is_part_of_blockdevice function
"""
get_device = lambda devname: blockdevice.get_blockdevice(devname)
func = lambda parent, child: get_device(parent) and get_device(parent).is_parent_of(get_device(child))
func_raw = lambda parent: get_device(parent).is_parent_of
self.assertTrue(func(self.blockdevice, self.device))
self.assertFalse(func(self.blockdevice, self.blockdevice))
self.assertFalse(func(self.device, self.blockdevice))
self.assertFalse(func(self.device, self.device))
self.assertFalse(func(self.blockdevice, "/dev/hde1"))
# check if any of the common block devices is available
blockdevice_found = False
for sys_dir, dev_node, parent, major_minor in COMMON_DEV_INFOS:
if os.path.isdir(sys_dir):
self.assertFalse(func(dev_node, parent))
self.assertTrue(func(parent, dev_node))
blockdevice_found = True
self.assertFalse(func("ram0", "ram1"))
self.assertFalse(func_raw(self.blockdevice)(None))
self.assertRaises(CBInternalError, func_raw(self.blockdevice), "")

View file

@ -0,0 +1,192 @@
#!/usr/bin/env python
#
# Copyright 2006 sense.lab e.V.
#
# This file is part of the CryptoBox.
#
# The CryptoBox is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# The CryptoBox is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with the CryptoBox; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
"""Some unittests for the core CryptoBox modules.
"""
__revision__ = "$Id$"
import cryptobox.core.main
from cryptobox.core.exceptions import *
import cryptobox.core.settings
from cryptobox.tests.base import CommonTestClass
import os
# which other devices are safe (cannot cause harm)?
COMMON_ALLOWED_DEVICES = [ '/dev/ubdb' ]
class CryptoBoxDeviceTests(CommonTestClass):
"""Some unittests for the CryptoBox
"""
cb = cryptobox.core.main.CryptoBox()
def test_allowed_devices(self):
'''is_device_allowed should accept permitted devices'''
# check if one of the common devices exists and is allowed
blockdevice_found = False
for dev_node in (COMMON_ALLOWED_DEVICES):
if os.path.exists(dev_node):
self.assertTrue(self.cb.is_device_allowed(dev_node))
blockdevice_found = True
self.assertTrue(blockdevice_found)
def test_denied_devices(self):
'''is_device_allowed should fail with not explicitly allowed devices'''
self.assertFalse(self.cb.is_device_allowed("/dev/hdc"))
self.assertFalse(self.cb.is_device_allowed("/dev/loopa/../hdc"))
self.assertFalse(self.cb.is_device_allowed("/"))
## this device does not exist -> no permission check possible
self.assertFalse(self.cb.is_device_allowed("/dev/loop"))
class CryptoBoxConfigTests(CommonTestClass):
'''test here if everything with the config turns right'''
files = {
"configFileOK" : "cbox-test_ok.conf",
"configFileBroken" : "cbox-test_broken.conf",
"nameDBFile" : "cryptobox_volumes.db",
"pluginConf" : "cryptobox_plugins.conf",
"userDB" : "cryptobox_users.db",
"logFile" : "cryptobox.log",
"tmpdir" : "cryptobox-mnt" }
tmpdirname = ""
filenames = {}
configContentOK = """
[Main]
AllowedDevices = /dev/ram
DefaultVolumePrefix = "Data "
DefaultCipher = aes-cbc-essiv:sha256
[Locations]
SettingsDir = %s
MountParentDir = %s
TemplateDir = ../templates
LangDir = ../lang
DocDir = ../doc/html
PluginDir = ../plugins
EventDir = ../event-scripts
[Log]
Level = debug
Destination = file
Details = %s/cryptobox.log
[WebSettings]
Stylesheet = /cryptobox-misc/cryptobox.css
[Programs]
blkid = /sbin/blkid
cryptsetup = /sbin/cryptsetup
super = /usr/bin/super
CryptoBoxRootActions = CryptoBoxRootActions
"""
def setUp(self):
'''prepare the test
'''
CommonTestClass.setUp(self)
## generate all files in tmp and remember the names
import tempfile
self.tmpdirname = tempfile.mkdtemp(prefix="cbox-")
for tfile in self.files.keys():
self.filenames[tfile] = os.path.join(self.tmpdirname, self.files[tfile])
self.write_config()
def tearDown(self):
'''remove the created tmpfiles'''
# remove temp files
for tfile in self.filenames.values():
compl_name = os.path.join(self.tmpdirname, tfile)
if os.path.exists(compl_name):
os.remove(compl_name)
# remove temp dir
os.rmdir(self.tmpdirname)
CommonTestClass.tearDown(self)
def test_config_init(self):
'''Check various branches of config file loading'''
self.assertRaises(CBConfigUnavailableError,
cryptobox.core.main.CryptoBox,"/invalid/path/to/config/file")
self.assertRaises(CBConfigUnavailableError,
cryptobox.core.main.CryptoBox,"/etc/shadow")
## check one of the following things:
## 1) are we successfully using an existing config file?
## 2) do we break, if no config file is there?
## depending on the existence of a config file, only one of these conditions
## can be checked - hints for more comprehensive tests are appreciated :)
for cfile in ['cryptobox.conf']:
if os.path.exists(cfile):
cryptobox.core.main.CryptoBox()
break # this skips the 'else' clause
else:
self.assertRaises(CBConfigUnavailableError,
cryptobox.core.main.CryptoBox)
self.assertRaises(CBConfigUnavailableError,
cryptobox.core.main.CryptoBox,[])
def test_broken_configs(self):
"""Check various broken configurations
"""
self.write_config("SettingsDir", "SettingsDir=/foo/bar",
filename=self.filenames["configFileBroken"])
self.assertRaises(CBConfigError, cryptobox.core.main.CryptoBox,
self.filenames["configFileBroken"])
self.write_config("Level", "Level = ho",
filename=self.filenames["configFileBroken"])
self.assertRaises(CBConfigError, cryptobox.core.main.CryptoBox,
self.filenames["configFileBroken"])
self.write_config("Destination", "Destination = foobar",
filename=self.filenames["configFileBroken"])
self.assertRaises(CBConfigError, cryptobox.core.main.CryptoBox,
self.filenames["configFileBroken"])
self.write_config("super", "super=/bin/invalid/no",
filename=self.filenames["configFileBroken"])
self.assertRaises(CBConfigError, cryptobox.core.main.CryptoBox,
self.filenames["configFileBroken"])
self.write_config("CryptoBoxRootActions", "#not here",
filename=self.filenames["configFileBroken"])
self.assertRaises(CBConfigError, cryptobox.core.main.CryptoBox,
self.filenames["configFileBroken"])
self.write_config("CryptoBoxRootActions", "CryptoBoxRootActions = /bin/false",
filename=self.filenames["configFileBroken"])
self.assertRaises(CBEnvironmentError, cryptobox.core.main.CryptoBox,
self.filenames["configFileBroken"])
def write_config(self, replace=None, newline=None, filename=None):
"""write a config file and (optional) replace a line in it"""
import re
if not filename:
filename = self.filenames["configFileOK"]
content = self.configContentOK % \
(self.tmpdirname, self.tmpdirname, self.tmpdirname)
if replace:
pattern = re.compile('^' + replace + '\\s*=.*$', flags=re.M)
content = re.sub(pattern, newline, content)
cfile = open(filename, "w")
cfile.write(content)
cfile.close()

View file

@ -0,0 +1,56 @@
#!/usr/bin/env python
#
# Copyright 2006 sense.lab e.V.
#
# This file is part of the CryptoBox.
#
# The CryptoBox is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# The CryptoBox is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with the CryptoBox; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
"""This module handles the unittests of all features.
"""
__revision__ = "$Id$"
from cryptobox.tests.base import CommonTestClass
import cryptobox.plugins.manage
class CheckForUndefinedTestCases(CommonTestClass):
"""here we will add failing test functions for every non-existing testcase"""
def create_testcases():
"""Create functions that execute unittests for all features.
"""
plugins = cryptobox.plugins.manage.PluginManager(None, "../plugins").get_plugins()
glob_dict = globals()
loc_dict = locals()
for plugin in plugins:
test_class = plugin.get_test_class()
if test_class:
## add the testclass to the global dictionary
glob_dict["unittest" + plugin.get_name()] = test_class
else:
subname = "test_existence_%s" % plugin.get_name()
def test_existence(self):
"""check if the plugin (%s) contains tests""" % plugin.get_name()
self.fail("no tests defined for plugin: %s" % plugin.get_name())
## add this function to the class above
setattr(CheckForUndefinedTestCases, subname, test_existence)
#FIXME: the failure output always contains the same name for all plugins
create_testcases()

View file

@ -0,0 +1,62 @@
#!/usr/bin/env python
#
# Copyright 2006 sense.lab e.V.
#
# This file is part of the CryptoBox.
#
# The CryptoBox is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# The CryptoBox is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with the CryptoBox; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
"""Base class for all unittests involving the webserver.
This class uses twill.
"""
__revision__ = "$Id$"
from cryptobox.tests.base import WebInterfaceTestClass
class WebServer(WebInterfaceTestClass):
"""Basic tests for the webserver.
"""
def test_is_server_running(self):
'''the server should run under given name and port'''
self.register_auth(self.url)
self.cmd.go(self.url)
self.cmd.find("CBOX-STATUS")
## other URLs must not be checked, as we do not know, if they are valid
class BuiltinPages(WebInterfaceTestClass):
"""Basic test of builtin pages (no features).
"""
def test_goto_index(self):
'''display all devices'''
self.register_auth(self.url)
self.cmd.go(self.url)
self.cmd.find("The CryptoNAS")
self.cmd.go(self.url + "?weblang=de")
self.cmd.find("Die CryptoNAS")
self.cmd.go(self.url + "?weblang=sl")
self.cmd.find("CryptoNAS je projekt")
self.cmd.go(self.url + "?weblang=fr")
self.cmd.find("La CryptoNAS")

View file

@ -0,0 +1,152 @@
#
# Copyright 2006 sense.lab e.V.
#
# This file is part of the CryptoBox.
#
# The CryptoBox is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# The CryptoBox is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with the CryptoBox; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
"""
this module contains some useful tools to be used during the tests
just inherit one of its classes and add some test functions
"""
__revision__ = "$Id$"
import os
import subprocess
def find_test_device(candidate):
"""Check for a valid test device - the data will get lost ...
the result is the parent blockdevice (containing the partition table)
and the single partition
"""
dev = candidate
if os.path.exists("/dev/%s" % dev) \
and os.access("/dev/%s" % dev, os.W_OK):
try:
## try if it is a symlink
return os.readlink("/dev/%s" % dev)
except OSError:
## not a symlink (usual)
return "/dev/%s" % dev
else:
raise Exception, "no valid device for testing found"
def is_config_partition(device):
"""Check if the device is a configuration partition.
"""
proc = subprocess.Popen(
shell = False,
stdout = subprocess.PIPE,
stderr = subprocess.PIPE,
args = [ '/sbin/e2label',
device ])
(stdout, stderr) = proc.communicate()
return stdout.strip() == "cbox_config"
def umount(device):
"""Umount the specified device if possible - ignore errors
"""
proc = subprocess.Popen(
shell = False,
stdout = subprocess.PIPE,
stderr = subprocess.PIPE,
args = [ '/bin/umount', '-d', device ])
proc.wait()
def prepare_partition(blockdevice):
"""Prepare the expected partition in the device (destroy all data)
Check if 'device' is a vfat partition - if not, then
partition 'blockdevice' and format 'device' as vfat
"""
if (get_fs_type(blockdevice + "1") == "vfat") \
and (get_fs_type(blockdevice + "2") == "ext3") \
and (get_fs_type(blockdevice + "3") is None) \
and (get_fs_type(blockdevice + "5") is None):
## everything is fine
return
else:
## repartitioning
proc = subprocess.Popen(
shell = False,
stdin = subprocess.PIPE,
stdout = subprocess.PIPE,
stderr = subprocess.PIPE,
args = [ '/sbin/sfdisk', blockdevice ])
# create two partitions (50MB fat, 50MB ext)
proc.stdin.write(",50,0xC\n,50,L\n")
(output, error) = proc.communicate()
if proc.returncode != 0:
raise Exception, "could not partition the device (%s): %s" \
% (blockdevice, output.strip())
##Make sure the kernel knows about the changes we just made
rereadpt_proc = subprocess.Popen(
shell = False,
stdin = subprocess.PIPE,
stdout = subprocess.PIPE,
stderr = subprocess.PIPE,
args = [ '/sbin/blockdev', '--rereadpt', blockdevice ])
(output, error) = rereadpt_proc.communicate()
rereadpt_status = rereadpt_proc.returncode
if rereadpt_status != 0:
raise Exception, "could not reread partition table on %s: %s" \
% (blockdevice, output.strip())
## formatting
format_device(blockdevice + "1", "vfat")
format_device(blockdevice + "2", "ext3")
def get_fs_type(device):
proc = subprocess.Popen(
shell = False,
stdout = subprocess.PIPE,
stderr = subprocess.PIPE,
args = [ '/sbin/blkid',
'-c', os.path.devnull,
'-w', os.path.devnull,
'-o', 'value',
'-s', 'TYPE',
device])
(output, error) = proc.communicate()
if (proc.returncode == 0) and output.strip():
## everything is fine
return output.strip()
else:
return None
def format_device(device, fs_type="vfat"):
proc = subprocess.Popen(
shell = False,
stdout = subprocess.PIPE,
stderr = subprocess.PIPE,
args = [ '/sbin/mkfs',
'-t', fs_type,
device ])
(output, error) = proc.communicate()
if proc.returncode != 0:
raise OSError, "could not format the device (%s): %s" \
% (device, output.strip())

View file

@ -0,0 +1,7 @@
"""The webinterface of the CryptoBox.
"""
__revision__ = "$Id$"
__all__ = [ 'dataset', 'languages', 'sites', 'testclass' ]

View file

@ -0,0 +1,229 @@
#
# Copyright 2006 sense.lab e.V.
#
# This file is part of the CryptoBox.
#
# The CryptoBox is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# The CryptoBox is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with the CryptoBox; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
"""Manage the hdf dataset of the cryptobox web sites.
"""
__revision__ = "$Id$"
import os
import cryptobox.core.container as cbxContainer
class WebInterfaceDataset(dict):
"""this class contains all data that should be available for the clearsilver
templates
"""
def __init__(self, cbox, prefs, plugin_manager):
"""initialize the hdf dataset for the web site
"""
super(WebInterfaceDataset, self).__init__()
self.prefs = prefs
self.cbox = cbox
self.__set_config_values()
self.plugin_manager = plugin_manager
self.set_crypto_box_state()
self.set_plugin_data()
self.set_containers_state()
def set_crypto_box_state(self):
"""Set some hdf values according to the cryptobox as a whole.
"""
import cherrypy
import cryptobox.core.main
import cryptobox.web.languages
import cryptobox
self["Data.Version"] = cryptobox.__version__
## first: clean the dataset up - necessary if we were called more than once
for key in self.keys():
if key.startswith("Data.Languages."):
del self[key]
langs = self.cbox.prefs["WebSettings"]["Languages"][:]
langs.sort()
for (index, lang) in enumerate(langs):
try:
(langname, plural_info) = cryptobox.web.languages.LANGUAGE_INFO[lang]
self["Data.Languages.%d.link" % index] = langname
self["Data.Languages.%d.name" % index] = lang
self.cbox.log.info("language loaded: %s" % lang)
except KeyError:
## language was not found
self.cbox.log.warn(
"invalid language specified in configuration: %s" % lang)
## check the help setting
try:
if cherrypy.request.params["help"] == "1":
self["Settings.Help"] = "1"
self["Settings.LinkAttrs.help"] = "1"
except (AttributeError, KeyError):
## no setting or first start before request
## reset values - just in case we are called more than once
self["Settings.Help"] = "0"
if "Settings.LinkAttrs.help" in self:
del self["Settings.LinkAttrs.help"]
try:
self["Data.ScriptURL.Prot"] = cherrypy.request.scheme
host = cherrypy.request.headers["Host"]
self["Data.ScriptURL.Host"] = host.split(":", 1)[0]
complete_url = "%s://%s" % \
(self["Data.ScriptURL.Prot"], self["Data.ScriptURL.Host"])
try:
port = int(host.split(":", 1)[1])
complete_url += ":%s" % port
except (IndexError, ValueError):
if cherrypy.request.scheme == "http":
port = 80
elif cherrypy.request.scheme == "https":
port = 443
else:
## unknown scheme -> port 0
self.cbox.log.info("unknown protocol scheme used: %s" % \
(cherrypy.request.scheme,))
port = 0
self["Data.ScriptURL.Port"] = port
## retrieve the relative address of the CGI (or the cherrypy base address)
## remove the last part of the url and add a slash
path = "/".join(cherrypy.request.path.split("/")[:-1]) + "/"
self["Data.ScriptURL.Path"] = path
complete_url += path
self["Data.ScriptURL"] = complete_url
for (key, value) in cherrypy.request.params.items():
if isinstance(value, list):
self["Data.ScriptParams.%s" % key] = value[0]
else:
self["Data.ScriptParams.%s" % key] = str(value)
if cherrypy.request.headers.has_key("CRYPTOBOX-Location"):
self.cbox.log.debug("ProxyLocation: %s" % \
cherrypy.request.headers["CRYPTOBOX-Location"])
self["Data.Proxy.ScriptPath"] = \
cherrypy.request.headers["CRYPTOBOX-Location"]
if cherrypy.request.headers.has_key("X-Forwarded-Host"):
self.cbox.log.debug("ProxyHost: %s" % cherrypy.request.headers["X-Forwarded-Host"])
self["Data.Proxy.Host"] = cherrypy.request.headers["X-Forwarded-Host"]
except AttributeError:
self["Data.ScriptURL"] = ""
def set_current_disk_state(self, device):
"""Set some hdf values according to the currently active disk.
"""
for container in self.cbox.get_container_list():
if container.device == device:
is_encrypted = (container.get_type() == \
cbxContainer.CONTAINERTYPES["luks"]) and 1 or 0
is_plain = (container.get_type() == \
cbxContainer.CONTAINERTYPES["plain"]) and 1 or 0
is_mounted = container.is_mounted() and 1 or 0
is_busy = container.is_busy() and 1 or 0
self["Data.CurrentDisk.device"] = device.name
self["Data.CurrentDisk.name"] = container.get_name()
self["Data.CurrentDisk.fs_type"] = container.get_fs_type()
self["Data.CurrentDisk.encryption"] = is_encrypted
self["Data.CurrentDisk.plaintext"] = is_plain
self["Data.CurrentDisk.active"] = is_mounted
self["Data.CurrentDisk.busy"] = is_busy
self["Data.CurrentDisk.size"] = device.size_human
if is_mounted:
self.cbox.log.debug("Retrieving container's data: %s" % \
container.get_name())
(size, avail, used) = container.get_capacity()
percent = int(used)*100 / int(size)
self.cbox.log.debug(percent)
self["Data.CurrentDisk.capacity.used"] = used
self["Data.CurrentDisk.capacity.free"] = avail
self["Data.CurrentDisk.capacity.size"] = size
self["Data.CurrentDisk.capacity.percent"] = percent
else:
for key in self.keys():
if key.startswith("Data.CurrentDisk.capacity."):
del self[key]
break
self["Settings.LinkAttrs.device"] = device.name
def set_containers_state(self):
"""Set some hdf values according to the list of available containers.
"""
## first: clean the dataset up - necessary if we were called more than once
for key in self.keys():
if key.startswith("Data.Disks."):
del self[key]
avail_counter = 0
active_counter = 0
self.cbox.reread_container_list()
for container in self.cbox.get_container_list():
## useful if the container was changed during an action
container.reset_object()
is_encrypted = (container.get_type() == \
cbxContainer.CONTAINERTYPES["luks"]) and 1 or 0
is_plain = (container.get_type() == \
cbxContainer.CONTAINERTYPES["plain"]) and 1 or 0
is_mounted = container.is_mounted() and 1 or 0
is_busy = container.is_busy() and 1 or 0
self["Data.Disks.%d.device" % avail_counter] = \
container.device.name
self["Data.Disks.%d.name" % avail_counter] = container.get_name()
self["Data.Disks.%d.encryption" % avail_counter] = is_encrypted
self["Data.Disks.%d.plaintext" % avail_counter] = is_plain
self["Data.Disks.%d.busy" % avail_counter] = is_busy
self["Data.Disks.%d.active" % avail_counter] = is_mounted
self["Data.Disks.%d.size" % avail_counter] = \
container.device.size_human
if is_mounted:
active_counter += 1
avail_counter += 1
self["Data.activeDisksCount"] = active_counter
def set_plugin_data(self):
"""Set some hdf values according to the available features.
"""
## first: clean the dataset up - necessary if we were called more than once
for key in self.keys():
if key.startswith("Settings.PluginList."):
del self[key]
for plugin in self.plugin_manager.get_plugins():
if plugin is None:
self.cbox.log.warn("Invalid plugin detected: %s" % str(plugin))
continue
entry_name = "Settings.PluginList." + plugin.get_name()
self[entry_name] = plugin.get_name()
self[entry_name + ".Rank"] = plugin.get_rank()
self[entry_name + ".RequestAuth"] = plugin.is_auth_required() and "1" or "0"
for capy in plugin.plugin_capabilities:
self[entry_name + ".Types." + capy] = "1"
for visi in plugin.get_visibility():
self[entry_name + ".Visible." + visi] = "1"
def __set_config_values(self):
"""Set some hdf values according to configuration settings.
"""
self["Settings.TemplateDir"] = os.path.abspath(
self.prefs["Locations"]["TemplateDir"])
self["Settings.DocDir"] = os.path.abspath(self.prefs["Locations"]["DocDir"])
self["Settings.Stylesheet"] = self.prefs["WebSettings"]["Stylesheet"]
self["Settings.Language"] = self.prefs["WebSettings"]["Languages"][0]
self["Settings.SettingsDir"] = self.prefs["Locations"]["SettingsDir"]

View file

@ -0,0 +1,51 @@
#-*- coding: utf-8 -*-
#
# Copyright 2006 sense.lab e.V.
#
# This file is part of the CryptoBox.
#
# The CryptoBox is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# The CryptoBox is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with the CryptoBox; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
"""supply information about existing languages
"""
__revision__ = "$Id$"
## every language information should contain (name, pluralformat)
LANGUAGE_INFO = {
"cs": ('Český', ('3', '(n==1) ? 0 : (n>=2 && n< =4) ? 1 : 2')),
"da": ('Dansk', ('2', '(n != 1)')),
"de": ('Deutsch', ('2', '(n != 1)')),
"en": ('English', ('2', '(n != 1)')),
"es": ('Español', ('2', '(n != 1)')),
"et": ('Eesti', ('2', '(n != 1)')),
"fi": ('Suomi', ('2', '(n != 1)')),
"fr": ('Français', ('2', '(n != 1)')),
"hu": ('Magyar', ('1', '0')),
"hr": ('Hrvatski', ('3', '(n%10==1 && n%100!=11 ? 0 : n%10>=2 && n%10<=4 && (n%100<10 || n%100>=20) ? 1 : 2)')),
"it": ('Italiano', ('2', '(n != 1)')),
"ja": ('日本語', ('1', '0')),
"nl": ('Nederlands', ('2', '(n != 1)')),
"pl": ('Polski', ('3', '(n==1 ? 0 : n%10>=2 && n%10< =4 '
+ '&& (n%100<10 || n%100>=20) ? 1 : 2)')),
"pt": ('Português', ('2', '(n != 1)')),
"ru": ('Русский', ('3', '(n%10==1 && n%100!=11 ? 0 : '
+ 'n%10>=2 && n%10< =4 && (n%100<10 || n%100>=20) ? 1 : 2)')),
"sl": ('Slovensko', ('4', '(n%100==1 ? 0 : n%100==2 ? 1 : n%100==3 || '
+ 'n%100==4 ? 2 : 3)')),
"sv": ('Svenska', ('2', '(n != 1)')),
}

View file

@ -0,0 +1,678 @@
#
# Copyright 2006 sense.lab e.V.
#
# This file is part of the CryptoBox.
#
# The CryptoBox is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# The CryptoBox is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with the CryptoBox; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
""" this module handles all http requests and renders a website """
__revision__ = "$Id$"
import cryptobox.core.main
import cryptobox.web.dataset
import cryptobox.plugins.manage
import cryptobox.core.exceptions
import re
import cherrypy
import os
import sys
try:
import neo_cgi, neo_util, neo_cs
except ImportError:
_ERRMSG = "Could not import clearsilver module. \
Try 'apt-get install python-clearsilver'."
sys.stderr.write(_ERRMSG)
raise ImportError, _ERRMSG
GETTEXT_DOMAIN = 'cryptobox-server'
class PluginIconHandler:
"""deliver the icons of available plugins via cherrypy
the state (enabled/disabled) and the require-auth setting is ignored to
avoid repetitive reloading"""
def __init__(self, plugins):
for plugin in plugins.get_plugins():
if not plugin:
continue
plname = plugin.get_name()
## expose the get_icon function of this plugin
setattr(self, plname, plugin.get_icon)
class PluginDownloadHandler:
"""deliver downloadable files of available plugins via cherrypy
the state (enabled/disabled) and the require-auth setting is ignored
"""
def __init__(self, plugins):
for plugin in plugins.get_plugins():
if not plugin:
continue
plname = plugin.get_name()
## expose the download function of this plugin
setattr(self, plname, plugin.download)
class WebInterfaceSites:
"""handle all http requests and render pages
this includes:
- filtering common arguments
- calling feature actions
- translating content
all available features are dynamically exposed
"""
## this template is used under strange circumstances
defaultTemplate = "empty"
def __init__(self, conf_file=None):
## we should only use variables preceded by "__" to avoid name conflicts
## when loading features
self.cbox = cryptobox.core.main.CryptoBox(conf_file)
self.__cached_language_data = None
self.__dataset = None
## load the plugin manager - we will not try to detect new plugins on
## the fly ...
self.__plugin_manager = cryptobox.plugins.manage.PluginManager(
self.cbox, self.cbox.prefs["Locations"]["PluginDir"], self)
self.__reset_dataset()
## store the original http error handler
self._cp_on_http_error = self.new_http_error_handler
## set initial language order
self.lang_order = self.cbox.prefs["WebSettings"]["Languages"][:]
## publish plugin icons
self.icons = PluginIconHandler(self.__plugin_manager)
self.icons.exposed = True
## publish plugin downloads
self.downloads = PluginDownloadHandler(self.__plugin_manager)
self.downloads.exposed = True
## announce that the server started up
self.setup()
def setup(self):
"""Prepare the webinterface.
"""
self.cbox.setup()
for plugin in self.__plugin_manager.get_plugins():
if plugin:
plugin.handle_event("bootup")
def cleanup(self):
"""Shutdown the webinterface safely.
"""
self.cbox.log.info("Shutting down webinterface ...")
for plugin in self.__plugin_manager.get_plugins():
if plugin:
self.cbox.log.info("Cleaning up plugin '%s' ..." % plugin.get_name())
plugin.handle_event("shutdown")
self.cbox.cleanup()
def __reset_dataset(self):
"""this method has to be called at the beginning of every "site" action
important: only at the beginning of an action (to not loose information)
important: for _every_ "site" action (cherrypy is stateful)
also take care for the plugins, as they also contain datasets
"""
self.__load_plugins()
self.__dataset = cryptobox.web.dataset.WebInterfaceDataset(
self.cbox, self.cbox.prefs, self.__plugin_manager)
## check, if a configuration partition has become available
self.cbox.prefs.prepare_partition()
def __load_plugins(self):
"""reinitialize the list of available plugins
this includes the following:
- reload all plugins and check their state (disabled or not)
- reinitilize the datasets of all plugins
"""
#TODO: in the long-term we should create a separate object that only
# contains the plugin handlers - this avoids some hassle of namespace
# conflicts - this object will be the cherrypy.server.root
# finish this for v0.4
for plugin in self.__plugin_manager.get_plugins():
if not plugin:
continue
plname = plugin.get_name()
## remove the old plugin handler and attach a new one
try:
## check if there are name conflicts: e.g. a local variable has
## the same name as a plugin to be loaded -> skip these plugins
## if we would not check this here, nasty effects could occour
prev_obj = getattr(self, plname)
if not callable(prev_obj) or not prev_obj.exposed:
## name conflict - see below
raise NameError
## remove the plugin handler
delattr(self, plname)
except AttributeError:
## "self" does not contain the given "plname" element
## this is ok, as we are just cleaning up
pass
except NameError:
## the attribute "exposed" of the element self."plname" does
## not exist - it seems, that we have a name conflict
self.cbox.log.error("Skipping feature (%s) as its" % plname
+ " name conflicts with a local variable - see"
+ " module cryptobox.web.sites")
## skip this plugin
continue
## the old attribute was cleaned up - we can reinitialize it now
if plugin.is_enabled():
self.cbox.log.info("Plugin '%s' loaded" % plname)
## expose all features as URLs
setattr(self, plname, self.return_plugin_action(plugin))
getattr(self, plname).exposed = True
#TODO: check, if the stream_response feature really works
#for now the "stream_response" feature seems to be broken
#setattr(getattr(self, plname), "stream_respones", True)
else:
self.cbox.log.info("Plugin '%s' is disabled" % plname)
## nothing else has to be done
## sub pages requiring authentication may not be defined above
def __request_auth(self=None):
""" this is a function decorator to check authentication
"""
def check_credentials(site):
""" see description of _inner_wrapper - please simplify this!
"""
def _inner_wrapper(self, *args, **kargs):
"""this function was necessary while trying around with the
function decorator - if someone can implement the decorator
with less effort, then any suggestions are welcome!
"""
import base64
## define a "non-allowed" function
user, password = None, None
try:
## ignore the "Basic " (first six letters) part
resp = cherrypy.request.headers["Authorization"][6:]
(user, password) = base64.b64decode(resp).split(":", 1)
except KeyError:
## no "authorization" header was sent
pass
except TypeError:
## invalid base64 string
pass
except AttributeError:
## no cherrypy request header defined
pass
auth_dict = self.cbox.prefs.user_db["admins"]
if user in auth_dict.keys():
if self.cbox.prefs.user_db.get_digest(password) == auth_dict[user]:
## ok: return the choosen page
self.cbox.log.info("access granted for: %s" % user)
return site(self, *args, **kargs)
else:
self.cbox.log.info(
"wrong password supplied for: %s" % user)
else:
self.cbox.log.info("unknown user: %s" % str(user))
## wrong credentials: return "access denied"
cherrypy.response.headers["WWW-Authenticate"] = \
'''Basic realm="CryptoBox"'''
cherrypy.response.status = 401
return self.__render("access_denied")
return _inner_wrapper
return check_credentials
######################################################################
## put real sites down here and don't forget to expose them at the end
@cherrypy.expose
def index(self, weblang="", help="0", device=None):
"""the default page on startup - we show the list of available disks
"""
self.__reset_dataset()
self.__set_web_lang(weblang)
self.__check_environment()
## do not forget the language!
param_dict = {"weblang":weblang}
## render "disks" plugin by default
return self.return_plugin_action(
self.__plugin_manager.get_plugin("disks"))(**param_dict)
def new_http_error_handler(self, error_code, message):
"""handle http errors gracefully
404 - not found errors: ignored if url is below /cryptobox-misc/
other 404 errors: send the error code and return a nice informative page
500 - runtime errors: return "ok" exit code and show a polite excuse
others: are there any other possible http errors?
"""
import traceback
## we ignore uninteresting not-found errors
if (error_code == 404) and \
(cherrypy.request.path.startswith("/cryptobox-misc/") or \
cherrypy.request.path in ['/robots.txt','/favicon.ico']):
cherrypy.response.status = error_code
return
## an invalid action was requested
if error_code == 404:
## we send a not-found error (with the usual interface)
cherrypy.response.status = error_code
self.__dataset["Data.Warning"] = "InvalidAction"
cherrypy.response.body = self.__render("empty")
return
## are there still bugs in the code?
if error_code == 500:
## we fix the error code (200 is "OK")
cherrypy.response.status = 200
self.cbox.log.error(
"HTTP-ERROR[500] - runtime error: %s" % str(message))
## add a traceback and exception information to the lo
for log_line in traceback.format_exception(*sys.exc_info()):
self.cbox.log.error("\t%s" % log_line)
self.__dataset["Data.Warning"] = "RuntimeError"
cherrypy.response.body = self.__render("empty")
return
## unknown error type
cherrypy.response.status = error_code
self.cbox.log.warn("HTTP-ERROR[%d] - an unknown error occoured: %s" \
% (error_code, message))
cherrypy.response.body = self.__render("empty")
def return_plugin_action(self, plugin):
""" returns a function that is suitable for handling a cherrypy
page request
"""
def handler(self, weblang="", device=None, help="0", redirect=None,
message_keep=None, **args):
"""this function handles a cherrypy page request
"""
plugin.reset()
self.__reset_dataset()
self.__check_environment()
self.__set_web_lang(weblang)
## we always read the "device" setting - otherwise volume-plugin
## links would not work easily
## (see "volume_props" linking to "volume_format_fs")
## it will get ignored for non-volume plugins
plugin.device = None
if device and self.__set_device(device):
plugin.device = self.cbox.get_container(device).device
## check the device argument of volume plugins
if "volume" in plugin.plugin_capabilities:
## initialize the dataset of the selected device if necessary
if plugin.device:
self.__dataset.set_current_disk_state(plugin.device)
else:
## invalid (or missing) device setting
return self.__render(self.defaultTemplate)
## check if there is a "redirect" setting - this will override
## the return value of the do_action function
## (e.g. useful for umount-before-format)
override_next_template = None
if redirect:
override_next_template = { "plugin": redirect }
if "volume" in plugin.plugin_capabilities:
override_next_template["values"] = {"device":plugin.device.name}
## check for information to be kept after the last call
if message_keep:
for (key, value) in message_keep["dataset"].items():
self.__dataset[key] = value
## check if the device is busy
if plugin.device and self.cbox.get_container(plugin.device).is_busy():
return self.__render("volume_busy")
## call the plugin handler
next_template = plugin.do_action(**args)
## for 'volume' plugins: reread the dataset of the current disk
## additionally: set the default template for plugins
if "volume" in plugin.plugin_capabilities:
## maybe the state of the current volume was changed?
self.__dataset.set_current_disk_state(plugin.device)
if not next_template:
next_template = { "plugin":"volume_mount",
"values":{"device":plugin.device.name}}
else:
## some non-volume plugins change the internal state of other
## plugins - e.g.: plugin_manager
## if we do not call __load_plugins now, then it is possible
## to call a plugin directly after disabling it (only once)
self.__load_plugins()
self.__dataset.set_plugin_data()
## default page for non-volume plugins is the disk selection
if not next_template:
next_template = { "plugin":"disks", "values":{} }
#TODO: there is a lot of piece-by-piece updating around here
# for v0.4 we should just call __reset_dataset - but this would
# require to store the currently changed dataset values (e.g.
# weblang) somewhere else to not override it
## some non-volume plugins may change the state of containers
## the mount plugin may change the number of active disks - for the logo
self.__dataset.set_containers_state()
## was a redirect requested?
if override_next_template:
next_template = override_next_template
## if another plugins was choosen for 'next_template', then do it!
if isinstance(next_template, dict) \
and "plugin" in next_template.keys() \
and "values" in next_template.keys() \
and self.__plugin_manager.get_plugin(next_template["plugin"]):
value_dict = dict(next_template["values"])
## force the current weblang attribute - otherwise it gets lost
value_dict["weblang"] = self.lang_order[0]
## check for warnings/success messages, that should be kept
if "Data.Success" in plugin.hdf.keys() \
or "Data.Warning" in plugin.hdf.keys():
value_dict["message_keep"] = {"plugin":plugin, "dataset":{}}
for keep_key in ("Data.Warning", "Data.Success"):
if keep_key in plugin.hdf.keys():
self.cbox.log.info("keeping message: %s" % \
plugin.hdf[keep_key])
value_dict["message_keep"]["dataset"][keep_key] = \
plugin.hdf[keep_key]
new_plugin = self.__plugin_manager.get_plugin(next_template["plugin"])
return self.return_plugin_action(new_plugin)(**value_dict)
## save the currently active plugin name
self.__dataset["Data.ActivePlugin"] = plugin.get_name()
return self.__render(next_template, plugin)
## apply authentication?
if plugin.is_auth_required():
return lambda **args: self.__request_auth()(handler)(self, **args)
else:
return lambda **args: handler(self, **args)
@cherrypy.expose
def test(self, weblang="", help="0", device=None):
"""test authentication - this function may be safely removed
"""
self.__reset_dataset()
self.__set_web_lang(weblang)
self.__check_environment()
result = "<html><head><title>Test</title><body><ul>"
for key in cherrypy.request.headers:
result += "<li>%s - %s</li>" % (str(key), str(cherrypy.request.headers[key]))
result += "</ul></body></html>"
return result
@cherrypy.expose
def test_stream(self, weblang="", help="0", device=None):
"""just for testing purposes - to check if the "stream_response" feature
actually works - for now (September 02006) it does not seem to be ok
"""
import time
yield "<html><head><title>neu</title></head><body><p><ul>"
for num in range(10):
yield "<li>yes: %d - %s</li>" % (num, str(time.time()))
time.sleep(1)
yield "</ul></p></html>"
##################### input checker ##########################
def __check_environment(self):
"""inform the user of suspicious environmental problems
examples are: non-https, readonly-config, ...
"""
warnings = []
for pl in self.__plugin_manager.get_plugins():
warnings.extend(pl.get_warnings())
warnings.sort(reverse=True)
for (index, (warn_prio, warn_text)) in enumerate(warnings):
self.__dataset["Data.EnvironmentWarning.%d" % index] = warn_text
def __set_web_lang(self, value):
"""set the preferred priority of languages according to this order:
1. language selected via web interface
2. preferred browser language setting
3. languages defined in the config file
"""
## start with the configured language order
lang_order = self.cbox.prefs["WebSettings"]["Languages"][:]
self.cbox.log.debug(
"updating language preferences (default: %s)" % str(lang_order))
## put the preferred browser language in front
guess = self.__get_browser_language(lang_order)
if guess:
lang_order.remove(guess)
lang_order.insert(0, guess)
self.cbox.log.debug(
"raised priority of preferred browser language: %s" % guess)
## check if the 'weblang' setting is necessary (does it change the result
## of the language preference calculation?)
override_by_weblang = False
## is the chosen language (via web interface) valid? - put it in front
if value and (value in lang_order) and (not re.search(r'\W', value)):
## skip if the 'weblang' value is already at the top of the list
if lang_order.index(value) != 0:
override_by_weblang = True
lang_order.remove(value)
lang_order.insert(0, value)
self.cbox.log.debug(
"raised priority of selected language: %s" % value)
elif value:
self.cbox.log.info("invalid language selected: %s" % value)
## store current language setting
self.cbox.log.info(
"current language preference: %s" % str(lang_order))
self.lang_order = lang_order
self.__dataset["Settings.Language"] = lang_order[0]
## we do not have to add the LinkAttr if it is irrelevant
if override_by_weblang:
self.__dataset["Settings.LinkAttrs.weblang"] = lang_order[0]
def __get_browser_language(self, avail_langs):
"""guess the preferred language of the user (as sent by the browser)
take the first language, that is part of 'avail_langs'
"""
try:
pref_lang_header = cherrypy.request.headers["Accept-Language"]
except KeyError:
## no language header was specified
return None
## this could be a typical 'Accept-Language' header:
## de-de,de;q=0.8,en-us;q=0.5,en;q=0.3
regex = re.compile(r"\w+(-\w+)?(;q=[\d\.]+)?$")
pref_langs = [e.split(";", 1)[0]
for e in pref_lang_header.split(",")
if regex.match(e)]
## is one of these preferred languages available?
for lang in pref_langs:
if lang in avail_langs:
return lang
## we try to be nice: also look for "de" if "de-de" was specified ...
for lang in pref_langs:
## use only the first part of the language
short_lang = lang.split("-", 1)[0]
if short_lang in avail_langs:
return short_lang
## we give up
return None
def __set_device(self, device):
"""check a device name that was chosen via the web interface
issue a warning if the device is invalid"""
if device and re.match(r'[\w-]+$', device) \
and self.cbox.get_container(device):
self.cbox.log.debug("Select device: %s" % device)
return True
else:
self.cbox.log.warn("Invalid device: %s" % device)
self.__dataset["Data.Warning"] = "InvalidDevice"
return False
def __substitute_gettext(self, languages, text_domain, hdf):
"""substitute all texts in the hdf dataset with their translated
counterparts as returned by gettext
"""
import gettext
try:
translator = gettext.translation(text_domain, languages=languages)
except IOError, err_msg:
## no translation found
self.cbox.log.warn("unable to load language file: %s" % err_msg)
return hdf
def walk_tree(parent_name, hdf_node):
"""iterate through all nodes"""
def translate_node(node):
"""turn one single string into unicode"""
if not node.value():
return
for (key, value) in node.attrs():
## ignore all nodes with the 'LINK' attribute
## for now clearsilver is buggy regarding attributes
## buggy -> parsing of a hdf file fails silently
if key == "LINK":
return
## as long as the attributes do not work, we have to rely on
## some magic names to ignore translations
if (parent_name == "Link") and \
(node.name() in ["Rel", "Prot", "Abs"]):
return
try:
#TODO: we should use unicode - or not? - turn it on later
#node.setValue("", translator.ugettext(node.value()))
## quite obscure: ugettext can handle None - gettext breaks instead
node.setValue("", translator.gettext(node.value()))
except UnicodeEncodeError, err_msg:
self.cbox.log.info(
"Failed unicode encoding for gettext: %s - %s" \
% (node.value(),err_msg))
## fallback to default encoding
node.setValue("", translator.gettext(node.value()))
while hdf_node:
translate_node(hdf_node)
walk_tree(hdf_node.name(), hdf_node.child())
hdf_node = hdf_node.next()
walk_tree("", hdf)
def __get_language_data(self):
"""return the hdf dataset of the main interface and all plugins
translations are done according to self.lang_order
"""
## check if the language setting has changed - use cache if possible
if self.__cached_language_data and \
self.__cached_language_data["lang_order"] == self.lang_order:
self.cbox.log.debug(
"using cached language data: %s" % str(self.lang_order))
return self.__cached_language_data["hdf"]
self.cbox.log.debug("generating language data")
hdf = neo_util.HDF()
hdf.readFile(os.path.join(
self.cbox.prefs["Locations"]["TemplateDir"],"language.hdf"))
self.__substitute_gettext(self.lang_order, GETTEXT_DOMAIN, hdf)
## load the language data of all plugins
for plugin in self.__plugin_manager.get_plugins():
pl_lang = plugin.get_language_data()
self.__substitute_gettext(self.lang_order, "%s-feature-%s" % \
(GETTEXT_DOMAIN, plugin.get_name()), pl_lang)
hdf.copy("Plugins.%s" % plugin.get_name(), pl_lang)
self.cbox.log.debug(
"language data for plugin loaded: %s" % plugin.get_name())
## cache result for later retrieval
self.__cached_language_data = \
{"lang_order": self.lang_order, "hdf": hdf}
return hdf
def __render(self, render_info, plugin=None):
'''renders from clearsilver templates and returns the resulting html
'''
## is render_info a string (filename of the template) or a dictionary?
if isinstance(render_info, dict):
template = render_info["template"]
if render_info.has_key("generator"):
generator = render_info["generator"]
else:
generator = None
else:
(template, generator) = (render_info, None)
## load the language data
hdf = neo_util.HDF()
hdf.copy("Lang", self.__get_language_data())
## first: assume, that the template file is in the global
## template directory
self.__dataset["Settings.TemplateFile"] = os.path.abspath(os.path.join(
self.cbox.prefs["Locations"]["TemplateDir"],
template + ".cs"))
if plugin:
## check, if the plugin provides the template file -> overriding
plugin_cs_file = plugin.get_template_filename(template)
if plugin_cs_file:
self.__dataset["Settings.TemplateFile"] = plugin_cs_file
## add the current state of the plugins to the hdf dataset
self.__dataset["Data.Status.Plugins.%s" % plugin.get_name()] = \
plugin.get_status()
## load the dataset of the plugin
plugin.load_dataset(hdf)
self.cbox.log.info("rendering site: " + template)
cs_path = os.path.abspath(os.path.join(
self.cbox.prefs["Locations"]["TemplateDir"], "main.cs"))
if not os.access(cs_path, os.R_OK):
self.cbox.log.error(
"Couldn't read clearsilver file: %s" % cs_path)
yield "Couldn't read clearsilver file: %s" % cs_path
return
self.cbox.log.debug(self.__dataset)
for key in self.__dataset.keys():
hdf.setValue(key, str(self.__dataset[key]))
cs_data = neo_cs.CS(hdf)
cs_data.parseFile(cs_path)
## is there a generator containing additional information?
if not generator:
## all content in one flush
result_data = cs_data.render().splitlines()
## remove empty leading lines (avoids html warnings)
while not result_data[0].strip():
del result_data[0]
yield "\n".join(result_data)
else:
content_generate = generator()
dummy_line = """<!-- CONTENT_DUMMY -->"""
## now we do it linewise - checking for the content marker
for line in cs_data.render().splitlines():
if line.find(dummy_line) != -1:
yield line.replace(dummy_line, content_generate.next())
else:
yield line + "\n"

View file

@ -0,0 +1,44 @@
# this is a local configuration file for pylint to be used for checking the
# quality of the CryptoBox code
#
# just run:
# bin/do_pylint.sh cryptobox.core.main
# to check the module cryptobox.core.main
[MASTER]
# Add <file or directory> to the black list. It should be a base name, not a
# path. You may set this option multiple times.
ignore=CVS
ignore=.svn
[BASIC]
# Required attributes for module, separated by a comma
required-attributes=__revision__
# Good variable names which should always be accepted, separated by a comma
good-names=i,j,k,ex,Run,_
# Bad variable names which should always be refused, separated by a comma
bad-names=foo,bar,baz,toto,tutu,tata
[FORMAT]
# Maximum number of characters on a single line.
max-line-length=88
# Maximum number of lines in a module
max-module-lines=1000
# String used as indentation unit. This is usually " " (4 spaces) or "\t" (1
# tab).
indent-string='\t'
[MISCELLANEOUS]
# List of note tags to take in consideration, separated by a comma.
notes=FIXME,XXX,TODO