lars
0fe6d426ed
moved config partition handling to CryptoBoxSettings implemented environment checks (writeable config, https (off for now)) chown mounted directory after mount to the cryptobox user
578 lines
16 KiB
Python
Executable file
578 lines
16 KiB
Python
Executable file
#!/usr/bin/env python2.4
|
|
|
|
## check python version
|
|
import sys
|
|
(ver_major, ver_minor, ver_sub, ver_desc, ver_subsub) = sys.version_info
|
|
if (ver_major < 2) or ((ver_major == 2) and (ver_minor < 4)):
|
|
sys.stderr.write("You need a python version >= 2.4\nCurrent version is:\n %s\n" % sys.version)
|
|
sys.exit(1)
|
|
|
|
import subprocess
|
|
import os
|
|
import re
|
|
import logging
|
|
from CryptoBoxExceptions import *
|
|
|
|
"""exceptions:
|
|
VolumeIsActive
|
|
NameActivelyUsed
|
|
InvalidName
|
|
InvalidPassword
|
|
InvalidType
|
|
CreateError
|
|
MountError
|
|
ChangePasswordError
|
|
"""
|
|
|
|
class CryptoBoxContainer:
|
|
|
|
Types = {
|
|
"unused":0,
|
|
"plain":1,
|
|
"luks":2,
|
|
"swap":3}
|
|
|
|
|
|
__fsTypes = {
|
|
"plain":["ext3", "ext2", "vfat", "reiser"],
|
|
"swap":["swap"]}
|
|
# TODO: more filesystem types? / check 'reiser'
|
|
|
|
__dmDir = "/dev/mapper"
|
|
|
|
|
|
def __init__(self, device, cbox):
|
|
self.device = device
|
|
self.cbox = cbox
|
|
self.log = logging.getLogger("CryptoBox")
|
|
self.resetObject()
|
|
|
|
|
|
def getName(self):
|
|
return self.name
|
|
|
|
|
|
def setName(self, new_name):
|
|
if new_name == self.name: return
|
|
if self.isMounted():
|
|
raise CBVolumeIsActive("the container must be inactive during renaming")
|
|
if not re.search(r'^[a-zA-Z0-9_\.\- ]+$', new_name):
|
|
raise CBInvalidName("the supplied new name contains illegal characters")
|
|
"check for active partitions with the same name"
|
|
prev_name_owner = self.cbox.getContainerList(filterName=new_name)
|
|
if prev_name_owner:
|
|
for a in prev_name_owner:
|
|
if a.isMounted():
|
|
raise CBNameActivelyUsed("the supplied new name is already in use for an active partition")
|
|
if not self.cbox.setNameForUUID(self.uuid, new_name):
|
|
raise CBContainerError("failed to change the volume name for unknown reasons")
|
|
self.name = new_name
|
|
|
|
|
|
def getDevice(self):
|
|
return self.device
|
|
|
|
|
|
def getType(self):
|
|
return self.type
|
|
|
|
|
|
def isMounted(self):
|
|
return os.path.ismount(self.__getMountPoint())
|
|
|
|
|
|
def getCapacity(self):
|
|
"""return the current capacity state of the volume
|
|
|
|
the volume may not be mounted
|
|
the result is a tuple of values in megabyte:
|
|
(size, available, used)
|
|
"""
|
|
info = os.statvfs(self.__getMountPoint())
|
|
return (
|
|
int(info.f_bsize*info.f_blocks/1024/1024),
|
|
int(info.f_bsize*info.f_bavail/1024/1024),
|
|
int(info.f_bsize*(info.f_blocks-info.f_bavail)/1024/1024))
|
|
|
|
|
|
def getSize(self):
|
|
"""return the size of the block device (_not_ of the filesystem)
|
|
|
|
the result is a value in megabyte
|
|
an error is indicated by "-1"
|
|
"""
|
|
import CryptoBoxTools
|
|
return CryptoBoxTools.getBlockDeviceSize(self.device)
|
|
|
|
|
|
def resetObject(self):
|
|
""" recheck the information about this container
|
|
this is especially useful after changing the type via 'create' """
|
|
self.uuid = self.__getUUID()
|
|
self.type = self.__getTypeOfPartition()
|
|
self.name = self.__getNameOfContainer()
|
|
if self.type == self.Types["luks"]:
|
|
self.mount = self.__mountLuks
|
|
self.umount = self.__umountLuks
|
|
elif self.type == self.Types["plain"]:
|
|
self.mount = self.__mountPlain
|
|
self.umount = self.__umountPlain
|
|
|
|
|
|
def create(self, type, password=None):
|
|
if type == self.Types["luks"]:
|
|
self.__createLuks(password)
|
|
self.resetObject()
|
|
return
|
|
if type == self.Types["plain"]:
|
|
self.__createPlain()
|
|
self.resetObject()
|
|
return
|
|
raise CBInvalidType("invalid container type (%d) supplied" % (type, ))
|
|
|
|
|
|
def changePassword(self, oldpw, newpw):
|
|
if self.type != self.Types["luks"]:
|
|
raise CBInvalidType("changing of password is possible only for luks containers")
|
|
if not oldpw:
|
|
raise CBInvalidPassword("no old password supplied for password change")
|
|
if not newpw:
|
|
raise CBInvalidPassword("no new password supplied for password change")
|
|
"return if new and old passwords are the same"
|
|
if oldpw == newpw: return
|
|
if self.isMounted():
|
|
raise CBVolumeIsActive("this container is currently active")
|
|
devnull = None
|
|
try:
|
|
devnull = open(os.devnull, "w")
|
|
except IOError:
|
|
self.log.warn("Could not open %s" % (os.devnull, ))
|
|
"remove any potential open luks mapping"
|
|
self.__umountLuks()
|
|
"create the luks header"
|
|
proc = subprocess.Popen(
|
|
shell = False,
|
|
stdin = subprocess.PIPE,
|
|
stdout = subprocess.PIPE,
|
|
stderr = subprocess.PIPE,
|
|
args = [
|
|
self.cbox.prefs["Programs"]["super"],
|
|
self.cbox.prefs["Programs"]["CryptoBoxRootActions"],
|
|
"cryptsetup",
|
|
"luksAddKey",
|
|
self.device,
|
|
"--batch-mode"])
|
|
proc.stdin.write("%s\n%s" % (oldpw, newpw))
|
|
(output, errout) = proc.communicate()
|
|
if proc.returncode != 0:
|
|
errorMsg = "Could not add a new luks key: %s - %s" % (output.strip(), errout.strip(), )
|
|
self.log.error(errorMsg)
|
|
raise CBChangePasswordError(errorMsg)
|
|
## retrieve the key slot we used for unlocking
|
|
keys_found = re.search(r'key slot (\d{1,3}) unlocked', output).groups()
|
|
if keys_found:
|
|
keyslot = int(keys_found[0])
|
|
else:
|
|
raise CBChangePasswordError("could not get the old key slot")
|
|
"remove the old key"
|
|
proc = subprocess.Popen(
|
|
shell = False,
|
|
stdin = None,
|
|
stdout = devnull,
|
|
stderr = subprocess.PIPE,
|
|
args = [
|
|
self.cbox.prefs["Programs"]["cryptsetup"],
|
|
"--batch-mode",
|
|
"luksDelKey",
|
|
self.device,
|
|
"%d" % (keyslot, )])
|
|
proc.wait()
|
|
if proc.returncode != 0:
|
|
errorMsg = "Could not remove the old luks key: %s" % (proc.stderr.read().strip(), )
|
|
self.log.error(errorMsg)
|
|
raise CBChangePasswordError(errorMsg)
|
|
|
|
|
|
|
|
" ****************** internal stuff ********************* "
|
|
|
|
def __getNameOfContainer(self):
|
|
"retrieve the name of the container by querying the database"
|
|
def_name = self.cbox.getNameForUUID(self.uuid)
|
|
if def_name: return def_name
|
|
"there is no name defined for this uuid - we will propose a good one"
|
|
prefix = self.cbox.prefs["Main"]["DefaultVolumePrefix"]
|
|
unused_found = False
|
|
counter = 1
|
|
while not unused_found:
|
|
guess = prefix + str(counter)
|
|
if self.cbox.getUUIDForName(guess):
|
|
counter += 1
|
|
else:
|
|
unused_found = True
|
|
self.cbox.setNameForUUID(self.uuid, guess)
|
|
return guess
|
|
|
|
|
|
def __getUUID(self):
|
|
"""return UUID for luks partitions, ext2/3 and vfat filesystems"""
|
|
emergency_default = self.device.replace(os.path.sep, "_")
|
|
devnull = None
|
|
try:
|
|
devnull = open(os.devnull, "w")
|
|
except IOError:
|
|
self.warn("Could not open %s" % (os.devnull, ))
|
|
proc = subprocess.Popen(
|
|
shell=False,
|
|
stdin=None,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
args=[self.cbox.prefs["Programs"]["blkid"],
|
|
"-s", "UUID",
|
|
"-o", "value",
|
|
"-c", os.devnull,
|
|
"-w", os.devnull,
|
|
self.device])
|
|
proc.wait()
|
|
result = proc.stdout.read().strip()
|
|
if proc.returncode != 0:
|
|
self.log.warn("retrieving of partition type via 'blkid' failed: %s" % (proc.stderr.read().strip(), ))
|
|
return emergency_default
|
|
devnull.close()
|
|
if result: return result
|
|
return emergency_default
|
|
|
|
|
|
def __getTypeOfPartition(self):
|
|
"retrieve the type of the given partition (see CryptoBoxContainer.Types)"
|
|
if self.__isLuksPartition(): return self.Types["luks"]
|
|
typeOfPartition = self.__getTypeIdOfPartition()
|
|
if typeOfPartition in self.__fsTypes["plain"]:
|
|
return self.Types["plain"]
|
|
if typeOfPartition in self.__fsTypes["swap"]:
|
|
return self.Types["swap"]
|
|
return self.Types["unused"]
|
|
|
|
|
|
def __getTypeIdOfPartition(self):
|
|
"returns the type of the partition (see 'man blkid')"
|
|
devnull = None
|
|
try:
|
|
devnull = open(os.devnull, "w")
|
|
except IOError:
|
|
self.log.warn("Could not open %s" % (os.devnull, ))
|
|
proc = subprocess.Popen(
|
|
shell=False,
|
|
stdin=None,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
args=[self.cbox.prefs["Programs"]["blkid"],
|
|
"-s", "TYPE",
|
|
"-o", "value",
|
|
"-c", os.devnull,
|
|
"-w", os.devnull,
|
|
self.device])
|
|
proc.wait()
|
|
output = proc.stdout.read().strip()
|
|
if proc.returncode != 0:
|
|
self.log.warn("retrieving of partition type via 'blkid' failed: %s" % (proc.stderr.read().strip(), ))
|
|
return None
|
|
devnull.close()
|
|
return output
|
|
|
|
|
|
def __isLuksPartition(self):
|
|
"check if the given device is a luks partition"
|
|
devnull = None
|
|
try:
|
|
devnull = open(os.devnull, "w")
|
|
except IOError:
|
|
self.log.warn("Could not open %s" % (os.devnull, ))
|
|
proc = subprocess.Popen(
|
|
shell = False,
|
|
stdin = None,
|
|
stdout = devnull,
|
|
stderr = devnull,
|
|
args = [
|
|
self.cbox.prefs["Programs"]["cryptsetup"],
|
|
"--batch-mode",
|
|
"isLuks",
|
|
self.device])
|
|
proc.wait()
|
|
devnull.close()
|
|
return proc.returncode == 0
|
|
|
|
|
|
def __getMountPoint(self):
|
|
"return the name of the mountpoint of this volume"
|
|
return os.path.join(self.cbox.prefs["Locations"]["MountParentDir"], self.name)
|
|
|
|
|
|
def __mountLuks(self, password):
|
|
"mount a luks partition"
|
|
if not password:
|
|
raise CBInvalidPassword("no password supplied for luksOpen")
|
|
if self.isMounted(): raise CBVolumeIsActive("this container is already active")
|
|
self.__umountLuks()
|
|
try:
|
|
devnull = open(os.devnull, "w")
|
|
except IOError:
|
|
self.log.warn("Could not open %s" % (os.devnull, ))
|
|
self.__cleanMountDirs()
|
|
if not os.path.exists(self.__getMountPoint()):
|
|
os.mkdir(self.__getMountPoint())
|
|
if not os.path.exists(self.__getMountPoint()):
|
|
errorMsg = "Could not create mountpoint (%s)" % (self.__getMountPoint(), )
|
|
self.log.error(errorMsg)
|
|
raise CBMountError(errorMsg)
|
|
proc = subprocess.Popen(
|
|
shell = False,
|
|
stdin = subprocess.PIPE,
|
|
stdout = devnull,
|
|
stderr = subprocess.PIPE,
|
|
args = [
|
|
self.cbox.prefs["Programs"]["super"],
|
|
self.cbox.prefs["Programs"]["CryptoBoxRootActions"],
|
|
"cryptsetup",
|
|
"luksOpen",
|
|
self.device,
|
|
self.name,
|
|
"--batch-mode"])
|
|
proc.stdin.write(password)
|
|
(output, errout) = proc.communicate()
|
|
if proc.returncode != 0:
|
|
errorMsg = "Could not open the luks mapping: %s" % (errout.strip(), )
|
|
self.log.warn(errorMsg)
|
|
raise CBMountError(errorMsg)
|
|
proc = subprocess.Popen(
|
|
shell = False,
|
|
stdin = None,
|
|
stdout = devnull,
|
|
stderr = subprocess.PIPE,
|
|
args = [
|
|
self.cbox.prefs["Programs"]["super"],
|
|
self.cbox.prefs["Programs"]["CryptoBoxRootActions"],
|
|
"mount",
|
|
os.path.join(self.__dmDir, self.name),
|
|
self.__getMountPoint()])
|
|
proc.wait()
|
|
if proc.returncode != 0:
|
|
errorMsg = "Could not mount the filesystem: %s" % (proc.stderr.read().strip(), )
|
|
self.log.warn(errorMsg)
|
|
raise CBMountError(errorMsg)
|
|
devnull.close()
|
|
|
|
|
|
def __umountLuks(self):
|
|
"umount a luks partition"
|
|
devnull = None
|
|
try:
|
|
devnull = open(os.devnull, "w")
|
|
except IOError:
|
|
self.log.warn("Could not open %s" % (os.devnull, ))
|
|
if self.isMounted():
|
|
proc = subprocess.Popen(
|
|
shell = False,
|
|
stdin = None,
|
|
stdout = devnull,
|
|
stderr = subprocess.PIPE,
|
|
args = [
|
|
self.cbox.prefs["Programs"]["super"],
|
|
self.cbox.prefs["Programs"]["CryptoBoxRootActions"],
|
|
"umount",
|
|
self.__getMountPoint()])
|
|
proc.wait()
|
|
if proc.returncode != 0:
|
|
errorMsg = "Could not umount the filesystem: %s" % (proc.stderr.read().strip(), )
|
|
self.log.warn(errorMsg)
|
|
raise CBUmountError(errorMsg)
|
|
if os.path.exists(os.path.join(self.__dmDir, self.name)):
|
|
proc = subprocess.Popen(
|
|
shell = False,
|
|
stdin = None,
|
|
stdout = devnull,
|
|
stderr = subprocess.PIPE,
|
|
args = [
|
|
self.cbox.prefs["Programs"]["super"],
|
|
self.cbox.prefs["Programs"]["CryptoBoxRootActions"],
|
|
"cryptsetup",
|
|
"luksClose",
|
|
self.name,
|
|
"--batch-mode"])
|
|
proc.wait()
|
|
if proc.returncode != 0:
|
|
errorMsg = "Could not remove the luks mapping: %s" % (proc.stderr.read().strip(), )
|
|
self.log.warn(errorMsg)
|
|
raise CBUmountError(errorMsg)
|
|
devnull.close()
|
|
|
|
|
|
def __mountPlain(self):
|
|
"mount a plaintext partition"
|
|
if self.isMounted(): raise CBVolumeIsActive("this container is already active")
|
|
devnull = None
|
|
try:
|
|
devnull = open(os.devnull, "w")
|
|
except IOError:
|
|
self.log.warn("Could not open %s" % (os.devnull, ))
|
|
self.__cleanMountDirs()
|
|
if not os.path.exists(self.__getMountPoint()):
|
|
os.mkdir(self.__getMountPoint())
|
|
if not os.path.exists(self.__getMountPoint()):
|
|
errorMsg = "Could not create mountpoint (%s)" % (self.__getMountPoint(), )
|
|
self.log.error(errorMsg)
|
|
raise CBMountError(errorMsg)
|
|
proc = subprocess.Popen(
|
|
shell = False,
|
|
stdin = None,
|
|
stdout = devnull,
|
|
stderr = subprocess.PIPE,
|
|
args = [
|
|
self.cbox.prefs["Programs"]["super"],
|
|
self.cbox.prefs["Programs"]["CryptoBoxRootActions"],
|
|
"mount",
|
|
self.device,
|
|
self.__getMountPoint()])
|
|
proc.wait()
|
|
if proc.returncode != 0:
|
|
errorMsg = "Could not mount the filesystem: %s" % (proc.stderr.read().strip(), )
|
|
self.log.warn(errorMsg)
|
|
raise CBMountError(errorMsg)
|
|
devnull.close()
|
|
|
|
|
|
def __umountPlain(self):
|
|
"umount a plaintext partition"
|
|
devnull = None
|
|
try:
|
|
devnull = open(os.devnull, "w")
|
|
except IOError:
|
|
self.log.warn("Could not open %s" % (os.devnull, ))
|
|
if self.isMounted():
|
|
proc = subprocess.Popen(
|
|
shell = False,
|
|
stdin = None,
|
|
stdout = devnull,
|
|
stderr = subprocess.PIPE,
|
|
args = [
|
|
self.cbox.prefs["Programs"]["super"],
|
|
self.cbox.prefs["Programs"]["CryptoBoxRootActions"],
|
|
"umount",
|
|
self.__getMountPoint()])
|
|
proc.wait()
|
|
if proc.returncode != 0:
|
|
errorMsg = "Could not umount the filesystem: %s" % (proc.stderr.read().strip(), )
|
|
self.log.warn(errorMsg)
|
|
raise CBUmountError(errorMsg)
|
|
devnull.close()
|
|
|
|
|
|
def __createPlain(self):
|
|
"make a plaintext partition"
|
|
if self.isMounted():
|
|
raise CBVolumeIsActive("deactivate the partition before filesystem initialization")
|
|
devnull = None
|
|
try:
|
|
devnull = open(os.devnull, "w")
|
|
except IOError:
|
|
self.log.warn("Could not open %s" % (os.devnull, ))
|
|
proc = subprocess.Popen(
|
|
shell = False,
|
|
stdin = None,
|
|
stdout = devnull,
|
|
stderr = subprocess.PIPE,
|
|
args = [
|
|
self.cbox.prefs["Programs"]["mkfs-data"],
|
|
self.device])
|
|
proc.wait()
|
|
if proc.returncode != 0:
|
|
errorMsg = "Could not create the filesystem: %s" % (proc.stderr.read().strip(), )
|
|
self.log.error(errorMsg)
|
|
raise CBCreateError(errorMsg)
|
|
devnull.close()
|
|
|
|
|
|
def __createLuks(self, password):
|
|
"make a luks partition"
|
|
if not password:
|
|
raise CBInvalidPassword("no password supplied for new luks mapping")
|
|
if self.isMounted():
|
|
raise CBVolumeIsActive("deactivate the partition before filesystem initialization")
|
|
devnull = None
|
|
try:
|
|
devnull = open(os.devnull, "w")
|
|
except IOError:
|
|
self.log.warn("Could not open %s" % (os.devnull, ))
|
|
"remove any potential open luks mapping"
|
|
self.__umountLuks()
|
|
"create the luks header"
|
|
proc = subprocess.Popen(
|
|
shell = False,
|
|
stdin = subprocess.PIPE,
|
|
stdout = devnull,
|
|
stderr = subprocess.PIPE,
|
|
args = [
|
|
self.cbox.prefs["Programs"]["super"],
|
|
self.cbox.prefs["Programs"]["CryptoBoxRootActions"],
|
|
"cryptsetup",
|
|
"luksFormat",
|
|
self.device,
|
|
"--batch-mode",
|
|
"--cipher", self.cbox.prefs["Main"]["DefaultCipher"],
|
|
"--iter-time", "2000"])
|
|
proc.stdin.write(password)
|
|
(output, errout) = proc.communicate()
|
|
if proc.returncode != 0:
|
|
errorMsg = "Could not create the luks header: %s" % (errout.strip(), )
|
|
self.log.error(errorMsg)
|
|
raise CBCreateError(errorMsg)
|
|
"open the luks container for mkfs"
|
|
proc = subprocess.Popen(
|
|
shell = False,
|
|
stdin = subprocess.PIPE,
|
|
stdout = devnull,
|
|
stderr = subprocess.PIPE,
|
|
args = [
|
|
self.cbox.prefs["Programs"]["super"],
|
|
self.cbox.prefs["Programs"]["CryptoBoxRootActions"],
|
|
"cryptsetup",
|
|
"luksOpen",
|
|
self.device,
|
|
self.name,
|
|
"--batch-mode"])
|
|
proc.stdin.write(password)
|
|
(output, errout) = proc.communicate()
|
|
if proc.returncode != 0:
|
|
errorMsg = "Could not open the new luks mapping: %s" % (errout.strip(), )
|
|
self.log.error(errorMsg)
|
|
raise CBCreateError(errorMsg)
|
|
"make the filesystem"
|
|
proc = subprocess.Popen(
|
|
shell = False,
|
|
stdin = None,
|
|
stdout = devnull,
|
|
stderr = subprocess.PIPE,
|
|
args = [
|
|
self.cbox.prefs["Programs"]["mkfs-data"],
|
|
os.path.join(self.__dmDir, self.name)])
|
|
proc.wait()
|
|
"remove the mapping - for every exit status"
|
|
self.__umountLuks()
|
|
if proc.returncode != 0:
|
|
errorMsg = "Could not create the filesystem: %s" % (proc.stderr.read().strip(), )
|
|
self.log.error(errorMsg)
|
|
"remove the luks mapping"
|
|
raise CBCreateError(errorMsg)
|
|
devnull.close()
|
|
|
|
|
|
def __cleanMountDirs(self):
|
|
""" remove all unnecessary subdirs of the mount parent directory
|
|
this should be called for every (u)mount """
|
|
subdirs = os.listdir(self.cbox.prefs["Locations"]["MountParentDir"])
|
|
for dir in subdirs:
|
|
abs_dir = os.path.join(self.cbox.prefs["Locations"]["MountParentDir"], dir)
|
|
if (not os.path.islink(abs_dir)) and os.path.isdir(abs_dir) and (not os.path.ismount(abs_dir)):
|
|
os.rmdir(abs_dir)
|
|
|
|
|