cryptonas-branches/staging-v0.3.5/bin/CryptoBoxRootActions

556 lines
18 KiB
Text
Raw Permalink Normal View History

2009-06-25 04:11:30 +02:00
#!/usr/bin/env python
#
# Copyright 2007 sense.lab e.V.
#
# This file is part of the CryptoBox.
#
# The CryptoBox is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# The CryptoBox is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with the CryptoBox; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
"""module for executing some programs or scripts that need root privileges
Syntax:
check
- return exitcode zero if basic checks succeeded
program PROGRAM_NAME [ARGS]
- call the program (must be defined in "allowedProgs" below)
event EVENT_SCRIPT [ARGS]
- call an event script
plugin PLUGIN_NAME [ARGS]
- call a root_action script of a plugin
Exitcodes:
0 - execution was ok
1 - the executed program or action returned a failure exitcode
100 - improper calling or misconfiguration
of CryptoBoxRootAction (wrong arguments, wrong uid)
101 - failed to execute the given program - maybe it does not exist?
For more detailed information take a look at the manpage:
"man CryptoBoxRootActions"
"""
__revision__ = "$Id"
import os
import sys
import subprocess
import pwd
import grp
import types
allowedProgs = {
"sfdisk": "/sbin/sfdisk",
"cryptsetup": "/sbin/cryptsetup",
"mount": "/bin/mount",
"umount": "/bin/umount",
"blkid": "/sbin/blkid",
"pvdisplay": "/sbin/pvdisplay",
}
## this line is necessary for running unittests or playing around with a local
## svn working copy - otherwise the security checks would be too strict
OVERRIDE_FILECHECK = False
DEV_TYPES = { "pipe":1, "char":2, "dir":4, "block":6, "file":8, "link":10, "socket":12}
EVENT_MARKER = '_event_scripts_'
## use this string as device name if you want to mount a ramdisk
MAGIC_TMPFS = "_tmpfs_"
def checkIfFileIsSafe(fname):
"""check if the file and its parents are only writeable for root"""
## the override setting may be turned off temporarily to allow unittests
if OVERRIDE_FILECHECK:
return True
## if the calling user id is 0 (root), then we do not have to check this,
## as root would be allowed to do this anyway
## this eases testing with a not-installed working copy in a uml environment
if getCallingUserInfo()[1] == 0:
return True
props = os.stat(fname)
## check if it is owned by non-root
if props.st_uid != 0: return False
## check group-write permission if gid is not zero
if (props.st_gid != 0) and (props.st_mode % 32 / 16 > 0): return False
## check if it is world-writeable
if props.st_mode % 4 / 2 > 0: return False
## are we at root-level (directory-wise)? If yes, then we are ok ...
if fname == os.path.sep: return True
## check if the parent directory is ok - recursively :)
return checkIfFileIsSafe(os.path.dirname(os.path.abspath(fname)))
def checkIfPluginIsValid(plugin):
import imp
try:
x = imp.load_source("cbox_plugin", plugin)
except (SyntaxError, IOError):
return False
try:
if getattr(x, "PLUGIN_TYPE") == "cryptobox":
return True
else:
return False
except AttributeError:
return False
def checkIfEventScriptIsValid(plugin):
event_dir = os.path.dirname(plugin)
if os.path.exists(os.path.join(event_dir, EVENT_MARKER)):
return True
else:
return False
def call_plugin(args):
"""check if the plugin may be called - and do it finally ..."""
plugin = os.path.abspath(args[0])
del args[0]
## check existence and if it is executable
if not os.access(plugin, os.X_OK):
raise Exception, "could not find executable plugin (%s)" % plugin
## check if the plugin (and its parents) are only writeable for root
## this can be overridden by OVERRIDE_FILECHECK
if not checkIfFileIsSafe(plugin):
raise Exception, "the plugin (%s) is not safe - check its " % plugin \
+ "(and its parents') permissions"
## check if the plugin is a python program, that is marked as a cryptobox plugin
if not checkIfPluginIsValid(plugin):
raise Exception, "the plugin (%s) is not a correctly marked python script" % plugin
args.insert(0, plugin)
proc = subprocess.Popen(
shell = False,
args = args)
proc.wait()
return proc.returncode == 0
def call_event(args):
"""check if the event script may be called - and do it finally ..."""
event = os.path.abspath(args[0])
del args[0]
## check existence and if it is executable
if not os.access(event, os.X_OK):
raise Exception, "could not find executable event script (%s)" % event
## check if the script is valid (the marker file must be in the same directory)
if not checkIfEventScriptIsValid(event):
raise Exception, "the event script (%s) does not reside in" % event \
+ "a directory with the marker file (%s) - this " % EVENT_MARKER \
+ "is not allowed due to abuse prevention"
## check if the event (and its parents) are only writeable for root
if not checkIfFileIsSafe(event):
raise Exception, "the event (%s) is not safe - check its " % event \
+ "(and its parents') permissions"
args.insert(0, event)
proc = subprocess.Popen(
shell = False,
args = args)
proc.wait()
return proc.returncode == 0
def isWriteable(path, force_dev_type=None):
"""check if the calling user (not root!) has write access to the device/file
the real (not the effective) user id is used for the check
additionally the permissions of the default groups of the real uid are checked
it is sufficient, if the device/dir is owned by us
this check works nicely together with "super", as it changes (by default) only
the effective uid (not the real uid)
"""
## first check, if the device/file exists
if not os.path.exists(path):
sys.stderr.write("%s does not exist!\n" % path)
return False
## check the type of the path - if necessary
if (not force_dev_type is None) and \
(force_dev_type != os.stat(path).st_mode % 65536 / 4096):
sys.stderr.write("%s does not have the numeric type '%d'!\n" \
% (path, force_dev_type))
return False
## retrieve the information for the real user id
(trustUserName, trustUID, groupsOfTrustUser) = getCallingUserInfo()
## are we called by the root user? this would be ok
if trustUID == 0:
return True
## is the path owned by us?
if os.stat(path)[4] == trustUID:
return True
## set the default groups of the caller for the check (restore them later)
savedGroups = os.getgroups()
os.setgroups(groupsOfTrustUser)
## check permissions
result = os.access(path, os.W_OK) and os.access(path, os.R_OK)
## reset the groups of this process
os.setgroups(savedGroups)
return result
def run_cryptsetup(args):
"""execute cryptsetup as root
@args: list of arguments - they will be treated accordingly to the first element
of this list (the action)"""
if not args: raise "WrongArguments", "no action for cryptsetup supplied"
if type(args) != types.ListType:
raise "WrongArguments", "invalid arguments supplied: %s" % (args, )
try:
action = args[0]
del args[0]
device = None
cmd_args = []
if action == "luksFormat":
device = args[0]; del args[0]
cmd_args.append(action)
cmd_args.append(device)
elif action == "luksUUID":
device = args[0]; del args[0]
cmd_args.append(action)
cmd_args.append(device)
elif action == "luksOpen":
if len(args) < 2: raise "WrongArguments", "missing arguments"
device = args[0]; del args[0]
destination = args[0]; del args[0]
cmd_args.append(action)
cmd_args.append(device)
cmd_args.append(destination)
elif action == "luksClose":
if len(args) < 1: raise "WrongArguments", "missing arguments"
destination = args[0]; del args[0]
# maybe add a check for the mapped device's permissions?
# dmsetup deps self.device
cmd_args.append(action)
cmd_args.append(destination)
elif action == "luksAddKey":
device = args[0]; del args[0]
cmd_args.append(action)
cmd_args.append(device)
elif action == "luksDelKey":
if len(args) < 2: raise "WrongArguments", "missing arguments"
device = args[0]; del args[0]
slot = args[0]; del args[0]
cmd_args.append(action)
cmd_args.append(device)
cmd_args.append(slot)
elif action == "isLuks":
device = args[0]; del args[0]
cmd_args.append(action)
cmd_args.append(device)
else: raise "WrongArguments", "invalid action supplied: %s" % (action, )
# check if a device was defined - and check it
if (not device is None) and (not isWriteable(device, DEV_TYPES["block"])):
raise "WrongArguments", "%s is not a writeable block device" % (device, )
cs_args = [allowedProgs["cryptsetup"]]
cs_args.extend(args)
cs_args.extend(cmd_args)
except (TypeError, IndexError):
raise "WrongArguments", "invalid arguments supplied: %s" % (args, )
# execute cryptsetup with the given parameters
proc = subprocess.Popen(
shell = False,
args = cs_args)
proc.wait()
## chown the devmapper block device to the cryptobox user
calling_user = getCallingUserInfo()
if (proc.returncode == 0) and (action == "luksOpen"):
os.chown(os.path.join(os.path.sep, "dev", "mapper", destination),
calling_user[1], calling_user[2][0])
return proc.returncode == 0
def getFSType(device):
"""get the filesystem type of a device"""
proc = subprocess.Popen(
shell = False,
stdout = subprocess.PIPE,
args = [ allowedProgs["blkid"],
"-s", "TYPE",
"-o", "value",
"-c", os.devnull,
"-w", os.devnull,
device])
(stdout, stderr) = proc.communicate()
if proc.returncode != 0:
return None
return stdout.strip()
def run_mount(args):
"""execute mount
"""
if not args: raise "WrongArguments", "no destination for mount supplied"
if type(args) != types.ListType:
raise "WrongArguments", "invalid arguments supplied: %s" % (args, )
try:
device = args[0]
del args[0]
destination = args[0]
del args[0]
## shall we mount a ramdisk?
is_tmpfs = (device == MAGIC_TMPFS)
# check permissions for the device
if (not is_tmpfs) and (not isWriteable(device, DEV_TYPES["block"])):
raise "WrongArguments", "%s is not a writeable block device" % (device, )
## check permissions for the mountpoint
if not isWriteable(destination, DEV_TYPES["dir"]):
raise "WrongArguments", "the mountpoint (%s) is not writeable" \
% (destination, )
# check for additional (not allowed) arguments
if len(args) != 0:
raise "WrongArguments", "too many arguments for 'mount': %s" % (args, )
except TypeError:
raise "WrongArguments", "invalid arguments supplied: %s" % (args, )
# execute mount with the given parameters
# first overwrite the real uid, as 'mount' wants this to be zero (root)
savedUID = os.getuid()
os.setuid(os.geteuid())
## we have to change the permissions of the mounted directory - otherwise it will
## not be writeable for the cryptobox user
## for 'vfat' we have to do this during mount
## for ext2/3 we have to do it afterward
## first: get the user/group of the target
(trustUserName, trustUID, groupsOfTrustUser) = getUserInfo(savedUID)
trustGID = groupsOfTrustUser[0]
if is_tmpfs:
fsType = "tmpfs"
else:
fsType = getFSType(device)
## define arguments
if fsType == "vfat":
## add the "uid/gid" arguments to the mount call
mount_args = [ allowedProgs["mount"],
"-o", "uid=%d,gid=%d,umask=0000" % (trustUID, trustGID),
device,
destination ]
## use fuse (ntfs-3g) for ntfs
elif fsType == "ntfs":
mount_args = [ allowedProgs["mount"],
"-t", "ntfs-3g",
device,
destination ]
elif is_tmpfs:
mount_args = [ allowedProgs["mount"],
"-t", "tmpfs",
"cryptobox-tmpfs", destination ]
else:
## all other filesystem types will be handled after mount
mount_args = [ allowedProgs["mount"], device, destination ]
# execute mount
proc = subprocess.Popen(
shell = False,
args = mount_args)
proc.wait()
## return in case of an error
if proc.returncode != 0:
return False
## for vfat: we are done
if fsType == "vfat": return True
## for all other filesystem types: chown the mount directory
try:
os.chown(destination, trustUID, groupsOfTrustUser[0])
except OSError, errMsg:
sys.stderr.write("could not chown the mount destination (%s) " % destination \
+ "to the specified user (%d/%d): " % (trustUID, groupsOfTrustUser[0]) \
+ "%s/n" % str(errMsg))
sys.stderr.write("UID: %d\n" % (os.geteuid(),))
return False
## BEWARE: it would be nice, if we could restore the previous uid (not euid) but
## this would also override the euid (see 'man 2 setuid') - any ideas?
return True
def run_umount(args):
"""execute mount
"""
if not args: raise "WrongArguments", "no mountpoint for umount supplied"
if type(args) != types.ListType:
raise "WrongArguments", "invalid arguments supplied"
try:
destination = args[0]
del args[0]
# check permissions for the destination
if not isWriteable(os.path.dirname(destination), DEV_TYPES["dir"]):
raise "WrongArguments", "the parent of the mountpoint " \
+ "(%s) is not writeable" % (destination, )
if len(args) != 0: raise "WrongArguments", "umount does not allow arguments"
except TypeError:
raise "WrongArguments", "invalid arguments supplied"
# execute umount with the given parameters
# first overwrite the real uid, as 'umount' wants this to be zero (root)
savedUID = os.getuid()
os.setuid(os.geteuid())
# execute umount (with the parameter '-l' - lazy umount)
proc = subprocess.Popen(
shell = False,
args = [allowedProgs["umount"], "-l", destination])
proc.wait()
# restore previous real uid
os.setuid(savedUID)
return proc.returncode == 0
def run_pvdisplay(args):
"""execute pvdisplay to check for physical LVM devices
"""
if len(args) > 0:
raise "WrongArguments", "no arguments may be supplied for 'pvdisplay'"
## call pvdisplay with the parameter "--colon"
proc = subprocess.Popen(
shell = False,
args = [ allowedProgs["pvdisplay"], "--colon" ])
proc.wait()
return proc.returncode == 0
def getCallingUserInfo():
"""return information about the user that was calling this program via "super"
@user: (uid or name)
@return: tuple of (name, uid, (groups))
"""
## are we called via 'super'?
if ("SUPERCMD" in os.environ) and ("ORIG_USER" in os.environ):
## return the user that was calling super
return getUserInfo(os.environ["ORIG_USER"])
else:
## return the current user
return getUserInfo(os.getuid())
def getUserInfo(user):
"""return information about the specified user
@user: (uid or name)
@return: tuple of (name, uid, (groups))
"""
if (user is None) or (user == ""):
raise "KeyError", "no user supplied"
## if a KeyError is raised again in the following lines, then the supplied
## user was invalid
if type(user) is int:
# 'user' is a uid
userinfo = pwd.getpwuid(user)
elif type(user) is str:
# 'user' is a name
userinfo = pwd.getpwnam(user)
u_groups = [one_group.gr_gid
for one_group in grp.getgrall()
if userinfo.pw_name in one_group.gr_mem]
if not userinfo.pw_gid in u_groups:
## put in front of the list
u_groups.insert(0,userinfo.pw_gid)
return (userinfo.pw_name, userinfo.pw_uid, u_groups)
# **************** main **********************
# prevent import
if __name__ == "__main__":
## do we have root privileges (effective uid is zero)?
if os.geteuid() != 0:
sys.stderr.write("the effective uid is not zero - you should use " \
+ "'super' to call this script (%s)" % sys.argv[0])
sys.exit(100)
## remove program name
args = sys.argv[1:]
# do not allow to use root permissions (real uid may not be zero)
#if os.getuid() == 0:
# sys.stderr.write("the uid of the caller is zero (root) - this is not allowed\n")
# sys.exit(100)
## check if there were arguments
if (len(args) == 0):
sys.stderr.write("No arguments supplied\n")
sys.exit(100)
## did the user call the "check" action?
if (len(args) == 1) and (args[0].lower() == "check"):
# exit silently
sys.exit(0)
## all of the following actions require at least two arguments
if len(args) < 2:
sys.stderr.write("No program/plugin/event specified for execution\n")
sys.exit(100)
## call a plugin root_action script
if args[0].lower() == "plugin":
del args[0]
try:
isOK = call_plugin(args)
except Exception, errMsg:
sys.stderr.write("Execution of plugin '%s' failed: %s\n" \
% (args[0], errMsg))
sys.exit(100)
if isOK:
sys.exit(0)
else:
sys.exit(1)
## call an event script
if args[0].lower() == "event":
del args[0]
try:
isOK = call_event(args)
except Exception, errMsg:
sys.stderr.write("Execution of event '%s' failed: %s\n" \
% (args[0], errMsg))
sys.exit(100)
if isOK:
sys.exit(0)
else:
sys.exit(1)
## call one of the allowed programs
if args[0].lower() == "program":
del args[0]
progRequest = args[0]
del args[0]
if not progRequest in allowedProgs.keys():
sys.stderr.write("Invalid program requested: %s\n" % progRequest)
sys.exit(100)
if progRequest == "cryptsetup": runner = run_cryptsetup
elif progRequest == "mount": runner = run_mount
elif progRequest == "umount": runner = run_umount
elif progRequest == "pvdisplay": runner = run_pvdisplay
else:
sys.stderr.write("The interface for this program (%s) is " \
+ "not yet implemented!\n" % progRequest)
sys.exit(100)
try:
if runner(args):
sys.exit(0)
else:
sys.exit(1)
except OSError, errstr:
sys.stderr.write("Execution failed: %s\n" % errstr)
sys.exit(101)
except "WrongArguments", errstr:
sys.stderr.write("Invalid arguments: %s\n" % errstr)
sys.exit(100)