use ConfigObj for config file management

implement privilege dropping
test script added
This commit is contained in:
lars 2006-08-17 10:38:05 +00:00
parent 7443b4684e
commit 07a63dbd9c
6 changed files with 338 additions and 149 deletions

View file

@ -5,10 +5,11 @@ import re
"""exceptions:
VolumeIsActive
InvalidName
NameActivelyUsed
InvalidName
InvalidPassword
InvalidType
CreateError
MountError
ChangePasswordError
"""
@ -39,10 +40,12 @@ class CryptoBoxContainer:
__dmDir = "/dev/mapper"
def __init__(self, device, cbox, logger):
def __init__(self, device, cbox):
self.device = device
self.logger = logger
self.cbox = cbox
self.debugMessage = self.cbox.debugMessage
self.__dropPrivileges = self.cbox.dropPrivileges
self.__risePrivileges = self.cbox.risePrivileges
self.__resetObject()
@ -55,7 +58,7 @@ class CryptoBoxContainer:
if new_name == self.name: return
if self.isMounted():
raise "VolumeIsActive", "the container must be inactive during renaming"
if not re.search(r'^[a-zA-Z0-9_\.\- ]$', new_name):
if not re.search(r'^[a-zA-Z0-9_\.\- ]+$', new_name):
raise "InvalidName", "the supplied new name contains illegal characters"
"check for active partitions with the same name"
prev_name_owner = self.cbox.getContainerList(filterName=new_name)
@ -107,12 +110,13 @@ class CryptoBoxContainer:
try:
devnull = open(os.devnull, "w")
except IOError:
self.logger.printMessage(
self.debugMessage(
CryptoBoxLogger.DebugLevels["warn"],
"Could not open %s" % (os.devnull, ))
"remove any potential open luks mapping"
self.__umountLuks()
"create the luks header"
self.__risePrivileges()
proc = subprocess.Popen(
shell = False,
stdin = subprocess.PIPE,
@ -124,12 +128,11 @@ class CryptoBoxContainer:
"luksAddKey",
self.device])
proc.stdin.write("%s\n%s" % (oldpw, newpw))
self.__dropPrivileges()
(output, errout) = proc.communicate()
if proc.returncode != 0:
errorMsg = "Could not add a new luks key: %s - %s" % (output.strip(), errout.strip(), )
self.logger.printMessage(
CryptoBoxLogger.DebugLevels["error"],
errorMsg)
self.debugMessage(CryptoBoxLogger.DebugLevels["error"], errorMsg)
raise "ChangePasswordError", errorMsg
keys_found = re.search(r'key slot (\d{1,3}) unlocked', output).groups()
if keys_found:
@ -137,6 +140,7 @@ class CryptoBoxContainer:
else:
raise "ChangePasswordError", "could not get the old key slot"
"remove the old key"
self.__risePrivileges()
proc = subprocess.Popen(
shell = False,
stdin = None,
@ -149,11 +153,10 @@ class CryptoBoxContainer:
self.device,
"%d" % (keyslot, )])
proc.wait()
self.__dropPrivileges()
if proc.returncode != 0:
errorMsg = "Could not remove the old luks key: %s" % (proc.stderr.read().strip(), )
self.logger.printMessage(
CryptoBoxLogger.DebugLevels["error"],
errorMsg)
self.debugMessage(CryptoBoxLogger.DebugLevels["error"], errorMsg)
raise "ChangePasswordError", errorMsg
@ -179,7 +182,7 @@ class CryptoBoxContainer:
def_name = self.cbox.getNameForUUID(self.uuid)
if def_name: return def_name
"there is no name defined for this uuid - we will propose a good one"
prefix = self.cbox.getConfigValue("DefaultNamePrefix")
prefix = self.cbox.cbxPrefs["Main"]["DefaultVolumePrefix"]
unused_found = False
counter = 1
while not unused_found:
@ -198,9 +201,10 @@ class CryptoBoxContainer:
try:
devnull = open(os.devnull, "w")
except IOError:
self.logger.printMessage(
self.debugMessage(
CryptoBoxLogger.DebugLevels["warn"],
"Could not open %s" % (os.devnull, ))
self.__risePrivileges()
proc = subprocess.Popen(
shell=False,
stdin=None,
@ -213,12 +217,13 @@ class CryptoBoxContainer:
"-w", os.devnull,
self.device])
proc.wait()
result = proc.stdout.read().strip()
self.__dropPrivileges()
if proc.returncode != 0:
self.logger.printMessage(
self.debugMessage(
CryptoBoxLogger.DebugLevels["warn"],
"retrieving of partition type via 'blkid' failed: %s" % (proc.stderr.read().strip(), ))
return None
result = proc.stdout.read().strip()
devnull.close()
if result: return result
return self.device.replace(os.path.sep, "_")
@ -241,9 +246,10 @@ class CryptoBoxContainer:
try:
devnull = open(os.devnull, "w")
except IOError:
self.logger.printMessage(
self.debugMessage(
CryptoBoxLogger.DebugLevels["warn"],
"Could not open %s" % (os.devnull, ))
self.__risePrivileges()
proc = subprocess.Popen(
shell=False,
stdin=None,
@ -256,12 +262,13 @@ class CryptoBoxContainer:
"-w", os.devnull,
self.device])
proc.wait()
output = proc.stdout.read().strip()
self.__dropPrivileges()
if proc.returncode != 0:
self.logger.printMessage(
self.debugMessage(
CryptoBoxLogger.DebugLevels["warn"],
"retrieving of partition type via 'blkid' failed: %s" % (proc.stderr.read().strip(), ))
return None
output = proc.stdout.read().strip()
devnull.close()
return output
@ -272,9 +279,10 @@ class CryptoBoxContainer:
try:
devnull = open(os.devnull, "w")
except IOError:
self.logger.printMessage(
self.debugMessage(
CryptoBoxLogger.DebugLevels["warn"],
"Could not open %s" % (os.devnull, ))
self.__risePrivileges()
proc = subprocess.Popen(
shell = False,
stdin = None,
@ -286,13 +294,14 @@ class CryptoBoxContainer:
"isLuks",
self.device])
proc.wait()
self.__dropPrivileges()
devnull.close()
return proc.returncode == 0
def __getMountPoint(self):
"return the name of the mountpoint of this volume"
return os.path.join(self.cbox.getConfigValue("MountParent"), self.name)
return os.path.join(self.cbox.cbxPrefs["System"]["MountParentDir"], self.name)
def __mountLuks(self, password):
@ -304,9 +313,17 @@ class CryptoBoxContainer:
try:
devnull = open(os.devnull, "w")
except IOError:
self.logger.printMessage(
self.debugMessage(
CryptoBoxLogger.DebugLevels["warn"],
"Could not open %s" % (os.devnull, ))
self.__cleanMountDirs()
if not os.path.exists(self.__getMountPoint()):
os.mkdir(self.__getMountPoint())
if not os.path.exists(self.__getMountPoint()):
errorMsg = "Could not create mountpoint (%s)" % (self.__getMountPoint(), )
self.debugMessage("error", errorMsg)
raise "MountError", errorMsg
self.__risePrivileges()
proc = subprocess.Popen(
shell = False,
stdin = subprocess.PIPE,
@ -320,27 +337,26 @@ class CryptoBoxContainer:
self.name])
proc.stdin.write(password)
(output, errout) = proc.communicate()
self.__dropPrivileges()
if proc.returncode != 0:
errorMsg = "Could not open the luks mapping: %s" % (errout.strip(), )
self.logger.printMessage(
CryptoBoxLogger.DebugLevels["warn"],
errorMsg)
self.debugMessage(CryptoBoxLogger.DebugLevels["warn"], errorMsg)
raise "MountError", errorMsg
self.__risePrivileges()
proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = devnull,
stderr = devnull,
stderr = subprocess.PIPE,
args = [
self.Progs["mount"],
os.path.join(self.__dmDir, self.name),
self.__getMountPoint()])
proc.wait()
self.__dropPrivileges()
if proc.returncode != 0:
errorMsg = "Could not mount the filesystem: %s" % (proc.stderr.read().strip(), )
self.logger.printMessage(
CryptoBoxLogger.DebugLevels["warn"],
errorMsg)
self.debugMessage(CryptoBoxLogger.DebugLevels["warn"], errorMsg)
raise "MountError", errorMsg
devnull.close()
@ -351,24 +367,25 @@ class CryptoBoxContainer:
try:
devnull = open(os.devnull, "w")
except IOError:
self.logger.printMessage(
self.debugMessage(
CryptoBoxLogger.DebugLevels["warn"],
"Could not open %s" % (os.devnull, ))
if self.isMounted():
self.__risePrivileges()
proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = devnull,
stderr = devnull,
stderr = subprocess.PIPE,
args = [self.Progs["umount"], "-l", self.__getMountPoint()])
proc.wait()
self.__dropPrivileges()
if proc.returncode != 0:
errorMsg = "Could not umount the filesystem: %s" % (proc.stderr.read().strip(), )
self.logger.printMessage(
CryptoBoxLogger.DebugLevels["warn"],
errorMsg)
self.debugMessage(CryptoBoxLogger.DebugLevels["warn"], errorMsg)
raise "MountError", errorMsg
if os.path.exists(os.path.join(self.__dmDir, self.name)):
self.__risePrivileges()
proc = subprocess.Popen(
shell = False,
stdin = None,
@ -380,27 +397,32 @@ class CryptoBoxContainer:
"luksClose",
self.name])
proc.wait()
self.__dropPrivileges()
if proc.returncode != 0:
errorMsg = "Could not remove the luks mapping: %s" % (proc.stderr.read().strip(), )
self.logger.printMessage(
CryptoBoxLogger.DebugLevels["warn"],
errorMsg)
self.debugMessage(CryptoBoxLogger.DebugLevels["warn"], errorMsg)
raise "MountError", errorMsg
devnull.close()
def __mountPlain(self):
"mount a plaintext partition"
if self.isMounted(): raise "VolumeIsActive", "this container is already active"
devnull = None
try:
devnull = open(os.devnull, "w")
except IOError:
self.logger.printMessage(
self.debugMessage(
CryptoBoxLogger.DebugLevels["warn"],
"Could not open %s" % (os.devnull, ))
self.__cleanMountDirs()
if not os.path.exists(self.__getMountPoint()):
os.mkdir(self.__getMountPoint())
if not os.path.exists(self.__getMountPoint()):
errorMsg = "Could not create mountpoint (%s)" % (self.__getMountPoint(), )
self.debugMessage("error", errorMsg)
raise "MountError", errorMsg
self.__risePrivileges()
proc = subprocess.Popen(
shell = False,
stdin = None,
@ -411,11 +433,10 @@ class CryptoBoxContainer:
self.device,
self.__getMountPoint()])
proc.wait()
self.__dropPrivileges()
if proc.returncode != 0:
errorMsg = "Could not mount the filesystem: %s" % (proc.stderr.read().strip(), )
self.logger.printMessage(
CryptoBoxLogger.DebugLevels["warn"],
errorMsg)
self.debugMessage(CryptoBoxLogger.DebugLevels["warn"], errorMsg)
raise "MountError", errorMsg
devnull.close()
@ -426,25 +447,26 @@ class CryptoBoxContainer:
try:
devnull = open(os.devnull, "w")
except IOError:
self.logger.printMessage(
self.debugMessage(
CryptoBoxLogger.DebugLevels["warn"],
"Could not open %s" % (os.devnull, ))
proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = devnull,
stderr = subprocess.PIPE,
args = [
self.Progs["umount"],
"-l",
self.__getMountPoint()])
proc.wait()
if proc.returncode != 0:
errorMsg = "Could not umount the filesystem: %s" % (proc.stderr.read().strip(), )
self.logger.printMessage(
CryptoBoxLogger.DebugLevels["warn"],
errorMsg)
raise "MountError", errorMsg
if self.isMounted():
self.__risePrivileges()
proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = devnull,
stderr = subprocess.PIPE,
args = [
self.Progs["umount"],
"-l",
self.__getMountPoint()])
proc.wait()
self.__dropPrivileges()
if proc.returncode != 0:
errorMsg = "Could not umount the filesystem: %s" % (proc.stderr.read().strip(), )
self.debugMessage(CryptoBoxLogger.DebugLevels["warn"], errorMsg)
raise "MountError", errorMsg
devnull.close()
@ -457,9 +479,10 @@ class CryptoBoxContainer:
try:
devnull = open(os.devnull, "w")
except IOError:
self.logger.printMessage(
self.debugMessage(
CryptoBoxLogger.DebugLevels["warn"],
"Could not open %s" % (os.devnull, ))
self.__risePrivileges()
proc = subprocess.Popen(
shell = False,
stdin = None,
@ -469,11 +492,10 @@ class CryptoBoxContainer:
self.Progs["mkfs-data"],
self.device])
proc.wait()
self.__dropPrivileges()
if proc.returncode != 0:
errorMsg = "Could not create the filesystem: %s" % (proc.stderr.read().strip(), )
self.logger.printMessage(
CryptoBoxLogger.DebugLevels["error"],
errorMsg)
self.debugMessage(CryptoBoxLogger.DebugLevels["error"], errorMsg)
raise "CreateError", errorMsg
devnull.close()
@ -489,12 +511,13 @@ class CryptoBoxContainer:
try:
devnull = open(os.devnull, "w")
except IOError:
self.logger.printMessage(
self.debugMessage(
CryptoBoxLogger.DebugLevels["warn"],
"Could not open %s" % (os.devnull, ))
"remove any potential open luks mapping"
self.__umountLuks()
"create the luks header"
self.__risePrivileges()
proc = subprocess.Popen(
shell = False,
stdin = subprocess.PIPE,
@ -503,19 +526,19 @@ class CryptoBoxContainer:
args = [
self.Progs["cryptsetup"],
"--batch-mode",
"--cipher", self.cbox.getConfigValue("DefaultCipher"),
"--cipher", self.cbox.cbxPrefs["System"]["DefaultCipher"],
"--iter-time", "2000",
"luksFormat",
self.device])
proc.stdin.write(password)
(output, errout) = proc.communicate()
self.__dropPrivileges()
if proc.returncode != 0:
errorMsg = "Could not create the luks header: %s" % (errout.strip(), )
self.logger.printMessage(
CryptoBoxLogger.DebugLevels["error"],
errorMsg)
self.debugMessage("error", errorMsg)
raise "CreateError", errorMsg
"open the luks container for mkfs"
self.__risePrivileges()
proc = subprocess.Popen(
shell = False,
stdin = subprocess.PIPE,
@ -529,29 +552,28 @@ class CryptoBoxContainer:
self.name])
proc.stdin.write(password)
(output, errout) = proc.communicate()
self.__dropPrivileges()
if proc.returncode != 0:
errorMsg = "Could not open the new luks mapping: %s" % (errout.strip(), )
self.logger.printMessage(
CryptoBoxLogger.DebugLevels["error"],
errorMsg)
self.debugMessage(CryptoBoxLogger.DebugLevels["error"], errorMsg)
raise "CreateError", errorMsg
"make the filesystem"
self.__risePrivileges()
proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = devnull,
stderr = devnull,
stderr = subprocess.PIPE,
args = [
self.Progs["mkfs-data"],
os.path.join(self.__dmDir, self.name)])
proc.wait()
self.__dropPrivileges()
"remove the mapping - for every exit status"
self.__umountLuks()
if proc.returncode != 0:
errorMsg = "Could not create the filesystem: %s" % (proc.stderr.read().strip(), )
self.logger.printMessage(
CryptoBoxLogger.DebugLevels["error"],
errorMsg)
self.debugMessage(CryptoBoxLogger.DebugLevels["error"], errorMsg)
"remove the luks mapping"
raise "CreateError", errorMsg
devnull.close()
@ -560,9 +582,9 @@ class CryptoBoxContainer:
def __cleanMountDirs(self):
""" remove all unnecessary subdirs of the mount parent directory
this should be called for every (u)mount """
subdirs = os.listdir(self.cbox.getConfigValue("MountParent"))
subdirs = os.listdir(self.cbox.cbxPrefs["System"]["MountParentDir"])
for dir in subdirs:
abs_dir = os.path.join(self.cbox.getConfigValue("MountParent"), dir)
abs_dir = os.path.join(self.cbox.cbxPrefs["System"]["MountParentDir"], dir)
if (not os.path.islink(abs_dir)) and os.path.isdir(abs_dir) and (not os.path.ismount(abs_dir)): os.rmdir(abs_dir)