cryptonas-branches/pythonrewrite/bin2/CryptoBoxContainer.py

593 lines
16 KiB
Python

from CryptoBoxLogger import CryptoBoxLogger
try:
import subprocess
except:
print "Couldn't import 'subprocess'. You need a python version >= 2.4"
import os
import re
"""exceptions:
VolumeIsActive
NameActivelyUsed
InvalidName
InvalidPassword
InvalidType
CreateError
MountError
ChangePasswordError
"""
class CryptoBoxContainer:
Progs = {
"cryptsetup":"/sbin/cryptsetup",
"mkfs-data":"/sbin/mkfs.ext3",
"mkfs-config":"/sbin/mkfs.ext2",
"blkid":"/sbin/blkid",
"mount":"/bin/mount",
"umount":"/bin/umount"}
Types = {
"unused":0,
"plain":1,
"luks":2,
"swap":3}
__fsTypes = {
"plain":["ext3", "ext2", "vfat", "reiser"],
"swap":["swap"]}
"TODO: mehr Dateisystemtypen? / 'reiser' pruefen"
__dmDir = "/dev/mapper"
def __init__(self, device, cbox):
self.device = device
self.cbox = cbox
self.debugMessage = self.cbox.debugMessage
self.__dropPrivileges = self.cbox.dropPrivileges
self.__risePrivileges = self.cbox.risePrivileges
self.__resetObject()
def getName(self):
return self.name
def setName(self, new_name):
"TODO: den Test-Code pruefen"
if new_name == self.name: return
if self.isMounted():
raise "VolumeIsActive", "the container must be inactive during renaming"
if not re.search(r'^[a-zA-Z0-9_\.\- ]+$', new_name):
raise "InvalidName", "the supplied new name contains illegal characters"
"check for active partitions with the same name"
prev_name_owner = self.cbox.getContainerList(filterName=new_name)
if prev_name_owner:
for a in prev_name_owner:
if a.isMounted():
raise "NameActivelyUsed", "the supplied new name is already in use for an active partition"
self.cbox.setNameForUUID(self.uuid, new_name)
self.name = new_name
def getDevice(self):
return self.device
def getType(self):
return self.type
def isMounted(self):
return os.path.ismount(self.__getMountPoint())
def create(self, type, password=None):
if type == self.Types["luks"]:
self.__createLuks(password)
self.__resetObject()
return
if type == self.Types["plain"]:
self.__createPlain()
self.__resetObject()
return
raise "InvalidType", "invalid container type (%d) supplied" % (type, )
def changePassword(self, oldpw, newpw):
if self.type != self.Types["luks"]:
raise "InvalidType", \
"changing of password is possible only for luks containers"
if not oldpw:
raise "InvalidPassword", "no old password supplied for password change"
if not newpw:
raise "InvalidPassword", "no new password supplied for password change"
"return if new and old passwords are the same"
if oldpw == newpw: return
if self.isMounted():
raise "VolumeIsActive", "this container is currently active"
devnull = None
try:
devnull = open(os.devnull, "w")
except IOError:
self.debugMessage(
CryptoBoxLogger.DebugLevels["warn"],
"Could not open %s" % (os.devnull, ))
"remove any potential open luks mapping"
self.__umountLuks()
"create the luks header"
self.__risePrivileges()
proc = subprocess.Popen(
shell = False,
stdin = subprocess.PIPE,
stdout = subprocess.PIPE,
stderr = subprocess.PIPE,
args = [
self.Progs["cryptsetup"],
"--batch-mode",
"luksAddKey",
self.device])
proc.stdin.write("%s\n%s" % (oldpw, newpw))
self.__dropPrivileges()
(output, errout) = proc.communicate()
if proc.returncode != 0:
errorMsg = "Could not add a new luks key: %s - %s" % (output.strip(), errout.strip(), )
self.debugMessage(CryptoBoxLogger.DebugLevels["error"], errorMsg)
raise "ChangePasswordError", errorMsg
keys_found = re.search(r'key slot (\d{1,3}) unlocked', output).groups()
if keys_found:
keyslot = int(keys_found[0])
else:
raise "ChangePasswordError", "could not get the old key slot"
"remove the old key"
self.__risePrivileges()
proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = devnull,
stderr = subprocess.PIPE,
args = [
self.Progs["cryptsetup"],
"--batch-mode",
"luksDelKey",
self.device,
"%d" % (keyslot, )])
proc.wait()
self.__dropPrivileges()
if proc.returncode != 0:
errorMsg = "Could not remove the old luks key: %s" % (proc.stderr.read().strip(), )
self.debugMessage(CryptoBoxLogger.DebugLevels["error"], errorMsg)
raise "ChangePasswordError", errorMsg
" ****************** internal stuff ********************* "
def __resetObject(self):
""" recheck the information about this container
this is especially useful after changing the type via 'create' """
self.uuid = self.__getUUID()
self.type = self.__getTypeOfPartition()
self.name = self.__getNameOfContainer()
if self.type == self.Types["luks"]:
self.mount = self.__mountLuks
self.umount = self.__umountLuks
if self.type == self.Types["plain"]:
self.mount = self.__mountPlain
self.umount = self.__umountPlain
def __getNameOfContainer(self):
"retrieve the name of the container by querying the database"
def_name = self.cbox.getNameForUUID(self.uuid)
if def_name: return def_name
"there is no name defined for this uuid - we will propose a good one"
prefix = self.cbox.cbxPrefs["Main"]["DefaultVolumePrefix"]
unused_found = False
counter = 1
while not unused_found:
guess = prefix + str(counter)
if self.cbox.getUUIDForName(guess):
counter += 1
else:
unused_found = True
self.cbox.setNameForUUID(self.uuid, guess)
return guess
def __getUUID(self):
"""return UUID for luks partitions, ext2/3 and vfat filesystems"""
devnull = None
try:
devnull = open(os.devnull, "w")
except IOError:
self.debugMessage(
CryptoBoxLogger.DebugLevels["warn"],
"Could not open %s" % (os.devnull, ))
self.__risePrivileges()
proc = subprocess.Popen(
shell=False,
stdin=None,
stdout=subprocess.PIPE,
stderr=devnull,
args=[self.Progs["blkid"],
"-s", "UUID",
"-o", "value",
"-c", os.devnull,
"-w", os.devnull,
self.device])
proc.wait()
result = proc.stdout.read().strip()
self.__dropPrivileges()
if proc.returncode != 0:
self.debugMessage(
CryptoBoxLogger.DebugLevels["warn"],
"retrieving of partition type via 'blkid' failed: %s" % (proc.stderr.read().strip(), ))
return None
devnull.close()
if result: return result
return self.device.replace(os.path.sep, "_")
def __getTypeOfPartition(self):
"retrieve the type of the given partition (see CryptoBoxContainer.Types)"
if self.__isLuksPartition(): return self.Types["luks"]
typeOfPartition = self.__getTypeIdOfPartition()
if typeOfPartition in self.__fsTypes["plain"]:
return self.Types["plain"]
if typeOfPartition in self.__fsTypes["swap"]:
return self.Types["swap"]
return self.Types["unused"]
def __getTypeIdOfPartition(self):
"returns the type of the partition (see 'man blkid')"
devnull = None
try:
devnull = open(os.devnull, "w")
except IOError:
self.debugMessage(
CryptoBoxLogger.DebugLevels["warn"],
"Could not open %s" % (os.devnull, ))
self.__risePrivileges()
proc = subprocess.Popen(
shell=False,
stdin=None,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
args=[self.Progs["blkid"],
"-s", "TYPE",
"-o", "value",
"-c", os.devnull,
"-w", os.devnull,
self.device])
proc.wait()
output = proc.stdout.read().strip()
self.__dropPrivileges()
if proc.returncode != 0:
self.debugMessage(
CryptoBoxLogger.DebugLevels["warn"],
"retrieving of partition type via 'blkid' failed: %s" % (proc.stderr.read().strip(), ))
return None
devnull.close()
return output
def __isLuksPartition(self):
"check if the given device is a luks partition"
devnull = None
try:
devnull = open(os.devnull, "w")
except IOError:
self.debugMessage(
CryptoBoxLogger.DebugLevels["warn"],
"Could not open %s" % (os.devnull, ))
self.__risePrivileges()
proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = devnull,
stderr = devnull,
args = [
self.Progs["cryptsetup"],
"--batch-mode",
"isLuks",
self.device])
proc.wait()
self.__dropPrivileges()
devnull.close()
return proc.returncode == 0
def __getMountPoint(self):
"return the name of the mountpoint of this volume"
return os.path.join(self.cbox.cbxPrefs["System"]["MountParentDir"], self.name)
def __mountLuks(self, password):
"mount a luks partition"
if not password:
raise "InvalidPassword", "no password supplied for luksOpen"
if self.isMounted(): raise "VolumeIsActive", "this container is already active"
self.__umountLuks()
try:
devnull = open(os.devnull, "w")
except IOError:
self.debugMessage(
CryptoBoxLogger.DebugLevels["warn"],
"Could not open %s" % (os.devnull, ))
self.__cleanMountDirs()
if not os.path.exists(self.__getMountPoint()):
os.mkdir(self.__getMountPoint())
if not os.path.exists(self.__getMountPoint()):
errorMsg = "Could not create mountpoint (%s)" % (self.__getMountPoint(), )
self.debugMessage("error", errorMsg)
raise "MountError", errorMsg
self.__risePrivileges()
proc = subprocess.Popen(
shell = False,
stdin = subprocess.PIPE,
stdout = devnull,
stderr = subprocess.PIPE,
args = [
self.Progs["cryptsetup"],
"luksOpen",
"--batch-mode",
self.device,
self.name])
proc.stdin.write(password)
(output, errout) = proc.communicate()
self.__dropPrivileges()
if proc.returncode != 0:
errorMsg = "Could not open the luks mapping: %s" % (errout.strip(), )
self.debugMessage(CryptoBoxLogger.DebugLevels["warn"], errorMsg)
raise "MountError", errorMsg
self.__risePrivileges()
proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = devnull,
stderr = subprocess.PIPE,
args = [
self.Progs["mount"],
os.path.join(self.__dmDir, self.name),
self.__getMountPoint()])
proc.wait()
self.__dropPrivileges()
if proc.returncode != 0:
errorMsg = "Could not mount the filesystem: %s" % (proc.stderr.read().strip(), )
self.debugMessage(CryptoBoxLogger.DebugLevels["warn"], errorMsg)
raise "MountError", errorMsg
devnull.close()
def __umountLuks(self):
"umount a luks partition"
devnull = None
try:
devnull = open(os.devnull, "w")
except IOError:
self.debugMessage(
CryptoBoxLogger.DebugLevels["warn"],
"Could not open %s" % (os.devnull, ))
if self.isMounted():
self.__risePrivileges()
proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = devnull,
stderr = subprocess.PIPE,
args = [self.Progs["umount"], "-l", self.__getMountPoint()])
proc.wait()
self.__dropPrivileges()
if proc.returncode != 0:
errorMsg = "Could not umount the filesystem: %s" % (proc.stderr.read().strip(), )
self.debugMessage(CryptoBoxLogger.DebugLevels["warn"], errorMsg)
raise "MountError", errorMsg
if os.path.exists(os.path.join(self.__dmDir, self.name)):
self.__risePrivileges()
proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = devnull,
stderr = subprocess.PIPE,
args = [
self.Progs["cryptsetup"],
"--batch-mode",
"luksClose",
self.name])
proc.wait()
self.__dropPrivileges()
if proc.returncode != 0:
errorMsg = "Could not remove the luks mapping: %s" % (proc.stderr.read().strip(), )
self.debugMessage(CryptoBoxLogger.DebugLevels["warn"], errorMsg)
raise "MountError", errorMsg
devnull.close()
def __mountPlain(self):
"mount a plaintext partition"
if self.isMounted(): raise "VolumeIsActive", "this container is already active"
devnull = None
try:
devnull = open(os.devnull, "w")
except IOError:
self.debugMessage(
CryptoBoxLogger.DebugLevels["warn"],
"Could not open %s" % (os.devnull, ))
self.__cleanMountDirs()
if not os.path.exists(self.__getMountPoint()):
os.mkdir(self.__getMountPoint())
if not os.path.exists(self.__getMountPoint()):
errorMsg = "Could not create mountpoint (%s)" % (self.__getMountPoint(), )
self.debugMessage("error", errorMsg)
raise "MountError", errorMsg
self.__risePrivileges()
proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = devnull,
stderr = subprocess.PIPE,
args = [
self.Progs["mount"],
self.device,
self.__getMountPoint()])
proc.wait()
self.__dropPrivileges()
if proc.returncode != 0:
errorMsg = "Could not mount the filesystem: %s" % (proc.stderr.read().strip(), )
self.debugMessage(CryptoBoxLogger.DebugLevels["warn"], errorMsg)
raise "MountError", errorMsg
devnull.close()
def __umountPlain(self):
"umount a plaintext partition"
devnull = None
try:
devnull = open(os.devnull, "w")
except IOError:
self.debugMessage(
CryptoBoxLogger.DebugLevels["warn"],
"Could not open %s" % (os.devnull, ))
if self.isMounted():
self.__risePrivileges()
proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = devnull,
stderr = subprocess.PIPE,
args = [
self.Progs["umount"],
"-l",
self.__getMountPoint()])
proc.wait()
self.__dropPrivileges()
if proc.returncode != 0:
errorMsg = "Could not umount the filesystem: %s" % (proc.stderr.read().strip(), )
self.debugMessage(CryptoBoxLogger.DebugLevels["warn"], errorMsg)
raise "MountError", errorMsg
devnull.close()
def __createPlain(self):
"make a plaintext partition"
if self.isMounted():
raise "VolumeIsActive", \
"deactivate the partition before filesystem initialization"
devnull = None
try:
devnull = open(os.devnull, "w")
except IOError:
self.debugMessage(
CryptoBoxLogger.DebugLevels["warn"],
"Could not open %s" % (os.devnull, ))
self.__risePrivileges()
proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = devnull,
stderr = subprocess.PIPE,
args = [
self.Progs["mkfs-data"],
self.device])
proc.wait()
self.__dropPrivileges()
if proc.returncode != 0:
errorMsg = "Could not create the filesystem: %s" % (proc.stderr.read().strip(), )
self.debugMessage(CryptoBoxLogger.DebugLevels["error"], errorMsg)
raise "CreateError", errorMsg
devnull.close()
def __createLuks(self, password):
"make a luks partition"
if not password:
raise "InvalidPassword", "no password supplied for new luks mapping"
if self.isMounted():
raise "VolumeIsActive", \
"deactivate the partition before filesystem initialization"
devnull = None
try:
devnull = open(os.devnull, "w")
except IOError:
self.debugMessage(
CryptoBoxLogger.DebugLevels["warn"],
"Could not open %s" % (os.devnull, ))
"remove any potential open luks mapping"
self.__umountLuks()
"create the luks header"
self.__risePrivileges()
proc = subprocess.Popen(
shell = False,
stdin = subprocess.PIPE,
stdout = devnull,
stderr = subprocess.PIPE,
args = [
self.Progs["cryptsetup"],
"--batch-mode",
"--cipher", self.cbox.cbxPrefs["System"]["DefaultCipher"],
"--iter-time", "2000",
"luksFormat",
self.device])
proc.stdin.write(password)
(output, errout) = proc.communicate()
self.__dropPrivileges()
if proc.returncode != 0:
errorMsg = "Could not create the luks header: %s" % (errout.strip(), )
self.debugMessage("error", errorMsg)
raise "CreateError", errorMsg
"open the luks container for mkfs"
self.__risePrivileges()
proc = subprocess.Popen(
shell = False,
stdin = subprocess.PIPE,
stdout = devnull,
stderr = subprocess.PIPE,
args = [
self.Progs["cryptsetup"],
"--batch-mode",
"luksOpen",
self.device,
self.name])
proc.stdin.write(password)
(output, errout) = proc.communicate()
self.__dropPrivileges()
if proc.returncode != 0:
errorMsg = "Could not open the new luks mapping: %s" % (errout.strip(), )
self.debugMessage(CryptoBoxLogger.DebugLevels["error"], errorMsg)
raise "CreateError", errorMsg
"make the filesystem"
self.__risePrivileges()
proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = devnull,
stderr = subprocess.PIPE,
args = [
self.Progs["mkfs-data"],
os.path.join(self.__dmDir, self.name)])
proc.wait()
self.__dropPrivileges()
"remove the mapping - for every exit status"
self.__umountLuks()
if proc.returncode != 0:
errorMsg = "Could not create the filesystem: %s" % (proc.stderr.read().strip(), )
self.debugMessage(CryptoBoxLogger.DebugLevels["error"], errorMsg)
"remove the luks mapping"
raise "CreateError", errorMsg
devnull.close()
def __cleanMountDirs(self):
""" remove all unnecessary subdirs of the mount parent directory
this should be called for every (u)mount """
subdirs = os.listdir(self.cbox.cbxPrefs["System"]["MountParentDir"])
for dir in subdirs:
abs_dir = os.path.join(self.cbox.cbxPrefs["System"]["MountParentDir"], dir)
if (not os.path.islink(abs_dir)) and os.path.isdir(abs_dir) and (not os.path.ismount(abs_dir)): os.rmdir(abs_dir)