cryptonas/bin/CryptoBoxRootActions

556 lines
18 KiB
Plaintext
Raw Normal View History

#!/usr/bin/env python
#
# Copyright 2007 sense.lab e.V.
#
# This file is part of the CryptoBox.
#
# The CryptoBox is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# The CryptoBox is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with the CryptoBox; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
2006-11-06 17:05:00 +01:00
"""module for executing some programs or scripts that need root privileges
2006-11-06 17:05:00 +01:00
Syntax:
check
- return exitcode zero if basic checks succeeded
2006-11-06 17:05:00 +01:00
program PROGRAM_NAME [ARGS]
- call the program (must be defined in "allowedProgs" below)
event EVENT_SCRIPT [ARGS]
- call an event script
plugin PLUGIN_NAME [ARGS]
- call a root_action script of a plugin
Exitcodes:
0 - execution was ok
1 - the executed program or action returned a failure exitcode
100 - improper calling or misconfiguration
of CryptoBoxRootAction (wrong arguments, wrong uid)
101 - failed to execute the given program - maybe it does not exist?
For more detailed information take a look at the manpage:
"man CryptoBoxRootActions"
2006-11-06 17:05:00 +01:00
"""
__revision__ = "$Id"
2006-11-06 17:05:00 +01:00
import os
import sys
import subprocess
import pwd
import grp
import types
allowedProgs = {
"sfdisk": "/sbin/sfdisk",
"cryptsetup": "/sbin/cryptsetup",
"mount": "/bin/mount",
"umount": "/bin/umount",
"blkid": "/sbin/blkid",
"pvdisplay": "/sbin/pvdisplay",
2006-11-06 17:05:00 +01:00
}
## this line is necessary for running unittests or playing around with a local
## svn working copy - otherwise the security checks would be too strict
OVERRIDE_FILECHECK = False
2006-11-06 17:05:00 +01:00
DEV_TYPES = { "pipe":1, "char":2, "dir":4, "block":6, "file":8, "link":10, "socket":12}
EVENT_MARKER = '_event_scripts_'
## use this string as device name if you want to mount a ramdisk
MAGIC_TMPFS = "_tmpfs_"
2006-11-06 17:05:00 +01:00
def checkIfFileIsSafe(fname):
"""check if the file and its parents are only writeable for root"""
## the override setting may be turned off temporarily to allow unittests
if OVERRIDE_FILECHECK:
return True
## if the calling user id is 0 (root), then we do not have to check this,
## as root would be allowed to do this anyway
## this eases testing with a not-installed working copy in a uml environment
if getCallingUserInfo()[1] == 0:
return True
props = os.stat(fname)
2006-11-06 17:05:00 +01:00
## check if it is owned by non-root
if props.st_uid != 0: return False
## check group-write permission if gid is not zero
if (props.st_gid != 0) and (props.st_mode % 32 / 16 > 0): return False
## check if it is world-writeable
if props.st_mode % 4 / 2 > 0: return False
## are we at root-level (directory-wise)? If yes, then we are ok ...
if fname == os.path.sep: return True
2006-11-06 17:05:00 +01:00
## check if the parent directory is ok - recursively :)
return checkIfFileIsSafe(os.path.dirname(os.path.abspath(fname)))
2006-11-06 17:05:00 +01:00
2007-01-30 00:34:15 +01:00
def checkIfPluginIsValid(plugin):
2006-11-06 17:05:00 +01:00
import imp
try:
x = imp.load_source("cbox_plugin", plugin)
except (SyntaxError, IOError):
2006-11-06 17:05:00 +01:00
return False
try:
if getattr(x, "PLUGIN_TYPE") == "cryptobox":
return True
else:
return False
except AttributeError:
2006-11-06 17:05:00 +01:00
return False
2007-01-30 00:34:15 +01:00
def checkIfEventScriptIsValid(plugin):
event_dir = os.path.dirname(plugin)
if os.path.exists(os.path.join(event_dir, EVENT_MARKER)):
return True
else:
return False
2006-11-06 17:05:00 +01:00
def call_plugin(args):
"""check if the plugin may be called - and do it finally ..."""
plugin = os.path.abspath(args[0])
del args[0]
## check existence and if it is executable
2006-11-06 17:05:00 +01:00
if not os.access(plugin, os.X_OK):
raise Exception, "could not find executable plugin (%s)" % plugin
## check if the plugin (and its parents) are only writeable for root
## this can be overridden by OVERRIDE_FILECHECK
if not checkIfFileIsSafe(plugin):
raise Exception, "the plugin (%s) is not safe - check its " % plugin \
+ "(and its parents') permissions"
2006-11-06 17:05:00 +01:00
## check if the plugin is a python program, that is marked as a cryptobox plugin
if not checkIfPluginIsValid(plugin):
raise Exception, "the plugin (%s) is not a correctly marked python script" % plugin
args.insert(0, plugin)
2006-11-06 17:05:00 +01:00
proc = subprocess.Popen(
shell = False,
args = args)
proc.wait()
return proc.returncode == 0
def call_event(args):
"""check if the event script may be called - and do it finally ..."""
event = os.path.abspath(args[0])
del args[0]
## check existence and if it is executable
if not os.access(event, os.X_OK):
raise Exception, "could not find executable event script (%s)" % event
## check if the script is valid (the marker file must be in the same directory)
if not checkIfEventScriptIsValid(event):
raise Exception, "the event script (%s) does not reside in" % event \
+ "a directory with the marker file (%s) - this " % EVENT_MARKER \
+ "is not allowed due to abuse prevention"
## check if the event (and its parents) are only writeable for root
if not checkIfFileIsSafe(event):
raise Exception, "the event (%s) is not safe - check its " % event \
+ "(and its parents') permissions"
args.insert(0, event)
proc = subprocess.Popen(
shell = False,
args = args)
proc.wait()
return proc.returncode == 0
def isWriteable(path, force_dev_type=None):
2006-11-06 17:05:00 +01:00
"""check if the calling user (not root!) has write access to the device/file
the real (not the effective) user id is used for the check
2006-11-06 17:05:00 +01:00
additionally the permissions of the default groups of the real uid are checked
it is sufficient, if the device/dir is owned by us
2006-11-06 17:05:00 +01:00
this check works nicely together with "super", as it changes (by default) only
the effective uid (not the real uid)
"""
## first check, if the device/file exists
if not os.path.exists(path):
sys.stderr.write("%s does not exist!\n" % path)
2006-11-06 17:05:00 +01:00
return False
## check the type of the path - if necessary
if (not force_dev_type is None) and \
(force_dev_type != os.stat(path).st_mode % 65536 / 4096):
sys.stderr.write("%s does not have the numeric type '%d'!\n" \
% (path, force_dev_type))
return False
## retrieve the information for the real user id
(trustUserName, trustUID, groupsOfTrustUser) = getCallingUserInfo()
## are we called by the root user? this would be ok
if trustUID == 0:
return True
## is the path owned by us?
2007-02-10 01:19:38 +01:00
if os.stat(path)[4] == trustUID:
return True
## set the default groups of the caller for the check (restore them later)
2006-11-06 17:05:00 +01:00
savedGroups = os.getgroups()
os.setgroups(groupsOfTrustUser)
## check permissions
result = os.access(path, os.W_OK) and os.access(path, os.R_OK)
## reset the groups of this process
2006-11-06 17:05:00 +01:00
os.setgroups(savedGroups)
return result
def run_cryptsetup(args):
"""execute cryptsetup as root
@args: list of arguments - they will be treated accordingly to the first element
of this list (the action)"""
if not args: raise "WrongArguments", "no action for cryptsetup supplied"
if type(args) != types.ListType:
raise "WrongArguments", "invalid arguments supplied: %s" % (args, )
2006-11-06 17:05:00 +01:00
try:
action = args[0]
del args[0]
device = None
cmd_args = []
if action == "luksFormat":
device = args[0]; del args[0]
cmd_args.append(action)
cmd_args.append(device)
elif action == "luksUUID":
device = args[0]; del args[0]
cmd_args.append(action)
cmd_args.append(device)
elif action == "luksOpen":
if len(args) < 2: raise "WrongArguments", "missing arguments"
device = args[0]; del args[0]
destination = args[0]; del args[0]
cmd_args.append(action)
cmd_args.append(device)
cmd_args.append(destination)
elif action == "luksClose":
if len(args) < 1: raise "WrongArguments", "missing arguments"
destination = args[0]; del args[0]
# maybe add a check for the mapped device's permissions?
# dmsetup deps self.device
cmd_args.append(action)
cmd_args.append(destination)
elif action == "luksAddKey":
device = args[0]; del args[0]
cmd_args.append(action)
cmd_args.append(device)
elif action == "luksDelKey":
if len(args) < 2: raise "WrongArguments", "missing arguments"
2006-11-06 17:05:00 +01:00
device = args[0]; del args[0]
slot = args[0]; del args[0]
cmd_args.append(action)
cmd_args.append(device)
cmd_args.append(slot)
2006-11-06 17:05:00 +01:00
elif action == "isLuks":
device = args[0]; del args[0]
cmd_args.append(action)
cmd_args.append(device)
else: raise "WrongArguments", "invalid action supplied: %s" % (action, )
# check if a device was defined - and check it
if (not device is None) and (not isWriteable(device, DEV_TYPES["block"])):
raise "WrongArguments", "%s is not a writeable block device" % (device, )
cs_args = [allowedProgs["cryptsetup"]]
cs_args.extend(args)
cs_args.extend(cmd_args)
except (TypeError, IndexError):
raise "WrongArguments", "invalid arguments supplied: %s" % (args, )
# execute cryptsetup with the given parameters
proc = subprocess.Popen(
shell = False,
args = cs_args)
proc.wait()
## chown the devmapper block device to the cryptobox user
calling_user = getCallingUserInfo()
2006-11-06 17:05:00 +01:00
if (proc.returncode == 0) and (action == "luksOpen"):
os.chown(os.path.join(os.path.sep, "dev", "mapper", destination),
calling_user[1], calling_user[2][0])
2006-11-06 17:05:00 +01:00
return proc.returncode == 0
def getFSType(device):
"""get the filesystem type of a device"""
proc = subprocess.Popen(
shell = False,
stdout = subprocess.PIPE,
args = [ allowedProgs["blkid"],
"-s", "TYPE",
"-o", "value",
"-c", os.devnull,
"-w", os.devnull,
device])
(stdout, stderr) = proc.communicate()
if proc.returncode != 0:
return None
return stdout.strip()
def run_mount(args):
"""execute mount
"""
if not args: raise "WrongArguments", "no destination for mount supplied"
if type(args) != types.ListType:
raise "WrongArguments", "invalid arguments supplied: %s" % (args, )
2006-11-06 17:05:00 +01:00
try:
device = args[0]
del args[0]
destination = args[0]
del args[0]
## shall we mount a ramdisk?
is_tmpfs = (device == MAGIC_TMPFS)
2006-11-06 17:05:00 +01:00
# check permissions for the device
if (not is_tmpfs) and (not isWriteable(device, DEV_TYPES["block"])):
2006-11-06 17:05:00 +01:00
raise "WrongArguments", "%s is not a writeable block device" % (device, )
## check permissions for the mountpoint
if not isWriteable(destination, DEV_TYPES["dir"]):
raise "WrongArguments", "the mountpoint (%s) is not writeable" \
% (destination, )
2006-11-06 17:05:00 +01:00
# check for additional (not allowed) arguments
if len(args) != 0:
raise "WrongArguments", "too many arguments for 'mount': %s" % (args, )
except TypeError:
raise "WrongArguments", "invalid arguments supplied: %s" % (args, )
# execute mount with the given parameters
# first overwrite the real uid, as 'mount' wants this to be zero (root)
savedUID = os.getuid()
os.setuid(os.geteuid())
## we have to change the permissions of the mounted directory - otherwise it will
## not be writeable for the cryptobox user
## for 'vfat' we have to do this during mount
## for ext2/3 we have to do it afterward
## first: get the user/group of the target
(trustUserName, trustUID, groupsOfTrustUser) = getUserInfo(savedUID)
trustGID = groupsOfTrustUser[0]
if is_tmpfs:
fsType = "tmpfs"
else:
fsType = getFSType(device)
2006-11-06 17:05:00 +01:00
## define arguments
if fsType == "vfat":
## add the "uid/gid" arguments to the mount call
mount_args = [ allowedProgs["mount"],
"-o", "uid=%d,gid=%d,umask=0000" % (trustUID, trustGID),
2006-11-06 17:05:00 +01:00
device,
destination ]
## use fuse (ntfs-3g) for ntfs
elif fsType == "ntfs":
mount_args = [ allowedProgs["mount"],
"-t", "ntfs-3g",
device,
destination ]
elif is_tmpfs:
mount_args = [ allowedProgs["mount"],
"-t", "tmpfs",
"cryptobox-tmpfs", destination ]
2006-11-06 17:05:00 +01:00
else:
## all other filesystem types will be handled after mount
mount_args = [ allowedProgs["mount"], device, destination ]
2006-11-06 17:05:00 +01:00
# execute mount
proc = subprocess.Popen(
shell = False,
args = mount_args)
proc.wait()
## return in case of an error
if proc.returncode != 0:
return False
## for vfat: we are done
if fsType == "vfat": return True
## for all other filesystem types: chown the mount directory
try:
os.chown(destination, trustUID, groupsOfTrustUser[0])
except OSError, errMsg:
sys.stderr.write("could not chown the mount destination (%s) " % destination \
+ "to the specified user (%d/%d): " % (trustUID, groupsOfTrustUser[0]) \
+ "%s/n" % str(errMsg))
2006-11-06 17:05:00 +01:00
sys.stderr.write("UID: %d\n" % (os.geteuid(),))
return False
## BEWARE: it would be nice, if we could restore the previous uid (not euid) but
## this would also override the euid (see 'man 2 setuid') - any ideas?
return True
def run_umount(args):
"""execute mount
"""
if not args: raise "WrongArguments", "no mountpoint for umount supplied"
if type(args) != types.ListType:
raise "WrongArguments", "invalid arguments supplied"
2006-11-06 17:05:00 +01:00
try:
destination = args[0]
del args[0]
# check permissions for the destination
if not isWriteable(os.path.dirname(destination), DEV_TYPES["dir"]):
raise "WrongArguments", "the parent of the mountpoint " \
+ "(%s) is not writeable" % (destination, )
2006-11-06 17:05:00 +01:00
if len(args) != 0: raise "WrongArguments", "umount does not allow arguments"
except TypeError:
raise "WrongArguments", "invalid arguments supplied"
# execute umount with the given parameters
# first overwrite the real uid, as 'umount' wants this to be zero (root)
savedUID = os.getuid()
os.setuid(os.geteuid())
# execute umount (with the parameter '-l' - lazy umount)
proc = subprocess.Popen(
shell = False,
args = [allowedProgs["umount"], "-l", destination])
proc.wait()
# restore previous real uid
os.setuid(savedUID)
return proc.returncode == 0
def run_pvdisplay(args):
"""execute pvdisplay to check for physical LVM devices
"""
if len(args) > 0:
raise "WrongArguments", "no arguments may be supplied for 'pvdisplay'"
## call pvdisplay with the parameter "--colon"
proc = subprocess.Popen(
shell = False,
args = [ allowedProgs["pvdisplay"], "--colon" ])
proc.wait()
return proc.returncode == 0
def getCallingUserInfo():
"""return information about the user that was calling this program via "super"
@user: (uid or name)
@return: tuple of (name, uid, (groups))
"""
## are we called via 'super'?
if ("SUPERCMD" in os.environ) and ("ORIG_USER" in os.environ):
## return the user that was calling super
return getUserInfo(os.environ["ORIG_USER"])
else:
## return the current user
return getUserInfo(os.getuid())
2006-11-06 17:05:00 +01:00
def getUserInfo(user):
"""return information about the specified user
@user: (uid or name)
@return: tuple of (name, uid, (groups))
"""
if (user is None) or (user == ""):
raise "KeyError", "no user supplied"
## if a KeyError is raised again in the following lines, then the supplied
## user was invalid
if type(user) is int:
# 'user' is a uid
2006-11-06 17:05:00 +01:00
userinfo = pwd.getpwuid(user)
elif type(user) is str:
# 'user' is a name
2006-11-06 17:05:00 +01:00
userinfo = pwd.getpwnam(user)
u_groups = [one_group.gr_gid
2006-11-06 17:05:00 +01:00
for one_group in grp.getgrall()
if userinfo.pw_name in one_group.gr_mem]
if not userinfo.pw_gid in u_groups:
## put in front of the list
u_groups.insert(0,userinfo.pw_gid)
2006-11-06 17:05:00 +01:00
return (userinfo.pw_name, userinfo.pw_uid, u_groups)
# **************** main **********************
# prevent import
if __name__ == "__main__":
## do we have root privileges (effective uid is zero)?
2006-11-06 17:05:00 +01:00
if os.geteuid() != 0:
sys.stderr.write("the effective uid is not zero - you should use " \
+ "'super' to call this script (%s)" % sys.argv[0])
2006-11-06 17:05:00 +01:00
sys.exit(100)
## remove program name
2006-11-06 17:05:00 +01:00
args = sys.argv[1:]
# do not allow to use root permissions (real uid may not be zero)
#if os.getuid() == 0:
# sys.stderr.write("the uid of the caller is zero (root) - this is not allowed\n")
# sys.exit(100)
2006-11-06 17:05:00 +01:00
## check if there were arguments
2006-11-06 17:05:00 +01:00
if (len(args) == 0):
sys.stderr.write("No arguments supplied\n")
sys.exit(100)
## did the user call the "check" action?
2006-11-06 17:05:00 +01:00
if (len(args) == 1) and (args[0].lower() == "check"):
# exit silently
sys.exit(0)
## all of the following actions require at least two arguments
if len(args) < 2:
sys.stderr.write("No program/plugin/event specified for execution\n")
sys.exit(100)
## call a plugin root_action script
2006-11-06 17:05:00 +01:00
if args[0].lower() == "plugin":
del args[0]
try:
isOK = call_plugin(args)
except Exception, errMsg:
sys.stderr.write("Execution of plugin '%s' failed: %s\n" \
% (args[0], errMsg))
2006-11-06 17:05:00 +01:00
sys.exit(100)
if isOK:
sys.exit(0)
else:
sys.exit(1)
## call an event script
if args[0].lower() == "event":
del args[0]
try:
isOK = call_event(args)
except Exception, errMsg:
sys.stderr.write("Execution of event '%s' failed: %s\n" \
% (args[0], errMsg))
sys.exit(100)
if isOK:
sys.exit(0)
else:
sys.exit(1)
## call one of the allowed programs
if args[0].lower() == "program":
del args[0]
2006-11-06 17:05:00 +01:00
progRequest = args[0]
del args[0]
if not progRequest in allowedProgs.keys():
sys.stderr.write("Invalid program requested: %s\n" % progRequest)
sys.exit(100)
if progRequest == "cryptsetup": runner = run_cryptsetup
elif progRequest == "mount": runner = run_mount
elif progRequest == "umount": runner = run_umount
elif progRequest == "pvdisplay": runner = run_pvdisplay
2006-11-06 17:05:00 +01:00
else:
sys.stderr.write("The interface for this program (%s) is " \
+ "not yet implemented!\n" % progRequest)
2006-11-06 17:05:00 +01:00
sys.exit(100)
try:
if runner(args):
sys.exit(0)
else:
sys.exit(1)
2008-01-14 22:48:45 +01:00
except OSError, errstr:
2006-11-06 17:05:00 +01:00
sys.stderr.write("Execution failed: %s\n" % errstr)
sys.exit(101)
except "WrongArguments", errstr:
sys.stderr.write("Invalid arguments: %s\n" % errstr)
2006-11-06 17:05:00 +01:00
sys.exit(100)