You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
448 lines
14 KiB
448 lines
14 KiB
#!/usr/bin/env python2.4 |
|
# |
|
# Copyright 2006 sense.lab e.V. |
|
# |
|
# This file is part of the CryptoBox. |
|
# |
|
# The CryptoBox is free software; you can redistribute it and/or modify |
|
# it under the terms of the GNU General Public License as published by |
|
# the Free Software Foundation; either version 2 of the License, or |
|
# (at your option) any later version. |
|
# |
|
# The CryptoBox is distributed in the hope that it will be useful, |
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
# GNU General Public License for more details. |
|
# |
|
# You should have received a copy of the GNU General Public License |
|
# along with the CryptoBox; if not, write to the Free Software |
|
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA |
|
# |
|
|
|
|
|
"""module for executing the programs, that need root privileges |
|
|
|
Syntax: |
|
- program |
|
- device |
|
- [action] |
|
- [action args] |
|
|
|
this script will always return with an exitcode 0 (true), if "check" is the only argument |
|
""" |
|
|
|
__revision__ = "$Id" |
|
|
|
import os |
|
import sys |
|
import subprocess |
|
import pwd |
|
import grp |
|
import types |
|
|
|
allowedProgs = { |
|
"sfdisk": "/sbin/sfdisk", |
|
"cryptsetup": "/sbin/cryptsetup", |
|
"mount": "/bin/mount", |
|
"umount": "/bin/umount", |
|
"blkid": "/sbin/blkid", |
|
} |
|
|
|
|
|
DEV_TYPES = { "pipe":1, "char":2, "dir":4, "block":6, "file":8, "link":10, "socket":12} |
|
EVENT_MARKER = '_event_scripts_' |
|
|
|
|
|
def checkIfFileIsSafe(fname): |
|
"""check if the file and its parents are only writeable for root""" |
|
props = os.stat(fname) |
|
## check if it is owned by non-root |
|
if props.st_uid != 0: return False |
|
## check group-write permission if gid is not zero |
|
if (props.st_gid != 0) and (props.st_mode % 32 / 16 > 0): return False |
|
## check if it is world-writeable |
|
if props.st_mode % 4 / 2 > 0: return False |
|
## are we at root-level (directory-wise)? If yes, then we are ok ... |
|
if fname == os.path.sep: return True |
|
## check if the parent directory is ok - recursively :) |
|
return checkIfFileIsSafe(os.path.dirname(os.path.abspath(fname))) |
|
|
|
|
|
def checkIfPluginIsValid(plugin): |
|
import imp |
|
try: |
|
x = imp.load_source("cbox_plugin",plugin) |
|
except (SyntaxError, IOError): |
|
return False |
|
try: |
|
if getattr(x, "PLUGIN_TYPE") == "cryptobox": |
|
return True |
|
else: |
|
return False |
|
except AttributeError: |
|
return False |
|
|
|
|
|
def checkIfEventScriptIsValid(plugin): |
|
event_dir = os.path.dirname(plugin) |
|
if os.path.exists(os.path.join(event_dir, EVENT_MARKER)): |
|
return True |
|
else: |
|
return False |
|
|
|
|
|
def call_plugin(args): |
|
"""check if the plugin may be called - and do it finally ...""" |
|
plugin = os.path.abspath(args[0]) |
|
del args[0] |
|
## check existence and if it is executable |
|
if not os.access(plugin, os.X_OK): |
|
raise Exception, "could not find executable plugin (%s)" % plugin |
|
## check if the plugin (and its parents) are only writeable for root |
|
if not checkIfFileIsSafe(plugin): |
|
raise Exception, "the plugin (%s) is not safe - check its (and its parents') permissions" % plugin |
|
## check if the plugin is a python program, that is marked as a cryptobox plugin |
|
if not checkIfPluginIsValid(plugin): |
|
raise Exception, "the plugin (%s) is not a correctly marked python script" % plugin |
|
args.insert(0, plugin) |
|
proc = subprocess.Popen( |
|
shell = False, |
|
args = args) |
|
proc.wait() |
|
return proc.returncode == 0 |
|
|
|
|
|
def call_event(args): |
|
"""check if the event script may be called - and do it finally ...""" |
|
event = os.path.abspath(args[0]) |
|
del args[0] |
|
## check existence and if it is executable |
|
if not os.access(event, os.X_OK): |
|
raise Exception, "could not find executable event script (%s)" % event |
|
## check if the script is valid (the marker file must be in the same directory) |
|
if not checkIfEventScriptIsValid(event): |
|
raise Exception, "the event script (%s) does not reside in a directory with the marker file (%s) - this is not allowed due to abuse prevention" % (event, EVENT_MARKER) |
|
## check if the event (and its parents) are only writeable for root |
|
if not checkIfFileIsSafe(event): |
|
raise Exception, "the event (%s) is not safe - check its (and its parents') permissions" % event |
|
args.insert(0, event) |
|
proc = subprocess.Popen( |
|
shell = False, |
|
args = args) |
|
proc.wait() |
|
return proc.returncode == 0 |
|
|
|
|
|
def isWriteable(device, force_dev_type=None): |
|
"""check if the calling user (not root!) has write access to the device/file |
|
|
|
the real (not the effictive) user id is used for the check |
|
additionally the permissions of the default groups of the real uid are checked |
|
this check works nicely together with "super", as it changes (by default) only |
|
the effective uid (not the real uid) |
|
""" |
|
# first check, if the device/file exists |
|
if not os.path.exists(device): |
|
return False |
|
# check the type of the device - if necessary |
|
if not force_dev_type is None: |
|
dev_type = os.stat(device).st_mode % 65536 / 4096 |
|
if dev_type != force_dev_type: return False |
|
# retrieve the information for the real user id |
|
(trustUserName, trustUID, groupsOfTrustUser) = getUserInfo(os.getuid()) |
|
# set the default groups of the caller for the check (restore them later) |
|
savedGroups = os.getgroups() |
|
os.setgroups(groupsOfTrustUser) |
|
# check permissions |
|
result = os.access(device, os.W_OK) and os.access(device, os.R_OK) |
|
# reset the groups of this process |
|
os.setgroups(savedGroups) |
|
return result |
|
|
|
|
|
def run_cryptsetup(args): |
|
"""execute cryptsetup as root |
|
|
|
@args: list of arguments - they will be treated accordingly to the first element |
|
of this list (the action)""" |
|
if not args: raise "WrongArguments", "no action for cryptsetup supplied" |
|
if type(args) != types.ListType: raise "WrongArguments", "invalid arguments supplied: %s" % (args, ) |
|
try: |
|
action = args[0] |
|
del args[0] |
|
device = None |
|
cmd_args = [] |
|
if action == "luksFormat": |
|
device = args[0]; del args[0] |
|
cmd_args.append(action) |
|
cmd_args.append(device) |
|
elif action == "luksUUID": |
|
device = args[0]; del args[0] |
|
cmd_args.append(action) |
|
cmd_args.append(device) |
|
elif action == "luksOpen": |
|
if len(args) < 2: raise "WrongArguments", "missing arguments" |
|
device = args[0]; del args[0] |
|
destination = args[0]; del args[0] |
|
cmd_args.append(action) |
|
cmd_args.append(device) |
|
cmd_args.append(destination) |
|
elif action == "luksClose": |
|
if len(args) < 1: raise "WrongArguments", "missing arguments" |
|
destination = args[0]; del args[0] |
|
# maybe add a check for the mapped device's permissions? |
|
# dmsetup deps self.device |
|
cmd_args.append(action) |
|
cmd_args.append(destination) |
|
elif action == "luksAddKey": |
|
device = args[0]; del args[0] |
|
cmd_args.append(action) |
|
cmd_args.append(device) |
|
elif action == "luksDelKey": |
|
if len(args) < 2: raise "WrongArguments", "missing arguments" |
|
device = args[0]; del args[0] |
|
cmd_args.insert(-1, action) |
|
cmd_args.insert(-1, device) |
|
elif action == "isLuks": |
|
device = args[0]; del args[0] |
|
cmd_args.append(action) |
|
cmd_args.append(device) |
|
else: raise "WrongArguments", "invalid action supplied: %s" % (action, ) |
|
# check if a device was defined - and check it |
|
if (not device is None) and (not isWriteable(device, DEV_TYPES["block"])): |
|
raise "WrongArguments", "%s is not a writeable block device" % (device, ) |
|
cs_args = [allowedProgs["cryptsetup"]] |
|
cs_args.extend(args) |
|
cs_args.extend(cmd_args) |
|
except (TypeError, IndexError): |
|
raise "WrongArguments", "invalid arguments supplied: %s" % (args, ) |
|
# execute cryptsetup with the given parameters |
|
proc = subprocess.Popen( |
|
shell = False, |
|
args = cs_args) |
|
proc.wait() |
|
## chown the devmapper block device to the cryptobox user |
|
if (proc.returncode == 0) and (action == "luksOpen"): |
|
os.chown(os.path.join(os.path.sep, "dev", "mapper", destination), os.getuid(), os.getgid()) |
|
return proc.returncode == 0 |
|
|
|
|
|
def run_sfdisk(args): |
|
"""execute sfdisk for partitioning |
|
|
|
not implemented yet""" |
|
print "ok - you are free to call sfdisk ..." |
|
print " not yet implemented ..." |
|
return True |
|
|
|
|
|
def getFSType(device): |
|
"""get the filesystem type of a device""" |
|
proc = subprocess.Popen( |
|
shell = False, |
|
stdout = subprocess.PIPE, |
|
args = [ allowedProgs["blkid"], |
|
"-s", "TYPE", |
|
"-o", "value", |
|
"-c", os.devnull, |
|
"-w", os.devnull, |
|
device]) |
|
(stdout, stderr) = proc.communicate() |
|
if proc.returncode != 0: |
|
return None |
|
return stdout.strip() |
|
|
|
|
|
def run_mount(args): |
|
"""execute mount |
|
""" |
|
if not args: raise "WrongArguments", "no destination for mount supplied" |
|
if type(args) != types.ListType: raise "WrongArguments", "invalid arguments supplied: %s" % (args, ) |
|
try: |
|
device = args[0] |
|
del args[0] |
|
destination = args[0] |
|
del args[0] |
|
# check permissions for the device |
|
if not isWriteable(device, DEV_TYPES["block"]): |
|
raise "WrongArguments", "%s is not a writeable block device" % (device, ) |
|
## check permissions for the mountpoint |
|
if not isWriteable(destination, DEV_TYPES["dir"]): |
|
raise "WrongArguments", "the mountpoint (%s) is not writeable" % (destination, ) |
|
# check for additional (not allowed) arguments |
|
if len(args) != 0: |
|
raise "WrongArguments", "too many arguments for 'mount': %s" % (args, ) |
|
except TypeError: |
|
raise "WrongArguments", "invalid arguments supplied: %s" % (args, ) |
|
# execute mount with the given parameters |
|
# first overwrite the real uid, as 'mount' wants this to be zero (root) |
|
savedUID = os.getuid() |
|
os.setuid(os.geteuid()) |
|
## we have to change the permissions of the mounted directory - otherwise it will |
|
## not be writeable for the cryptobox user |
|
## for 'vfat' we have to do this during mount |
|
## for ext2/3 we have to do it afterward |
|
## first: get the user/group of the target |
|
(trustUserName, trustUID, groupsOfTrustUser) = getUserInfo(savedUID) |
|
trustGID = groupsOfTrustUser[0] |
|
fsType = getFSType(device) |
|
## define arguments |
|
if fsType == "vfat": |
|
## add the "uid/gid" arguments to the mount call |
|
mount_args = [allowedProgs["mount"], |
|
"-o", "uid=%d,gid=%d,umask=0000" % (trustUID, trustGID), |
|
device, |
|
destination] |
|
else: |
|
## all other filesystem types will be handled after mount |
|
mount_args = [allowedProgs["mount"], device, destination] |
|
# execute mount |
|
proc = subprocess.Popen( |
|
shell = False, |
|
args = mount_args) |
|
proc.wait() |
|
## return in case of an error |
|
if proc.returncode != 0: |
|
return False |
|
## for vfat: we are done |
|
if fsType == "vfat": return True |
|
## for all other filesystem types: chown the mount directory |
|
try: |
|
os.chown(destination, trustUID, groupsOfTrustUser[0]) |
|
except OSError, errMsg: |
|
sys.stderr.write("could not chown the mount destination (%s) to the specified user (%d/%d): %s\n" % (destination, trustUID, groupsOfTrustUser[0], errMsg)) |
|
sys.stderr.write("UID: %d\n" % (os.geteuid(),)) |
|
return False |
|
## BEWARE: it would be nice, if we could restore the previous uid (not euid) but |
|
## this would also override the euid (see 'man 2 setuid') - any ideas? |
|
return True |
|
|
|
|
|
def run_umount(args): |
|
"""execute mount |
|
""" |
|
if not args: raise "WrongArguments", "no mountpoint for umount supplied" |
|
if type(args) != types.ListType: raise "WrongArguments", "invalid arguments supplied" |
|
try: |
|
destination = args[0] |
|
del args[0] |
|
# check permissions for the destination |
|
if not isWriteable(os.path.dirname(destination), DEV_TYPES["dir"]): |
|
raise "WrongArguments", "the parent of the mountpoint (%s) is not writeable" % (destination, ) |
|
if len(args) != 0: raise "WrongArguments", "umount does not allow arguments" |
|
except TypeError: |
|
raise "WrongArguments", "invalid arguments supplied" |
|
# execute umount with the given parameters |
|
# first overwrite the real uid, as 'umount' wants this to be zero (root) |
|
savedUID = os.getuid() |
|
os.setuid(os.geteuid()) |
|
# execute umount (with the parameter '-l' - lazy umount) |
|
proc = subprocess.Popen( |
|
shell = False, |
|
args = [allowedProgs["umount"], "-l", destination]) |
|
proc.wait() |
|
# restore previous real uid |
|
os.setuid(savedUID) |
|
return proc.returncode == 0 |
|
|
|
|
|
def getUserInfo(user): |
|
"""return information about the specified user |
|
|
|
@user: (uid or name) |
|
@return: tuple of (name, uid, (groups)) |
|
""" |
|
if user is None: raise "KeyError", "no user supplied" |
|
# first check, if 'user' contains an id - then check for a name |
|
try: |
|
userinfo = pwd.getpwuid(user) |
|
except TypeError: |
|
# if a KeyError is raised again, then the supplied user was invalid |
|
userinfo = pwd.getpwnam(user) |
|
u_groups = [one_group.gr_gid |
|
for one_group in grp.getgrall() |
|
if userinfo.pw_name in one_group.gr_mem] |
|
if not userinfo.pw_gid in u_groups: u_groups.append(userinfo.pw_gid) |
|
return (userinfo.pw_name, userinfo.pw_uid, u_groups) |
|
|
|
|
|
# **************** main ********************** |
|
|
|
# prevent import |
|
if __name__ == "__main__": |
|
|
|
# do we have root privileges (effective uid is zero)? |
|
if os.geteuid() != 0: |
|
sys.stderr.write("the effective uid is not zero - you should use 'super' to call this script (%s)" % sys.argv[0]) |
|
sys.exit(100) |
|
|
|
# remove program name |
|
args = sys.argv[1:] |
|
|
|
# do not allow to use root permissions (real uid may not be zero) |
|
#if os.getuid() == 0: |
|
# sys.stderr.write("the uid of the caller is zero (root) - this is not allowed\n") |
|
# sys.exit(100) |
|
|
|
# check if there were arguments |
|
if (len(args) == 0): |
|
sys.stderr.write("No arguments supplied\n") |
|
sys.exit(100) |
|
|
|
# did the user call the "check" action? |
|
if (len(args) == 1) and (args[0].lower() == "check"): |
|
# exit silently |
|
sys.exit(0) |
|
|
|
if args[0].lower() == "plugin": |
|
del args[0] |
|
try: |
|
isOK = call_plugin(args) |
|
except Exception, errMsg: |
|
sys.stderr.write("Execution of plugin failed: %s\n" % errMsg) |
|
sys.exit(100) |
|
if isOK: |
|
sys.exit(0) |
|
else: |
|
sys.exit(1) |
|
|
|
if args[0].lower() == "event": |
|
del args[0] |
|
try: |
|
isOK = call_event(args) |
|
except Exception, errMsg: |
|
sys.stderr.write("Execution of event script failed: %s\n" % errMsg) |
|
sys.exit(100) |
|
if isOK: |
|
sys.exit(0) |
|
else: |
|
sys.exit(1) |
|
|
|
# check parameters count |
|
if len(args) < 2: |
|
sys.stderr.write("Not enough arguments supplied (%s)!\n" % " ".join(args)) |
|
sys.exit(100) |
|
|
|
progRequest = args[0] |
|
del args[0] |
|
|
|
if not progRequest in allowedProgs.keys(): |
|
sys.stderr.write("Invalid program requested: %s\n" % progRequest) |
|
sys.exit(100) |
|
|
|
if progRequest == "cryptsetup": runner = run_cryptsetup |
|
elif progRequest == "sfdisk": runner = run_sfdisk |
|
elif progRequest == "mount": runner = run_mount |
|
elif progRequest == "umount": runner = run_umount |
|
else: |
|
sys.stderr.write("The interface for this program (%s) is not yet implemented!\n" % progRequest) |
|
sys.exit(100) |
|
try: |
|
if runner(args): |
|
sys.exit(0) |
|
else: |
|
sys.exit(1) |
|
except "WrongArguments", errstr: |
|
sys.stderr.write("Execution failed: %s\n" % errstr) |
|
sys.exit(100) |
|
|
|
|