lars
b951efdd9c
syslog support added moved program locations to config file removed obsolete CryptoBoxPreferences added some requests for comments (RFC)
553 lines
16 KiB
Python
553 lines
16 KiB
Python
from CryptoBoxLogger import CryptoBoxLogger
|
|
try:
|
|
import subprocess
|
|
except:
|
|
print "Couldn't import 'subprocess'. You need a python version >= 2.4"
|
|
import os
|
|
import re
|
|
|
|
"""exceptions:
|
|
VolumeIsActive
|
|
NameActivelyUsed
|
|
InvalidName
|
|
InvalidPassword
|
|
InvalidType
|
|
CreateError
|
|
MountError
|
|
ChangePasswordError
|
|
"""
|
|
|
|
class CryptoBoxContainer:
|
|
|
|
Types = {
|
|
"unused":0,
|
|
"plain":1,
|
|
"luks":2,
|
|
"swap":3}
|
|
|
|
|
|
__fsTypes = {
|
|
"plain":["ext3", "ext2", "vfat", "reiser"],
|
|
"swap":["swap"]}
|
|
"TODO: mehr Dateisystemtypen? / 'reiser' pruefen"
|
|
|
|
__dmDir = "/dev/mapper"
|
|
|
|
|
|
def __init__(self, device, cbox):
|
|
self.device = device
|
|
self.cbox = cbox
|
|
self.debugMessage = self.cbox.debugMessage
|
|
self.Progs = self.cbox.cbxPrefs["Programs"]
|
|
self.__resetObject()
|
|
|
|
|
|
def getName(self):
|
|
return self.name
|
|
|
|
|
|
def setName(self, new_name):
|
|
"TODO: den Test-Code pruefen"
|
|
if new_name == self.name: return
|
|
if self.isMounted():
|
|
raise "VolumeIsActive", "the container must be inactive during renaming"
|
|
if not re.search(r'^[a-zA-Z0-9_\.\- ]+$', new_name):
|
|
raise "InvalidName", "the supplied new name contains illegal characters"
|
|
"check for active partitions with the same name"
|
|
prev_name_owner = self.cbox.getContainerList(filterName=new_name)
|
|
if prev_name_owner:
|
|
for a in prev_name_owner:
|
|
if a.isMounted():
|
|
raise "NameActivelyUsed", "the supplied new name is already in use for an active partition"
|
|
self.cbox.setNameForUUID(self.uuid, new_name)
|
|
self.name = new_name
|
|
|
|
|
|
def getDevice(self):
|
|
return self.device
|
|
|
|
|
|
def getType(self):
|
|
return self.type
|
|
|
|
|
|
def isMounted(self):
|
|
return os.path.ismount(self.__getMountPoint())
|
|
|
|
|
|
def create(self, type, password=None):
|
|
if type == self.Types["luks"]:
|
|
self.__createLuks(password)
|
|
self.__resetObject()
|
|
return
|
|
if type == self.Types["plain"]:
|
|
self.__createPlain()
|
|
self.__resetObject()
|
|
return
|
|
raise "InvalidType", "invalid container type (%d) supplied" % (type, )
|
|
|
|
|
|
def changePassword(self, oldpw, newpw):
|
|
if self.type != self.Types["luks"]:
|
|
raise "InvalidType", \
|
|
"changing of password is possible only for luks containers"
|
|
if not oldpw:
|
|
raise "InvalidPassword", "no old password supplied for password change"
|
|
if not newpw:
|
|
raise "InvalidPassword", "no new password supplied for password change"
|
|
"return if new and old passwords are the same"
|
|
if oldpw == newpw: return
|
|
if self.isMounted():
|
|
raise "VolumeIsActive", "this container is currently active"
|
|
devnull = None
|
|
try:
|
|
devnull = open(os.devnull, "w")
|
|
except IOError:
|
|
self.debugMessage(
|
|
CryptoBoxLogger.DebugLevels["warn"],
|
|
"Could not open %s" % (os.devnull, ))
|
|
"remove any potential open luks mapping"
|
|
self.__umountLuks()
|
|
"create the luks header"
|
|
proc = subprocess.Popen(
|
|
shell = False,
|
|
stdin = subprocess.PIPE,
|
|
stdout = subprocess.PIPE,
|
|
stderr = subprocess.PIPE,
|
|
args = [
|
|
self.Progs["cryptsetup"],
|
|
"--batch-mode",
|
|
"luksAddKey",
|
|
self.device])
|
|
proc.stdin.write("%s\n%s" % (oldpw, newpw))
|
|
(output, errout) = proc.communicate()
|
|
if proc.returncode != 0:
|
|
errorMsg = "Could not add a new luks key: %s - %s" % (output.strip(), errout.strip(), )
|
|
self.debugMessage(CryptoBoxLogger.DebugLevels["error"], errorMsg)
|
|
raise "ChangePasswordError", errorMsg
|
|
keys_found = re.search(r'key slot (\d{1,3}) unlocked', output).groups()
|
|
if keys_found:
|
|
keyslot = int(keys_found[0])
|
|
else:
|
|
raise "ChangePasswordError", "could not get the old key slot"
|
|
"remove the old key"
|
|
proc = subprocess.Popen(
|
|
shell = False,
|
|
stdin = None,
|
|
stdout = devnull,
|
|
stderr = subprocess.PIPE,
|
|
args = [
|
|
self.Progs["cryptsetup"],
|
|
"--batch-mode",
|
|
"luksDelKey",
|
|
self.device,
|
|
"%d" % (keyslot, )])
|
|
proc.wait()
|
|
if proc.returncode != 0:
|
|
errorMsg = "Could not remove the old luks key: %s" % (proc.stderr.read().strip(), )
|
|
self.debugMessage(CryptoBoxLogger.DebugLevels["error"], errorMsg)
|
|
raise "ChangePasswordError", errorMsg
|
|
|
|
|
|
|
|
" ****************** internal stuff ********************* "
|
|
|
|
def __resetObject(self):
|
|
""" recheck the information about this container
|
|
this is especially useful after changing the type via 'create' """
|
|
self.uuid = self.__getUUID()
|
|
self.type = self.__getTypeOfPartition()
|
|
self.name = self.__getNameOfContainer()
|
|
if self.type == self.Types["luks"]:
|
|
self.mount = self.__mountLuks
|
|
self.umount = self.__umountLuks
|
|
if self.type == self.Types["plain"]:
|
|
self.mount = self.__mountPlain
|
|
self.umount = self.__umountPlain
|
|
|
|
|
|
def __getNameOfContainer(self):
|
|
"retrieve the name of the container by querying the database"
|
|
def_name = self.cbox.getNameForUUID(self.uuid)
|
|
if def_name: return def_name
|
|
"there is no name defined for this uuid - we will propose a good one"
|
|
prefix = self.cbox.cbxPrefs["Main"]["DefaultVolumePrefix"]
|
|
unused_found = False
|
|
counter = 1
|
|
while not unused_found:
|
|
guess = prefix + str(counter)
|
|
if self.cbox.getUUIDForName(guess):
|
|
counter += 1
|
|
else:
|
|
unused_found = True
|
|
self.cbox.setNameForUUID(self.uuid, guess)
|
|
return guess
|
|
|
|
|
|
def __getUUID(self):
|
|
"""return UUID for luks partitions, ext2/3 and vfat filesystems"""
|
|
devnull = None
|
|
try:
|
|
devnull = open(os.devnull, "w")
|
|
except IOError:
|
|
self.debugMessage(
|
|
CryptoBoxLogger.DebugLevels["warn"],
|
|
"Could not open %s" % (os.devnull, ))
|
|
proc = subprocess.Popen(
|
|
shell=False,
|
|
stdin=None,
|
|
stdout=subprocess.PIPE,
|
|
stderr=devnull,
|
|
args=[self.Progs["blkid"],
|
|
"-s", "UUID",
|
|
"-o", "value",
|
|
"-c", os.devnull,
|
|
"-w", os.devnull,
|
|
self.device])
|
|
proc.wait()
|
|
result = proc.stdout.read().strip()
|
|
if proc.returncode != 0:
|
|
self.debugMessage(
|
|
CryptoBoxLogger.DebugLevels["warn"],
|
|
"retrieving of partition type via 'blkid' failed: %s" % (proc.stderr.read().strip(), ))
|
|
return None
|
|
devnull.close()
|
|
if result: return result
|
|
return self.device.replace(os.path.sep, "_")
|
|
|
|
|
|
def __getTypeOfPartition(self):
|
|
"retrieve the type of the given partition (see CryptoBoxContainer.Types)"
|
|
if self.__isLuksPartition(): return self.Types["luks"]
|
|
typeOfPartition = self.__getTypeIdOfPartition()
|
|
if typeOfPartition in self.__fsTypes["plain"]:
|
|
return self.Types["plain"]
|
|
if typeOfPartition in self.__fsTypes["swap"]:
|
|
return self.Types["swap"]
|
|
return self.Types["unused"]
|
|
|
|
|
|
def __getTypeIdOfPartition(self):
|
|
"returns the type of the partition (see 'man blkid')"
|
|
devnull = None
|
|
try:
|
|
devnull = open(os.devnull, "w")
|
|
except IOError:
|
|
self.debugMessage(
|
|
CryptoBoxLogger.DebugLevels["warn"],
|
|
"Could not open %s" % (os.devnull, ))
|
|
proc = subprocess.Popen(
|
|
shell=False,
|
|
stdin=None,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
args=[self.Progs["blkid"],
|
|
"-s", "TYPE",
|
|
"-o", "value",
|
|
"-c", os.devnull,
|
|
"-w", os.devnull,
|
|
self.device])
|
|
proc.wait()
|
|
output = proc.stdout.read().strip()
|
|
if proc.returncode != 0:
|
|
self.debugMessage(
|
|
CryptoBoxLogger.DebugLevels["warn"],
|
|
"retrieving of partition type via 'blkid' failed: %s" % (proc.stderr.read().strip(), ))
|
|
return None
|
|
devnull.close()
|
|
return output
|
|
|
|
|
|
def __isLuksPartition(self):
|
|
"check if the given device is a luks partition"
|
|
devnull = None
|
|
try:
|
|
devnull = open(os.devnull, "w")
|
|
except IOError:
|
|
self.debugMessage(
|
|
CryptoBoxLogger.DebugLevels["warn"],
|
|
"Could not open %s" % (os.devnull, ))
|
|
proc = subprocess.Popen(
|
|
shell = False,
|
|
stdin = None,
|
|
stdout = devnull,
|
|
stderr = devnull,
|
|
args = [
|
|
self.Progs["cryptsetup"],
|
|
"--batch-mode",
|
|
"isLuks",
|
|
self.device])
|
|
proc.wait()
|
|
devnull.close()
|
|
return proc.returncode == 0
|
|
|
|
|
|
def __getMountPoint(self):
|
|
"return the name of the mountpoint of this volume"
|
|
return os.path.join(self.cbox.cbxPrefs["System"]["MountParentDir"], self.name)
|
|
|
|
|
|
def __mountLuks(self, password):
|
|
"mount a luks partition"
|
|
if not password:
|
|
raise "InvalidPassword", "no password supplied for luksOpen"
|
|
if self.isMounted(): raise "VolumeIsActive", "this container is already active"
|
|
self.__umountLuks()
|
|
try:
|
|
devnull = open(os.devnull, "w")
|
|
except IOError:
|
|
self.debugMessage(
|
|
CryptoBoxLogger.DebugLevels["warn"],
|
|
"Could not open %s" % (os.devnull, ))
|
|
self.__cleanMountDirs()
|
|
if not os.path.exists(self.__getMountPoint()):
|
|
os.mkdir(self.__getMountPoint())
|
|
if not os.path.exists(self.__getMountPoint()):
|
|
errorMsg = "Could not create mountpoint (%s)" % (self.__getMountPoint(), )
|
|
self.debugMessage("error", errorMsg)
|
|
raise "MountError", errorMsg
|
|
proc = subprocess.Popen(
|
|
shell = False,
|
|
stdin = subprocess.PIPE,
|
|
stdout = devnull,
|
|
stderr = subprocess.PIPE,
|
|
args = [
|
|
self.Progs["cryptsetup"],
|
|
"luksOpen",
|
|
"--batch-mode",
|
|
self.device,
|
|
self.name])
|
|
proc.stdin.write(password)
|
|
(output, errout) = proc.communicate()
|
|
if proc.returncode != 0:
|
|
errorMsg = "Could not open the luks mapping: %s" % (errout.strip(), )
|
|
self.debugMessage(CryptoBoxLogger.DebugLevels["warn"], errorMsg)
|
|
raise "MountError", errorMsg
|
|
proc = subprocess.Popen(
|
|
shell = False,
|
|
stdin = None,
|
|
stdout = devnull,
|
|
stderr = subprocess.PIPE,
|
|
args = [
|
|
self.Progs["mount"],
|
|
os.path.join(self.__dmDir, self.name),
|
|
self.__getMountPoint()])
|
|
proc.wait()
|
|
if proc.returncode != 0:
|
|
errorMsg = "Could not mount the filesystem: %s" % (proc.stderr.read().strip(), )
|
|
self.debugMessage(CryptoBoxLogger.DebugLevels["warn"], errorMsg)
|
|
raise "MountError", errorMsg
|
|
devnull.close()
|
|
|
|
|
|
def __umountLuks(self):
|
|
"umount a luks partition"
|
|
devnull = None
|
|
try:
|
|
devnull = open(os.devnull, "w")
|
|
except IOError:
|
|
self.debugMessage(
|
|
CryptoBoxLogger.DebugLevels["warn"],
|
|
"Could not open %s" % (os.devnull, ))
|
|
if self.isMounted():
|
|
proc = subprocess.Popen(
|
|
shell = False,
|
|
stdin = None,
|
|
stdout = devnull,
|
|
stderr = subprocess.PIPE,
|
|
args = [self.Progs["umount"], "-l", self.__getMountPoint()])
|
|
proc.wait()
|
|
if proc.returncode != 0:
|
|
errorMsg = "Could not umount the filesystem: %s" % (proc.stderr.read().strip(), )
|
|
self.debugMessage(CryptoBoxLogger.DebugLevels["warn"], errorMsg)
|
|
raise "MountError", errorMsg
|
|
if os.path.exists(os.path.join(self.__dmDir, self.name)):
|
|
proc = subprocess.Popen(
|
|
shell = False,
|
|
stdin = None,
|
|
stdout = devnull,
|
|
stderr = subprocess.PIPE,
|
|
args = [
|
|
self.Progs["cryptsetup"],
|
|
"--batch-mode",
|
|
"luksClose",
|
|
self.name])
|
|
proc.wait()
|
|
if proc.returncode != 0:
|
|
errorMsg = "Could not remove the luks mapping: %s" % (proc.stderr.read().strip(), )
|
|
self.debugMessage(CryptoBoxLogger.DebugLevels["warn"], errorMsg)
|
|
raise "MountError", errorMsg
|
|
devnull.close()
|
|
|
|
|
|
def __mountPlain(self):
|
|
"mount a plaintext partition"
|
|
if self.isMounted(): raise "VolumeIsActive", "this container is already active"
|
|
devnull = None
|
|
try:
|
|
devnull = open(os.devnull, "w")
|
|
except IOError:
|
|
self.debugMessage(
|
|
CryptoBoxLogger.DebugLevels["warn"],
|
|
"Could not open %s" % (os.devnull, ))
|
|
self.__cleanMountDirs()
|
|
if not os.path.exists(self.__getMountPoint()):
|
|
os.mkdir(self.__getMountPoint())
|
|
if not os.path.exists(self.__getMountPoint()):
|
|
errorMsg = "Could not create mountpoint (%s)" % (self.__getMountPoint(), )
|
|
self.debugMessage("error", errorMsg)
|
|
raise "MountError", errorMsg
|
|
proc = subprocess.Popen(
|
|
shell = False,
|
|
stdin = None,
|
|
stdout = devnull,
|
|
stderr = subprocess.PIPE,
|
|
args = [
|
|
self.Progs["mount"],
|
|
self.device,
|
|
self.__getMountPoint()])
|
|
proc.wait()
|
|
if proc.returncode != 0:
|
|
errorMsg = "Could not mount the filesystem: %s" % (proc.stderr.read().strip(), )
|
|
self.debugMessage(CryptoBoxLogger.DebugLevels["warn"], errorMsg)
|
|
raise "MountError", errorMsg
|
|
devnull.close()
|
|
|
|
|
|
def __umountPlain(self):
|
|
"umount a plaintext partition"
|
|
devnull = None
|
|
try:
|
|
devnull = open(os.devnull, "w")
|
|
except IOError:
|
|
self.debugMessage(
|
|
CryptoBoxLogger.DebugLevels["warn"],
|
|
"Could not open %s" % (os.devnull, ))
|
|
if self.isMounted():
|
|
proc = subprocess.Popen(
|
|
shell = False,
|
|
stdin = None,
|
|
stdout = devnull,
|
|
stderr = subprocess.PIPE,
|
|
args = [
|
|
self.Progs["umount"],
|
|
"-l",
|
|
self.__getMountPoint()])
|
|
proc.wait()
|
|
if proc.returncode != 0:
|
|
errorMsg = "Could not umount the filesystem: %s" % (proc.stderr.read().strip(), )
|
|
self.debugMessage(CryptoBoxLogger.DebugLevels["warn"], errorMsg)
|
|
raise "MountError", errorMsg
|
|
devnull.close()
|
|
|
|
|
|
def __createPlain(self):
|
|
"make a plaintext partition"
|
|
if self.isMounted():
|
|
raise "VolumeIsActive", \
|
|
"deactivate the partition before filesystem initialization"
|
|
devnull = None
|
|
try:
|
|
devnull = open(os.devnull, "w")
|
|
except IOError:
|
|
self.debugMessage(
|
|
CryptoBoxLogger.DebugLevels["warn"],
|
|
"Could not open %s" % (os.devnull, ))
|
|
proc = subprocess.Popen(
|
|
shell = False,
|
|
stdin = None,
|
|
stdout = devnull,
|
|
stderr = subprocess.PIPE,
|
|
args = [
|
|
self.Progs["mkfs-data"],
|
|
self.device])
|
|
proc.wait()
|
|
if proc.returncode != 0:
|
|
errorMsg = "Could not create the filesystem: %s" % (proc.stderr.read().strip(), )
|
|
self.debugMessage(CryptoBoxLogger.DebugLevels["error"], errorMsg)
|
|
raise "CreateError", errorMsg
|
|
devnull.close()
|
|
|
|
|
|
def __createLuks(self, password):
|
|
"make a luks partition"
|
|
if not password:
|
|
raise "InvalidPassword", "no password supplied for new luks mapping"
|
|
if self.isMounted():
|
|
raise "VolumeIsActive", \
|
|
"deactivate the partition before filesystem initialization"
|
|
devnull = None
|
|
try:
|
|
devnull = open(os.devnull, "w")
|
|
except IOError:
|
|
self.debugMessage(
|
|
CryptoBoxLogger.DebugLevels["warn"],
|
|
"Could not open %s" % (os.devnull, ))
|
|
"remove any potential open luks mapping"
|
|
self.__umountLuks()
|
|
"create the luks header"
|
|
proc = subprocess.Popen(
|
|
shell = False,
|
|
stdin = subprocess.PIPE,
|
|
stdout = devnull,
|
|
stderr = subprocess.PIPE,
|
|
args = [
|
|
self.Progs["cryptsetup"],
|
|
"--batch-mode",
|
|
"--cipher", self.cbox.cbxPrefs["System"]["DefaultCipher"],
|
|
"--iter-time", "2000",
|
|
"luksFormat",
|
|
self.device])
|
|
proc.stdin.write(password)
|
|
(output, errout) = proc.communicate()
|
|
if proc.returncode != 0:
|
|
errorMsg = "Could not create the luks header: %s" % (errout.strip(), )
|
|
self.debugMessage("error", errorMsg)
|
|
raise "CreateError", errorMsg
|
|
"open the luks container for mkfs"
|
|
proc = subprocess.Popen(
|
|
shell = False,
|
|
stdin = subprocess.PIPE,
|
|
stdout = devnull,
|
|
stderr = subprocess.PIPE,
|
|
args = [
|
|
self.Progs["cryptsetup"],
|
|
"--batch-mode",
|
|
"luksOpen",
|
|
self.device,
|
|
self.name])
|
|
proc.stdin.write(password)
|
|
(output, errout) = proc.communicate()
|
|
if proc.returncode != 0:
|
|
errorMsg = "Could not open the new luks mapping: %s" % (errout.strip(), )
|
|
self.debugMessage(CryptoBoxLogger.DebugLevels["error"], errorMsg)
|
|
raise "CreateError", errorMsg
|
|
"make the filesystem"
|
|
proc = subprocess.Popen(
|
|
shell = False,
|
|
stdin = None,
|
|
stdout = devnull,
|
|
stderr = subprocess.PIPE,
|
|
args = [
|
|
self.Progs["mkfs-data"],
|
|
os.path.join(self.__dmDir, self.name)])
|
|
proc.wait()
|
|
"remove the mapping - for every exit status"
|
|
self.__umountLuks()
|
|
if proc.returncode != 0:
|
|
errorMsg = "Could not create the filesystem: %s" % (proc.stderr.read().strip(), )
|
|
self.debugMessage(CryptoBoxLogger.DebugLevels["error"], errorMsg)
|
|
"remove the luks mapping"
|
|
raise "CreateError", errorMsg
|
|
devnull.close()
|
|
|
|
|
|
def __cleanMountDirs(self):
|
|
""" remove all unnecessary subdirs of the mount parent directory
|
|
this should be called for every (u)mount """
|
|
subdirs = os.listdir(self.cbox.cbxPrefs["System"]["MountParentDir"])
|
|
for dir in subdirs:
|
|
abs_dir = os.path.join(self.cbox.cbxPrefs["System"]["MountParentDir"], dir)
|
|
if (not os.path.islink(abs_dir)) and os.path.isdir(abs_dir) and (not os.path.ismount(abs_dir)): os.rmdir(abs_dir)
|
|
|
|
|