# # Copyright 2006 sense.lab e.V. # # This file is part of the CryptoBox. # # The CryptoBox is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # The CryptoBox is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with the CryptoBox; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # """The partition feature of the CryptoBox. """ __revision__ = "$Id$" import subprocess import os import re import logging import cryptobox.core.blockdevice as blockdevice_tools import cryptobox.plugins.base from cryptobox.core.exceptions import * PARTTYPES = { "windows" : ["0xC", "vfat"], "linux" : ["L", "ext3"]} CONFIGPARTITION = { "size" : 5, # size of configuration partition (if necessary) in MB "type" : "L", "fs" : "ext2"} class partition(cryptobox.plugins.base.CryptoBoxPlugin): """The partition feature of the CryptoBox. """ plugin_capabilities = [ "system" ] plugin_visibility = [ "preferences" ] request_auth = True rank = 80 def do_action(self, **args): """Show the partitioning form and execute the requested action. """ ## load default hdf values self.__prepare_dataset() ## retrieve some values from 'args' - defaults are empty self.blockdevice = self.__get_selected_device(args) if self.blockdevice: self.with_config_partition = self.__is_with_config_partition() self.blockdevice_size = self.__get_available_device_size(self.blockdevice) else: self.with_config_partition = False self.blockdevice_size = 0 self.cbox.log.debug( "partition plugin: selected device=%s" % str(self.blockdevice)) ## no (or invalid) device was supplied if not self.blockdevice: return self.__action_select_device() ## exit if the blockdevice is not writeable if not os.access(self.blockdevice.get_device(), os.W_OK): self.hdf["Data.Warning"] = "DeviceNotWriteable" return self.__action_select_device() ## no confirm setting? if not args.has_key("confirm") or (args["confirm"] != "1"): self.hdf["Data.Warning"] = "Plugins.partition.FormatNotConfirmed" return self.__action_select_device() elif args.has_key("easy"): return self.__action_easy_setup() elif args.has_key("add_part"): return self.__action_add_partition(args) elif args.has_key("finish"): return self.__action_finish(args) elif args.has_key("cancel"): return self.__action_select_device() ## check if we should remove a partition del_args = [ e for e in args.keys() if re.match(r"del_part_[\d]+$", e) ] if len(del_args) == 1: try: num_part = int(del_args[0][9:]) except ValueError: self.cbox.log.warn( "partition: invalid partition number to delete (%s)" % del_args[0]) return self.__action_select_device() return self.__action_del_partition(args, num_part) else: ## for "select_device" and for invalid targets return self.__action_select_device() def get_status(self): """The status of this plugin is the selected device and some information. """ if not self.blockdevice: return "no blockdevice selected" else: return "%s / %s / %s" % (self.blockdevice.name, self.blockdevice_size, self.with_config_partition) def get_warnings(self): warnings = [] ## this check is done _after_ "reset_dataset" -> if there is ## a config partition, then it was loaded before if self.cbox.prefs.requires_partition() \ and not self.cbox.prefs.get_active_partition(): warnings.append((50, "Plugins.%s.ReadOnlyConfig" % self.get_name())) ## check required programs if not os.path.isfile(self.root_action.SFDISK_BIN): warnings.append((53, "Plugins.%s.MissingProgramSfdisk" % self.get_name())) if not os.path.isfile(self.root_action.MKFS_BIN): warnings.append((56, "Plugins.%s.MissingProgramMkfs" % self.get_name())) if not os.path.isfile(self.root_action.LABEL_BIN): warnings.append((40, "Plugins.%s.MissingProgramE2label" % self.get_name())) return warnings def __prepare_dataset(self): """Set some hdf values. """ self.hdf["Data.AdditionalStylesheets.%s" % self.get_name()] = \ os.path.join(self.plugin_dir, "partition.css") self.hdf[self.hdf_prefix + "PluginDir"] = self.plugin_dir def __get_selected_device(self, args): """Check the selected device (valid, not busy, ...). """ try: blockdevice_name = args["block_device"] found = [ dev for dev in blockdevice_tools.Blockdevices().get_partitionable_devices() if dev.name == blockdevice_name ] if not found: return None blockdevice = found[0] except KeyError: return None if not self.__is_device_valid(blockdevice): return None if self.__is_device_busy(blockdevice): self.hdf["Data.Warning"] = "Plugins.partition.DiskIsBusy" return None return blockdevice def __is_device_valid(self, blockdevice): """Check if the device is valid and allowed. """ if not blockdevice: return False if not self.cbox.is_device_allowed(blockdevice): return False return True def __is_device_busy(self, blockdevice): """check if the device (or one of its partitions) is mounted """ ## the config partition is ignored, as it will get unmounted if necessary for dev in blockdevice.children: container = self.cbox.get_container( blockdevice_tools.get_blockdevice(dev)) if container and (container.is_mounted() or container.is_busy()): return True return False def __action_select_device(self): """Show a form to select the device for partitioning. """ block_devices = [ e for e in blockdevice_tools.Blockdevices().get_partitionable_devices() if self.cbox.is_device_allowed(e) ] counter = 0 for dev in block_devices: self.hdf[self.hdf_prefix + "BlockDevices.%d.name" % counter] = \ dev.name self.hdf[self.hdf_prefix + "BlockDevices.%d.size" % counter] = \ dev.size_human self.cbox.log.debug("found a suitable block device: %s" % \ dev.get_device()) counter += 1 if self.with_config_partition: self.hdf[self.hdf_prefix + "CreateConfigPartition"] = "1" ## there is no disk available if not block_devices: self.hdf["Data.Warning"] = "Plugins.partition.NoDisksAvailable" return "select_device" def __action_add_partition(self, args): """Add a selected partition to the currently proposed partition table. """ self.hdf[self.hdf_prefix + "Device"] = self.blockdevice.name self.hdf[self.hdf_prefix + "Device.Size"] = self.blockdevice_size parts = self.__get_partitions_from_args(args) self.__set_partition_data(parts) return "set_partitions" def __action_del_partition(self, args, part_num): """Remove a partition from the proposed partition table. """ self.hdf[self.hdf_prefix + "Device"] = self.blockdevice.name self.hdf[self.hdf_prefix + "Device.Size"] = self.blockdevice_size parts = self.__get_partitions_from_args(args) ## valid partition number to be deleted? if part_num < len(parts): del parts[part_num] self.__set_partition_data(parts) return "set_partitions" def __action_finish(self, args): """Write the partition table. """ parts = self.__get_partitions_from_args(args) if parts: self.__set_partition_data(parts) ## umount config partition if necessary config_partition = self.cbox.prefs.get_active_partition() if [ dev for dev in self.blockdevice.children if config_partition in blockdevice_tools.get_blockdevice(dev).devnodes ]: self.cbox.prefs.umount_partition() if not self.__run_fdisk(parts): self.hdf["Data.Warning"] = "Plugins.partition.PartitioningFailed" self.cbox.log.warn( "partition: failed to partition device: %s" % self.blockdevice) return self.__action_add_partition(args) else: ## tricky problem: if the device was partitioned, then a created config ## partition is still part of the containerlist, as the label is not ## checked again - very ugly!!! So we will call reReadContainerList ## after formatting the last partition - see below #self.cbox.reread_container_list() format_ok = True counter = 0 ## initialize the generator format_part_gen = self.__format_partitions(parts) while counter < len(parts): ## first part: get the device name counter += 1 ## second part: do the real formatting of a partition result = format_part_gen.next() ## after the first partiton, we can reRead the containerList ## (as the possible config partition was already created) if self.with_config_partition and (counter == 1): ## important: reRead the containerList - but somehow it ## breaks the flow (hanging process) #self.cbox.reReadContainerList() ## write config data self.cbox.prefs.mount_partition() try: self.cbox.prefs.write() self.cbox.log.info("settings stored on config partition") except IOError: self.cbox.log.warn( "Failed to store settings on new config partition") ## return the result if not result: format_ok = False self.cbox.reread_container_list() if format_ok: self.hdf["Data.Success"] = "Plugins.partition.Partitioned" return { "plugin":"system_preferences", "values":[] } else: self.hdf["Data.Warning"] = "Plugins.partition.FormattingFailed" return "empty" else: return self.__action_add_partition(args) def __action_easy_setup(self): """Do automatic partitioning (create only one big partition). """ import types ## we do not have to take special care for a possible config partition parts = [ { "size": self.blockdevice_size, "type": "windows" } ] ## umount config partition if necessary config_partition = self.cbox.prefs.get_active_partition() if [ dev for dev in self.blockdevice.children if config_partition in blockdevice_tools.get_blockdevice(dev).devnodes ]: self.cbox.prefs.umount_partition() ## partition it if not self.__run_fdisk(parts): self.hdf["Data.Warning"] = "Plugins.partition.PartitioningFailed" return None ## "formatPartitions" is a generator, returning device names and boolean values result = [e for e in self.__format_partitions(parts) if type(e) == types.BooleanType] if self.with_config_partition: self.cbox.prefs.mount_partition() if not self.cbox.prefs.write(): self.cbox.log.warn("Failed to store settings on new config partition") ## check if there is a "False" return value if False in result: ## operation failed self.hdf["Data.Warning"] = "Plugins.partition.FormattingFailed" self.cbox.log.info("easy partitioning failed") return "set_partitions" else: ## operation was successful self.hdf["Data.Success"] = "Plugins.partition.EasySetup" self.cbox.log.info("easy partitioning succeeded") ## do not show the disk overview immediately ## it does not get updated that fast return { "plugin":"system_preferences", "values":[] } def __set_partition_data(self, parts): """Set some hdf values for the currently proposed partition table. """ avail_size = self.blockdevice_size i = 0 for part in parts: self.cbox.log.debug(part) self.hdf[self.hdf_prefix + "Parts.%d.Size" % i] = part["size"] self.hdf[self.hdf_prefix + "Parts.%d.Type" % i] = part["type"] avail_size -= part["size"] i += 1 self.hdf[self.hdf_prefix + "availSize"] = avail_size if self.with_config_partition: self.hdf[self.hdf_prefix + "CreateConfigPartition"] = "1" for ptype in PARTTYPES.keys(): self.hdf[self.hdf_prefix + "Types.%s" % ptype] = ptype ## store the currently existing partitions of the choosen block device current_containers = [ blockdevice_tools.get_blockdevice(dev) for dev in self.blockdevice.children if blockdevice_tools.get_blockdevice(dev).is_storage() ] for (index, cont) in enumerate(current_containers): self.hdf[self.hdf_prefix + "ExistingContainers.%d" % index] = \ cont.name def __get_partitions_from_args(self, args): """Filter the given arguments and construct a partition table. """ parts = [] done = False avail_size = self.blockdevice_size i = -1 while not done: i += 1 try: ## skip every unconfirmed (probably the last) partition if we should not add it if args.has_key("part%d_unconfirmed" % i) and \ not args.has_key("add_part"): continue size = int(args["part%d_size" % i]) part_type = args["part%d_type" % i] if int(size) > avail_size: self.hdf["Data.Warning"] = "Plugins.partition.PartitionTooBig" continue if int(size) < 10: self.hdf["Data.Warning"] = "Plugins.partition.PartitionTooSmall" continue if not part_type in PARTTYPES.keys(): continue parts.append({"size":size, "type":part_type}) avail_size -= size except TypeError: pass except KeyError: done = True return parts def __get_available_device_size(self, device): """calculate the available size (MB) of the device also consider a (possible) configuration partition """ device_size = device.size if self.with_config_partition: device_size -= CONFIGPARTITION["size"] if device_size < 0: return 0 return device_size def __is_with_config_partition(self): """check if we have to create a configuration partition """ if self.cbox.prefs.requires_partition(): active = self.cbox.prefs.get_active_partition() ## we need a partition, if there is no active one if not active: return True ## check if the active one is part of the current device if [ dev for dev in self.blockdevice.children if active in blockdevice_tools.get_blockdevice(dev).devnodes ]: return True else: return False return False def __run_fdisk(self, parts): """Call fdisk to partition the device. """ ## check if the device is completely filled (to avoid some empty last blocks) avail_size = self.blockdevice_size for one_part in parts: avail_size -= one_part["size"] self.cbox.log.debug("remaining size: %d" % avail_size) is_filled = avail_size == 0 proc = subprocess.Popen( shell = False, stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE, args = [ self.cbox.prefs["Programs"]["super"], self.cbox.prefs["Programs"]["CryptoBoxRootActions"], "plugin", os.path.join(self.plugin_dir, "root_action.py"), "partition", self.blockdevice.get_device()]) for line in self.__get_sfdisk_layout(parts, is_filled): proc.stdin.write(line + "\n") #TODO: if running inside of an uml, then sfdisk hangs at "nanosleep({3,0})" # very ugly - maybe a uml bug? # it seems, like this can be avoided by running uml with the param "aio=2.4" (output, error) = proc.communicate() fdisk_status = proc.returncode if fdisk_status != 0: self.cbox.log.debug("partitioning failed: %s" % error) ##Every time we update a partition table, force the kernel ##to reread it and update /proc/partitions. This particularly ##applies to internal hard disks. rereadpt_proc = subprocess.Popen( shell = False, stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE, args = [ self.cbox.prefs["Programs"]["super"], self.cbox.prefs["Programs"]["CryptoBoxRootActions"], "plugin", os.path.join(self.plugin_dir, "root_action.py"), "rereadpt", self.blockdevice.get_device()]) (output, error) = rereadpt_proc.communicate() rereadpt_status = rereadpt_proc.returncode if rereadpt_status != 0: self.cbox.log.info("failed to reread the modified partition table: %s" % error) # refresh the device list self.cbox.reread_container_list() # update the blockdevice - especially the new partitioned children self.blockdevice.reset() return (fdisk_status == 0) def __get_sfdisk_layout(self, param_parts, is_filled): """this generator returns the input lines for sfdisk """ parts = param_parts[:] ## first a (possible) configuration partition - so it will be reusable if self.with_config_partition: ## fill the main table (including a config partition) yield ",%d,%s" % (CONFIGPARTITION["size"], CONFIGPARTITION["type"]) ## one primary partition if is_filled and (len(parts) == 1): ## fill the rest of the device yield ",,%s,*" % PARTTYPES[parts[0]["type"]][0] else: ## only use the specified size yield ",%d,%s,*" % (parts[0]["size"], PARTTYPES[parts[0]["type"]][0]) del parts[0] ## no extended partition, if there is only one disk if not parts: return ## an extended container for the rest yield ",,E" ## an empty partition in main table yield ";" ## maybe another empty partition if there is no config partition if not self.with_config_partition: yield ";" while parts: if is_filled and (len(parts) == 1): yield ",,%s" % (PARTTYPES[parts[0]["type"]][0],) else: yield ",%d,%s" % (parts[0]["size"], PARTTYPES[parts[0]["type"]][0]) del parts[0] def __format_partitions(self, param_parts): """Format all partitions of the device. """ parts = param_parts[:] part_num = 1 ## maybe a config partition? if self.with_config_partition: dev_name = self.__get_partition_device(self.blockdevice, part_num) self.cbox.log.info("formatting config partition (%s)" % dev_name) if self.__format_one_partition(dev_name, CONFIGPARTITION["fs"]): self.__set_label_of_partition(dev_name, self.cbox.prefs["Main"]["ConfigVolumeLabel"]) part_num += 1 ## the first data partition dev_name = self.__get_partition_device(self.blockdevice, part_num) part_type = PARTTYPES[parts[0]["type"]][1] self.cbox.log.info("formatting partition (%s) as '%s'" % (dev_name, part_type)) yield self.__format_one_partition(dev_name, part_type) del parts[0] ## other data partitions part_num += 1 while parts: dev_name = self.__get_partition_device(self.blockdevice, part_num) part_type = PARTTYPES[parts[0]["type"]][1] self.cbox.log.info("formatting partition (%s) as '%s'" % \ (dev_name, part_type)) yield self.__format_one_partition(dev_name, part_type) part_num += 1 del parts[0] return def __get_partition_device(self, blockdev, number): """Return the devicename of a specific partition of a device No tests are performed, whether the partition exists or not. """ valid_children = [] ## filter the storage devices for child in blockdev.children: childdev = blockdevice_tools.get_blockdevice(child) self.cbox.log.debug("Checking child device: %s - %s" % \ (blockdev, child)) if childdev and childdev.is_storage(): valid_children.append(childdev) valid_children.sort() self.cbox.log.debug("Valid children of %s: %s" % \ (blockdev, [child.name for child in valid_children])) if number <= len(valid_children): childdev = valid_children[number-1] if childdev: return childdev.get_device() ## return some guessed value - we should never get here ... guess = "%s%d" % (blockdev.get_device(), number) self.cbox.log.warn("Failed to get the partition name (%s, %d)" % \ (blockdev, number) + " - proceeding with guessed name: %s" % \ guess) return guess def __format_one_partition(self, dev_name, fs_type): """Format a single partition """ import cryptobox.core.container ## first: retrieve UUID - it can be removed from the database afterwards prev_name = [e.get_name() for e in self.cbox.get_container_list() if e.get_device() == dev_name] ## call "mkfs" try: format_dev = blockdevice_tools.get_blockdevice(dev_name) if format_dev is None: self.cbox.log.warn("Could not find the specified device: %s" % \ str(format_dev)) return False else: cont = cryptobox.core.container.CryptoBoxContainer(format_dev, self.cbox) cont.create(cryptobox.core.container.CONTAINERTYPES["plain"], fs_type=fs_type) except (CBInvalidType, CBCreateError, CBVolumeIsActive), err_msg: self.cbox.log.warn(err_msg) return False ## remove unused volume entry in case it exists if prev_name and self.cbox.prefs.volumes_db.has_key(prev_name[0]): del self.cbox.prefs.volumes_db[prev_name[0]] return True def __set_label_of_partition(self, dev_name, label): """Set the label of a partition - useful for the config partition. """ proc = subprocess.Popen( shell = False, stdout = subprocess.PIPE, stderr = subprocess.PIPE, args = [ self.cbox.prefs["Programs"]["super"], self.cbox.prefs["Programs"]["CryptoBoxRootActions"], "plugin", os.path.join(self.plugin_dir, "root_action.py"), "label", dev_name, label]) (output, error) = proc.communicate() if proc.returncode == 0: return True else: self.cbox.log.warn("failed to create filesystem on %s: %s" % (dev_name, error)) return False