added python version checks

added startup test to CryptoBox
integrate CryptoBoxRootActions into CryptoBox (see example-super.tab for details)
implemented "mount" and "umount" in CryptoBoxRootActions
adapted CryptoBoxRootActions checks to 'super'
This commit is contained in:
lars 2006-08-23 13:27:25 +00:00
parent accbb7c515
commit 84028f4a92
9 changed files with 263 additions and 96 deletions

49
pythonrewrite/bin2/CryptoBox.py Normal file → Executable file
View file

@ -1,25 +1,36 @@
#!/usr/bin/env python #!/usr/bin/env python2.4
''' '''
This is the web interface for a fileserver managing encrypted filesystems. This is the web interface for a fileserver managing encrypted filesystems.
It was originally written in bash/perl. Now a complete rewrite is in It was originally written in bash/perl. Now a complete rewrite is in
progress. So things might be confusing here. Hopefully not for long. progress. So things might be confusing here. Hopefully not for long.
:) :)
TODO: replace all "sys.exit"-calls by a unified handler
''' '''
# check python version
import sys
(ver_major, ver_minor, ver_sub, ver_desc, ver_subsub) = sys.version_info
if (ver_major < 2) or ((ver_major == 2) and (ver_minor < 4)):
sys.stderr.write("You need a python version >= 2.4\nCurrent version is:\n %s\n" % sys.version)
sys.exit(1)
import CryptoBoxContainer import CryptoBoxContainer
import types import types
import re import re
import os import os
import sys
import unittest import unittest
import logging import logging
import subprocess
CONF_LOCATIONS = [ CONF_LOCATIONS = [
"./cryptobox.conf", "./cryptobox.conf",
"~/.cryptobox.conf", "~/.cryptobox.conf",
"/etc/cryptobox/cryptobox.conf"] "/etc/cryptobox/cryptobox.conf"]
SUDO_WRAPPER = ["sudo", "/home/lars/subversion/cryptobox/branches/pythonrewrite/bin2/CryptoBoxRootActions.py"]
class CryptoBox: class CryptoBox:
'''this class rules them all! '''this class rules them all!
@ -29,6 +40,7 @@ class CryptoBox:
def __init__(self, config_file=None): def __init__(self, config_file=None):
self.__initLogging() self.__initLogging()
self.__initPreferences(config_file) self.__initPreferences(config_file)
self.__runTests()
def __initLogging(self): def __initLogging(self):
@ -106,19 +118,42 @@ class CryptoBox:
if not log_level in log_level_avail: if not log_level in log_level_avail:
self.log.error("invalid log level: %s is not in %s" % (self.cbxPrefs["Log"]["Level"], log_level_avail)) self.log.error("invalid log level: %s is not in %s" % (self.cbxPrefs["Log"]["Level"], log_level_avail))
sys.exit(1) sys.exit(1)
self.log.setLevel(getattr(logging, log_level))
self.log.addHandler(logging.FileHandler(self.cbxPrefs["Log"]["Details"]))
except TypeError: except TypeError:
self.log.error("invalid log level: %s" % self.cbxPrefs["Log"]["Level"]) self.log.error("invalid log level: %s" % self.cbxPrefs["Log"]["Level"])
sys.exit(1) sys.exit(1)
try:
new_handler = logging.FileHandler(self.cbxPrefs["Log"]["Details"])
new_handler.setLevel(getattr(logging, log_level))
# this formatter does not work for now
new_handler.setFormatter = '%(asctime)s %(module)s %(levelname)s %(message)s'
self.log.addHandler(new_handler)
except IOError: except IOError:
self.log.error("could not open logfile: %s" % self.cbxPrefs["Log"]["Details"]) self.log.error("could not open logfile: %s" % self.cbxPrefs["Log"]["Details"])
sys.exit(1) sys.exit(1)
# RFC: what is this method useful for? # do some initial checks
# as it claims, just for demonstrating inheritance effects def __runTests(self):
try:
devnull = open(os.devnull, "w")
except IOError:
self.log.error("Could not open %s for writing!" % os.devnull)
sys.exit(1)
proc = subprocess.Popen(
shell = False,
stdout = devnull,
stderr = devnull,
args = [
self.cbxPrefs["Programs"]["super"],
self.cbxPrefs["Programs"]["CryptoBoxRootActions"],
"check"])
proc.wait()
if proc.returncode != 0:
self.log.error("Could not call CryptoBoxRootActions by 'super' - maybe you did not add the appropriate line to /etc/super.tab?")
sys.exit(1)
# this method just demonstrates inheritance effects - may be removed
def cbx_inheritance_test(self): def cbx_inheritance_test(self):
print "you lucky widow" print "you lucky widow"

80
pythonrewrite/bin2/CryptoBoxContainer.py Normal file → Executable file
View file

@ -1,8 +1,17 @@
from CryptoBoxLogger import CryptoBoxLogger #!/usr/bin/env python2.4
try:
import subprocess """
except: TODO: implement "getCapacity"
print "Couldn't import 'subprocess'. You need a python version >= 2.4" """
# check python version
import sys
(ver_major, ver_minor, ver_sub, ver_desc, ver_subsub) = sys.version_info
if (ver_major < 2) or ((ver_major == 2) and (ver_minor < 4)):
sys.stderr.write("You need a python version >= 2.4\nCurrent version is:\n %s\n" % sys.version)
sys.exit(1)
import subprocess
import os import os
import re import re
import logging import logging
@ -114,10 +123,12 @@ class CryptoBoxContainer:
stdout = subprocess.PIPE, stdout = subprocess.PIPE,
stderr = subprocess.PIPE, stderr = subprocess.PIPE,
args = [ args = [
self.Progs["cryptsetup"], self.Progs["super"],
"--batch-mode", self.Progs["CryptoBoxRootActions"],
"cryptsetup",
"luksAddKey", "luksAddKey",
self.device]) self.device,
"--batch-mode"])
proc.stdin.write("%s\n%s" % (oldpw, newpw)) proc.stdin.write("%s\n%s" % (oldpw, newpw))
(output, errout) = proc.communicate() (output, errout) = proc.communicate()
if proc.returncode != 0: if proc.returncode != 0:
@ -299,11 +310,13 @@ class CryptoBoxContainer:
stdout = devnull, stdout = devnull,
stderr = subprocess.PIPE, stderr = subprocess.PIPE,
args = [ args = [
self.Progs["cryptsetup"], self.Progs["super"],
self.Progs["CryptoBoxRootActions"],
"cryptsetup",
"luksOpen", "luksOpen",
"--batch-mode",
self.device, self.device,
self.name]) self.name,
"--batch-mode"])
proc.stdin.write(password) proc.stdin.write(password)
(output, errout) = proc.communicate() (output, errout) = proc.communicate()
if proc.returncode != 0: if proc.returncode != 0:
@ -316,7 +329,9 @@ class CryptoBoxContainer:
stdout = devnull, stdout = devnull,
stderr = subprocess.PIPE, stderr = subprocess.PIPE,
args = [ args = [
self.Progs["mount"], self.Progs["super"],
self.Progs["CryptoBoxRootActions"],
"mount",
os.path.join(self.__dmDir, self.name), os.path.join(self.__dmDir, self.name),
self.__getMountPoint()]) self.__getMountPoint()])
proc.wait() proc.wait()
@ -340,7 +355,11 @@ class CryptoBoxContainer:
stdin = None, stdin = None,
stdout = devnull, stdout = devnull,
stderr = subprocess.PIPE, stderr = subprocess.PIPE,
args = [self.Progs["umount"], "-l", self.__getMountPoint()]) args = [
self.Progs["super"],
self.Progs["CryptoBoxRootActions"],
"umount",
self.__getMountPoint()])
proc.wait() proc.wait()
if proc.returncode != 0: if proc.returncode != 0:
errorMsg = "Could not umount the filesystem: %s" % (proc.stderr.read().strip(), ) errorMsg = "Could not umount the filesystem: %s" % (proc.stderr.read().strip(), )
@ -353,10 +372,12 @@ class CryptoBoxContainer:
stdout = devnull, stdout = devnull,
stderr = subprocess.PIPE, stderr = subprocess.PIPE,
args = [ args = [
self.Progs["cryptsetup"], self.Progs["super"],
"--batch-mode", self.Progs["CryptoBoxRootActions"],
"cryptsetup",
"luksClose", "luksClose",
self.name]) self.name,
"--batch-mode"])
proc.wait() proc.wait()
if proc.returncode != 0: if proc.returncode != 0:
errorMsg = "Could not remove the luks mapping: %s" % (proc.stderr.read().strip(), ) errorMsg = "Could not remove the luks mapping: %s" % (proc.stderr.read().strip(), )
@ -386,7 +407,9 @@ class CryptoBoxContainer:
stdout = devnull, stdout = devnull,
stderr = subprocess.PIPE, stderr = subprocess.PIPE,
args = [ args = [
self.Progs["mount"], self.Progs["super"],
self.Progs["CryptoBoxRootActions"],
"mount",
self.device, self.device,
self.__getMountPoint()]) self.__getMountPoint()])
proc.wait() proc.wait()
@ -411,8 +434,9 @@ class CryptoBoxContainer:
stdout = devnull, stdout = devnull,
stderr = subprocess.PIPE, stderr = subprocess.PIPE,
args = [ args = [
self.Progs["umount"], self.Progs["super"],
"-l", self.Progs["CryptoBoxRootActions"],
"umount",
self.__getMountPoint()]) self.__getMountPoint()])
proc.wait() proc.wait()
if proc.returncode != 0: if proc.returncode != 0:
@ -469,12 +493,14 @@ class CryptoBoxContainer:
stdout = devnull, stdout = devnull,
stderr = subprocess.PIPE, stderr = subprocess.PIPE,
args = [ args = [
self.Progs["cryptsetup"], self.Progs["super"],
self.Progs["CryptoBoxRootActions"],
"cryptsetup",
"luksFormat",
self.device,
"--batch-mode", "--batch-mode",
"--cipher", self.cbox.cbxPrefs["System"]["DefaultCipher"], "--cipher", self.cbox.cbxPrefs["System"]["DefaultCipher"],
"--iter-time", "2000", "--iter-time", "2000"])
"luksFormat",
self.device])
proc.stdin.write(password) proc.stdin.write(password)
(output, errout) = proc.communicate() (output, errout) = proc.communicate()
if proc.returncode != 0: if proc.returncode != 0:
@ -488,11 +514,13 @@ class CryptoBoxContainer:
stdout = devnull, stdout = devnull,
stderr = subprocess.PIPE, stderr = subprocess.PIPE,
args = [ args = [
self.Progs["cryptsetup"], self.Progs["super"],
"--batch-mode", self.Progs["CryptoBoxRootActions"],
"cryptsetup",
"luksOpen", "luksOpen",
self.device, self.device,
self.name]) self.name,
"--batch-mode"])
proc.stdin.write(password) proc.stdin.write(password)
(output, errout) = proc.communicate() (output, errout) = proc.communicate()
if proc.returncode != 0: if proc.returncode != 0:

219
pythonrewrite/bin2/CryptoBoxRootActions.py Normal file → Executable file
View file

@ -1,4 +1,4 @@
#!/usr/bin/env python #!/usr/bin/env python2.4
"""module for executing the programs, that need root privileges """module for executing the programs, that need root privileges
Syntax: Syntax:
@ -6,9 +6,9 @@ Syntax:
- device - device
- [action] - [action]
- [action args] - [action args]
"""
# current problem: how to identify the "cryptobox" user - to check its permissions? this script will always return with an exitcode 0 (true), if "check" is the only argument
"""
import os import os
import sys import sys
@ -20,24 +20,30 @@ import types
allowedProgs = { allowedProgs = {
"sfdisk": "/sbin/sfdisk", "sfdisk": "/sbin/sfdisk",
"cryptsetup": "/sbin/cryptsetup", "cryptsetup": "/sbin/cryptsetup",
"mount": "/bin/mount",
"umount": "/bin/umount",
} }
# check if this user has write-access to the device DEV_TYPES = { "pipe":1, "char":2, "dir":4, "block":6, "file":8, "link":10, "socket":12}
allowedUser = "lars"
# supply gids - otherwise the default groups of the above user are used
#allowedGroups = [6, 1000]
def isWriteableBlock(device): def isWriteable(device, force_dev_type=None):
fstat = os.stat(device) """check if the calling user (not root!) has write access to the device/file
(dev_type, perm_u, perm_g, perm_o) = (
fstat.st_mode % 65536 / 4096, the real (not the effictive) user id is used for the check
fstat.st_mode % 512 / 64, additionally the permissions of the default groups of the real uid are checked
fstat.st_mode % 512 % 64 / 8, this check works nicely together with "super", as it changes (by default) only
fstat.st_mode % 512 % 64 % 8) the effective uid (not the real uid)"""
(owner_uid, owner_gid) = (fstat.st_uid, fstat.st_gid) # first check, if the device/file exists
(trustUserName, trustUID, groupsOfTrustUser) = getUserInfo(allowedUser) if not os.path.exists(device):
return False
# check the type of the device - if necessary
if not force_dev_type is None:
dev_type = os.stat(device).st_mode % 65536 / 4096
if dev_type != force_dev_type: return False
# retrieve the information for the real user id
(trustUserName, trustUID, groupsOfTrustUser) = getUserInfo(os.getuid())
trustGIDs = [] trustGIDs = []
try: try:
"if 'allowedGroups' are defined, then ignore the previously aquired groups" "if 'allowedGroups' are defined, then ignore the previously aquired groups"
@ -49,57 +55,72 @@ def isWriteableBlock(device):
except TypeError: except TypeError:
"if it fails, then 'allowedGroups' was really not well defined" "if it fails, then 'allowedGroups' was really not well defined"
trustGIDs.append(allowedGID) trustGIDs.append(allowedGID)
DEF_TYPES = { "pipe":1, "char":2, "dir":4, "block":6, "file":8, "link":10, "socket":12} # set the default groups of the caller for the check (restore them later)
if dev_type != DEF_TYPES["block"]: return False savedGroups = os.getgroups()
## could the following check be replaced by os.access? os.setgroups(trustGIDs)
"does the owner id match?" # check permissions
if owner_uid == trustUID: result = os.access(device, os.W_OK) and os.access(device, os.R_OK)
"is the write bit (2) set for the owner?" # reset the groups of this process
return perm_u & 2 os.setgroups(savedGroups)
if owner_gid in trustGIDs: return result
return perm_g & 2
"we seem to be one of 'others' - so just check the bit"
return perm_o & 2
def run_cryptsetup(device, args): def run_cryptsetup(args):
"""execute cryptsetup as root """execute cryptsetup as root
@args: list of arguments - they will be treated accordingly to the first element @args: list of arguments - they will be treated accordingly to the first element
of this list (the action)""" of this list (the action)"""
if not args: raise "WrongArguments", "no action for cryptsetup supplied" if not args: raise "WrongArguments", "no action for cryptsetup supplied"
if type(args) != types.ListType: raise "WrongArguments", "invalid arguments supplied" if type(args) != types.ListType: raise "WrongArguments", "invalid arguments supplied: %s" % (args, )
try: try:
action = args[0] action = args[0]
del args[0] del args[0]
cs_args = [allowedProgs["cryptsetup"]] device = None
cs_args.extend(args) cmd_args = []
if action == "luksFormat": if action == "luksFormat":
cs_args.append(action) device = args[0]; del args[0]
cs_args.append(device) cmd_args.append(action)
cmd_args.append(device)
elif action == "luksUUID": elif action == "luksUUID":
cs_args.append(action) device = args[0]; del args[0]
cs_args.append(device) cmd_args.append(action)
cmd_args.append(device)
elif action == "luksOpen": elif action == "luksOpen":
if len(cs_args) < 2: raise "WrongArguments", "missing arguments" if len(args) < 2: raise "WrongArguments", "missing arguments"
cs_args.insert(-1, action) device = args[0]; del args[0]
cs_args.insert(-1, device) destination = args[0]; del args[0]
cmd_args.append(action)
cmd_args.append(device)
cmd_args.append(destination)
elif action == "luksClose": elif action == "luksClose":
if len(cs_args) < 2: raise "WrongArguments", "missing arguments" if len(args) < 1: raise "WrongArguments", "missing arguments"
cs_args.insert(-1, action) destination = args[0]; del args[0]
# maybe add a check for the mapped device's permissions?
# dmsetup deps self.device
cmd_args.append(action)
cmd_args.append(destination)
elif action == "luksAddKey": elif action == "luksAddKey":
cs_args.append(action) device = args[0]; del args[0]
cs_args.append(device) cmd_args.append(action)
cmd_args.append(device)
elif action == "luksDelKey": elif action == "luksDelKey":
if len(cs_args) < 2: raise "WrongArguments", "missing arguments" if len(cs_args) < 2: raise "WrongArguments", "missing arguments"
cs_args.insert(-1, action) device = args[0]; del args[0]
cs_args.insert(-1, device) cmd_args.insert(-1, action)
cmd_args.insert(-1, device)
elif action == "isLuks": elif action == "isLuks":
cs_args.append(action) device = args[0]; del args[0]
cs_args.append(device) cmd_args.append(action)
else: raise "WrongArguments", "invalid action supplied" cmd_args.append(device)
except TypeError: else: raise "WrongArguments", "invalid action supplied: %s" % (action, )
raise "WrongArguments", "invalid arguments supplied" # check if a device was defined - and check it
if (not device is None) and (not isWriteable(device, DEV_TYPES["block"])):
raise "WrongArguments", "%s is not a writeable block device" % (device, )
cs_args = [allowedProgs["cryptsetup"]]
cs_args.extend(args)
cs_args.extend(cmd_args)
except (TypeError, IndexError):
raise "WrongArguments", "invalid arguments supplied: %s" % (args, )
# execute cryptsetup with the given parameters # execute cryptsetup with the given parameters
proc = subprocess.Popen( proc = subprocess.Popen(
shell = False, shell = False,
@ -108,12 +129,78 @@ def run_cryptsetup(device, args):
return proc.returncode == 0 return proc.returncode == 0
def run_sfdisk(device, args): def run_sfdisk(args):
"""execute sfdisk for partitioning
not implemented yet"""
print "ok - you are free to call sfdisk ..." print "ok - you are free to call sfdisk ..."
print " not yet implemented ..." print " not yet implemented ..."
return True return True
def run_mount(args):
"""execute mount
"""
if not args: raise "WrongArguments", "no destination for mount supplied"
if type(args) != types.ListType: raise "WrongArguments", "invalid arguments supplied: %s" % (args, )
try:
device = args[0]
del args[0]
destination = args[0]
del args[0]
# check permissions for the device
if not isWriteable(device, DEV_TYPES["block"]):
raise "WrongArguments", "%s is not a writeable block device" % (device, )
# check permissions for the mountpoint
if not isWriteable(destination, DEV_TYPES["dir"]):
raise "WrongArguments", "the mountpoint (%s) is not writeable" % (destination, )
# check for additional (not allowed) arguments
if len(args) != 0:
raise "WrongArguments", "too many arguments for 'mount': %s" % (args, )
except TypeError:
raise "WrongArguments", "invalid arguments supplied: %s" % (args, )
# execute mount with the given parameters
# first overwrite the real uid, as 'mount' wants this to be zero (root)
savedUID = os.getuid()
os.setuid(os.geteuid())
# execute mount
proc = subprocess.Popen(
shell = False,
args = [allowedProgs["mount"], device, destination])
proc.communicate()
# restore previous real uid
os.setuid(savedUID)
return proc.returncode == 0
def run_umount(args):
"""execute mount
"""
if not args: raise "WrongArguments", "no mountpoint for umount supplied"
if type(args) != types.ListType: raise "WrongArguments", "invalid arguments supplied"
try:
destination = args[0]
del args[0]
# check permissions for the destination
if not isWriteable(destination, DEV_TYPES["dir"]):
raise "WrongArguments", "the mountpoint (%s) is not writeable" % (destination, )
if len(args) != 0: raise "WrongArguments", "umount does not allow arguments"
except TypeError:
raise "WrongArguments", "invalid arguments supplied"
# execute umount with the given parameters
# first overwrite the real uid, as 'umount' wants this to be zero (root)
savedUID = os.getuid()
os.setuid(os.geteuid())
# execute umount (with the parameter '-l' - lazy umount)
proc = subprocess.Popen(
shell = False,
args = [allowedProgs["umount"], "-l", destination])
proc.communicate()
# restore previous real uid
os.setuid(savedUID)
return proc.returncode == 0
def getUserInfo(user): def getUserInfo(user):
"""return information about the specified user """return information about the specified user
@ -139,37 +226,45 @@ def getUserInfo(user):
# prevent import # prevent import
if __name__ == "__main__": if __name__ == "__main__":
# do we have root privileges (effective uid is zero)?
if os.geteuid() != 0:
sys.stderr.write("the effective uid is not zero - you should use 'super' to call this script (%s)" % sys.argv[0])
sys.exit(100)
# remove program name # remove program name
sys.argv.remove(sys.argv[0]) sys.argv.remove(sys.argv[0])
# do not allow to use root permissions (real uid may not be zero)
if os.getuid() == 0:
sys.stderr.write("the uid of the caller is zero (root) - this is not allowed\n")
sys.exit(100)
# did the user call the "check" action?
if (len(sys.argv) == 1) and (sys.argv[0].lower() == "check"):
# exit silently
sys.exit(0)
# check parameters count
if len(sys.argv) < 2: if len(sys.argv) < 2:
sys.stderr.write("Not enough arguments supplied (%s)!\n" % " ".join(sys.argv)) sys.stderr.write("Not enough arguments supplied (%s)!\n" % " ".join(sys.argv))
sys.exit(100) sys.exit(100)
progRequest = sys.argv[0] progRequest = sys.argv[0]
deviceRequest = sys.argv[1] del sys.argv[0]
sys.argv.remove(progRequest)
sys.argv.remove(deviceRequest)
if not progRequest in allowedProgs.keys(): if not progRequest in allowedProgs.keys():
sys.stderr.write("Invalid program requested: %s\n" % progRequest) sys.stderr.write("Invalid program requested: %s\n" % progRequest)
sys.exit(100) sys.exit(100)
if not os.path.exists(deviceRequest):
sys.stderr.write("The specified device (%s) does not exist!\n" % deviceRequest)
sys.exit(100)
if not isWriteableBlock(deviceRequest):
sys.stderr.write("This device (%s) must be a writeable block device!\n" % deviceRequest)
sys.exit(100)
if progRequest == "cryptsetup": runner = run_cryptsetup if progRequest == "cryptsetup": runner = run_cryptsetup
elif progRequest == "sfdisk": runner = run_sfdisk elif progRequest == "sfdisk": runner = run_sfdisk
elif progRequest == "mount": runner = run_mount
elif progRequest == "umount": runner = run_umount
else: else:
sys.stderr.write("The interface for this program (%s) is not yet implemented!\n" % progRequest) sys.stderr.write("The interface for this program (%s) is not yet implemented!\n" % progRequest)
sys.exit(100) sys.exit(100)
try: try:
if runner(deviceRequest, sys.argv): if runner(sys.argv):
sys.exit(0) sys.exit(0)
else: else:
sys.exit(1) sys.exit(1)

2
pythonrewrite/bin2/CryptoBoxWebserver.py Normal file → Executable file
View file

@ -1,4 +1,4 @@
#!/usr/bin/env python #!/usr/bin/env python2.4
import os,sys import os,sys
import CryptoBox import CryptoBox

View file

@ -69,4 +69,7 @@ mkfs-config = /sbin/mkfs.ext2
blkid = /sbin/blkid blkid = /sbin/blkid
mount = /bin/mount mount = /bin/mount
umount = /bin/umount umount = /bin/umount
super = /usr/bin/super
# this is the "program" name as defined in the /etc/super.tab
CryptoBoxRootActions = CryptoBoxRootActions

View file

@ -0,0 +1,2 @@
# adapt the following line to your local setup and add it to /etc/super.tab
CryptoBoxRootActions /your/local/path/to/CryptoBoxRootActions.py yourUserName

0
pythonrewrite/bin2/filehandling.py Normal file → Executable file
View file

2
pythonrewrite/bin2/test.py Normal file → Executable file
View file

@ -1,3 +1,5 @@
#!/usr/bin/env python2.4
""" """
BEWARE: this script may overwrite the data of one of your loop devices. You BEWARE: this script may overwrite the data of one of your loop devices. You
should restrict the AllowedDevices directive in cryptobox.conf to exclude should restrict the AllowedDevices directive in cryptobox.conf to exclude

2
pythonrewrite/bin2/unittests.CryptoBox.py Normal file → Executable file
View file

@ -1,3 +1,5 @@
#!/usr/bin/env python2.4
import unittest import unittest
class CryptoBoxPropsDeviceTests(unittest.TestCase): class CryptoBoxPropsDeviceTests(unittest.TestCase):