cryptonas-branches/pythonrewrite/bin/CryptoBoxContainer.py
lars 0fe6d426ed added mounting and unmounting of config partition
moved config partition handling to CryptoBoxSettings
implemented environment checks (writeable config, https (off for now))
chown mounted directory after mount to the cryptobox user
2006-11-03 14:27:19 +00:00

579 lines
16 KiB
Python
Executable file

#!/usr/bin/env python2.4
## check python version
import sys
(ver_major, ver_minor, ver_sub, ver_desc, ver_subsub) = sys.version_info
if (ver_major < 2) or ((ver_major == 2) and (ver_minor < 4)):
sys.stderr.write("You need a python version >= 2.4\nCurrent version is:\n %s\n" % sys.version)
sys.exit(1)
import subprocess
import os
import re
import logging
from CryptoBoxExceptions import *
"""exceptions:
VolumeIsActive
NameActivelyUsed
InvalidName
InvalidPassword
InvalidType
CreateError
MountError
ChangePasswordError
"""
class CryptoBoxContainer:
Types = {
"unused":0,
"plain":1,
"luks":2,
"swap":3}
__fsTypes = {
"plain":["ext3", "ext2", "vfat", "reiser"],
"swap":["swap"]}
# TODO: more filesystem types? / check 'reiser'
__dmDir = "/dev/mapper"
def __init__(self, device, cbox):
self.device = device
self.cbox = cbox
self.log = logging.getLogger("CryptoBox")
self.resetObject()
def getName(self):
return self.name
def setName(self, new_name):
if new_name == self.name: return
if self.isMounted():
raise CBVolumeIsActive("the container must be inactive during renaming")
if not re.search(r'^[a-zA-Z0-9_\.\- ]+$', new_name):
raise CBInvalidName("the supplied new name contains illegal characters")
"check for active partitions with the same name"
prev_name_owner = self.cbox.getContainerList(filterName=new_name)
if prev_name_owner:
for a in prev_name_owner:
if a.isMounted():
raise CBNameActivelyUsed("the supplied new name is already in use for an active partition")
if not self.cbox.setNameForUUID(self.uuid, new_name):
raise CBContainerError("failed to change the volume name for unknown reasons")
self.name = new_name
def getDevice(self):
return self.device
def getType(self):
return self.type
def isMounted(self):
return os.path.ismount(self.__getMountPoint())
def getCapacity(self):
"""return the current capacity state of the volume
the volume may not be mounted
the result is a tuple of values in megabyte:
(size, available, used)
"""
info = os.statvfs(self.__getMountPoint())
return (
int(info.f_bsize*info.f_blocks/1024/1024),
int(info.f_bsize*info.f_bavail/1024/1024),
int(info.f_bsize*(info.f_blocks-info.f_bavail)/1024/1024))
def getSize(self):
"""return the size of the block device (_not_ of the filesystem)
the result is a value in megabyte
an error is indicated by "-1"
"""
import CryptoBoxTools
return CryptoBoxTools.getBlockDeviceSize(self.device)
def resetObject(self):
""" recheck the information about this container
this is especially useful after changing the type via 'create' """
self.uuid = self.__getUUID()
self.type = self.__getTypeOfPartition()
self.name = self.__getNameOfContainer()
if self.type == self.Types["luks"]:
self.mount = self.__mountLuks
self.umount = self.__umountLuks
elif self.type == self.Types["plain"]:
self.mount = self.__mountPlain
self.umount = self.__umountPlain
def create(self, type, password=None):
if type == self.Types["luks"]:
self.__createLuks(password)
self.resetObject()
return
if type == self.Types["plain"]:
self.__createPlain()
self.resetObject()
return
raise CBInvalidType("invalid container type (%d) supplied" % (type, ))
def changePassword(self, oldpw, newpw):
if self.type != self.Types["luks"]:
raise CBInvalidType("changing of password is possible only for luks containers")
if not oldpw:
raise CBInvalidPassword("no old password supplied for password change")
if not newpw:
raise CBInvalidPassword("no new password supplied for password change")
"return if new and old passwords are the same"
if oldpw == newpw: return
if self.isMounted():
raise CBVolumeIsActive("this container is currently active")
devnull = None
try:
devnull = open(os.devnull, "w")
except IOError:
self.log.warn("Could not open %s" % (os.devnull, ))
"remove any potential open luks mapping"
self.__umountLuks()
"create the luks header"
proc = subprocess.Popen(
shell = False,
stdin = subprocess.PIPE,
stdout = subprocess.PIPE,
stderr = subprocess.PIPE,
args = [
self.cbox.prefs["Programs"]["super"],
self.cbox.prefs["Programs"]["CryptoBoxRootActions"],
"cryptsetup",
"luksAddKey",
self.device,
"--batch-mode"])
proc.stdin.write("%s\n%s" % (oldpw, newpw))
(output, errout) = proc.communicate()
if proc.returncode != 0:
errorMsg = "Could not add a new luks key: %s - %s" % (output.strip(), errout.strip(), )
self.log.error(errorMsg)
raise CBChangePasswordError(errorMsg)
## retrieve the key slot we used for unlocking
keys_found = re.search(r'key slot (\d{1,3}) unlocked', output).groups()
if keys_found:
keyslot = int(keys_found[0])
else:
raise CBChangePasswordError("could not get the old key slot")
"remove the old key"
proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = devnull,
stderr = subprocess.PIPE,
args = [
self.cbox.prefs["Programs"]["cryptsetup"],
"--batch-mode",
"luksDelKey",
self.device,
"%d" % (keyslot, )])
proc.wait()
if proc.returncode != 0:
errorMsg = "Could not remove the old luks key: %s" % (proc.stderr.read().strip(), )
self.log.error(errorMsg)
raise CBChangePasswordError(errorMsg)
" ****************** internal stuff ********************* "
def __getNameOfContainer(self):
"retrieve the name of the container by querying the database"
def_name = self.cbox.getNameForUUID(self.uuid)
if def_name: return def_name
"there is no name defined for this uuid - we will propose a good one"
prefix = self.cbox.prefs["Main"]["DefaultVolumePrefix"]
unused_found = False
counter = 1
while not unused_found:
guess = prefix + str(counter)
if self.cbox.getUUIDForName(guess):
counter += 1
else:
unused_found = True
self.cbox.setNameForUUID(self.uuid, guess)
return guess
def __getUUID(self):
"""return UUID for luks partitions, ext2/3 and vfat filesystems"""
emergency_default = self.device.replace(os.path.sep, "_")
devnull = None
try:
devnull = open(os.devnull, "w")
except IOError:
self.warn("Could not open %s" % (os.devnull, ))
proc = subprocess.Popen(
shell=False,
stdin=None,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
args=[self.cbox.prefs["Programs"]["blkid"],
"-s", "UUID",
"-o", "value",
"-c", os.devnull,
"-w", os.devnull,
self.device])
proc.wait()
result = proc.stdout.read().strip()
if proc.returncode != 0:
self.log.warn("retrieving of partition type via 'blkid' failed: %s" % (proc.stderr.read().strip(), ))
return emergency_default
devnull.close()
if result: return result
return emergency_default
def __getTypeOfPartition(self):
"retrieve the type of the given partition (see CryptoBoxContainer.Types)"
if self.__isLuksPartition(): return self.Types["luks"]
typeOfPartition = self.__getTypeIdOfPartition()
if typeOfPartition in self.__fsTypes["plain"]:
return self.Types["plain"]
if typeOfPartition in self.__fsTypes["swap"]:
return self.Types["swap"]
return self.Types["unused"]
def __getTypeIdOfPartition(self):
"returns the type of the partition (see 'man blkid')"
devnull = None
try:
devnull = open(os.devnull, "w")
except IOError:
self.log.warn("Could not open %s" % (os.devnull, ))
proc = subprocess.Popen(
shell=False,
stdin=None,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
args=[self.cbox.prefs["Programs"]["blkid"],
"-s", "TYPE",
"-o", "value",
"-c", os.devnull,
"-w", os.devnull,
self.device])
proc.wait()
output = proc.stdout.read().strip()
if proc.returncode != 0:
self.log.warn("retrieving of partition type via 'blkid' failed: %s" % (proc.stderr.read().strip(), ))
return None
devnull.close()
return output
def __isLuksPartition(self):
"check if the given device is a luks partition"
devnull = None
try:
devnull = open(os.devnull, "w")
except IOError:
self.log.warn("Could not open %s" % (os.devnull, ))
proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = devnull,
stderr = devnull,
args = [
self.cbox.prefs["Programs"]["cryptsetup"],
"--batch-mode",
"isLuks",
self.device])
proc.wait()
devnull.close()
return proc.returncode == 0
def __getMountPoint(self):
"return the name of the mountpoint of this volume"
return os.path.join(self.cbox.prefs["Locations"]["MountParentDir"], self.name)
def __mountLuks(self, password):
"mount a luks partition"
if not password:
raise CBInvalidPassword("no password supplied for luksOpen")
if self.isMounted(): raise CBVolumeIsActive("this container is already active")
self.__umountLuks()
try:
devnull = open(os.devnull, "w")
except IOError:
self.log.warn("Could not open %s" % (os.devnull, ))
self.__cleanMountDirs()
if not os.path.exists(self.__getMountPoint()):
os.mkdir(self.__getMountPoint())
if not os.path.exists(self.__getMountPoint()):
errorMsg = "Could not create mountpoint (%s)" % (self.__getMountPoint(), )
self.log.error(errorMsg)
raise CBMountError(errorMsg)
proc = subprocess.Popen(
shell = False,
stdin = subprocess.PIPE,
stdout = devnull,
stderr = subprocess.PIPE,
args = [
self.cbox.prefs["Programs"]["super"],
self.cbox.prefs["Programs"]["CryptoBoxRootActions"],
"cryptsetup",
"luksOpen",
self.device,
self.name,
"--batch-mode"])
proc.stdin.write(password)
(output, errout) = proc.communicate()
if proc.returncode != 0:
errorMsg = "Could not open the luks mapping: %s" % (errout.strip(), )
self.log.warn(errorMsg)
raise CBMountError(errorMsg)
proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = devnull,
stderr = subprocess.PIPE,
args = [
self.cbox.prefs["Programs"]["super"],
self.cbox.prefs["Programs"]["CryptoBoxRootActions"],
"mount",
os.path.join(self.__dmDir, self.name),
self.__getMountPoint()])
proc.wait()
if proc.returncode != 0:
errorMsg = "Could not mount the filesystem: %s" % (proc.stderr.read().strip(), )
self.log.warn(errorMsg)
raise CBMountError(errorMsg)
devnull.close()
def __umountLuks(self):
"umount a luks partition"
devnull = None
try:
devnull = open(os.devnull, "w")
except IOError:
self.log.warn("Could not open %s" % (os.devnull, ))
if self.isMounted():
proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = devnull,
stderr = subprocess.PIPE,
args = [
self.cbox.prefs["Programs"]["super"],
self.cbox.prefs["Programs"]["CryptoBoxRootActions"],
"umount",
self.__getMountPoint()])
proc.wait()
if proc.returncode != 0:
errorMsg = "Could not umount the filesystem: %s" % (proc.stderr.read().strip(), )
self.log.warn(errorMsg)
raise CBUmountError(errorMsg)
if os.path.exists(os.path.join(self.__dmDir, self.name)):
proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = devnull,
stderr = subprocess.PIPE,
args = [
self.cbox.prefs["Programs"]["super"],
self.cbox.prefs["Programs"]["CryptoBoxRootActions"],
"cryptsetup",
"luksClose",
self.name,
"--batch-mode"])
proc.wait()
if proc.returncode != 0:
errorMsg = "Could not remove the luks mapping: %s" % (proc.stderr.read().strip(), )
self.log.warn(errorMsg)
raise CBUmountError(errorMsg)
devnull.close()
def __mountPlain(self):
"mount a plaintext partition"
if self.isMounted(): raise CBVolumeIsActive("this container is already active")
devnull = None
try:
devnull = open(os.devnull, "w")
except IOError:
self.log.warn("Could not open %s" % (os.devnull, ))
self.__cleanMountDirs()
if not os.path.exists(self.__getMountPoint()):
os.mkdir(self.__getMountPoint())
if not os.path.exists(self.__getMountPoint()):
errorMsg = "Could not create mountpoint (%s)" % (self.__getMountPoint(), )
self.log.error(errorMsg)
raise CBMountError(errorMsg)
proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = devnull,
stderr = subprocess.PIPE,
args = [
self.cbox.prefs["Programs"]["super"],
self.cbox.prefs["Programs"]["CryptoBoxRootActions"],
"mount",
self.device,
self.__getMountPoint()])
proc.wait()
if proc.returncode != 0:
errorMsg = "Could not mount the filesystem: %s" % (proc.stderr.read().strip(), )
self.log.warn(errorMsg)
raise CBMountError(errorMsg)
devnull.close()
def __umountPlain(self):
"umount a plaintext partition"
devnull = None
try:
devnull = open(os.devnull, "w")
except IOError:
self.log.warn("Could not open %s" % (os.devnull, ))
if self.isMounted():
proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = devnull,
stderr = subprocess.PIPE,
args = [
self.cbox.prefs["Programs"]["super"],
self.cbox.prefs["Programs"]["CryptoBoxRootActions"],
"umount",
self.__getMountPoint()])
proc.wait()
if proc.returncode != 0:
errorMsg = "Could not umount the filesystem: %s" % (proc.stderr.read().strip(), )
self.log.warn(errorMsg)
raise CBUmountError(errorMsg)
devnull.close()
def __createPlain(self):
"make a plaintext partition"
if self.isMounted():
raise CBVolumeIsActive("deactivate the partition before filesystem initialization")
devnull = None
try:
devnull = open(os.devnull, "w")
except IOError:
self.log.warn("Could not open %s" % (os.devnull, ))
proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = devnull,
stderr = subprocess.PIPE,
args = [
self.cbox.prefs["Programs"]["mkfs-data"],
self.device])
proc.wait()
if proc.returncode != 0:
errorMsg = "Could not create the filesystem: %s" % (proc.stderr.read().strip(), )
self.log.error(errorMsg)
raise CBCreateError(errorMsg)
devnull.close()
def __createLuks(self, password):
"make a luks partition"
if not password:
raise CBInvalidPassword("no password supplied for new luks mapping")
if self.isMounted():
raise CBVolumeIsActive("deactivate the partition before filesystem initialization")
devnull = None
try:
devnull = open(os.devnull, "w")
except IOError:
self.log.warn("Could not open %s" % (os.devnull, ))
"remove any potential open luks mapping"
self.__umountLuks()
"create the luks header"
proc = subprocess.Popen(
shell = False,
stdin = subprocess.PIPE,
stdout = devnull,
stderr = subprocess.PIPE,
args = [
self.cbox.prefs["Programs"]["super"],
self.cbox.prefs["Programs"]["CryptoBoxRootActions"],
"cryptsetup",
"luksFormat",
self.device,
"--batch-mode",
"--cipher", self.cbox.prefs["Main"]["DefaultCipher"],
"--iter-time", "2000"])
proc.stdin.write(password)
(output, errout) = proc.communicate()
if proc.returncode != 0:
errorMsg = "Could not create the luks header: %s" % (errout.strip(), )
self.log.error(errorMsg)
raise CBCreateError(errorMsg)
"open the luks container for mkfs"
proc = subprocess.Popen(
shell = False,
stdin = subprocess.PIPE,
stdout = devnull,
stderr = subprocess.PIPE,
args = [
self.cbox.prefs["Programs"]["super"],
self.cbox.prefs["Programs"]["CryptoBoxRootActions"],
"cryptsetup",
"luksOpen",
self.device,
self.name,
"--batch-mode"])
proc.stdin.write(password)
(output, errout) = proc.communicate()
if proc.returncode != 0:
errorMsg = "Could not open the new luks mapping: %s" % (errout.strip(), )
self.log.error(errorMsg)
raise CBCreateError(errorMsg)
"make the filesystem"
proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = devnull,
stderr = subprocess.PIPE,
args = [
self.cbox.prefs["Programs"]["mkfs-data"],
os.path.join(self.__dmDir, self.name)])
proc.wait()
"remove the mapping - for every exit status"
self.__umountLuks()
if proc.returncode != 0:
errorMsg = "Could not create the filesystem: %s" % (proc.stderr.read().strip(), )
self.log.error(errorMsg)
"remove the luks mapping"
raise CBCreateError(errorMsg)
devnull.close()
def __cleanMountDirs(self):
""" remove all unnecessary subdirs of the mount parent directory
this should be called for every (u)mount """
subdirs = os.listdir(self.cbox.prefs["Locations"]["MountParentDir"])
for dir in subdirs:
abs_dir = os.path.join(self.cbox.prefs["Locations"]["MountParentDir"], dir)
if (not os.path.islink(abs_dir)) and os.path.isdir(abs_dir) and (not os.path.ismount(abs_dir)):
os.rmdir(abs_dir)