lars
0fe6d426ed
moved config partition handling to CryptoBoxSettings implemented environment checks (writeable config, https (off for now)) chown mounted directory after mount to the cryptobox user
351 lines
11 KiB
Python
Executable file
351 lines
11 KiB
Python
Executable file
#!/usr/bin/env python2.4
|
|
|
|
"""module for executing the programs, that need root privileges
|
|
|
|
Syntax:
|
|
- program
|
|
- device
|
|
- [action]
|
|
- [action args]
|
|
|
|
this script will always return with an exitcode 0 (true), if "check" is the only argument
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import subprocess
|
|
import pwd
|
|
import grp
|
|
import types
|
|
|
|
allowedProgs = {
|
|
"sfdisk": "/sbin/sfdisk",
|
|
"cryptsetup": "/sbin/cryptsetup",
|
|
"mount": "/bin/mount",
|
|
"umount": "/bin/umount",
|
|
}
|
|
|
|
|
|
DEV_TYPES = { "pipe":1, "char":2, "dir":4, "block":6, "file":8, "link":10, "socket":12}
|
|
|
|
|
|
def checkIfPluginIsSafe(plugin):
|
|
"""check if the plugin and its parents are only writeable for root"""
|
|
#FIXME: for now we may skip this test - but users will not like it this way :)
|
|
return True
|
|
props = os.stat(plugin)
|
|
## check if it is owned by non-root
|
|
if props.st_uid != 0: return False
|
|
## check group-write permission if gid is not zero
|
|
if (props.st_gid != 0) and (props.st_mode % 32 / 16 > 0): return False
|
|
## check if it is world-writeable
|
|
if props.st_mode % 4 / 2 > 0: return False
|
|
## are we at root-level (directory-wise)? If yes, then we are ok ...
|
|
if plugin == os.path.sep: return True
|
|
## check if the parent directory is ok - recursively :)
|
|
return checkIfPluginIsSafe(os.path.dirname(os.path.abspath(plugin)))
|
|
|
|
|
|
def checkIfPluginIsValid(plugin):
|
|
import imp
|
|
try:
|
|
x = imp.load_source("cbox_plugin",plugin)
|
|
except Exception:
|
|
return False
|
|
try:
|
|
if getattr(x, "PLUGIN_TYPE") == "cryptobox":
|
|
return True
|
|
else:
|
|
return False
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def call_plugin(args):
|
|
"""check if the plugin may be called - and do it finally ..."""
|
|
plugin = os.path.abspath(args[0])
|
|
del args[0]
|
|
## check existence and excutability
|
|
if not os.access(plugin, os.X_OK):
|
|
raise Exception, "could not find executable plugin (%s)" % plugin
|
|
## check if the plugin (and its parents) are only writeable for root
|
|
if not checkIfPluginIsSafe(plugin):
|
|
raise Exception, "the plugin (%s) was not safe - check its (and its parents') permissions" % plugin
|
|
## check if the plugin is a python program, that is marked as a cryptobox plugin
|
|
if not checkIfPluginIsValid(plugin):
|
|
raise Exception, "the plugin (%s) is not a correctly marked python script" % plugin
|
|
args.insert(0,plugin)
|
|
proc = subprocess.Popen(
|
|
shell = False,
|
|
args = args)
|
|
proc.wait()
|
|
return proc.returncode == 0
|
|
|
|
|
|
def isWriteable(device, force_dev_type=None):
|
|
"""check if the calling user (not root!) has write access to the device/file
|
|
|
|
the real (not the effictive) user id is used for the check
|
|
additionally the permissions of the default groups of the real uid are checked
|
|
this check works nicely together with "super", as it changes (by default) only
|
|
the effective uid (not the real uid)
|
|
"""
|
|
# first check, if the device/file exists
|
|
if not os.path.exists(device):
|
|
return False
|
|
# check the type of the device - if necessary
|
|
if not force_dev_type is None:
|
|
dev_type = os.stat(device).st_mode % 65536 / 4096
|
|
if dev_type != force_dev_type: return False
|
|
# retrieve the information for the real user id
|
|
(trustUserName, trustUID, groupsOfTrustUser) = getUserInfo(os.getuid())
|
|
# set the default groups of the caller for the check (restore them later)
|
|
savedGroups = os.getgroups()
|
|
os.setgroups(groupsOfTrustUser)
|
|
# check permissions
|
|
result = os.access(device, os.W_OK) and os.access(device, os.R_OK)
|
|
# reset the groups of this process
|
|
os.setgroups(savedGroups)
|
|
return result
|
|
|
|
|
|
def run_cryptsetup(args):
|
|
"""execute cryptsetup as root
|
|
|
|
@args: list of arguments - they will be treated accordingly to the first element
|
|
of this list (the action)"""
|
|
if not args: raise "WrongArguments", "no action for cryptsetup supplied"
|
|
if type(args) != types.ListType: raise "WrongArguments", "invalid arguments supplied: %s" % (args, )
|
|
try:
|
|
action = args[0]
|
|
del args[0]
|
|
device = None
|
|
cmd_args = []
|
|
if action == "luksFormat":
|
|
device = args[0]; del args[0]
|
|
cmd_args.append(action)
|
|
cmd_args.append(device)
|
|
elif action == "luksUUID":
|
|
device = args[0]; del args[0]
|
|
cmd_args.append(action)
|
|
cmd_args.append(device)
|
|
elif action == "luksOpen":
|
|
if len(args) < 2: raise "WrongArguments", "missing arguments"
|
|
device = args[0]; del args[0]
|
|
destination = args[0]; del args[0]
|
|
cmd_args.append(action)
|
|
cmd_args.append(device)
|
|
cmd_args.append(destination)
|
|
elif action == "luksClose":
|
|
if len(args) < 1: raise "WrongArguments", "missing arguments"
|
|
destination = args[0]; del args[0]
|
|
# maybe add a check for the mapped device's permissions?
|
|
# dmsetup deps self.device
|
|
cmd_args.append(action)
|
|
cmd_args.append(destination)
|
|
elif action == "luksAddKey":
|
|
device = args[0]; del args[0]
|
|
cmd_args.append(action)
|
|
cmd_args.append(device)
|
|
elif action == "luksDelKey":
|
|
if len(cs_args) < 2: raise "WrongArguments", "missing arguments"
|
|
device = args[0]; del args[0]
|
|
cmd_args.insert(-1, action)
|
|
cmd_args.insert(-1, device)
|
|
elif action == "isLuks":
|
|
device = args[0]; del args[0]
|
|
cmd_args.append(action)
|
|
cmd_args.append(device)
|
|
else: raise "WrongArguments", "invalid action supplied: %s" % (action, )
|
|
# check if a device was defined - and check it
|
|
if (not device is None) and (not isWriteable(device, DEV_TYPES["block"])):
|
|
raise "WrongArguments", "%s is not a writeable block device" % (device, )
|
|
cs_args = [allowedProgs["cryptsetup"]]
|
|
cs_args.extend(args)
|
|
cs_args.extend(cmd_args)
|
|
except (TypeError, IndexError):
|
|
raise "WrongArguments", "invalid arguments supplied: %s" % (args, )
|
|
# execute cryptsetup with the given parameters
|
|
proc = subprocess.Popen(
|
|
shell = False,
|
|
args = cs_args)
|
|
proc.wait()
|
|
## chown the devmapper block device to the cryptobox user
|
|
if (proc.returncode == 0) and (action == "luksOpen"):
|
|
os.chown(os.path.join(os.path.sep, "dev", "mapper", destination), os.getuid(), os.getgid())
|
|
return proc.returncode == 0
|
|
|
|
|
|
def run_sfdisk(args):
|
|
"""execute sfdisk for partitioning
|
|
|
|
not implemented yet"""
|
|
print "ok - you are free to call sfdisk ..."
|
|
print " not yet implemented ..."
|
|
return True
|
|
|
|
|
|
def run_mount(args):
|
|
"""execute mount
|
|
"""
|
|
if not args: raise "WrongArguments", "no destination for mount supplied"
|
|
if type(args) != types.ListType: raise "WrongArguments", "invalid arguments supplied: %s" % (args, )
|
|
try:
|
|
device = args[0]
|
|
del args[0]
|
|
destination = args[0]
|
|
del args[0]
|
|
# check permissions for the device
|
|
if not isWriteable(device, DEV_TYPES["block"]):
|
|
raise "WrongArguments", "%s is not a writeable block device" % (device, )
|
|
# check permissions for the mountpoint
|
|
if not isWriteable(destination, DEV_TYPES["dir"]):
|
|
raise "WrongArguments", "the mountpoint (%s) is not writeable" % (destination, )
|
|
# check for additional (not allowed) arguments
|
|
if len(args) != 0:
|
|
raise "WrongArguments", "too many arguments for 'mount': %s" % (args, )
|
|
except TypeError:
|
|
raise "WrongArguments", "invalid arguments supplied: %s" % (args, )
|
|
# execute mount with the given parameters
|
|
# first overwrite the real uid, as 'mount' wants this to be zero (root)
|
|
savedUID = os.getuid()
|
|
os.setuid(os.geteuid())
|
|
# execute mount
|
|
proc = subprocess.Popen(
|
|
shell = False,
|
|
args = [allowedProgs["mount"], device, destination])
|
|
proc.wait()
|
|
## return in case of an error
|
|
if proc.returncode != 0:
|
|
return False
|
|
## chown the mounted directory - otherwise it will not be writeable for
|
|
## the cryptobox user (at least for the configuration partition this is
|
|
## absolutely necessary) TODO: check if this is valid for data, too
|
|
(trustUserName, trustUID, groupsOfTrustUser) = getUserInfo(savedUID)
|
|
try:
|
|
os.chown(destination, trustUID, groupsOfTrustUser[0])
|
|
except OSError, errMsg:
|
|
sys.stderr.write("could not chown the mount destination (%s) to the specified user (%d/%d): %s\n" % (destination, trustUID, groupsOfTrustUser[0], errMsg))
|
|
sys.stderr.write("UID: %d\n" % (os.geteuid(),))
|
|
return False
|
|
## BEWARE: it would be nice, if we could restore the previous uid (not euid) but
|
|
## this would also override the euid (see 'man 2 setuid') - any ideas?
|
|
return True
|
|
|
|
|
|
def run_umount(args):
|
|
"""execute mount
|
|
"""
|
|
if not args: raise "WrongArguments", "no mountpoint for umount supplied"
|
|
if type(args) != types.ListType: raise "WrongArguments", "invalid arguments supplied"
|
|
try:
|
|
destination = args[0]
|
|
del args[0]
|
|
# check permissions for the destination
|
|
if not isWriteable(os.path.dirname(destination), DEV_TYPES["dir"]):
|
|
raise "WrongArguments", "the parent of the mountpoint (%s) is not writeable" % (destination, )
|
|
if len(args) != 0: raise "WrongArguments", "umount does not allow arguments"
|
|
except TypeError:
|
|
raise "WrongArguments", "invalid arguments supplied"
|
|
# execute umount with the given parameters
|
|
# first overwrite the real uid, as 'umount' wants this to be zero (root)
|
|
savedUID = os.getuid()
|
|
os.setuid(os.geteuid())
|
|
# execute umount (with the parameter '-l' - lazy umount)
|
|
proc = subprocess.Popen(
|
|
shell = False,
|
|
args = [allowedProgs["umount"], "-l", destination])
|
|
proc.wait()
|
|
# restore previous real uid
|
|
os.setuid(savedUID)
|
|
return proc.returncode == 0
|
|
|
|
|
|
def getUserInfo(user):
|
|
"""return information about the specified user
|
|
|
|
@user: (uid or name)
|
|
@return: tuple of (name, uid, (groups))
|
|
"""
|
|
if user is None: raise "KeyError", "no user supplied"
|
|
# first check, if 'user' contains an id - then check for a name
|
|
try:
|
|
userinfo = pwd.getpwuid(user)
|
|
except TypeError:
|
|
# if a KeyError is raised again, then the supplied user was invalid
|
|
userinfo = pwd.getpwnam(user)
|
|
u_groups =[one_group.gr_gid
|
|
for one_group in grp.getgrall()
|
|
if userinfo.pw_name in one_group.gr_mem]
|
|
if not userinfo.pw_gid in u_groups: u_groups.append(userinfo.pw_gid)
|
|
return (userinfo.pw_name, userinfo.pw_uid, u_groups)
|
|
|
|
|
|
# **************** main **********************
|
|
|
|
# prevent import
|
|
if __name__ == "__main__":
|
|
|
|
# do we have root privileges (effective uid is zero)?
|
|
if os.geteuid() != 0:
|
|
sys.stderr.write("the effective uid is not zero - you should use 'super' to call this script (%s)" % sys.argv[0])
|
|
sys.exit(100)
|
|
|
|
# remove program name
|
|
args = sys.argv[1:]
|
|
|
|
# do not allow to use root permissions (real uid may not be zero)
|
|
if os.getuid() == 0:
|
|
sys.stderr.write("the uid of the caller is zero (root) - this is not allowed\n")
|
|
sys.exit(100)
|
|
|
|
# check if there were arguments
|
|
if (len(args) == 0):
|
|
sys.stderr.write("No arguments supplied\n")
|
|
sys.exit(100)
|
|
|
|
# did the user call the "check" action?
|
|
if (len(args) == 1) and (args[0].lower() == "check"):
|
|
# exit silently
|
|
sys.exit(0)
|
|
|
|
if args[0].lower() == "plugin":
|
|
del args[0]
|
|
try:
|
|
isOK = call_plugin(args)
|
|
except Exception, errMsg:
|
|
sys.stderr.write("Execution of plugin failed: %s\n" % errMsg)
|
|
sys.exit(100)
|
|
if isOK:
|
|
sys.exit(0)
|
|
else:
|
|
sys.exit(1)
|
|
|
|
# check parameters count
|
|
if len(args) < 2:
|
|
sys.stderr.write("Not enough arguments supplied (%s)!\n" % " ".join(args))
|
|
sys.exit(100)
|
|
|
|
progRequest = args[0]
|
|
del args[0]
|
|
|
|
if not progRequest in allowedProgs.keys():
|
|
sys.stderr.write("Invalid program requested: %s\n" % progRequest)
|
|
sys.exit(100)
|
|
|
|
if progRequest == "cryptsetup": runner = run_cryptsetup
|
|
elif progRequest == "sfdisk": runner = run_sfdisk
|
|
elif progRequest == "mount": runner = run_mount
|
|
elif progRequest == "umount": runner = run_umount
|
|
else:
|
|
sys.stderr.write("The interface for this program (%s) is not yet implemented!\n" % progRequest)
|
|
sys.exit(100)
|
|
try:
|
|
if runner(args):
|
|
sys.exit(0)
|
|
else:
|
|
sys.exit(1)
|
|
except "WrongArguments", errstr:
|
|
sys.stderr.write("Execution failed: %s\n" % errstr)
|
|
sys.exit(100)
|
|
|