cryptonas/bin/CryptoBoxContainer.py

652 lines
19 KiB
Python
Executable file

#!/usr/bin/env python2.4
## check python version
import sys
(ver_major, ver_minor, ver_sub, ver_desc, ver_subsub) = sys.version_info
if (ver_major < 2) or ((ver_major == 2) and (ver_minor < 4)):
sys.stderr.write("You need a python version >= 2.4\nCurrent version is:\n %s\n" % sys.version)
sys.exit(1)
import subprocess
import os
import re
import logging
from CryptoBoxExceptions import *
"""exceptions:
CBVolumeIsActive
CBNameActivelyUsed
CBInvalidName
CBInvalidPassword
CBInvalidType
CBCreateError
CBMountError
CBChangePasswordError
"""
class CryptoBoxContainer:
Types = {
"unused":0,
"plain":1,
"luks":2,
"swap":3}
__fsTypes = {
"plain":["ext3", "ext2", "vfat", "reiser"],
"swap":["swap"]}
# TODO: more filesystem types? / check 'reiser'
__dmDir = "/dev/mapper"
def __init__(self, device, cbox):
self.device = device
self.cbox = cbox
self.log = logging.getLogger("CryptoBox")
self.resetObject()
def getName(self):
return self.name
def __setAttributes(self):
try:
## is there already an entry in the database?
self.attributes = self.cbox.prefs.volumesDB[self.getName()]
self.attributes["uuid"] = self.uuid
except KeyError:
## set default values
self.attributes = { "uuid": self.uuid }
self.cbox.prefs.volumesDB[self.getName()] = self.attributes
def setName(self, new_name):
old_name = self.getName()
if new_name == self.name: return
## renaming is not possible, if the volume is active, as the mountpoint name
## is the same as the volume name
if self.isMounted():
raise CBVolumeIsActive("the container must not be active during renaming")
if not re.search(r'^[a-zA-Z0-9_\.\- ]+$', new_name):
raise CBInvalidName("the supplied new name contains illegal characters")
## check for another partitions with the same name
if self.cbox.getContainerList(filterName=new_name):
raise CBNameIsInUse("the supplied new name is already in use for anonther partition")
## maybe there a is an entry in the volumes database (but the partition is not active
try:
## remove possibly existing inactive database item
del self.cbox.prefs.volumesDB[new_name]
except KeyError:
## no entry - so nothing happens
pass
## set new name
self.name = new_name
## remove old database entry
try:
del self.cbox.prefs.volumesDB[old_name]
except KeyError:
pass
## set new volumes database entry
self.cbox.prefs.volumesDB[new_name] = self.attributes
self.cbox.prefs.volumesDB.write()
def getDevice(self):
return self.device
def getType(self):
return self.type
def isMounted(self):
return os.path.ismount(self.__getMountPoint())
def getCapacity(self):
"""return the current capacity state of the volume
the volume may not be mounted
the result is a tuple of values in megabyte:
(size, available, used)
"""
info = os.statvfs(self.__getMountPoint())
return (
int(info.f_bsize*info.f_blocks/1024/1024),
int(info.f_bsize*info.f_bavail/1024/1024),
int(info.f_bsize*(info.f_blocks-info.f_bavail)/1024/1024))
def getSize(self):
"""return the size of the block device (_not_ of the filesystem)
the result is a value in megabyte
an error is indicated by "-1"
"""
import CryptoBoxTools
return CryptoBoxTools.getBlockDeviceSize(self.device)
def resetObject(self):
""" recheck the information about this container
this is especially useful after changing the type via 'create' """
self.uuid = self.__getUUID()
self.type = self.__getTypeOfPartition()
self.name = self.__getNameOfContainer()
self.__setAttributes()
if self.type == self.Types["luks"]:
self.mount = self.__mountLuks
self.umount = self.__umountLuks
elif self.type == self.Types["plain"]:
self.mount = self.__mountPlain
self.umount = self.__umountPlain
def create(self, type, password=None):
old_name = self.getName()
if type == self.Types["luks"]:
self.__createLuks(password)
elif type == self.Types["plain"]:
self.__createPlain()
else:
raise CBInvalidType("invalid container type (%d) supplied" % (type, ))
## no exception was raised during creation -> we can continue
## reset the properties (encryption state, ...) of the device
self.resetObject()
## restore the old name (must be after resetObject)
self.setName(old_name)
def changePassword(self, oldpw, newpw):
if self.type != self.Types["luks"]:
raise CBInvalidType("changing of password is possible only for luks containers")
if not oldpw:
raise CBInvalidPassword("no old password supplied for password change")
if not newpw:
raise CBInvalidPassword("no new password supplied for password change")
"return if new and old passwords are the same"
if oldpw == newpw: return
if self.isMounted():
raise CBVolumeIsActive("this container is currently active")
devnull = None
try:
devnull = open(os.devnull, "w")
except IOError:
self.log.warn("Could not open %s" % (os.devnull, ))
"remove any potential open luks mapping"
self.__umountLuks()
"create the luks header"
proc = subprocess.Popen(
shell = False,
stdin = subprocess.PIPE,
stdout = subprocess.PIPE,
stderr = subprocess.PIPE,
args = [
self.cbox.prefs["Programs"]["super"],
self.cbox.prefs["Programs"]["CryptoBoxRootActions"],
"cryptsetup",
"luksAddKey",
self.device,
"--batch-mode"])
proc.stdin.write("%s\n%s" % (oldpw, newpw))
(output, errout) = proc.communicate()
if proc.returncode != 0:
errorMsg = "Could not add a new luks key: %s - %s" % (output.strip(), errout.strip(), )
self.log.error(errorMsg)
raise CBChangePasswordError(errorMsg)
## retrieve the key slot we used for unlocking
keys_found = re.search(r'key slot (\d{1,3}) unlocked', output).groups()
if keys_found:
keyslot = int(keys_found[0])
else:
raise CBChangePasswordError("could not get the old key slot")
"remove the old key"
proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = devnull,
stderr = subprocess.PIPE,
args = [
self.cbox.prefs["Programs"]["cryptsetup"],
"--batch-mode",
"luksDelKey",
self.device,
"%d" % (keyslot, )])
proc.wait()
if proc.returncode != 0:
errorMsg = "Could not remove the old luks key: %s" % (proc.stderr.read().strip(), )
self.log.error(errorMsg)
raise CBChangePasswordError(errorMsg)
" ****************** internal stuff ********************* "
def __getNameOfContainer(self):
"""retrieve the name of the container by querying the database
call this function only for the initial setup of the container object"""
found_name = None
for key in self.cbox.prefs.volumesDB.keys():
if self.cbox.prefs.volumesDB[key]["uuid"] == self.uuid:
found_name = key
if found_name: return found_name
## there is no name defined for this uuid - we will propose a good one
prefix = self.cbox.prefs["Main"]["DefaultVolumePrefix"]
unused_found = False
counter = 1
while not unused_found:
guess = prefix + str(counter)
if self.cbox.prefs.volumesDB.has_key(guess):
counter += 1
else:
unused_found = True
return guess
def __getUUID(self):
if self.__getTypeOfPartition() == self.Types["luks"]:
guess = self.__getLuksUUID()
else:
guess = self.__getNonLuksUUID()
## did we get a valid value?
if guess:
return guess
else:
## emergency default value
return self.device.replace(os.path.sep, "_")
def __getLuksUUID(self):
"""get uuid for luks devices"""
proc = subprocess.Popen(
shell = False,
stdout = subprocess.PIPE,
stderr = subprocess.PIPE,
args = [self.cbox.prefs["Programs"]["cryptsetup"],
"luksUUID",
self.device])
(stdout, stderr) = proc.communicate()
if proc.returncode != 0:
self.cbox.log.info("could not retrieve luks uuid (%s): %s", (self.device, stderr.strip()))
return None
return stdout.strip()
def __getNonLuksUUID(self):
"""return UUID for ext2/3 and vfat filesystems"""
try:
devnull = open(os.devnull, "w")
except IOError:
self.warn("Could not open %s" % (os.devnull, ))
proc = subprocess.Popen(
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
args=[self.cbox.prefs["Programs"]["blkid"],
"-s", "UUID",
"-o", "value",
"-c", os.devnull,
"-w", os.devnull,
self.device])
(stdout, stderr) = proc.communicate()
devnull.close()
## execution failed?
if proc.returncode != 0:
self.log.info("retrieving of partition type (%s) via 'blkid' failed: %s - maybe it is encrypted?" % (self.device, stderr.strip()))
return None
## return output of blkid
return stdout.strip()
def __getTypeOfPartition(self):
"retrieve the type of the given partition (see CryptoBoxContainer.Types)"
if self.__isLuksPartition(): return self.Types["luks"]
typeOfPartition = self.__getTypeIdOfPartition()
if typeOfPartition in self.__fsTypes["plain"]:
return self.Types["plain"]
if typeOfPartition in self.__fsTypes["swap"]:
return self.Types["swap"]
return self.Types["unused"]
def __getTypeIdOfPartition(self):
"returns the type of the partition (see 'man blkid')"
devnull = None
try:
devnull = open(os.devnull, "w")
except IOError:
self.log.warn("Could not open %s" % (os.devnull, ))
proc = subprocess.Popen(
shell=False,
stdin=None,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
args=[self.cbox.prefs["Programs"]["blkid"],
"-s", "TYPE",
"-o", "value",
"-c", os.devnull,
"-w", os.devnull,
self.device])
proc.wait()
output = proc.stdout.read().strip()
if proc.returncode != 0:
self.log.warn("retrieving of partition type via 'blkid' failed: %s" % (proc.stderr.read().strip(), ))
return None
devnull.close()
return output
def __isLuksPartition(self):
"check if the given device is a luks partition"
devnull = None
try:
devnull = open(os.devnull, "w")
except IOError:
self.log.warn("Could not open %s" % (os.devnull, ))
proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = devnull,
stderr = devnull,
args = [
self.cbox.prefs["Programs"]["cryptsetup"],
"--batch-mode",
"isLuks",
self.device])
proc.wait()
devnull.close()
return proc.returncode == 0
def __getMountPoint(self):
"return the name of the mountpoint of this volume"
return os.path.join(self.cbox.prefs["Locations"]["MountParentDir"], self.name)
def __mountLuks(self, password):
"mount a luks partition"
if not password:
raise CBInvalidPassword("no password supplied for luksOpen")
if self.isMounted(): raise CBVolumeIsActive("this container is already active")
self.__umountLuks()
try:
devnull = open(os.devnull, "w")
except IOError:
self.log.warn("Could not open %s" % (os.devnull, ))
self.__cleanMountDirs()
if not os.path.exists(self.__getMountPoint()):
os.mkdir(self.__getMountPoint())
if not os.path.exists(self.__getMountPoint()):
errorMsg = "Could not create mountpoint (%s)" % (self.__getMountPoint(), )
self.log.error(errorMsg)
raise CBMountError(errorMsg)
self.cbox.sendEventNotification("premount", self.__getEventArgs())
proc = subprocess.Popen(
shell = False,
stdin = subprocess.PIPE,
stdout = devnull,
stderr = subprocess.PIPE,
args = [
self.cbox.prefs["Programs"]["super"],
self.cbox.prefs["Programs"]["CryptoBoxRootActions"],
"cryptsetup",
"luksOpen",
self.device,
self.name,
"--batch-mode"])
proc.stdin.write(password)
(output, errout) = proc.communicate()
if proc.returncode != 0:
errorMsg = "Could not open the luks mapping: %s" % (errout.strip(), )
self.log.warn(errorMsg)
raise CBMountError(errorMsg)
proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = devnull,
stderr = subprocess.PIPE,
args = [
self.cbox.prefs["Programs"]["super"],
self.cbox.prefs["Programs"]["CryptoBoxRootActions"],
"mount",
os.path.join(self.__dmDir, self.name),
self.__getMountPoint()])
proc.wait()
if proc.returncode != 0:
errorMsg = "Could not mount the filesystem: %s" % (proc.stderr.read().strip(), )
self.log.warn(errorMsg)
raise CBMountError(errorMsg)
devnull.close()
self.cbox.sendEventNotification("postmount", self.__getEventArgs())
def __umountLuks(self):
"umount a luks partition"
devnull = None
try:
devnull = open(os.devnull, "w")
except IOError:
self.log.warn("Could not open %s" % (os.devnull, ))
self.cbox.sendEventNotification("preumount", self.__getEventArgs())
if self.isMounted():
proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = devnull,
stderr = subprocess.PIPE,
args = [
self.cbox.prefs["Programs"]["super"],
self.cbox.prefs["Programs"]["CryptoBoxRootActions"],
"umount",
self.__getMountPoint()])
proc.wait()
if proc.returncode != 0:
errorMsg = "Could not umount the filesystem: %s" % (proc.stderr.read().strip(), )
self.log.warn(errorMsg)
raise CBUmountError(errorMsg)
if os.path.exists(os.path.join(self.__dmDir, self.name)):
proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = devnull,
stderr = subprocess.PIPE,
args = [
self.cbox.prefs["Programs"]["super"],
self.cbox.prefs["Programs"]["CryptoBoxRootActions"],
"cryptsetup",
"luksClose",
self.name,
"--batch-mode"])
proc.wait()
if proc.returncode != 0:
errorMsg = "Could not remove the luks mapping: %s" % (proc.stderr.read().strip(), )
self.log.warn(errorMsg)
raise CBUmountError(errorMsg)
devnull.close()
self.cbox.sendEventNotification("postumount", self.__getEventArgs())
def __mountPlain(self):
"mount a plaintext partition"
if self.isMounted(): raise CBVolumeIsActive("this container is already active")
devnull = None
try:
devnull = open(os.devnull, "w")
except IOError:
self.log.warn("Could not open %s" % (os.devnull, ))
self.__cleanMountDirs()
if not os.path.exists(self.__getMountPoint()):
os.mkdir(self.__getMountPoint())
if not os.path.exists(self.__getMountPoint()):
errorMsg = "Could not create mountpoint (%s)" % (self.__getMountPoint(), )
self.log.error(errorMsg)
raise CBMountError(errorMsg)
self.cbox.sendEventNotification("premount", self.__getEventArgs())
proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = devnull,
stderr = subprocess.PIPE,
args = [
self.cbox.prefs["Programs"]["super"],
self.cbox.prefs["Programs"]["CryptoBoxRootActions"],
"mount",
self.device,
self.__getMountPoint()])
proc.wait()
if proc.returncode != 0:
errorMsg = "Could not mount the filesystem: %s" % (proc.stderr.read().strip(), )
self.log.warn(errorMsg)
raise CBMountError(errorMsg)
devnull.close()
self.cbox.sendEventNotification("postmount", self.__getEventArgs())
def __umountPlain(self):
"umount a plaintext partition"
if not self.isMounted():
self.cbox.log.info("trying to umount while volume (%s) is mounted" % self.getDevice())
return
devnull = None
try:
devnull = open(os.devnull, "w")
except IOError:
self.log.warn("Could not open %s" % (os.devnull, ))
self.cbox.sendEventNotification("preumount", self.__getEventArgs())
proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = devnull,
stderr = subprocess.PIPE,
args = [
self.cbox.prefs["Programs"]["super"],
self.cbox.prefs["Programs"]["CryptoBoxRootActions"],
"umount",
self.__getMountPoint()])
proc.wait()
if proc.returncode != 0:
errorMsg = "Could not umount the filesystem: %s" % (proc.stderr.read().strip(), )
self.log.warn(errorMsg)
raise CBUmountError(errorMsg)
devnull.close()
self.cbox.sendEventNotification("postumount", self.__getEventArgs())
def __createPlain(self):
"make a plaintext partition"
if self.isMounted():
raise CBVolumeIsActive("deactivate the partition before filesystem initialization")
devnull = None
try:
devnull = open(os.devnull, "w")
except IOError:
self.log.warn("Could not open %s" % (os.devnull, ))
proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = devnull,
stderr = subprocess.PIPE,
args = [
self.cbox.prefs["Programs"]["mkfs-data"],
self.device])
proc.wait()
if proc.returncode != 0:
errorMsg = "Could not create the filesystem: %s" % (proc.stderr.read().strip(), )
self.log.error(errorMsg)
raise CBCreateError(errorMsg)
devnull.close()
def __createLuks(self, password):
"make a luks partition"
if not password:
raise CBInvalidPassword("no password supplied for new luks mapping")
if self.isMounted():
raise CBVolumeIsActive("deactivate the partition before filesystem initialization")
devnull = None
try:
devnull = open(os.devnull, "w")
except IOError:
self.log.warn("Could not open %s" % (os.devnull, ))
"remove any potential open luks mapping"
self.__umountLuks()
"create the luks header"
proc = subprocess.Popen(
shell = False,
stdin = subprocess.PIPE,
stdout = devnull,
stderr = subprocess.PIPE,
args = [
self.cbox.prefs["Programs"]["super"],
self.cbox.prefs["Programs"]["CryptoBoxRootActions"],
"cryptsetup",
"luksFormat",
self.device,
"--batch-mode",
"--cipher", self.cbox.prefs["Main"]["DefaultCipher"],
"--iter-time", "2000"])
proc.stdin.write(password)
(output, errout) = proc.communicate()
if proc.returncode != 0:
errorMsg = "Could not create the luks header: %s" % (errout.strip(), )
self.log.error(errorMsg)
raise CBCreateError(errorMsg)
"open the luks container for mkfs"
proc = subprocess.Popen(
shell = False,
stdin = subprocess.PIPE,
stdout = devnull,
stderr = subprocess.PIPE,
args = [
self.cbox.prefs["Programs"]["super"],
self.cbox.prefs["Programs"]["CryptoBoxRootActions"],
"cryptsetup",
"luksOpen",
self.device,
self.name,
"--batch-mode"])
proc.stdin.write(password)
(output, errout) = proc.communicate()
if proc.returncode != 0:
errorMsg = "Could not open the new luks mapping: %s" % (errout.strip(), )
self.log.error(errorMsg)
raise CBCreateError(errorMsg)
"make the filesystem"
proc = subprocess.Popen(
shell = False,
stdin = None,
stdout = devnull,
stderr = subprocess.PIPE,
args = [
self.cbox.prefs["Programs"]["mkfs-data"],
os.path.join(self.__dmDir, self.name)])
proc.wait()
"remove the mapping - for every exit status"
self.__umountLuks()
if proc.returncode != 0:
errorMsg = "Could not create the filesystem: %s" % (proc.stderr.read().strip(), )
self.log.error(errorMsg)
"remove the luks mapping"
raise CBCreateError(errorMsg)
devnull.close()
def __cleanMountDirs(self):
""" remove all unnecessary subdirs of the mount parent directory
this should be called for every (u)mount """
subdirs = os.listdir(self.cbox.prefs["Locations"]["MountParentDir"])
for dir in subdirs:
abs_dir = os.path.join(self.cbox.prefs["Locations"]["MountParentDir"], dir)
if (not os.path.islink(abs_dir)) and os.path.isdir(abs_dir) and (not os.path.ismount(abs_dir)):
os.rmdir(abs_dir)
def __getEventArgs(self):
"""return an array of arguments for hook scripts handling pre/post-mount/umount
events"""
typeText = [e for e in self.Types.keys() if self.Types[e] == self.getType()][0]
return [self.getDevice(), self.getName(), typeText, self.__getMountPoint()]