cryptonas-branches/pythonrewrite/bin/WebInterfaceSites.py

496 lines
16 KiB
Python
Executable File

import CryptoBox
import WebInterfaceDataset
import re
import Plugins
from CryptoBoxExceptions import *
class WebInterfacePlugins:
def __init__(self, log, plugins, handler_func):
for plname in plugins.allPlugins():
log.info("Plugin '%s' loaded" % plname)
## this should be the "easiest" way to expose all plugins as URLs
setattr(self, plname, handler_func(plname))
setattr(getattr(self, plname), "exposed", True)
class WebInterfaceSites:
'''
url2func = {'index':'show_status','doc':'show_doc','logs':'show_log'}
'''
def __init__(self):
import logging
self.cbox = CryptoBox.CryptoBoxProps()
self.log = logging.getLogger("CryptoBox")
self.prefs = self.cbox.prefs
self.__resetDataset()
self.pluginList = Plugins.PluginManager(self.prefs["Locations"]["PluginDir"])
self.plugins = WebInterfacePlugins(self.log, self.pluginList, self.return_plugin_action)
self.plugins.index = self.system
def __resetDataset(self):
"""this method has to be called at the beginning of every "site" action
important: only at the beginning of an action (to not loose information)
important: for _every_ "site" action (cherrypy is stateful)
"""
self.dataset = WebInterfaceDataset.WebInterfaceDataset(self.cbox, self.prefs)
def __isHDAvailable(self):
#TODO: implement this
return True
def __check_config(self):
#TODO: from now on a cryptobox is always configured
return True
def __check_init_running(self):
#TODO: implement this check (is mkfs still running?)
return False
######################################################################
## put real sites down here and don't forget to expose them at the end
def logs(self, weblang=""):
'''displays a HTML version of the logfile
'''
self.__resetDataset()
self.__setWebLang(weblang)
self.dataset["Data.Log"] = "<br/>".join(self.cbox.getLogData(lines=30, maxSize=2000))
return self.__render("show_log")
def status(self, weblang=""):
'''shows the current status of the box
'''
self.__resetDataset()
self.__setWebLang(weblang)
if not self.__check_config():
self.dataset["Data.Warning"] = "NotInitialized"
return self.__render("form_init")
elif self.__check_init_running():
self.dataset["Data.Warning"] = "InitNotFinished"
self.dataset["Data.Redirect.Action"] = "form_config"
self.dataset["Data.Redirect.Delay"] = "30"
return self.__render("empty")
else:
self.dataset["Data.Redirect.Delay"] = "60"
return self.__render("show_status")
def doc(self,page="",weblang=""):
'''prints the offline wikipage
'''
import re
self.__resetDataset()
self.__setWebLang(weblang)
## check for invalid characters
if page and not re.search(u'\W', page):
self.dataset["Data.Doc.Page"] = page
else:
## display this page as default help page
self.dataset["Data.Doc.Page"] ="CryptoBoxUser"
return self.__render("show_doc")
def system(self, weblang=""):
self.__resetDataset()
self.__setWebLang(weblang)
return self.__render("form_system")
def index(self, weblang=""):
self.__resetDataset()
self.__setWebLang(weblang)
return self.__render("show_status")
def show_volume(self, device="", weblang=""):
self.__resetDataset()
self.__setWebLang(weblang)
if self.__setDevice(device):
return self.__render("show_volume")
else:
if self.cbox.getContainerList():
return self.__render("show_volumes")
else:
return self.__render("show_status")
def show_volumes(self, weblang=""):
self.__resetDataset()
self.__setWebLang(weblang)
return self.__render("show_volumes")
def volume_name_set(self, device="", volume_name="", weblang=""):
self.__resetDataset()
self.__setWebLang(weblang)
if self.__setDevice(device):
volume_name = volume_name.strip()
if self.__checkVolumeName(volume_name):
container = self.cbox.getContainer(device)
try:
container.setName(volume_name)
# TODO: specify the possible exceptions
except Exception, errMsg:
self.log.warn("failed to rename the volume '%s' to '%s: %s'" % (device, volume_name, errMsg))
self.dataset["Data.Warning"] = "SetVolumeNameFailed"
else:
self.log.info("successfully renamed volume '%s' to '%s'" % (device, volume_name))
self.dataset.setCurrentDiskState(device)
else:
self.dataset["Data.Warning"] = "InvalidVolumeName"
return self.__render("show_volume")
else:
if self.cbox.getContainerList():
return self.__render("show_volumes")
else:
return self.__render("show_status")
def mount_do(self, device, crypto_password=None, weblang=""):
self.__resetDataset()
self.__setWebLang(weblang)
if self.__setDevice(device):
container = self.cbox.getContainer(device)
if container.isMounted():
self.dataset["Data.Warning"] = "IsMounted"
self.log.warn("the device (%s) is already mounted" % device)
else:
try:
if container.getType() == container.Types["luks"]:
## encrypted luks container
if not crypto_password:
self.dataset["Data.Warning"] = "EmptyCryptoPassword"
self.log.warn("no password was supplied for mounting of device '%s'" % device)
return self.__render("show_volume")
else:
container.mount(crypto_password)
elif container.getType() == container.Types["plain"]:
## plain container
container.mount()
else:
## mounting is not possible
# TODO: wrong warning message - replace it
self.dataset["Data.Warning"] = "MountFailed"
self.log.warn("this type of container (%s) cannot be mounted - sorry!" % device)
except (Exception, "MountError"):
self.dataset["Data.Warning"] = "MountFailed"
self.log.warn("failed to mount the device (%s)" % device)
else:
self.log.info("successfully mounted the container (%s)" % device)
self.dataset.setCurrentDiskState(device)
else:
if self.cbox.getContainerList():
return self.__render("show_volumes")
else:
return self.__render("show_status")
return self.__render("show_volume")
def volume_init_ask(self, device, encryption=None, weblang=""):
self.__resetDataset()
self.__setWebLang(weblang)
if self.__setDevice(device):
container = self.cbox.getContainer(device)
if container.isMounted():
self.dataset["Data.Warning"] = "VolumeMayNotBeMounted"
self.log.warn("initialization is not possible as long as the device (%s) is mounted" % device)
return self.__render("show_volume")
else:
if encryption is None:
self.dataset["Data.Init.isCrypto"] = 0
else:
self.dataset["Data.Init.isCrypto"] = 1
return self.__render("form_init")
else:
if self.cbox.getContainerList():
return self.__render("show_volumes")
else:
return self.__render("show_status")
def init_do(self, device, confirm, crypto_password=None, crypto_password2=None, encryption=None, weblang=""):
self.__resetDataset()
self.__setWebLang(weblang)
if self.__setDevice(device):
container = self.cbox.getContainer(device)
## set 'Data.Init.isCrypto' - just in case, we have to show the same form again
if encryption is None:
self.dataset["Data.Init.isCrypto"] = 0
else:
self.dataset["Data.Init.isCrypto"] = 1
if container.isMounted():
self.dataset["Data.Warning"] = "VolumeMayNotBeMounted"
self.log.warn("initialization is not possible as long as the device (%s) is mounted" % device)
return self.__render("form_init")
else:
# TODO: we have to compare 'confirm' with the value in the language file - IMPORTANT!
if not confirm:
self.dataset["Data.Warning"] = "InitNotConfirmed"
self.log.warn("the confirmation sentence for initialization of the device '%s' was wrong" % device)
return self.__render("form_init")
try:
if not encryption is None:
if not crypto_password:
self.dataset["Data.Warning"] = "EmptyCryptoPassword"
self.log.warn("no crypto password was supplied for initialization of device '%s'" % device)
return self.__render("form_init")
if crypto_password != crypto_password2:
self.dataset["Data.Warning"] = "DifferentCryptoPasswords"
self.log.warn("the crypto password was not repeated correctly for initialization of device '%s'" % device)
return self.__render("form_init")
container.create(container.Types["luks"], crypto_password)
else:
container.create(container.Types["plain"])
# TODO: specify the exception
except Exception, errMsg:
# TODO: wrong error/warning message - change it
self.dataset["Data.Error"] = "InitFailed"
self.log.warn("initialization of device '%s' failed" % device)
self.log.warn("reason: %s" % errMsg)
return self.__render("form_init")
else:
self.log.info("successfully initialized device '%s'" % device)
# reread the dataset
self.dataset.setCurrentDiskState(device)
return self.__render("show_volume")
else:
if self.cbox.getContainerList():
return self.__render("show_volumes")
else:
return self.__render("show_status")
def test(self, weblang=""):
self.__resetDataset()
self.__setWebLang(weblang)
return "test passed"
def umount_do(self, device, weblang=""):
self.__resetDataset()
self.__setWebLang(weblang)
if self.__setDevice(device):
container = self.cbox.getContainer(device)
if not container.isMounted():
self.dataset["Data.Warning"] = "NotMounted"
self.log.warn("the device (%s) is currently not mounted" % device)
else:
try:
if container.getType() == container.Types["luks"]:
## encrypted luks container
container.umount()
elif container.getType() == container.Types["plain"]:
## plain container
container.umount()
else:
## mounting is not possible
# TODO: wrong warning message - replace it
self.dataset["Data.Warning"] = "UmountFailed"
self.log.warn("this type of container (%s) cannot be umounted - sorry!" % device)
except (Exception, "UmountError"):
self.dataset["Data.Warning"] = "UmountFailed"
self.log.warn("failed to unmount the device (%s)" % device)
else:
self.log.info("successfully unmounted the container (%s)" % device)
# reread the dataset
self.dataset.setCurrentDiskState(device)
else:
if self.cbox.getContainerList():
return self.__render("show_volumes")
else:
return self.__render("show_status")
return self.__render("show_volume")
def return_plugin_action(self, plugin_name):
def handler(**args):
self.__resetDataset()
try:
self.__setWebLang(args["weblang"])
del args["weblang"]
except KeyError:
pass
plugin = self.pluginList.getPlugin(plugin_name)
nextTemplate = plugin.doAction(self.dataset, self.cbox, **args)
return self.__render(nextTemplate, plugin_name)
return handler
'''
## DONE: these functions are pythonized
#################### show_log #######################
##################### doc ############################
##################### poweroff ######################
##################### reboot ########################
## but there are even more TODO
#-------------------------------------------------------#
# here you may define all cases that require a harddisk #
#-------------------------------------------------------#
################ umount_do #######################
elif action == "unmount_do":
if not device:
self.log.debug("invalid device chosen: %s" device
settings["Data.Warning"] = "InvalidDevice"
settings["Data.Action"] = "empty"
elif not True: #TODO: replace True with check_config()
settings["Data.Warning"] = "NotInitialized"
settings["Data.Action"] = "form_init"
elif True: #TODO: replace True with check_init_running()
settings["Data.Warning"] = "InitNotFinished"
settings["Data.Action"] = "empty"
settings["Data.Redirect.Action"] = "form_config"
settings["Data.Redirect.Delay"] = "30"
elif not True: #TODO: replace True with check_mounted(device)
settings["Data.Warning"] = "NotMounted"
settings["Data.Action"] = "show_volume"
else: #unmount
#TODO: replace this line with umount_vol(device)
if True: #TODO: replace True with check_mounted(device)
settings["Data.Warning"] = "UmountFailed"
settings["Data.Action"] = "show_volume"
else:
settings["Data.Action"] = "show_volume"
################ mount_do ########################
elif action == "mount_do":
if device:
pass #TODO: is_encrypted = check_device_encryption(device)
else:
self.log.debug("invalid device chosen: %s" device
settings["Data.Warning"] = "InvalidDevice"
settings["Data.Action"] = "empty"
elif not True: #TODO: replace True with check_config()
settings["Data.Warning"] = "NotInitialized"
settings["Data.Action"] = "form_init"
#at cryptobox.pl line 568
'''
##################### input checker ##########################
def __setWebLang(self, value):
## TODO: add some code to evaluate the language setting of the browser
guess = value
availLangs = self.cbox.getAvailableLanguages()
## TODO: add some warnings for an invalid choosen language
if not guess or \
not guess in availLangs or \
re.search(u'\W', guess):
guess = self.prefs["WebSettings"]["Language"]
## maybe the language is still not valid
if not guess in availLangs:
self.log.warn("the configured language is invalid: %s" % guess)
guess = availLangs[0]
self.dataset["Settings.Language"] = guess
self.dataset["Settings.DocLang"] = guess
self.dataset["Settings.LinkAttrs.weblang"] = guess
def __setDevice(self, device):
if device and re.match(u'[\w /\-]+$', device) and self.cbox.getContainer(device):
self.log.debug("select device: %s" % device)
self.dataset.setCurrentDiskState(device)
return True
else:
self.log.warn("invalid device: %s" % device)
self.dataset["Data.Warning"] = "InvalidDevice"
return False
def __checkVolumeName(self, name):
if name and re.match(u'[\w \-]+$', name):
return True
else:
return False
def __render(self, template, plugin=None):
'''renders from clearsilver templates and returns the resulting html
Gets a dictionary with all settings, nessessary for rendering.
In fact the dictionary is a copy of the CryptoBoxWerbserverSite
object, that calls this render method. This might be bloat but
this way the render method has always a complete, actual set of values.
'''
import os
try:
import neo_cgi, neo_util, neo_cs
except ImportError:
errorMsg = "Could not import clearsilver module. Try 'apt-get install python-clearsilver'."
self.log.error(errorMsg)
sys.stderr.write(errorMsg)
raise ImportError, errorMsg
plugin_cs_file = False
if plugin:
plugin_cs_file = self.pluginList.getTemplateFileName(plugin, template)
default_cs_file = os.path.join(self.prefs["Locations"]["TemplateDir"], template + ".cs")
self.dataset["Data.TemplateFile"] = plugin_cs_file or default_cs_file
self.log.info("rendering site: " + template)
cs_path = os.path.join(self.prefs["Locations"]["TemplateDir"], "main.cs")
if not os.access(cs_path, os.R_OK):
log.error("Couldn't read clearsilver file: %s" % cs_path)
return "Couldn't read clearsilver file: %s" % cs_path
# use the user selected language instead of the configured
hdf_path = os.path.join(self.prefs["Locations"]["LangDir"], self.dataset["Settings.Language"] + ".hdf")
if not os.access(hdf_path, os.R_OK):
log.error("Couldn't read language file: %s" % hdf_path)
return "Couldn't read language file: %s" % hdf_path
## add the current state of the plugins to the hdf dataset
self.dataset.setPluginState(self.pluginList)
## update the container information
self.dataset.setContainersState()
hdf = neo_util.HDF()
hdf.readFile(hdf_path)
self.log.debug(self.dataset)
for key in self.dataset.keys():
hdf.setValue(key,str(self.dataset[key]))
## load languaga data of plugins
self.pluginList.loadLanguageData(hdf, lang=self.dataset["Settings.Language"])
cs = neo_cs.CS(hdf)
cs.parseFile(cs_path)
return cs.render()
############################################################################
## to make the sites visible through the webserver they must be exposed here
index.exposed = True
doc.exposed = True
logs.exposed = True
system.exposed = True
status.exposed = True
show_volume.exposed = True
volume_name_set.exposed = True
mount_do.exposed = True
volume_init_ask.exposed = True
init_do.exposed = True
umount_do.exposed = True
show_volumes.exposed = True
test.exposed = True
"""
## TODO: check this before anything else
if self.cbox.getAvailableDocLanguages():
self.dataset["Data.Error"] = "NoDocumentation"
return self.__render("show_status")
"""