# # Copyright 2007 sense.lab e.V. # # This file is part of the CryptoBox. # # The CryptoBox is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # The CryptoBox is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with the CryptoBox; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # ''' These classes detect and filter available blockdevices. ''' __revision__ = "$Id$" import os import subprocess import time import logging import cryptobox.core.settings LOGGER = logging.getLogger("CryptoBox") DEFAULT_SYSBLOCK_DIR = '/sys/block' DEFAULT_DEVNODE_DIR = '/dev' MINIMUM_STORAGE_SIZE = 20 MAJOR_DEVNUM_RAM = 1 MAJOR_DEVNUM_LOOP = 7 MAJOR_DEVNUM_MD_RAID = 9 ## cache settings CACHE_ENABLED = True CACHE_EXPIRE_SECONDS = 60 CACHE_MONITOR_FILE = '/proc/partitions' ## useful for manual profiling IS_VISIBLE = True ## caching is quite important for the following implementation ## the object will be initializes later below CACHE = None class Blockdevices: """handle all blockdevices of this system """ def __init__(self, sysblock_dir=DEFAULT_SYSBLOCK_DIR, devnode_dir=DEFAULT_DEVNODE_DIR): self.sysblock_dir = sysblock_dir self.devnode_dir = devnode_dir self.devices = [] for devdir in find_blockdevices(self.sysblock_dir): blockdevice = get_blockdevice(devdir, self.sysblock_dir, self.devnode_dir) if (not blockdevice is None) and blockdevice.is_valid(): self.devices.append(blockdevice) def get_devices(self): """return a copy of the device list """ return self.devices[:] def get_storage_devices(self): """return a list of devices with the 'storage' flag """ return [ dev for dev in self.devices if dev.is_storage() ] def get_partitionable_devices(self): """return a list of devices with the 'partitionable' flag """ return [ dev for dev in self.devices if dev.is_partitionable() ] class Blockdevice: def __init__(self, dev, sysblock_dir=DEFAULT_SYSBLOCK_DIR, devnode_dir=DEFAULT_DEVNODE_DIR): """initialize the blockdevice """ self.devdir = dev self.devnode_dir = devnode_dir self.sysblock_dir = sysblock_dir self.name = os.path.basename(self.devdir) ## "reset" below will fill these values self.devnum = None self.size = None self.size_human = None self.range = None self.slaves = None self.holders = None self.children = None self.devnodes = None self.uuid = None self.label = None self.reset() def reset(self): """reread the data of the device """ CACHE.reset(["blockdevice_info", self.name]) self.devnum = self.__get_major_minor() self.size = self.__get_size() self.size_human = self.__get_size_human() self.range = self.__get_device_range() self.slaves = self.__get_dev_related("slaves") self.holders = self.__get_dev_related("holders") self.children = self.__get_children() self.devnodes = self.__get_device_nodes() self.uuid = self.__get_uuid() self.label = self.__get_label() def is_valid(self): """check if the device is usable and valid causes of invalidity: ram device, loop device, removable device """ if not self.devnodes: return False ## check valid devnum try: major, minor = self.devnum if (major == 0) and (minor == 0): return False ## ram devices are ignored if major == MAJOR_DEVNUM_RAM: return False ## loop devices are ignored if major == MAJOR_DEVNUM_LOOP: return False ## removable devices are ignored (due to long timeouts) if self.is_removable(): return False except TypeError: return False return True def is_storage(self): """return if this device can be used as a storage """ ## check the cache first cache_link = ["blockdevice_info", self.name, "is_storage"] cached = CACHE.get(cache_link) if not cached is None: return cached if self.range > 1: ## partitionable blockdevice CACHE.set(cache_link, False) return False if self.size < MINIMUM_STORAGE_SIZE: ## extended partition, unused loop device CACHE.set(cache_link, False) return False if self.devnum[0] == MAJOR_DEVNUM_RAM: ## ram device CACHE.set(cache_link, False) return False ## are we the device mapper of a luks device? for slave in self.slaves: if get_blockdevice(slave, self.sysblock_dir, self.devnode_dir).is_luks(): CACHE.set(cache_link, False) return False ## if we are a luks device with exactly one child, then ## we are a storage if (len(self.children) == 1) and self.is_luks(): CACHE.set(cache_link, True) return True if self.children: ## a parent blockdevice CACHE.set(cache_link, False) return False CACHE.set(cache_link, True) return True def is_partitionable(self): """is the device partitionable """ if self.range > 1: return True else: return False def is_lvm_pv(self): """return if the device is a physical volume of a LVM """ ## check the cache first cache_link = ["blockdevice_info", self.name, "is_lvm_pv"] cached = CACHE.get(cache_link) if not cached is None: return cached ## is one of the devnodes of the device a physical volume? for one_lvm_pv in find_lvm_pv(): if one_lvm_pv in self.devnodes: CACHE.set(cache_link, True) return True CACHE.set(cache_link, False) return False def is_lvm_lv(self): """return if the device is a logical volume of a LVM """ ## check the cache first cache_link = ["blockdevice_info", self.name, "is_lvm_lv"] cached = CACHE.get(cache_link) if not cached is None: return cached ## is one of the devnodes of the device a physical volume? ## logical LVM volumes always depend on their physical volumes if not self.slaves: CACHE.set(cache_link, False) return False ## is one of the LVM physical volumes a device node of our slave(s)? for one_lvm_pv in find_lvm_pv(): for one_slave in self.slaves: if one_lvm_pv in get_blockdevice(one_slave, self.sysblock_dir, self.devnode_dir).devnodes: CACHE.set(cache_link, True) return True CACHE.set(cache_link, False) return False def is_md_raid(self): """check if the device is the base of a md raid device """ ## check the cache first cache_link = ["blockdevice_info", self.name, "is_md_raid"] cached = CACHE.get(cache_link) if not cached is None: return cached if self.range > 1: result = False elif self.size < MINIMUM_STORAGE_SIZE: result = False else: for hold in self.holders: if get_blockdevice(hold, self.sysblock_dir, self.devnode_dir).devnum[0] == MAJOR_DEVNUM_MD_RAID: result = True break else: result = False ## store result and return CACHE.set(cache_link, result) return result def is_luks(self): """check if the device is a luks container """ ## check the cache first cache_link = ["blockdevice_info", self.name, "is_luks"] cached = CACHE.get(cache_link) if not cached is None: return cached if self.range > 1: result = False elif self.size < MINIMUM_STORAGE_SIZE: result = False elif self.is_lvm_pv(): result = False elif self.is_md_raid(): result = False else: ## is the device a luks volume? prefs = _load_preferences() proc = subprocess.Popen( shell = False, stdout = subprocess.PIPE, stderr = subprocess.PIPE, args = [ prefs["Programs"]["cryptsetup"], "--batch-mode", "isLuks", self.devnodes[0]]) proc.wait() result = proc.returncode == 0 ## store result and return CACHE.set(cache_link, result) return result def is_removable(self): """check if the device is marked as 'removable' """ ## check the cache first cache_link = ["blockdevice_info", self.name, "is_removable"] cached = CACHE.get(cache_link) if not cached is None: return cached removable_file = os.path.join(self.devdir, "removable") if os.path.isfile(removable_file): try: content = file(removable_file).read().strip() if content == "1": return True else: return False except IOError: result = False else: result = False CACHE.set(cache_link, result) return result def __get_dev_related(self, subdir): """return the content of sub directories (e.g. 'holders' or 'slaves') """ try: return os.listdir(os.path.join(self.devdir, subdir)) except OSError: return [] def __get_size_human(self): """return a human readable string representing the size of the device """ size = self.size if self.size > 5120: return "%dGB" % int(self.size/1024) else: return "%dMB" % self.size def __get_size(self): """return the size (in MB) of the blockdevice """ default = 0 try: size_kb = int(file(os.path.join(self.devdir, 'size')).read()) return int(size_kb/1024) except OSError: return default except ValueError: return default def __get_major_minor(self): """return the major and minor of the device""" default = (0, 0) try: content = file(os.path.join(self.devdir, "dev")).read() except IOError: return default try: major, minor = content.split(":", 1) except TypeError: return default try: return int(major), int(minor) except ValueError: return default def __get_device_range(self): """number of possible subdevices partitionable blockdevices have a range > 1 """ default = 1 try: content = file(os.path.join(self.devdir, "range")).read() except IOError: return default try: return int(content) except ValueError: return default def __get_children(self): """return all devices depending on the current one all holders, subdevices and children of subdevices """ direct_children = [ get_blockdevice(child, self.sysblock_dir, self.devnode_dir).name for child in find_blockdevices(self.devdir)] direct_children.extend(self.holders[:]) children = direct_children[:] for dchild in direct_children: children.extend(get_blockdevice(dchild, self.sysblock_dir, self.devnode_dir).children) return children def __get_device_nodes(self): """get all device nodes with the major/minor combination of the device """ result = [] major, minor = self.devnum def find_major_minor(arg, dirname, fnames): for fname in fnames: try: stat = os.stat(os.path.join(dirname, fname)) ## check if it is a blockdevice and compare major/minor if (stat.st_mode & 060000 == 060000) \ and (os.major(stat.st_rdev) == major) \ and (os.minor(stat.st_rdev) == minor): result.append(os.path.join(dirname, fname)) except OSError: pass os.path.walk(self.devnode_dir, find_major_minor, None) return result def __get_uuid(self): """determine the unique identifier of this device returns None in case of error or for invalid devices (see "is_valid") """ if not self.is_valid(): return None ## partitionable devices do not have a UUID if self.is_partitionable(): return None ## UUIDs of physical LVM volumes can only be determined via pvdisplay if self.is_lvm_pv(): return self.__get_uuid_lvm_pv() ## UUIDs of luks devices can be determined via luksDump if self.is_luks(): return self.__get_uuid_luks() return self.__get_uuid_default() def __get_uuid_luks(self): """determine the unique identifier of luks devices """ prefs = _load_preferences() try: proc = subprocess.Popen( shell = False, stdout = subprocess.PIPE, stderr = subprocess.PIPE, args = [ prefs["Programs"]["cryptsetup"], "luksUUID", self.devnodes[0] ]) (output, error) = proc.communicate() except OSError, err_msg: LOGGER.warning("Failed to call '%s' to determine UUID: %s" \ % (prefs["Programs"]["cryptsetup"], err_msg)) return None if proc.returncode != 0: LOGGER.warning("Execution of '%s' for '%s' failed: %s" % \ (prefs["Programs"]["cryptsetup"], self.devnodes[0], error)) return None result = output.strip() if result: return result else: return None def __get_uuid_lvm_pv(self): """determine the unique identifier of physical LVM volumes """ prefs = _load_preferences() try: proc = subprocess.Popen( shell = False, stdout = subprocess.PIPE, stderr = subprocess.PIPE, args = [ prefs["Programs"]["super"], prefs["Programs"]["CryptoBoxRootActions"], "program", "pvdisplay" ]) (output, error) = proc.communicate() except OSError, err_msg: LOGGER.warning("Failed to call '%s' via 'super' to determine " \ % prefs["Programs"]["pvdisplay"] + "UUID: %s" % err_msg) return None if proc.returncode != 0: LOGGER.warning("Execution of 'pvdisplay' failed: %s" % error) return None for line in output.splitlines(): items = line.strip().split(":") if (len(items) == 12) and (items[0] in self.devnodes): return items[11] ## not found return None def __get_uuid_default(self): """determine the unique identifier for non-special devices luks and lvm_pv devices must be treated differently """ prefs = _load_preferences() try: proc = subprocess.Popen( shell = False, stdout = subprocess.PIPE, stderr = subprocess.PIPE, args = [ prefs["Programs"]["blkid"], "-s", "UUID", "-o", "value", "-c", os.devnull, "-w", os.devnull, self.devnodes[0] ]) (output, error) = proc.communicate() except OSError, err_msg: LOGGER.warning("Failed to call '%s' to determine UUID: %s" % \ (prefs["Programs"]["blkid"], err_msg)) return None if proc.returncode != 0: LOGGER.warning("Execution of '%s' for '%s' failed: %s" % \ (prefs["Programs"]["blkid"], self.devnodes[0], error.strip())) return None result = output.strip() if result: return result else: return None def __get_label(self): """determine the label of a filesystem contained in a device return None for errors, empty labels and for luks or non-storage devices """ if not self.is_valid(): return None if self.is_luks(): return None prefs = _load_preferences() try: proc = subprocess.Popen( shell = False, stdout = subprocess.PIPE, stderr = subprocess.PIPE, args = [ prefs["Programs"]["blkid"], "-s", "LABEL", "-o", "value", "-c", os.devnull, "-w", os.devnull, self.devnodes[0]]) (output, error) = proc.communicate() except OSError, err_msg: LOGGER.warning("Failed to call '%s' to determine label: %s" % \ (prefs["Programs"]["blkid"], err_msg)) return None if proc.returncode != 0: LOGGER.warning("Execution of '%s' for '%s' failed: %s" % \ (prefs["Programs"]["blkid"], self.devnodes[0], error.strip())) return None result = output.strip() if result: return result else: return None def __eq__(self, device): """compare two blockdevice objects """ return self.name == device.name def __str__(self): """display the name of the device """ return self.name def info(self): """display some information about the device """ output = "%s:\n" % self.name output += "\t%s:\t%s\n" % ("blockdir", self.devdir) output += "\t%s:\t%s\n" % ("major/minor", self.devnum) output += "\t%s:\t\t%s\n" % ("label", self.label) output += "\t%s:\t\t%s\n" % ("UUID", self.uuid) output += "\t%s:\t\t%s\n" % ("range", self.range) output += "\t%s:\t\t%s\n" % ("size", self.size) output += "\t%s:\t\t%s\n" % ("slaves", self.slaves) output += "\t%s:\t%s\n" % ("holders", self.holders) output += "\t%s:\t%s\n" % ("children", self.children) output += "\t%s:\t%s\n" % ("device nodes", self.devnodes) output += "\tflags:\t\t" for funcname in [ func for func in dir(self) if func.startswith("is_") and callable(getattr(self, func))]: if getattr(self, funcname)(): output += "%s " % funcname[3:] output += "\n" return output class BlockdeviceCache: """manage cached results of blockdevce queries the cache expires every 60 seconds or as soon as CACHE_MONITOR_FILE changes """ def __init__(self): self.values = {} self.expires = None self.partitions_save = None self.reset() def reset(self, link=None): """empty the cache and reset the expire time """ if not link: self.values = {} try: self.partitions_save = file(CACHE_MONITOR_FILE).read() except IOError, err_msg: LOGGER.warning("Failed to read '%s': %s" % \ (CACHE_MONITOR_FILE, err_msg)) self.partitions_save = "" self.expires = int(time.time()) + CACHE_EXPIRE_SECONDS else: ## we do no reset the expire date self.set(link, {}) def __is_expired(self): """check if the cache is expired """ try: if (file(CACHE_MONITOR_FILE).read() != self.partitions_save) or \ (self.expires < int(time.time())): return True except IOError: LOGGER.warning("Failed to read '%s': %s" % \ (CACHE_MONITOR_FILE, err_msg)) return False def get(self, link): """return a cached value "link" is an array of the hierachie of the accessed item e.g. link = ["blockdevices", "hda"] return None if the value is not in the cache or if CACHE_ENABLED is False """ if not CACHE_ENABLED: return None if self.__is_expired(): self.reset() ## walk down the tree ref = self.values for element in link: if element in ref: ref = ref[element] else: return None return ref def set(self, link, item): """store an item in the cache "link" is an array of the hierachie of the accessed item e.g. link = ["blockdevices", "hda"] """ if not CACHE_ENABLED: return ## walk down the tree ref = self.values for element in link[:-1]: if not element in ref: ## create a non-existing sub element ref[element] = {} ref = ref[element] ## store the item ref[link[-1]] = item def get_blockdevice(dev, sysblock_dir=DEFAULT_SYSBLOCK_DIR, devnode_dir=DEFAULT_DEVNODE_DIR): if os.path.isabs(dev): if os.path.isfile(os.path.join(dev, "dev")): devdir = dev else: return None else: for one_devdir in find_blockdevices(sysblock_dir): if os.path.basename(one_devdir) == dev: devdir = one_devdir break else: return None devname = os.path.basename(devdir) cache_link = ["blockdevices", devname] dev = CACHE.get(cache_link) if dev is None: dev = Blockdevice(devdir, sysblock_dir, devnode_dir) CACHE.set(cache_link, dev) return dev def find_blockdevices(top_dir): cache_link = ["blockdevice_dirs", top_dir] cached = CACHE.get(cache_link) if not cached is None: return cached[:] dev_dirs = [] def look4dev_dirs(arg, dirname, fnames): ## ignore the top level directory to avoid infinite recursion for ## get_children if os.path.samefile(dirname, top_dir): return ## add directories containing the file 'dev' to the list if (arg in fnames) and os.path.isfile(os.path.join(dirname, arg)): dev_dirs.append(dirname) for fname in fnames: ## remove symlinks and non-directories fullname = os.path.join(dirname, fname) if os.path.islink(fullname) or (not os.path.isdir(fullname)): fnames.remove(fname) os.path.walk(top_dir, look4dev_dirs, 'dev') CACHE.set(cache_link, dev_dirs) return dev_dirs[:] def find_lvm_pv(): """return the blockdevice names of all physical LVM volumes """ cache_link = ["lvm", "pv"] cached = CACHE.get(cache_link) if not cached is None: return cached[:] prefs = _load_preferences() result = None try: proc = subprocess.Popen( shell = False, stdout = subprocess.PIPE, stderr = subprocess.PIPE, args = [ prefs["Programs"]["super"], prefs["Programs"]["CryptoBoxRootActions"], "program", "pvdisplay" ]) (output, error) = proc.communicate() except OSError, err_msg: LOGGER.info("Failed to call 'pvdisplay' via 'super': %s" % err_msg) result = [] if proc.returncode != 0: LOGGER.info("Execution of 'pvdisplay' failed: %s" % error.strip()) result = [] if result is None: result = [] for line in output.splitlines(): result.append(line.split(":", 1)[0].strip()) CACHE.set(cache_link, result) return result[:] def _load_preferences(): prefs = cryptobox.core.settings.get_current_settings() if not prefs is None: ## now the preferences are loaded return prefs ## we have to load an emergency fallback for proper function ## this is mainly useful for local testing root_dir = os.path.realpath(os.path.join(globals()["cryptobox"].__path__[0], os.path.pardir, os.path.pardir)) config_file = os.path.join(root_dir, "bin", "cryptobox.conf") ## we have to chdir to the 'bin' directory - otherwise the paths in ## cryptobox.conf do not work os.chdir(os.path.dirname(config_file)) return cryptobox.core.settings.CryptoBoxSettings(config_file) ## initialize cache CACHE = BlockdeviceCache() if __name__ == '__main__': ## list the properties of all available devices ## this is just for testing purposes blocks = Blockdevices().get_devices() ## do we want to show the result? def show(text=""): if IS_VISIBLE: print text if len(blocks) > 0: ## show all devices and their properties show("Properties of all devices:") for device in blocks: show(device.info()) ## discover all self-check methods example = blocks[0] flag_checker = [ method for method in dir(example) if callable(getattr(example, method)) and method.startswith("is_")] ## list all checks and the respective devices for check in flag_checker: show("List of '%s' devices:" % check[3:]) for device in blocks: if getattr(device, check)(): show("\t%s" % device) show()