#!/usr/bin/env python2.4 ## check python version import sys (ver_major, ver_minor, ver_sub, ver_desc, ver_subsub) = sys.version_info if (ver_major < 2) or ((ver_major == 2) and (ver_minor < 4)): sys.stderr.write("You need a python version >= 2.4\nCurrent version is:\n %s\n" % sys.version) sys.exit(1) import subprocess import os import re import logging from CryptoBoxExceptions import * """exceptions: CBVolumeIsActive CBNameActivelyUsed CBInvalidName CBInvalidPassword CBInvalidType CBCreateError CBMountError CBChangePasswordError """ class CryptoBoxContainer: Types = { "unused":0, "plain":1, "luks":2, "swap":3} __fsTypes = { "plain":["ext3", "ext2", "vfat", "reiser"], "swap":["swap"]} # TODO: more filesystem types? / check 'reiser' __dmDir = "/dev/mapper" def __init__(self, device, cbox): self.device = device self.cbox = cbox self.log = logging.getLogger("CryptoBox") self.resetObject() def getName(self): return self.name def setName(self, new_name): if new_name == self.name: return if self.isMounted(): raise CBVolumeIsActive("the container must be inactive during renaming") if not re.search(r'^[a-zA-Z0-9_\.\- ]+$', new_name): raise CBInvalidName("the supplied new name contains illegal characters") "check for active partitions with the same name" prev_name_owner = self.cbox.getContainerList(filterName=new_name) if prev_name_owner: for a in prev_name_owner: if a.isMounted(): raise CBNameActivelyUsed("the supplied new name is already in use for an active partition") if not self.cbox.setNameForUUID(self.uuid, new_name): raise CBContainerError("failed to change the volume name for unknown reasons") self.name = new_name def getDevice(self): return self.device def getType(self): return self.type def isMounted(self): return os.path.ismount(self.__getMountPoint()) def getCapacity(self): """return the current capacity state of the volume the volume may not be mounted the result is a tuple of values in megabyte: (size, available, used) """ info = os.statvfs(self.__getMountPoint()) return ( int(info.f_bsize*info.f_blocks/1024/1024), int(info.f_bsize*info.f_bavail/1024/1024), int(info.f_bsize*(info.f_blocks-info.f_bavail)/1024/1024)) def getSize(self): """return the size of the block device (_not_ of the filesystem) the result is a value in megabyte an error is indicated by "-1" """ import CryptoBoxTools return CryptoBoxTools.getBlockDeviceSize(self.device) def resetObject(self): """ recheck the information about this container this is especially useful after changing the type via 'create' """ self.uuid = self.__getUUID() self.type = self.__getTypeOfPartition() self.name = self.__getNameOfContainer() if self.type == self.Types["luks"]: self.mount = self.__mountLuks self.umount = self.__umountLuks elif self.type == self.Types["plain"]: self.mount = self.__mountPlain self.umount = self.__umountPlain def create(self, type, password=None): old_name = self.getName() if type == self.Types["luks"]: self.__createLuks(password) elif type == self.Types["plain"]: self.__createPlain() else: raise CBInvalidType("invalid container type (%d) supplied" % (type, )) ## no exception was raised during creation -> we can continue ## reset the properties (encryption state, ...) of the device self.resetObject() ## restore the old name (must be after resetObject) self.setName(old_name) def changePassword(self, oldpw, newpw): if self.type != self.Types["luks"]: raise CBInvalidType("changing of password is possible only for luks containers") if not oldpw: raise CBInvalidPassword("no old password supplied for password change") if not newpw: raise CBInvalidPassword("no new password supplied for password change") "return if new and old passwords are the same" if oldpw == newpw: return if self.isMounted(): raise CBVolumeIsActive("this container is currently active") devnull = None try: devnull = open(os.devnull, "w") except IOError: self.log.warn("Could not open %s" % (os.devnull, )) "remove any potential open luks mapping" self.__umountLuks() "create the luks header" proc = subprocess.Popen( shell = False, stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE, args = [ self.cbox.prefs["Programs"]["super"], self.cbox.prefs["Programs"]["CryptoBoxRootActions"], "cryptsetup", "luksAddKey", self.device, "--batch-mode"]) proc.stdin.write("%s\n%s" % (oldpw, newpw)) (output, errout) = proc.communicate() if proc.returncode != 0: errorMsg = "Could not add a new luks key: %s - %s" % (output.strip(), errout.strip(), ) self.log.error(errorMsg) raise CBChangePasswordError(errorMsg) ## retrieve the key slot we used for unlocking keys_found = re.search(r'key slot (\d{1,3}) unlocked', output).groups() if keys_found: keyslot = int(keys_found[0]) else: raise CBChangePasswordError("could not get the old key slot") "remove the old key" proc = subprocess.Popen( shell = False, stdin = None, stdout = devnull, stderr = subprocess.PIPE, args = [ self.cbox.prefs["Programs"]["cryptsetup"], "--batch-mode", "luksDelKey", self.device, "%d" % (keyslot, )]) proc.wait() if proc.returncode != 0: errorMsg = "Could not remove the old luks key: %s" % (proc.stderr.read().strip(), ) self.log.error(errorMsg) raise CBChangePasswordError(errorMsg) " ****************** internal stuff ********************* " def __getNameOfContainer(self): "retrieve the name of the container by querying the database" def_name = self.cbox.getNameForUUID(self.uuid) if def_name: return def_name "there is no name defined for this uuid - we will propose a good one" prefix = self.cbox.prefs["Main"]["DefaultVolumePrefix"] unused_found = False counter = 1 while not unused_found: guess = prefix + str(counter) if self.cbox.getUUIDForName(guess): counter += 1 else: unused_found = True self.cbox.setNameForUUID(self.uuid, guess) return guess def __getUUID(self): if self.__getTypeOfPartition() == self.Types["luks"]: guess = self.__getLuksUUID() else: guess = self.__getNonLuksUUID() ## did we get a valid value? if guess: return guess else: ## emergency default value return self.device.replace(os.path.sep, "_") def __getLuksUUID(self): """get uuid for luks devices""" proc = subprocess.Popen( shell = False, stdout = subprocess.PIPE, stderr = subprocess.PIPE, args = [self.cbox.prefs["Programs"]["cryptsetup"], "luksUUID", self.device]) (stdout, stderr) = proc.communicate() if proc.returncode != 0: self.cbox.log.info("could not retrieve luks uuid (%s): %s", (self.device, stderr.strip())) return None return stdout.strip() def __getNonLuksUUID(self): """return UUID for ext2/3 and vfat filesystems""" try: devnull = open(os.devnull, "w") except IOError: self.warn("Could not open %s" % (os.devnull, )) proc = subprocess.Popen( shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, args=[self.cbox.prefs["Programs"]["blkid"], "-s", "UUID", "-o", "value", "-c", os.devnull, "-w", os.devnull, self.device]) (stdout, stderr) = proc.communicate() devnull.close() ## execution failed? if proc.returncode != 0: self.log.info("retrieving of partition type (%s) via 'blkid' failed: %s - maybe it is encrypted?" % (self.device, stderr.strip())) return None ## return output of blkid return stdout.strip() def __getTypeOfPartition(self): "retrieve the type of the given partition (see CryptoBoxContainer.Types)" if self.__isLuksPartition(): return self.Types["luks"] typeOfPartition = self.__getTypeIdOfPartition() if typeOfPartition in self.__fsTypes["plain"]: return self.Types["plain"] if typeOfPartition in self.__fsTypes["swap"]: return self.Types["swap"] return self.Types["unused"] def __getTypeIdOfPartition(self): "returns the type of the partition (see 'man blkid')" devnull = None try: devnull = open(os.devnull, "w") except IOError: self.log.warn("Could not open %s" % (os.devnull, )) proc = subprocess.Popen( shell=False, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE, args=[self.cbox.prefs["Programs"]["blkid"], "-s", "TYPE", "-o", "value", "-c", os.devnull, "-w", os.devnull, self.device]) proc.wait() output = proc.stdout.read().strip() if proc.returncode != 0: self.log.warn("retrieving of partition type via 'blkid' failed: %s" % (proc.stderr.read().strip(), )) return None devnull.close() return output def __isLuksPartition(self): "check if the given device is a luks partition" devnull = None try: devnull = open(os.devnull, "w") except IOError: self.log.warn("Could not open %s" % (os.devnull, )) proc = subprocess.Popen( shell = False, stdin = None, stdout = devnull, stderr = devnull, args = [ self.cbox.prefs["Programs"]["cryptsetup"], "--batch-mode", "isLuks", self.device]) proc.wait() devnull.close() return proc.returncode == 0 def __getMountPoint(self): "return the name of the mountpoint of this volume" return os.path.join(self.cbox.prefs["Locations"]["MountParentDir"], self.name) def __mountLuks(self, password): "mount a luks partition" if not password: raise CBInvalidPassword("no password supplied for luksOpen") if self.isMounted(): raise CBVolumeIsActive("this container is already active") self.__umountLuks() try: devnull = open(os.devnull, "w") except IOError: self.log.warn("Could not open %s" % (os.devnull, )) self.__cleanMountDirs() if not os.path.exists(self.__getMountPoint()): os.mkdir(self.__getMountPoint()) if not os.path.exists(self.__getMountPoint()): errorMsg = "Could not create mountpoint (%s)" % (self.__getMountPoint(), ) self.log.error(errorMsg) raise CBMountError(errorMsg) proc = subprocess.Popen( shell = False, stdin = subprocess.PIPE, stdout = devnull, stderr = subprocess.PIPE, args = [ self.cbox.prefs["Programs"]["super"], self.cbox.prefs["Programs"]["CryptoBoxRootActions"], "cryptsetup", "luksOpen", self.device, self.name, "--batch-mode"]) proc.stdin.write(password) (output, errout) = proc.communicate() if proc.returncode != 0: errorMsg = "Could not open the luks mapping: %s" % (errout.strip(), ) self.log.warn(errorMsg) raise CBMountError(errorMsg) proc = subprocess.Popen( shell = False, stdin = None, stdout = devnull, stderr = subprocess.PIPE, args = [ self.cbox.prefs["Programs"]["super"], self.cbox.prefs["Programs"]["CryptoBoxRootActions"], "mount", os.path.join(self.__dmDir, self.name), self.__getMountPoint()]) proc.wait() if proc.returncode != 0: errorMsg = "Could not mount the filesystem: %s" % (proc.stderr.read().strip(), ) self.log.warn(errorMsg) raise CBMountError(errorMsg) devnull.close() def __umountLuks(self): "umount a luks partition" devnull = None try: devnull = open(os.devnull, "w") except IOError: self.log.warn("Could not open %s" % (os.devnull, )) if self.isMounted(): proc = subprocess.Popen( shell = False, stdin = None, stdout = devnull, stderr = subprocess.PIPE, args = [ self.cbox.prefs["Programs"]["super"], self.cbox.prefs["Programs"]["CryptoBoxRootActions"], "umount", self.__getMountPoint()]) proc.wait() if proc.returncode != 0: errorMsg = "Could not umount the filesystem: %s" % (proc.stderr.read().strip(), ) self.log.warn(errorMsg) raise CBUmountError(errorMsg) if os.path.exists(os.path.join(self.__dmDir, self.name)): proc = subprocess.Popen( shell = False, stdin = None, stdout = devnull, stderr = subprocess.PIPE, args = [ self.cbox.prefs["Programs"]["super"], self.cbox.prefs["Programs"]["CryptoBoxRootActions"], "cryptsetup", "luksClose", self.name, "--batch-mode"]) proc.wait() if proc.returncode != 0: errorMsg = "Could not remove the luks mapping: %s" % (proc.stderr.read().strip(), ) self.log.warn(errorMsg) raise CBUmountError(errorMsg) devnull.close() def __mountPlain(self): "mount a plaintext partition" if self.isMounted(): raise CBVolumeIsActive("this container is already active") devnull = None try: devnull = open(os.devnull, "w") except IOError: self.log.warn("Could not open %s" % (os.devnull, )) self.__cleanMountDirs() if not os.path.exists(self.__getMountPoint()): os.mkdir(self.__getMountPoint()) if not os.path.exists(self.__getMountPoint()): errorMsg = "Could not create mountpoint (%s)" % (self.__getMountPoint(), ) self.log.error(errorMsg) raise CBMountError(errorMsg) proc = subprocess.Popen( shell = False, stdin = None, stdout = devnull, stderr = subprocess.PIPE, args = [ self.cbox.prefs["Programs"]["super"], self.cbox.prefs["Programs"]["CryptoBoxRootActions"], "mount", self.device, self.__getMountPoint()]) proc.wait() if proc.returncode != 0: errorMsg = "Could not mount the filesystem: %s" % (proc.stderr.read().strip(), ) self.log.warn(errorMsg) raise CBMountError(errorMsg) devnull.close() def __umountPlain(self): "umount a plaintext partition" devnull = None try: devnull = open(os.devnull, "w") except IOError: self.log.warn("Could not open %s" % (os.devnull, )) if self.isMounted(): proc = subprocess.Popen( shell = False, stdin = None, stdout = devnull, stderr = subprocess.PIPE, args = [ self.cbox.prefs["Programs"]["super"], self.cbox.prefs["Programs"]["CryptoBoxRootActions"], "umount", self.__getMountPoint()]) proc.wait() if proc.returncode != 0: errorMsg = "Could not umount the filesystem: %s" % (proc.stderr.read().strip(), ) self.log.warn(errorMsg) raise CBUmountError(errorMsg) devnull.close() def __createPlain(self): "make a plaintext partition" if self.isMounted(): raise CBVolumeIsActive("deactivate the partition before filesystem initialization") devnull = None try: devnull = open(os.devnull, "w") except IOError: self.log.warn("Could not open %s" % (os.devnull, )) proc = subprocess.Popen( shell = False, stdin = None, stdout = devnull, stderr = subprocess.PIPE, args = [ self.cbox.prefs["Programs"]["mkfs-data"], self.device]) proc.wait() if proc.returncode != 0: errorMsg = "Could not create the filesystem: %s" % (proc.stderr.read().strip(), ) self.log.error(errorMsg) raise CBCreateError(errorMsg) devnull.close() def __createLuks(self, password): "make a luks partition" if not password: raise CBInvalidPassword("no password supplied for new luks mapping") if self.isMounted(): raise CBVolumeIsActive("deactivate the partition before filesystem initialization") devnull = None try: devnull = open(os.devnull, "w") except IOError: self.log.warn("Could not open %s" % (os.devnull, )) "remove any potential open luks mapping" self.__umountLuks() "create the luks header" proc = subprocess.Popen( shell = False, stdin = subprocess.PIPE, stdout = devnull, stderr = subprocess.PIPE, args = [ self.cbox.prefs["Programs"]["super"], self.cbox.prefs["Programs"]["CryptoBoxRootActions"], "cryptsetup", "luksFormat", self.device, "--batch-mode", "--cipher", self.cbox.prefs["Main"]["DefaultCipher"], "--iter-time", "2000"]) proc.stdin.write(password) (output, errout) = proc.communicate() if proc.returncode != 0: errorMsg = "Could not create the luks header: %s" % (errout.strip(), ) self.log.error(errorMsg) raise CBCreateError(errorMsg) "open the luks container for mkfs" proc = subprocess.Popen( shell = False, stdin = subprocess.PIPE, stdout = devnull, stderr = subprocess.PIPE, args = [ self.cbox.prefs["Programs"]["super"], self.cbox.prefs["Programs"]["CryptoBoxRootActions"], "cryptsetup", "luksOpen", self.device, self.name, "--batch-mode"]) proc.stdin.write(password) (output, errout) = proc.communicate() if proc.returncode != 0: errorMsg = "Could not open the new luks mapping: %s" % (errout.strip(), ) self.log.error(errorMsg) raise CBCreateError(errorMsg) "make the filesystem" proc = subprocess.Popen( shell = False, stdin = None, stdout = devnull, stderr = subprocess.PIPE, args = [ self.cbox.prefs["Programs"]["mkfs-data"], os.path.join(self.__dmDir, self.name)]) proc.wait() "remove the mapping - for every exit status" self.__umountLuks() if proc.returncode != 0: errorMsg = "Could not create the filesystem: %s" % (proc.stderr.read().strip(), ) self.log.error(errorMsg) "remove the luks mapping" raise CBCreateError(errorMsg) devnull.close() def __cleanMountDirs(self): """ remove all unnecessary subdirs of the mount parent directory this should be called for every (u)mount """ subdirs = os.listdir(self.cbox.prefs["Locations"]["MountParentDir"]) for dir in subdirs: abs_dir = os.path.join(self.cbox.prefs["Locations"]["MountParentDir"], dir) if (not os.path.islink(abs_dir)) and os.path.isdir(abs_dir) and (not os.path.ismount(abs_dir)): os.rmdir(abs_dir)