# # Copyright 2007 sense.lab e.V. # # This file is part of the CryptoBox. # # The CryptoBox is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # The CryptoBox is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with the CryptoBox; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # ''' These classes detect and filter available blockdevices. ''' __revision__ = "$Id$" #TODO: use logger to report interesting behaviour import os import subprocess import time import cryptobox.core.settings DEFAULT_SYSBLOCK_DIR = '/sys/block' DEFAULT_DEVNODE_DIR = '/dev' MINIMUM_STORAGE_SIZE = 20 MAJOR_DEVNUM_RAM = 1 MAJOR_DEVNUM_LOOP = 7 MAJOR_DEVNUM_MD_RAID = 9 USE_CACHE = True CACHE_EXPIRE_SECONDS = 60 #TODO: remove this after profiling IS_VISIBLE = True ## caching is quite important for the following implementation CACHED_VALUES = {} class Blockdevices: """handle all blockdevices of this system """ def __init__(self, sysblock_dir=DEFAULT_SYSBLOCK_DIR, devnode_dir=DEFAULT_DEVNODE_DIR): self.sysblock_dir = sysblock_dir self.devnode_dir = devnode_dir self.devices = [] for devdir in find_blockdevices(self.sysblock_dir): blockdevice = get_blockdevice(devdir, self.sysblock_dir, self.devnode_dir) if (not blockdevice is None) and blockdevice.is_valid(): self.devices.append(blockdevice) def get_devices(self): """return a copy of the device list """ return self.devices[:] class Blockdevice: def __init__(self, dev, sysblock_dir=DEFAULT_SYSBLOCK_DIR, devnode_dir=DEFAULT_DEVNODE_DIR): """initialize the blockdevice """ self.devdir = dev self.devnode_dir = devnode_dir self.sysblock_dir = sysblock_dir self.name = os.path.basename(self.devdir) self.devnum = self.__get_major_minor() self.size = self.__get_size() self.range = self.__get_device_range() self.slaves = self.__get_dev_related("slaves") self.holders = self.__get_dev_related("holders") self.children = self.__get_children() self.devnodes = self.__get_device_nodes() def is_valid(self): """ check if the device is usable and valid """ if not self.devnodes: return False ## check valid devnum try: major, minor = self.devnum if (major == 0) and (minor == 0): return False ## ram devices are ignored if major == MAJOR_DEVNUM_RAM: return False ## loop devices are ignored if major == MAJOR_DEVNUM_LOOP: return False except TypeError: return False return True def is_storage(self): """return if this device can be used as a storage """ ## check the cache first cache_link = ["blockdevice_info", self.name, "is_storage"] cached = _get_cached_value(cache_link) if not cached is None: return cached if self.range > 1: ## partitionable blockdevice _set_cached_value(cache_link, False) return False if self.size < MINIMUM_STORAGE_SIZE: ## extended partition, unused loop device _set_cached_value(cache_link, False) return False if self.devnum[0] == MAJOR_DEVNUM_RAM: ## ram device _set_cached_value(cache_link, False) return False ## are we the device mapper of a luks device? for slave in self.slaves: if get_blockdevice(slave, self.sysblock_dir, self.devnode_dir).is_luks(): _set_cached_value(cache_link, False) return False ## if we are a luks device with exactly one child, then ## we are a storage if (len(self.children) == 1) and self.is_luks(): _set_cached_value(cache_link, True) return True if self.children: ## a parent blockdevice _set_cached_value(cache_link, False) return False _set_cached_value(cache_link, True) return True def is_partitionable(self): """is the device partitionable """ if self.range > 1: return True else: return False def is_lvm_pv(self): """return if the device is a physical volume of a LVM """ ## check the cache first cache_link = ["blockdevice_info", self.name, "is_lvm_pv"] cached = _get_cached_value(cache_link) if not cached is None: return cached ## is one of the devnodes of the device a physical volume? for one_lvm_pv in find_lvm_pv(): if one_lvm_pv in self.devnodes: _set_cached_value(cache_link, True) return True _set_cached_value(cache_link, False) return False def is_lvm_lv(self): """return if the device is a logical volume of a LVM """ ## check the cache first cache_link = ["blockdevice_info", self.name, "is_lvm_lv"] cached = _get_cached_value(cache_link) if not cached is None: return cached ## is one of the devnodes of the device a physical volume? ## logical LVM volumes always depend on their physical volumes if not self.slaves: _set_cached_value(cache_link, False) return False ## is one of the LVM physical volumes a device node of our slave(s)? for one_lvm_pv in find_lvm_pv(): for one_slave in self.slaves: if one_lvm_pv in get_blockdevice(one_slave, self.sysblock_dir, self.devnode_dir).devnodes: _set_cached_value(cache_link, True) return True _set_cached_value(cache_link, False) return False def is_md_raid(self): """check if the device is the base of a md raid device """ ## check the cache first cache_link = ["blockdevice_info", self.name, "is_md_raid"] cached = _get_cached_value(cache_link) if not cached is None: return cached if self.range > 1: result = False elif self.size < MINIMUM_STORAGE_SIZE: result = False else: for hold in self.holders: if get_blockdevice(hold, self.sysblock_dir, self.devnode_dir).devnum[0] == MAJOR_DEVNUM_MD_RAID: result = True break else: result = False ## store result and return _set_cached_value(cache_link, result) return result def is_luks(self): """check if the device is a luks container """ ## check the cache first cache_link = ["blockdevice_info", self.name, "is_luks"] cached = _get_cached_value(cache_link) if not cached is None: return cached if self.range > 1: result = False elif self.size < MINIMUM_STORAGE_SIZE: result = False elif self.is_lvm_pv(): result = False elif self.is_md_raid(): result = False else: ## is the device a luks volume? prefs = _load_preferences() proc = subprocess.Popen( shell = False, stdout = subprocess.PIPE, stderr = subprocess.PIPE, args = [ prefs["Programs"]["cryptsetup"], "--batch-mode", "isLuks", self.devnodes[0]]) proc.wait() result = proc.returncode == 0 ## store result and return _set_cached_value(cache_link, result) return result def __get_dev_related(self, subdir): """return the content of sub directories (e.g. 'holders' or 'slaves') """ try: return os.listdir(os.path.join(self.devdir, subdir)) except OSError: return [] def __get_size(self): """return the size (in kB) of the blockdevice """ default = 0 try: return int(file(os.path.join(self.devdir, 'size')).read()) except OSError: return default except ValueError: return default def __get_major_minor(self): """return the major and minor of the device""" default = (0, 0) try: content = file(os.path.join(self.devdir, "dev")).read() except IOError: return default try: major, minor = content.split(":", 1) except TypeError: return default try: return int(major), int(minor) except ValueError: return default def __get_device_range(self): """number of possible subdevices partitionable blockdevices have a range > 1 """ default = 1 try: content = file(os.path.join(self.devdir, "range")).read() except IOError: return default try: return int(content) except ValueError: return default def __get_children(self): """return all devices depending on the current one all holders, subdevices and children of subdevices """ direct_children = [ get_blockdevice(child, self.sysblock_dir, self.devnode_dir).name for child in find_blockdevices(self.devdir)] direct_children.extend(self.holders[:]) children = direct_children[:] for dchild in direct_children: children.extend(get_blockdevice(dchild, self.sysblock_dir, self.devnode_dir).children) return children def __get_device_nodes(self): """get all device nodes with the major/minor combination of the device """ result = [] major, minor = self.devnum def find_major_minor(arg, dirname, fnames): for fname in fnames: try: stat = os.stat(os.path.join(dirname, fname)) ## check if it is a blockdevice and compare major/minor if (stat.st_mode & 060000 == 060000) \ and (os.major(stat.st_rdev) == major) \ and (os.minor(stat.st_rdev) == minor): result.append(os.path.join(dirname, fname)) except OSError: pass os.path.walk(self.devnode_dir, find_major_minor, None) return result def __str__(self): """display the name of the device """ return self.name def info(self): """display some information about the device """ output = "%s:\n" % self.name output += "\t%s:\t%s\n" % ("blockdir", self.devdir) output += "\t%s:\t%s\n" % ("major/minor", self.devnum) output += "\t%s:\t\t%s\n" % ("range", self.range) output += "\t%s:\t\t%s\n" % ("size", self.size) output += "\t%s:\t\t%s\n" % ("slaves", self.slaves) output += "\t%s:\t%s\n" % ("holders", self.holders) output += "\t%s:\t%s\n" % ("children", self.children) output += "\t%s:\t%s\n" % ("device nodes", self.devnodes) output += "\tflags:\t\t" for funcname in [ "storage", "md_raid", "partitionable", "luks", "lvm_pv", "lvm_lv"]: if getattr(self, "is_%s" % funcname)(): output += "%s " % funcname output += "\n" return output def get_blockdevice(dev, sysblock_dir=DEFAULT_SYSBLOCK_DIR, devnode_dir=DEFAULT_DEVNODE_DIR): if os.path.isabs(dev): if os.path.isfile(os.path.join(dev, "dev")): devdir = dev else: return None else: for one_devdir in find_blockdevices(sysblock_dir): if os.path.basename(one_devdir) == dev: devdir = one_devdir break else: return None devname = os.path.basename(devdir) dev = _get_cached_value(["blockdevices", devname]) if dev is None: dev = Blockdevice(devdir, sysblock_dir, devnode_dir) _set_cached_value(["blockdevices", devname], dev) return dev def find_blockdevices(top_dir): cached = _get_cached_value(["blockdevice_dirs", top_dir]) if not cached is None: return cached[:] dev_dirs = [] def look4dev_dirs(arg, dirname, fnames): ## ignore the top level directory to avoid infinite recursion for ## get_children if os.path.samefile(dirname, top_dir): return ## add directories containing the file 'dev' to the list if (arg in fnames) and os.path.isfile(os.path.join(dirname, arg)): dev_dirs.append(dirname) for fname in fnames: ## remove symlinks and non-directories fullname = os.path.join(dirname, fname) if os.path.islink(fullname) or (not os.path.isdir(fullname)): fnames.remove(fname) os.path.walk(top_dir, look4dev_dirs, 'dev') _set_cached_value(["blockdevice_dirs", top_dir], dev_dirs) return dev_dirs[:] def find_lvm_pv(): """return the blockdevice names of all physical LVM volumes """ cached = _get_cached_value(["lvm", "pv"]) if not cached is None: return cached[:] #TODO: should we check, if LVM is supported at all? # e.g. by checking the existence of pvdisplay? prefs = _load_preferences() result = None try: proc = subprocess.Popen( shell = False, stdout = subprocess.PIPE, args = [ prefs["Programs"]["super"], prefs["Programs"]["CryptoBoxRootActions"], "program", "pvdisplay" ]) proc.wait() except OSError, err_msg: # TODO: add a logging warning result = [] if proc.returncode != 0: # TODO: add a logging warning result = [] if result is None: result = [] for line in proc.stdout.readlines(): result.append(line.split(":", 1)[0].strip()) _set_cached_value(["lvm", "pv"], result) return result[:] def _get_cached_value(link): """return a cached value "link" is an array of the hierachie of the accessed item e.g. link = ["blockdevices", "hda"] return None if the value is not in the cache or if USE_CACHE is False """ if not USE_CACHE: return None if "expires" in CACHED_VALUES: if CACHED_VALUES["expires"] < int(time.time()): reset_cache() else: __reset_cache_timer() ref = CACHED_VALUES for element in link: if element in ref: ref = ref[element] else: return None return ref def reset_cache(): ## refresh the cache for item in CACHED_VALUES: CACHED_VALUES[item] = {} __reset_cache_timer() def __reset_cache_timer(): CACHED_VALUES["expires"] = int(time.time()) + CACHE_EXPIRE_SECONDS def _set_cached_value(link, item): """store an item in the cache "link" is an array of the hierachie of the accessed item e.g. link = ["blockdevices", "hda"] """ if not USE_CACHE: return ref = CACHED_VALUES for element in link[:-1]: if not element in ref: ## create a non-existing sub element ref[element] = {} ref = ref[element] ## store the item ref[link[-1]] = item def _load_preferences(): prefs = cryptobox.core.settings.get_current_settings() if not prefs is None: ## now the preferences are loaded return prefs ## we have to load an emergency fallback for proper function ## this is mainly useful for local testing root_dir = os.path.realpath(os.path.join(globals()["cryptobox"].__path__[0], os.path.pardir, os.path.pardir)) config_file = os.path.join(root_dir, "bin", "cryptobox.conf") ## we have to chdir to the 'bin' directory - otherwise the paths in ## cryptobox.conf do not work os.chdir(os.path.dirname(config_file)) return cryptobox.core.settings.CryptoBoxSettings(config_file) if __name__ == '__main__': ## list the properties of all available devices ## this is just for testing purposes blocks = Blockdevices().get_devices() ## do we want to show the result? def show(text=""): if IS_VISIBLE: print text if len(blocks) > 0: ## show all devices and their properties show("Properties of all devices:") for device in blocks: show(device.info()) ## discover all self-check methods example = blocks[0] flag_checker = [ method for method in dir(example) if callable(getattr(example, method)) and method.startswith("is_")] ## list all checks and the respective devices for check in flag_checker: show("List of '%s' devices:" % check[3:]) for device in blocks: if getattr(device, check)(): show("\t%s" % device) show()