lars
0c23c5e2c3
fixed: use 'cryptsetup luksUUID' to retrieve UUIDs of luks devices added: analyse request URL andd add information to hdf dataset added: attribute 'device' is persistent (like 'weblang') to allow redirects between volume plugins changed: moved URLs of plugins from 'plugins/???' to '???' (flat hierarchie -> simple links) added: new action for all plugins: "redirect" - call specific plugin after completing the current one fixed: permission fix for vfat filesystems during mount / for ext2/3: after mount regular checks for config partition (mount, if it becomes available)
386 lines
12 KiB
Python
Executable file
386 lines
12 KiB
Python
Executable file
#!/usr/bin/env python2.4
|
|
|
|
"""module for executing the programs, that need root privileges
|
|
|
|
Syntax:
|
|
- program
|
|
- device
|
|
- [action]
|
|
- [action args]
|
|
|
|
this script will always return with an exitcode 0 (true), if "check" is the only argument
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import subprocess
|
|
import pwd
|
|
import grp
|
|
import types
|
|
|
|
allowedProgs = {
|
|
"sfdisk": "/sbin/sfdisk",
|
|
"cryptsetup": "/sbin/cryptsetup",
|
|
"mount": "/bin/mount",
|
|
"umount": "/bin/umount",
|
|
"blkid": "/sbin/blkid",
|
|
}
|
|
|
|
|
|
DEV_TYPES = { "pipe":1, "char":2, "dir":4, "block":6, "file":8, "link":10, "socket":12}
|
|
|
|
|
|
def checkIfPluginIsSafe(plugin):
|
|
"""check if the plugin and its parents are only writeable for root"""
|
|
#FIXME: for now we may skip this test - but users will not like it this way :)
|
|
return True
|
|
props = os.stat(plugin)
|
|
## check if it is owned by non-root
|
|
if props.st_uid != 0: return False
|
|
## check group-write permission if gid is not zero
|
|
if (props.st_gid != 0) and (props.st_mode % 32 / 16 > 0): return False
|
|
## check if it is world-writeable
|
|
if props.st_mode % 4 / 2 > 0: return False
|
|
## are we at root-level (directory-wise)? If yes, then we are ok ...
|
|
if plugin == os.path.sep: return True
|
|
## check if the parent directory is ok - recursively :)
|
|
return checkIfPluginIsSafe(os.path.dirname(os.path.abspath(plugin)))
|
|
|
|
|
|
def checkIfPluginIsValid(plugin):
|
|
import imp
|
|
try:
|
|
x = imp.load_source("cbox_plugin",plugin)
|
|
except Exception:
|
|
return False
|
|
try:
|
|
if getattr(x, "PLUGIN_TYPE") == "cryptobox":
|
|
return True
|
|
else:
|
|
return False
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def call_plugin(args):
|
|
"""check if the plugin may be called - and do it finally ..."""
|
|
plugin = os.path.abspath(args[0])
|
|
del args[0]
|
|
## check existence and excutability
|
|
if not os.access(plugin, os.X_OK):
|
|
raise Exception, "could not find executable plugin (%s)" % plugin
|
|
## check if the plugin (and its parents) are only writeable for root
|
|
if not checkIfPluginIsSafe(plugin):
|
|
raise Exception, "the plugin (%s) was not safe - check its (and its parents') permissions" % plugin
|
|
## check if the plugin is a python program, that is marked as a cryptobox plugin
|
|
if not checkIfPluginIsValid(plugin):
|
|
raise Exception, "the plugin (%s) is not a correctly marked python script" % plugin
|
|
args.insert(0,plugin)
|
|
proc = subprocess.Popen(
|
|
shell = False,
|
|
args = args)
|
|
proc.wait()
|
|
return proc.returncode == 0
|
|
|
|
|
|
def isWriteable(device, force_dev_type=None):
|
|
"""check if the calling user (not root!) has write access to the device/file
|
|
|
|
the real (not the effictive) user id is used for the check
|
|
additionally the permissions of the default groups of the real uid are checked
|
|
this check works nicely together with "super", as it changes (by default) only
|
|
the effective uid (not the real uid)
|
|
"""
|
|
# first check, if the device/file exists
|
|
if not os.path.exists(device):
|
|
return False
|
|
# check the type of the device - if necessary
|
|
if not force_dev_type is None:
|
|
dev_type = os.stat(device).st_mode % 65536 / 4096
|
|
if dev_type != force_dev_type: return False
|
|
# retrieve the information for the real user id
|
|
(trustUserName, trustUID, groupsOfTrustUser) = getUserInfo(os.getuid())
|
|
# set the default groups of the caller for the check (restore them later)
|
|
savedGroups = os.getgroups()
|
|
os.setgroups(groupsOfTrustUser)
|
|
# check permissions
|
|
result = os.access(device, os.W_OK) and os.access(device, os.R_OK)
|
|
# reset the groups of this process
|
|
os.setgroups(savedGroups)
|
|
return result
|
|
|
|
|
|
def run_cryptsetup(args):
|
|
"""execute cryptsetup as root
|
|
|
|
@args: list of arguments - they will be treated accordingly to the first element
|
|
of this list (the action)"""
|
|
if not args: raise "WrongArguments", "no action for cryptsetup supplied"
|
|
if type(args) != types.ListType: raise "WrongArguments", "invalid arguments supplied: %s" % (args, )
|
|
try:
|
|
action = args[0]
|
|
del args[0]
|
|
device = None
|
|
cmd_args = []
|
|
if action == "luksFormat":
|
|
device = args[0]; del args[0]
|
|
cmd_args.append(action)
|
|
cmd_args.append(device)
|
|
elif action == "luksUUID":
|
|
device = args[0]; del args[0]
|
|
cmd_args.append(action)
|
|
cmd_args.append(device)
|
|
elif action == "luksOpen":
|
|
if len(args) < 2: raise "WrongArguments", "missing arguments"
|
|
device = args[0]; del args[0]
|
|
destination = args[0]; del args[0]
|
|
cmd_args.append(action)
|
|
cmd_args.append(device)
|
|
cmd_args.append(destination)
|
|
elif action == "luksClose":
|
|
if len(args) < 1: raise "WrongArguments", "missing arguments"
|
|
destination = args[0]; del args[0]
|
|
# maybe add a check for the mapped device's permissions?
|
|
# dmsetup deps self.device
|
|
cmd_args.append(action)
|
|
cmd_args.append(destination)
|
|
elif action == "luksAddKey":
|
|
device = args[0]; del args[0]
|
|
cmd_args.append(action)
|
|
cmd_args.append(device)
|
|
elif action == "luksDelKey":
|
|
if len(cs_args) < 2: raise "WrongArguments", "missing arguments"
|
|
device = args[0]; del args[0]
|
|
cmd_args.insert(-1, action)
|
|
cmd_args.insert(-1, device)
|
|
elif action == "isLuks":
|
|
device = args[0]; del args[0]
|
|
cmd_args.append(action)
|
|
cmd_args.append(device)
|
|
else: raise "WrongArguments", "invalid action supplied: %s" % (action, )
|
|
# check if a device was defined - and check it
|
|
if (not device is None) and (not isWriteable(device, DEV_TYPES["block"])):
|
|
raise "WrongArguments", "%s is not a writeable block device" % (device, )
|
|
cs_args = [allowedProgs["cryptsetup"]]
|
|
cs_args.extend(args)
|
|
cs_args.extend(cmd_args)
|
|
except (TypeError, IndexError):
|
|
raise "WrongArguments", "invalid arguments supplied: %s" % (args, )
|
|
# execute cryptsetup with the given parameters
|
|
proc = subprocess.Popen(
|
|
shell = False,
|
|
args = cs_args)
|
|
proc.wait()
|
|
## chown the devmapper block device to the cryptobox user
|
|
if (proc.returncode == 0) and (action == "luksOpen"):
|
|
os.chown(os.path.join(os.path.sep, "dev", "mapper", destination), os.getuid(), os.getgid())
|
|
return proc.returncode == 0
|
|
|
|
|
|
def run_sfdisk(args):
|
|
"""execute sfdisk for partitioning
|
|
|
|
not implemented yet"""
|
|
print "ok - you are free to call sfdisk ..."
|
|
print " not yet implemented ..."
|
|
return True
|
|
|
|
|
|
def getFSType(device):
|
|
"""get the filesystem type of a device"""
|
|
proc = subprocess.Popen(
|
|
shell = False,
|
|
stdout = subprocess.PIPE,
|
|
args = [ allowedProgs["blkid"],
|
|
"-s", "TYPE",
|
|
"-o", "value",
|
|
"-c", os.devnull,
|
|
"-w", os.devnull,
|
|
device])
|
|
(stdout, stderr) = proc.communicate()
|
|
if proc.returncode != 0:
|
|
return None
|
|
return stdout.strip()
|
|
|
|
|
|
def run_mount(args):
|
|
"""execute mount
|
|
"""
|
|
if not args: raise "WrongArguments", "no destination for mount supplied"
|
|
if type(args) != types.ListType: raise "WrongArguments", "invalid arguments supplied: %s" % (args, )
|
|
try:
|
|
device = args[0]
|
|
del args[0]
|
|
destination = args[0]
|
|
del args[0]
|
|
# check permissions for the device
|
|
if not isWriteable(device, DEV_TYPES["block"]):
|
|
raise "WrongArguments", "%s is not a writeable block device" % (device, )
|
|
## check permissions for the mountpoint
|
|
if not isWriteable(destination, DEV_TYPES["dir"]):
|
|
raise "WrongArguments", "the mountpoint (%s) is not writeable" % (destination, )
|
|
# check for additional (not allowed) arguments
|
|
if len(args) != 0:
|
|
raise "WrongArguments", "too many arguments for 'mount': %s" % (args, )
|
|
except TypeError:
|
|
raise "WrongArguments", "invalid arguments supplied: %s" % (args, )
|
|
# execute mount with the given parameters
|
|
# first overwrite the real uid, as 'mount' wants this to be zero (root)
|
|
savedUID = os.getuid()
|
|
os.setuid(os.geteuid())
|
|
## we have to change the permissions of the mounted directory - otherwise it will
|
|
## not be writeable for the cryptobox user
|
|
## for 'vfat' we have to do this during mount
|
|
## for ext2/3 we have to do it afterward
|
|
## first: get the user/group of the target
|
|
(trustUserName, trustUID, groupsOfTrustUser) = getUserInfo(savedUID)
|
|
trustGID = groupsOfTrustUser[0]
|
|
fsType = getFSType(device)
|
|
## define arguments
|
|
if fsType == "vfat":
|
|
## add the "uid/gid" arguments to the mount call
|
|
mount_args = [allowedProgs["mount"],
|
|
"-o", "uid=%d,gid=%d" % (trustUID, trustGID),
|
|
device,
|
|
destination]
|
|
else:
|
|
## all other filesystem types will be handled after mount
|
|
mount_args = [allowedProgs["mount"], device, destination]
|
|
# execute mount
|
|
proc = subprocess.Popen(
|
|
shell = False,
|
|
args = mount_args)
|
|
proc.wait()
|
|
## return in case of an error
|
|
if proc.returncode != 0:
|
|
return False
|
|
## for vfat: we are done
|
|
if fsType == "vfat": return True
|
|
## for all other filesystem types: chown the mount directory
|
|
try:
|
|
os.chown(destination, trustUID, groupsOfTrustUser[0])
|
|
except OSError, errMsg:
|
|
sys.stderr.write("could not chown the mount destination (%s) to the specified user (%d/%d): %s\n" % (destination, trustUID, groupsOfTrustUser[0], errMsg))
|
|
sys.stderr.write("UID: %d\n" % (os.geteuid(),))
|
|
return False
|
|
## BEWARE: it would be nice, if we could restore the previous uid (not euid) but
|
|
## this would also override the euid (see 'man 2 setuid') - any ideas?
|
|
return True
|
|
|
|
|
|
def run_umount(args):
|
|
"""execute mount
|
|
"""
|
|
if not args: raise "WrongArguments", "no mountpoint for umount supplied"
|
|
if type(args) != types.ListType: raise "WrongArguments", "invalid arguments supplied"
|
|
try:
|
|
destination = args[0]
|
|
del args[0]
|
|
# check permissions for the destination
|
|
if not isWriteable(os.path.dirname(destination), DEV_TYPES["dir"]):
|
|
raise "WrongArguments", "the parent of the mountpoint (%s) is not writeable" % (destination, )
|
|
if len(args) != 0: raise "WrongArguments", "umount does not allow arguments"
|
|
except TypeError:
|
|
raise "WrongArguments", "invalid arguments supplied"
|
|
# execute umount with the given parameters
|
|
# first overwrite the real uid, as 'umount' wants this to be zero (root)
|
|
savedUID = os.getuid()
|
|
os.setuid(os.geteuid())
|
|
# execute umount (with the parameter '-l' - lazy umount)
|
|
proc = subprocess.Popen(
|
|
shell = False,
|
|
args = [allowedProgs["umount"], "-l", destination])
|
|
proc.wait()
|
|
# restore previous real uid
|
|
os.setuid(savedUID)
|
|
return proc.returncode == 0
|
|
|
|
|
|
def getUserInfo(user):
|
|
"""return information about the specified user
|
|
|
|
@user: (uid or name)
|
|
@return: tuple of (name, uid, (groups))
|
|
"""
|
|
if user is None: raise "KeyError", "no user supplied"
|
|
# first check, if 'user' contains an id - then check for a name
|
|
try:
|
|
userinfo = pwd.getpwuid(user)
|
|
except TypeError:
|
|
# if a KeyError is raised again, then the supplied user was invalid
|
|
userinfo = pwd.getpwnam(user)
|
|
u_groups =[one_group.gr_gid
|
|
for one_group in grp.getgrall()
|
|
if userinfo.pw_name in one_group.gr_mem]
|
|
if not userinfo.pw_gid in u_groups: u_groups.append(userinfo.pw_gid)
|
|
return (userinfo.pw_name, userinfo.pw_uid, u_groups)
|
|
|
|
|
|
# **************** main **********************
|
|
|
|
# prevent import
|
|
if __name__ == "__main__":
|
|
|
|
# do we have root privileges (effective uid is zero)?
|
|
if os.geteuid() != 0:
|
|
sys.stderr.write("the effective uid is not zero - you should use 'super' to call this script (%s)" % sys.argv[0])
|
|
sys.exit(100)
|
|
|
|
# remove program name
|
|
args = sys.argv[1:]
|
|
|
|
# do not allow to use root permissions (real uid may not be zero)
|
|
if os.getuid() == 0:
|
|
sys.stderr.write("the uid of the caller is zero (root) - this is not allowed\n")
|
|
sys.exit(100)
|
|
|
|
# check if there were arguments
|
|
if (len(args) == 0):
|
|
sys.stderr.write("No arguments supplied\n")
|
|
sys.exit(100)
|
|
|
|
# did the user call the "check" action?
|
|
if (len(args) == 1) and (args[0].lower() == "check"):
|
|
# exit silently
|
|
sys.exit(0)
|
|
|
|
if args[0].lower() == "plugin":
|
|
del args[0]
|
|
try:
|
|
isOK = call_plugin(args)
|
|
except Exception, errMsg:
|
|
sys.stderr.write("Execution of plugin failed: %s\n" % errMsg)
|
|
sys.exit(100)
|
|
if isOK:
|
|
sys.exit(0)
|
|
else:
|
|
sys.exit(1)
|
|
|
|
# check parameters count
|
|
if len(args) < 2:
|
|
sys.stderr.write("Not enough arguments supplied (%s)!\n" % " ".join(args))
|
|
sys.exit(100)
|
|
|
|
progRequest = args[0]
|
|
del args[0]
|
|
|
|
if not progRequest in allowedProgs.keys():
|
|
sys.stderr.write("Invalid program requested: %s\n" % progRequest)
|
|
sys.exit(100)
|
|
|
|
if progRequest == "cryptsetup": runner = run_cryptsetup
|
|
elif progRequest == "sfdisk": runner = run_sfdisk
|
|
elif progRequest == "mount": runner = run_mount
|
|
elif progRequest == "umount": runner = run_umount
|
|
else:
|
|
sys.stderr.write("The interface for this program (%s) is not yet implemented!\n" % progRequest)
|
|
sys.exit(100)
|
|
try:
|
|
if runner(args):
|
|
sys.exit(0)
|
|
else:
|
|
sys.exit(1)
|
|
except "WrongArguments", errstr:
|
|
sys.stderr.write("Execution failed: %s\n" % errstr)
|
|
sys.exit(100)
|
|
|