lars
f95f33ba71
names of cryptobox.conf settings changed cryptsetup interface for CryptoBoxRootActions implemented
180 lines
5.2 KiB
Python
180 lines
5.2 KiB
Python
#!/usr/bin/env python
|
|
"""module for executing the programs, that need root privileges
|
|
|
|
Syntax:
|
|
- program
|
|
- device
|
|
- [action]
|
|
- [action args]
|
|
"""
|
|
|
|
# current problem: how to identify the "cryptobox" user - to check its permissions?
|
|
|
|
import os
|
|
import sys
|
|
import subprocess
|
|
import pwd
|
|
import grp
|
|
import types
|
|
|
|
allowedProgs = {
|
|
"sfdisk": "/sbin/sfdisk",
|
|
"cryptsetup": "/sbin/cryptsetup",
|
|
}
|
|
|
|
|
|
# check if this user has write-access to the device
|
|
allowedUser = "lars"
|
|
# supply gids - otherwise the default groups of the above user are used
|
|
#allowedGroups = [6, 1000]
|
|
|
|
|
|
def isWriteableBlock(device):
|
|
fstat = os.stat(device)
|
|
(dev_type, perm_u, perm_g, perm_o) = (
|
|
fstat.st_mode % 65536 / 4096,
|
|
fstat.st_mode % 512 / 64,
|
|
fstat.st_mode % 512 % 64 / 8,
|
|
fstat.st_mode % 512 % 64 % 8)
|
|
(owner_uid, owner_gid) = (fstat.st_uid, fstat.st_gid)
|
|
(trustUserName, trustUID, groupsOfTrustUser) = getUserInfo(allowedUser)
|
|
trustGIDs = []
|
|
try:
|
|
"if 'allowedGroups' are defined, then ignore the previously aquired groups"
|
|
"TODO: for now only group ids (no names) are allowed"
|
|
trustGIDs.extend(allowedGroups)
|
|
except NameError:
|
|
"'allowedGroups' was not defined - we use the default ones of the user"
|
|
trustGIDs.extend(groupsOfTrustUser)
|
|
except TypeError:
|
|
"if it fails, then 'allowedGroups' was really not well defined"
|
|
trustGIDs.append(allowedGID)
|
|
DEF_TYPES = { "pipe":1, "char":2, "dir":4, "block":6, "file":8, "link":10, "socket":12}
|
|
if dev_type != DEF_TYPES["block"]: return False
|
|
## could the following check be replaced by os.access?
|
|
"does the owner id match?"
|
|
if owner_uid == trustUID:
|
|
"is the write bit (2) set for the owner?"
|
|
return perm_u & 2
|
|
if owner_gid in trustGIDs:
|
|
return perm_g & 2
|
|
"we seem to be one of 'others' - so just check the bit"
|
|
return perm_o & 2
|
|
|
|
|
|
def run_cryptsetup(device, args):
|
|
"""execute cryptsetup as root
|
|
|
|
@args: list of arguments - they will be treated accordingly to the first element
|
|
of this list (the action)"""
|
|
if not args: raise "WrongArguments", "no action for cryptsetup supplied"
|
|
if type(args) != types.ListType: raise "WrongArguments", "invalid arguments supplied"
|
|
try:
|
|
action = args[0]
|
|
del args[0]
|
|
cs_args = [allowedProgs["cryptsetup"]]
|
|
cs_args.extend(args)
|
|
if action == "luksFormat":
|
|
cs_args.append(action)
|
|
cs_args.append(device)
|
|
elif action == "luksUUID":
|
|
cs_args.append(action)
|
|
cs_args.append(device)
|
|
elif action == "luksOpen":
|
|
if len(cs_args) < 2: raise "WrongArguments", "missing arguments"
|
|
cs_args.insert(-1, action)
|
|
cs_args.insert(-1, device)
|
|
elif action == "luksClose":
|
|
if len(cs_args) < 2: raise "WrongArguments", "missing arguments"
|
|
cs_args.insert(-1, action)
|
|
elif action == "luksAddKey":
|
|
cs_args.append(action)
|
|
cs_args.append(device)
|
|
elif action == "luksDelKey":
|
|
if len(cs_args) < 2: raise "WrongArguments", "missing arguments"
|
|
cs_args.insert(-1, action)
|
|
cs_args.insert(-1, device)
|
|
elif action == "isLuks":
|
|
cs_args.append(action)
|
|
cs_args.append(device)
|
|
else: raise "WrongArguments", "invalid action supplied"
|
|
except TypeError:
|
|
raise "WrongArguments", "invalid arguments supplied"
|
|
# execute cryptsetup with the given parameters
|
|
proc = subprocess.Popen(
|
|
shell = False,
|
|
args = cs_args)
|
|
proc.communicate()
|
|
return proc.returncode == 0
|
|
|
|
|
|
def run_sfdisk(device, args):
|
|
print "ok - you are free to call sfdisk ..."
|
|
print " not yet implemented ..."
|
|
return True
|
|
|
|
|
|
def getUserInfo(user):
|
|
"""return information about the specified user
|
|
|
|
@user: (uid or name)
|
|
@return: tuple of (name, uid, (groups))
|
|
"""
|
|
if user is None: raise "KeyError", "no user supplied"
|
|
# first check, if 'user' contains an id - then check for a name
|
|
try:
|
|
userinfo = pwd.getpwuid(user)
|
|
except TypeError:
|
|
# if a KeyError is raised again, then the supplied user was invalid
|
|
userinfo = pwd.getpwnam(user)
|
|
u_groups =[one_group.gr_gid
|
|
for one_group in grp.getgrall()
|
|
if userinfo.pw_name in one_group.gr_mem]
|
|
if not userinfo.pw_gid in u_groups: u_groups.append(userinfo.pw_gid)
|
|
return (userinfo.pw_name, userinfo.pw_uid, u_groups)
|
|
|
|
|
|
# **************** main **********************
|
|
|
|
# prevent import
|
|
if __name__ == "__main__":
|
|
|
|
# remove program name
|
|
sys.argv.remove(sys.argv[0])
|
|
|
|
if len(sys.argv) < 2:
|
|
sys.stderr.write("Not enough arguments supplied (%s)!\n" % " ".join(sys.argv))
|
|
sys.exit(100)
|
|
|
|
progRequest = sys.argv[0]
|
|
deviceRequest = sys.argv[1]
|
|
sys.argv.remove(progRequest)
|
|
sys.argv.remove(deviceRequest)
|
|
|
|
if not progRequest in allowedProgs.keys():
|
|
sys.stderr.write("Invalid program requested: %s\n" % progRequest)
|
|
sys.exit(100)
|
|
|
|
if not os.path.exists(deviceRequest):
|
|
sys.stderr.write("The specified device (%s) does not exist!\n" % deviceRequest)
|
|
sys.exit(100)
|
|
|
|
if not isWriteableBlock(deviceRequest):
|
|
sys.stderr.write("This device (%s) must be a writeable block device!\n" % deviceRequest)
|
|
sys.exit(100)
|
|
|
|
if progRequest == "cryptsetup": runner = run_cryptsetup
|
|
elif progRequest == "sfdisk": runner = run_sfdisk
|
|
else:
|
|
sys.stderr.write("The interface for this program (%s) is not yet implemented!\n" % progRequest)
|
|
sys.exit(100)
|
|
try:
|
|
if runner(deviceRequest, sys.argv):
|
|
sys.exit(0)
|
|
else:
|
|
sys.exit(1)
|
|
except "WrongArguments", errstr:
|
|
sys.stderr.write("Execution failed: %s\n" % errstr)
|
|
sys.exit(100)
|
|
|