#!/usr/bin/env python2.4 """module for executing the programs, that need root privileges Syntax: - program - device - [action] - [action args] this script will always return with an exitcode 0 (true), if "check" is the only argument """ import os import sys import subprocess import pwd import grp import types allowedProgs = { "sfdisk": "/sbin/sfdisk", "cryptsetup": "/sbin/cryptsetup", "mount": "/bin/mount", "umount": "/bin/umount", } DEV_TYPES = { "pipe":1, "char":2, "dir":4, "block":6, "file":8, "link":10, "socket":12} def isWriteable(device, force_dev_type=None): """check if the calling user (not root!) has write access to the device/file the real (not the effictive) user id is used for the check additionally the permissions of the default groups of the real uid are checked this check works nicely together with "super", as it changes (by default) only the effective uid (not the real uid)""" # first check, if the device/file exists if not os.path.exists(device): return False # check the type of the device - if necessary if not force_dev_type is None: dev_type = os.stat(device).st_mode % 65536 / 4096 if dev_type != force_dev_type: return False # retrieve the information for the real user id (trustUserName, trustUID, groupsOfTrustUser) = getUserInfo(os.getuid()) # set the default groups of the caller for the check (restore them later) savedGroups = os.getgroups() os.setgroups(groupsOfTrustUser) # check permissions result = os.access(device, os.W_OK) and os.access(device, os.R_OK) # reset the groups of this process os.setgroups(savedGroups) return result def run_cryptsetup(args): """execute cryptsetup as root @args: list of arguments - they will be treated accordingly to the first element of this list (the action)""" if not args: raise "WrongArguments", "no action for cryptsetup supplied" if type(args) != types.ListType: raise "WrongArguments", "invalid arguments supplied: %s" % (args, ) try: action = args[0] del args[0] device = None cmd_args = [] if action == "luksFormat": device = args[0]; del args[0] cmd_args.append(action) cmd_args.append(device) elif action == "luksUUID": device = args[0]; del args[0] cmd_args.append(action) cmd_args.append(device) elif action == "luksOpen": if len(args) < 2: raise "WrongArguments", "missing arguments" device = args[0]; del args[0] destination = args[0]; del args[0] cmd_args.append(action) cmd_args.append(device) cmd_args.append(destination) elif action == "luksClose": if len(args) < 1: raise "WrongArguments", "missing arguments" destination = args[0]; del args[0] # maybe add a check for the mapped device's permissions? # dmsetup deps self.device cmd_args.append(action) cmd_args.append(destination) elif action == "luksAddKey": device = args[0]; del args[0] cmd_args.append(action) cmd_args.append(device) elif action == "luksDelKey": if len(cs_args) < 2: raise "WrongArguments", "missing arguments" device = args[0]; del args[0] cmd_args.insert(-1, action) cmd_args.insert(-1, device) elif action == "isLuks": device = args[0]; del args[0] cmd_args.append(action) cmd_args.append(device) else: raise "WrongArguments", "invalid action supplied: %s" % (action, ) # check if a device was defined - and check it if (not device is None) and (not isWriteable(device, DEV_TYPES["block"])): raise "WrongArguments", "%s is not a writeable block device" % (device, ) cs_args = [allowedProgs["cryptsetup"]] cs_args.extend(args) cs_args.extend(cmd_args) except (TypeError, IndexError): raise "WrongArguments", "invalid arguments supplied: %s" % (args, ) # execute cryptsetup with the given parameters proc = subprocess.Popen( shell = False, args = cs_args) proc.communicate() return proc.returncode == 0 def run_sfdisk(args): """execute sfdisk for partitioning not implemented yet""" print "ok - you are free to call sfdisk ..." print " not yet implemented ..." return True def run_mount(args): """execute mount """ if not args: raise "WrongArguments", "no destination for mount supplied" if type(args) != types.ListType: raise "WrongArguments", "invalid arguments supplied: %s" % (args, ) try: device = args[0] del args[0] destination = args[0] del args[0] # check permissions for the device if not isWriteable(device, DEV_TYPES["block"]): raise "WrongArguments", "%s is not a writeable block device" % (device, ) # check permissions for the mountpoint if not isWriteable(destination, DEV_TYPES["dir"]): raise "WrongArguments", "the mountpoint (%s) is not writeable" % (destination, ) # check for additional (not allowed) arguments if len(args) != 0: raise "WrongArguments", "too many arguments for 'mount': %s" % (args, ) except TypeError: raise "WrongArguments", "invalid arguments supplied: %s" % (args, ) # execute mount with the given parameters # first overwrite the real uid, as 'mount' wants this to be zero (root) savedUID = os.getuid() os.setuid(os.geteuid()) # execute mount proc = subprocess.Popen( shell = False, args = [allowedProgs["mount"], device, destination]) proc.communicate() # restore previous real uid os.setuid(savedUID) return proc.returncode == 0 def run_umount(args): """execute mount """ if not args: raise "WrongArguments", "no mountpoint for umount supplied" if type(args) != types.ListType: raise "WrongArguments", "invalid arguments supplied" try: destination = args[0] del args[0] # check permissions for the destination if not isWriteable(destination, DEV_TYPES["dir"]): raise "WrongArguments", "the mountpoint (%s) is not writeable" % (destination, ) if len(args) != 0: raise "WrongArguments", "umount does not allow arguments" except TypeError: raise "WrongArguments", "invalid arguments supplied" # execute umount with the given parameters # first overwrite the real uid, as 'umount' wants this to be zero (root) savedUID = os.getuid() os.setuid(os.geteuid()) # execute umount (with the parameter '-l' - lazy umount) proc = subprocess.Popen( shell = False, args = [allowedProgs["umount"], "-l", destination]) proc.communicate() # restore previous real uid os.setuid(savedUID) return proc.returncode == 0 def getUserInfo(user): """return information about the specified user @user: (uid or name) @return: tuple of (name, uid, (groups)) """ if user is None: raise "KeyError", "no user supplied" # first check, if 'user' contains an id - then check for a name try: userinfo = pwd.getpwuid(user) except TypeError: # if a KeyError is raised again, then the supplied user was invalid userinfo = pwd.getpwnam(user) u_groups =[one_group.gr_gid for one_group in grp.getgrall() if userinfo.pw_name in one_group.gr_mem] if not userinfo.pw_gid in u_groups: u_groups.append(userinfo.pw_gid) return (userinfo.pw_name, userinfo.pw_uid, u_groups) # **************** main ********************** # prevent import if __name__ == "__main__": # do we have root privileges (effective uid is zero)? if os.geteuid() != 0: sys.stderr.write("the effective uid is not zero - you should use 'super' to call this script (%s)" % sys.argv[0]) sys.exit(100) # remove program name sys.argv.remove(sys.argv[0]) # do not allow to use root permissions (real uid may not be zero) if os.getuid() == 0: sys.stderr.write("the uid of the caller is zero (root) - this is not allowed\n") sys.exit(100) # did the user call the "check" action? if (len(sys.argv) == 1) and (sys.argv[0].lower() == "check"): # exit silently sys.exit(0) # check parameters count if len(sys.argv) < 2: sys.stderr.write("Not enough arguments supplied (%s)!\n" % " ".join(sys.argv)) sys.exit(100) progRequest = sys.argv[0] del sys.argv[0] if not progRequest in allowedProgs.keys(): sys.stderr.write("Invalid program requested: %s\n" % progRequest) sys.exit(100) if progRequest == "cryptsetup": runner = run_cryptsetup elif progRequest == "sfdisk": runner = run_sfdisk elif progRequest == "mount": runner = run_mount elif progRequest == "umount": runner = run_umount else: sys.stderr.write("The interface for this program (%s) is not yet implemented!\n" % progRequest) sys.exit(100) try: if runner(sys.argv): sys.exit(0) else: sys.exit(1) except "WrongArguments", errstr: sys.stderr.write("Execution failed: %s\n" % errstr) sys.exit(100)