#!/usr/bin/env python2.4 ## check python version import sys (ver_major, ver_minor, ver_sub, ver_desc, ver_subsub) = sys.version_info if (ver_major < 2) or ((ver_major == 2) and (ver_minor < 4)): sys.stderr.write("You need a python version >= 2.4\nCurrent version is:\n %s\n" % sys.version) sys.exit(1) import subprocess import os import re import logging """exceptions: VolumeIsActive NameActivelyUsed InvalidName InvalidPassword InvalidType CreateError MountError ChangePasswordError """ class CryptoBoxContainer: Types = { "unused":0, "plain":1, "luks":2, "swap":3} __fsTypes = { "plain":["ext3", "ext2", "vfat", "reiser"], "swap":["swap"]} # TODO: more filesystem types? / check 'reiser' __dmDir = "/dev/mapper" def __init__(self, device, cbox): self.device = device self.cbox = cbox self.log = logging.getLogger("CryptoBox") self.Progs = self.cbox.prefs["Programs"] self.resetObject() def getName(self): return self.name def setName(self, new_name): if new_name == self.name: return if self.isMounted(): raise "VolumeIsActive", "the container must be inactive during renaming" if not re.search(r'^[a-zA-Z0-9_\.\- ]+$', new_name): raise "InvalidName", "the supplied new name contains illegal characters" "check for active partitions with the same name" prev_name_owner = self.cbox.getContainerList(filterName=new_name) if prev_name_owner: for a in prev_name_owner: if a.isMounted(): raise "NameActivelyUsed", "the supplied new name is already in use for an active partition" self.cbox.setNameForUUID(self.uuid, new_name) self.name = new_name def getDevice(self): return self.device def getType(self): return self.type def isMounted(self): return os.path.ismount(self.__getMountPoint()) def getCapacity(self): """return the current capacity state of the volume the result is a tuple of values in megabyte: (size, available, used) """ info = os.statvfs(self.__getMountPoint()) return ( int(info.f_bsize*info.f_blocks/1024/1024), int(info.f_bsize*info.f_bavail/1024/1024), int(info.f_bsize*(info.f_blocks-info.f_bavail)/1024/1024)) def resetObject(self): """ recheck the information about this container this is especially useful after changing the type via 'create' """ self.uuid = self.__getUUID() self.type = self.__getTypeOfPartition() self.name = self.__getNameOfContainer() if self.type == self.Types["luks"]: self.mount = self.__mountLuks self.umount = self.__umountLuks if self.type == self.Types["plain"]: self.mount = self.__mountPlain self.umount = self.__umountPlain def create(self, type, password=None): if type == self.Types["luks"]: self.__createLuks(password) self.resetObject() return if type == self.Types["plain"]: self.__createPlain() self.resetObject() return raise "InvalidType", "invalid container type (%d) supplied" % (type, ) def changePassword(self, oldpw, newpw): if self.type != self.Types["luks"]: raise "InvalidType", \ "changing of password is possible only for luks containers" if not oldpw: raise "InvalidPassword", "no old password supplied for password change" if not newpw: raise "InvalidPassword", "no new password supplied for password change" "return if new and old passwords are the same" if oldpw == newpw: return if self.isMounted(): raise "VolumeIsActive", "this container is currently active" devnull = None try: devnull = open(os.devnull, "w") except IOError: self.log.warn("Could not open %s" % (os.devnull, )) "remove any potential open luks mapping" self.__umountLuks() "create the luks header" proc = subprocess.Popen( shell = False, stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE, args = [ self.Progs["super"], self.Progs["CryptoBoxRootActions"], "cryptsetup", "luksAddKey", self.device, "--batch-mode"]) proc.stdin.write("%s\n%s" % (oldpw, newpw)) (output, errout) = proc.communicate() if proc.returncode != 0: errorMsg = "Could not add a new luks key: %s - %s" % (output.strip(), errout.strip(), ) self.log.error(errorMsg) raise "ChangePasswordError", errorMsg ## retrieve the key slot we used for unlocking keys_found = re.search(r'key slot (\d{1,3}) unlocked', output).groups() if keys_found: keyslot = int(keys_found[0]) else: raise "ChangePasswordError", "could not get the old key slot" "remove the old key" proc = subprocess.Popen( shell = False, stdin = None, stdout = devnull, stderr = subprocess.PIPE, args = [ self.Progs["cryptsetup"], "--batch-mode", "luksDelKey", self.device, "%d" % (keyslot, )]) proc.wait() if proc.returncode != 0: errorMsg = "Could not remove the old luks key: %s" % (proc.stderr.read().strip(), ) self.log.error(errorMsg) raise "ChangePasswordError", errorMsg " ****************** internal stuff ********************* " def __getNameOfContainer(self): "retrieve the name of the container by querying the database" def_name = self.cbox.getNameForUUID(self.uuid) if def_name: return def_name "there is no name defined for this uuid - we will propose a good one" prefix = self.cbox.prefs["Main"]["DefaultVolumePrefix"] unused_found = False counter = 1 while not unused_found: guess = prefix + str(counter) if self.cbox.getUUIDForName(guess): counter += 1 else: unused_found = True self.cbox.setNameForUUID(self.uuid, guess) return guess def __getUUID(self): """return UUID for luks partitions, ext2/3 and vfat filesystems""" emergency_default = self.device.replace(os.path.sep, "_") devnull = None try: devnull = open(os.devnull, "w") except IOError: self.warn("Could not open %s" % (os.devnull, )) proc = subprocess.Popen( shell=False, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE, args=[self.Progs["blkid"], "-s", "UUID", "-o", "value", "-c", os.devnull, "-w", os.devnull, self.device]) proc.wait() result = proc.stdout.read().strip() if proc.returncode != 0: self.log.warn("retrieving of partition type via 'blkid' failed: %s" % (proc.stderr.read().strip(), )) return emergency_default devnull.close() if result: return result return emergency_default def __getTypeOfPartition(self): "retrieve the type of the given partition (see CryptoBoxContainer.Types)" if self.__isLuksPartition(): return self.Types["luks"] typeOfPartition = self.__getTypeIdOfPartition() if typeOfPartition in self.__fsTypes["plain"]: return self.Types["plain"] if typeOfPartition in self.__fsTypes["swap"]: return self.Types["swap"] return self.Types["unused"] def __getTypeIdOfPartition(self): "returns the type of the partition (see 'man blkid')" devnull = None try: devnull = open(os.devnull, "w") except IOError: self.log.warn("Could not open %s" % (os.devnull, )) proc = subprocess.Popen( shell=False, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE, args=[self.Progs["blkid"], "-s", "TYPE", "-o", "value", "-c", os.devnull, "-w", os.devnull, self.device]) proc.wait() output = proc.stdout.read().strip() if proc.returncode != 0: self.log.warn("retrieving of partition type via 'blkid' failed: %s" % (proc.stderr.read().strip(), )) return None devnull.close() return output def __isLuksPartition(self): "check if the given device is a luks partition" devnull = None try: devnull = open(os.devnull, "w") except IOError: self.log.warn("Could not open %s" % (os.devnull, )) proc = subprocess.Popen( shell = False, stdin = None, stdout = devnull, stderr = devnull, args = [ self.Progs["cryptsetup"], "--batch-mode", "isLuks", self.device]) proc.wait() devnull.close() return proc.returncode == 0 def __getMountPoint(self): "return the name of the mountpoint of this volume" return os.path.join(self.cbox.prefs["Locations"]["MountParentDir"], self.name) def __mountLuks(self, password): "mount a luks partition" if not password: raise "InvalidPassword", "no password supplied for luksOpen" if self.isMounted(): raise "VolumeIsActive", "this container is already active" self.__umountLuks() try: devnull = open(os.devnull, "w") except IOError: self.log.warn("Could not open %s" % (os.devnull, )) self.__cleanMountDirs() if not os.path.exists(self.__getMountPoint()): os.mkdir(self.__getMountPoint()) if not os.path.exists(self.__getMountPoint()): errorMsg = "Could not create mountpoint (%s)" % (self.__getMountPoint(), ) self.log.error(errorMsg) raise "MountError", errorMsg proc = subprocess.Popen( shell = False, stdin = subprocess.PIPE, stdout = devnull, stderr = subprocess.PIPE, args = [ self.Progs["super"], self.Progs["CryptoBoxRootActions"], "cryptsetup", "luksOpen", self.device, self.name, "--batch-mode"]) proc.stdin.write(password) (output, errout) = proc.communicate() if proc.returncode != 0: errorMsg = "Could not open the luks mapping: %s" % (errout.strip(), ) self.log.warn(errorMsg) raise "MountError", errorMsg proc = subprocess.Popen( shell = False, stdin = None, stdout = devnull, stderr = subprocess.PIPE, args = [ self.Progs["super"], self.Progs["CryptoBoxRootActions"], "mount", os.path.join(self.__dmDir, self.name), self.__getMountPoint()]) proc.wait() if proc.returncode != 0: errorMsg = "Could not mount the filesystem: %s" % (proc.stderr.read().strip(), ) self.log.warn(errorMsg) raise "MountError", errorMsg devnull.close() def __umountLuks(self): "umount a luks partition" devnull = None try: devnull = open(os.devnull, "w") except IOError: self.log.warn("Could not open %s" % (os.devnull, )) if self.isMounted(): proc = subprocess.Popen( shell = False, stdin = None, stdout = devnull, stderr = subprocess.PIPE, args = [ self.Progs["super"], self.Progs["CryptoBoxRootActions"], "umount", self.__getMountPoint()]) proc.wait() if proc.returncode != 0: errorMsg = "Could not umount the filesystem: %s" % (proc.stderr.read().strip(), ) self.log.warn(errorMsg) raise "MountError", errorMsg if os.path.exists(os.path.join(self.__dmDir, self.name)): proc = subprocess.Popen( shell = False, stdin = None, stdout = devnull, stderr = subprocess.PIPE, args = [ self.Progs["super"], self.Progs["CryptoBoxRootActions"], "cryptsetup", "luksClose", self.name, "--batch-mode"]) proc.wait() if proc.returncode != 0: errorMsg = "Could not remove the luks mapping: %s" % (proc.stderr.read().strip(), ) self.log.warn(errorMsg) raise "MountError", errorMsg devnull.close() def __mountPlain(self): "mount a plaintext partition" if self.isMounted(): raise "VolumeIsActive", "this container is already active" devnull = None try: devnull = open(os.devnull, "w") except IOError: self.log.warn("Could not open %s" % (os.devnull, )) self.__cleanMountDirs() if not os.path.exists(self.__getMountPoint()): os.mkdir(self.__getMountPoint()) if not os.path.exists(self.__getMountPoint()): errorMsg = "Could not create mountpoint (%s)" % (self.__getMountPoint(), ) self.log.error(errorMsg) raise "MountError", errorMsg proc = subprocess.Popen( shell = False, stdin = None, stdout = devnull, stderr = subprocess.PIPE, args = [ self.Progs["super"], self.Progs["CryptoBoxRootActions"], "mount", self.device, self.__getMountPoint()]) proc.wait() if proc.returncode != 0: errorMsg = "Could not mount the filesystem: %s" % (proc.stderr.read().strip(), ) self.log.warn(errorMsg) raise "MountError", errorMsg devnull.close() def __umountPlain(self): "umount a plaintext partition" devnull = None try: devnull = open(os.devnull, "w") except IOError: self.log.warn("Could not open %s" % (os.devnull, )) if self.isMounted(): proc = subprocess.Popen( shell = False, stdin = None, stdout = devnull, stderr = subprocess.PIPE, args = [ self.Progs["super"], self.Progs["CryptoBoxRootActions"], "umount", self.__getMountPoint()]) proc.wait() if proc.returncode != 0: errorMsg = "Could not umount the filesystem: %s" % (proc.stderr.read().strip(), ) self.log.warn(errorMsg) raise "MountError", errorMsg devnull.close() def __createPlain(self): "make a plaintext partition" if self.isMounted(): raise "VolumeIsActive", \ "deactivate the partition before filesystem initialization" devnull = None try: devnull = open(os.devnull, "w") except IOError: self.log.warn("Could not open %s" % (os.devnull, )) proc = subprocess.Popen( shell = False, stdin = None, stdout = devnull, stderr = subprocess.PIPE, args = [ self.Progs["mkfs-data"], self.device]) proc.wait() if proc.returncode != 0: errorMsg = "Could not create the filesystem: %s" % (proc.stderr.read().strip(), ) self.log.error(errorMsg) raise "CreateError", errorMsg devnull.close() def __createLuks(self, password): "make a luks partition" if not password: raise "InvalidPassword", "no password supplied for new luks mapping" if self.isMounted(): raise "VolumeIsActive", \ "deactivate the partition before filesystem initialization" devnull = None try: devnull = open(os.devnull, "w") except IOError: self.log.warn("Could not open %s" % (os.devnull, )) "remove any potential open luks mapping" self.__umountLuks() "create the luks header" proc = subprocess.Popen( shell = False, stdin = subprocess.PIPE, stdout = devnull, stderr = subprocess.PIPE, args = [ self.Progs["super"], self.Progs["CryptoBoxRootActions"], "cryptsetup", "luksFormat", self.device, "--batch-mode", "--cipher", self.cbox.prefs["Main"]["DefaultCipher"], "--iter-time", "2000"]) proc.stdin.write(password) (output, errout) = proc.communicate() if proc.returncode != 0: errorMsg = "Could not create the luks header: %s" % (errout.strip(), ) self.log.error(errorMsg) raise "CreateError", errorMsg "open the luks container for mkfs" proc = subprocess.Popen( shell = False, stdin = subprocess.PIPE, stdout = devnull, stderr = subprocess.PIPE, args = [ self.Progs["super"], self.Progs["CryptoBoxRootActions"], "cryptsetup", "luksOpen", self.device, self.name, "--batch-mode"]) proc.stdin.write(password) (output, errout) = proc.communicate() if proc.returncode != 0: errorMsg = "Could not open the new luks mapping: %s" % (errout.strip(), ) self.log.error(errorMsg) raise "CreateError", errorMsg "make the filesystem" proc = subprocess.Popen( shell = False, stdin = None, stdout = devnull, stderr = subprocess.PIPE, args = [ self.Progs["mkfs-data"], os.path.join(self.__dmDir, self.name)]) proc.wait() "remove the mapping - for every exit status" self.__umountLuks() if proc.returncode != 0: errorMsg = "Could not create the filesystem: %s" % (proc.stderr.read().strip(), ) self.log.error(errorMsg) "remove the luks mapping" raise "CreateError", errorMsg devnull.close() def __cleanMountDirs(self): """ remove all unnecessary subdirs of the mount parent directory this should be called for every (u)mount """ subdirs = os.listdir(self.cbox.prefs["Locations"]["MountParentDir"]) for dir in subdirs: abs_dir = os.path.join(self.cbox.prefs["Locations"]["MountParentDir"], dir) if (not os.path.islink(abs_dir)) and os.path.isdir(abs_dir) and (not os.path.ismount(abs_dir)): os.rmdir(abs_dir)