import CryptoBox import WebInterfaceDataset import re import Plugins from CryptoBoxExceptions import * class WebInterfacePlugins: def __init__(self, log, plugins, handler_func): for plugin in plugins.getPlugins(): if not plugin: continue plname = plugin.getName() log.info("Plugin '%s' loaded" % plname) ## this should be the "easiest" way to expose all plugins as URLs setattr(self, plname, handler_func(plugin)) setattr(getattr(self, plname), "exposed", True) # TODO: check, if this really works - for now the "stream_response" feature seems to be broken #setattr(getattr(self, plname), "stream_respones", True) class WebInterfaceSites: ''' url2func = {'index':'show_status','doc':'show_doc','logs':'show_log'} ''' def __init__(self): import logging self.cbox = CryptoBox.CryptoBoxProps() self.log = logging.getLogger("CryptoBox") self.prefs = self.cbox.prefs self.__resetDataset() def __resetDataset(self): """this method has to be called at the beginning of every "site" action important: only at the beginning of an action (to not loose information) important: for _every_ "site" action (cherrypy is stateful) also take care for the plugins, as they also contain datasets """ self.pluginList = Plugins.PluginManager(self.cbox, self.prefs["Locations"]["PluginDir"]) self.plugins = WebInterfacePlugins(self.log, self.pluginList, self.return_plugin_action) ## publish the url "/system" as an alias for "/plugins" self.plugins.index = self.system self.dataset = WebInterfaceDataset.WebInterfaceDataset(self.cbox, self.prefs, self.pluginList.getPlugins()) def __check_config(self): #TODO: from now on a cryptobox is always configured return True def __check_init_running(self): #TODO: implement this check (is mkfs still running?) return False ###################################################################### ## put real sites down here and don't forget to expose them at the end def status(self, weblang=""): '''shows the current status of the box ''' self.__resetDataset() self.__setWebLang(weblang) if not self.__check_config(): self.dataset["Data.Warning"] = "NotInitialized" return self.__render("form_init") elif self.__check_init_running(): self.dataset["Data.Warning"] = "InitNotFinished" self.dataset["Data.Redirect.Action"] = "form_config" self.dataset["Data.Redirect.Delay"] = "30" return self.__render("empty") else: self.dataset["Data.Redirect.Delay"] = "60" return self.__render("show_status") def doc(self,page="",weblang=""): '''prints the offline wikipage ''' import re self.__resetDataset() self.__setWebLang(weblang) ## check for invalid characters if page and not re.search(u'\W', page): self.dataset["Data.Doc.Page"] = page else: ## display this page as default help page self.dataset["Data.Doc.Page"] ="CryptoBoxUser" return self.__render("show_doc") def system(self, weblang=""): self.__resetDataset() self.__setWebLang(weblang) return self.__render("form_system") def index(self, weblang=""): self.__resetDataset() self.__setWebLang(weblang) return self.__render("show_status") def show_volume(self, device="", weblang=""): self.__resetDataset() self.__setWebLang(weblang) if self.__setDevice(device): return self.__render("show_volume") else: if self.cbox.getContainerList(): return self.__render("show_volumes") else: return self.__render("show_status") def show_volumes(self, weblang=""): self.__resetDataset() self.__setWebLang(weblang) return self.__render("show_volumes") def volume_name_set(self, device="", volume_name="", weblang=""): self.__resetDataset() self.__setWebLang(weblang) if self.__setDevice(device): volume_name = volume_name.strip() if self.__checkVolumeName(volume_name): container = self.cbox.getContainer(device) try: container.setName(volume_name) except CBContainerError, errMsg: self.log.warn("failed to rename the volume '%s' to '%s: %s'" % (device, volume_name, errMsg)) self.dataset["Data.Warning"] = "SetVolumeNameFailed" else: self.log.info("successfully renamed volume '%s' to '%s'" % (device, volume_name)) self.dataset.setCurrentDiskState(device) else: self.dataset["Data.Warning"] = "InvalidVolumeName" return self.__render("show_volume") else: if self.cbox.getContainerList(): return self.__render("show_volumes") else: return self.__render("show_status") def mount_do(self, device, crypto_password=None, weblang=""): self.__resetDataset() self.__setWebLang(weblang) if self.__setDevice(device): container = self.cbox.getContainer(device) if container.isMounted(): self.dataset["Data.Warning"] = "IsMounted" self.log.warn("the device (%s) is already mounted" % device) else: try: if container.getType() == container.Types["luks"]: ## encrypted luks container if not crypto_password: self.dataset["Data.Warning"] = "EmptyCryptoPassword" self.log.warn("no password was supplied for mounting of device '%s'" % device) return self.__render("show_volume") else: container.mount(crypto_password) elif container.getType() == container.Types["plain"]: ## plain container container.mount() else: ## mounting is not possible self.dataset["Data.Warning"] = "InvalidType" self.log.warn("this type of container (%s) cannot be mounted - sorry!" % device) except (Exception, "MountError"): self.dataset["Data.Warning"] = "MountFailed" self.log.warn("failed to mount the device (%s)" % device) else: self.log.info("successfully mounted the container (%s)" % device) self.dataset.setCurrentDiskState(device) else: if self.cbox.getContainerList(): return self.__render("show_volumes") else: return self.__render("show_status") return self.__render("show_volume") def volume_init_ask(self, device, encryption=None, weblang=""): self.__resetDataset() self.__setWebLang(weblang) if self.__setDevice(device): container = self.cbox.getContainer(device) if container.isMounted(): self.dataset["Data.Warning"] = "VolumeMayNotBeMounted" self.log.warn("initialization is not possible as long as the device (%s) is mounted" % device) return self.__render("show_volume") else: if encryption is None: self.dataset["Data.Init.isCrypto"] = 0 else: self.dataset["Data.Init.isCrypto"] = 1 return self.__render("form_init") else: if self.cbox.getContainerList(): return self.__render("show_volumes") else: return self.__render("show_status") def init_do(self, device, confirm="", crypto_password=None, crypto_password2=None, encryption=None, weblang=""): self.__resetDataset() self.__setWebLang(weblang) if self.__setDevice(device): container = self.cbox.getContainer(device) ## set 'Data.Init.isCrypto' - just in case, we have to show the same form again if encryption is None: self.dataset["Data.Init.isCrypto"] = 0 else: self.dataset["Data.Init.isCrypto"] = 1 if container.isMounted(): self.dataset["Data.Warning"] = "VolumeMayNotBeMounted" self.log.warn("initialization is not possible as long as the device (%s) is mounted" % device) return self.__render("form_init") else: if confirm != self.__getLanguageValue("Text.ConfirmInit"): self.dataset["Data.Warning"] = "InitNotConfirmed" self.log.warn("the confirmation sentence for initialization of the device '%s' was wrong" % device) return self.__render("form_init") try: if not encryption is None: if not crypto_password: self.dataset["Data.Warning"] = "EmptyCryptoPassword" self.log.warn("no crypto password was supplied for initialization of device '%s'" % device) return self.__render("form_init") if crypto_password != crypto_password2: self.dataset["Data.Warning"] = "DifferentCryptoPasswords" self.log.warn("the crypto password was not repeated correctly for initialization of device '%s'" % device) return self.__render("form_init") container.create(container.Types["luks"], crypto_password) else: container.create(container.Types["plain"]) except CBContainerError, errMsg: self.dataset["Data.Warning"] = "CreateFailed" self.log.warn("initialization of device '%s' failed" % device) self.log.warn("reason: %s" % errMsg) return self.__render("form_init") else: self.log.info("successfully initialized device '%s'" % device) # reread the dataset self.dataset.setCurrentDiskState(device) return self.__render("show_volume") else: if self.cbox.getContainerList(): return self.__render("show_volumes") else: return self.__render("show_status") def test(self, weblang=""): import cherrypy self.__resetDataset() self.__setWebLang(weblang) return "test passed" def umount_do(self, device, weblang=""): self.__resetDataset() self.__setWebLang(weblang) if self.__setDevice(device): container = self.cbox.getContainer(device) if not container.isMounted(): self.dataset["Data.Warning"] = "NotMounted" self.log.warn("the device (%s) is currently not mounted" % device) else: try: if container.getType() == container.Types["luks"]: ## encrypted luks container container.umount() elif container.getType() == container.Types["plain"]: ## plain container container.umount() else: ## umounting is not possible self.dataset["Data.Warning"] = "InvalidType" self.log.warn("this type of container (%s) cannot be umounted - sorry!" % device) except (Exception, "UmountError"): self.dataset["Data.Warning"] = "UmountFailed" self.log.warn("failed to unmount the device (%s)" % device) else: self.log.info("successfully unmounted the container (%s)" % device) # reread the dataset self.dataset.setCurrentDiskState(device) else: if self.cbox.getContainerList(): return self.__render("show_volumes") else: return self.__render("show_status") return self.__render("show_volume") def return_plugin_action(self, plugin): def handler(**args): self.__resetDataset() try: self.__setWebLang(args["weblang"]) del args["weblang"] except KeyError: pass nextTemplate = plugin.doAction(**args) ## set the default template if not nextTemplate: nextTemplate = "form_system" return self.__render(nextTemplate, plugin) return handler ##################### input checker ########################## def __setWebLang(self, value): guess = value availLangs = self.cbox.getAvailableLanguages() ## no language specified: check browser language if not guess: guess = self.__getPreferredBrowserLanguage(availLangs) if not guess or \ not guess in availLangs or \ re.search(u'\W', guess): self.cbox.log.info("invalid language choosen: %s" % guess) guess = self.prefs["WebSettings"]["Language"] ## maybe the language is still not valid if not guess in availLangs: self.log.warn("the configured language is invalid: %s" % guess) guess = "en" ## maybe there is no english dataset??? if not guess in availLangs: self.log.warn("couldn't find the english dataset") guess = availLangs[0] self.dataset["Settings.Language"] = guess ## we only have to save it, if it was specified correctly and explicitly if value == guess: self.dataset["Settings.LinkAttrs.weblang"] = guess def __getPreferredBrowserLanguage(self, availLangs): """guess the preferred language of the user (as sent by the browser) take the first language, that is part of 'availLangs' """ import cherrypy try: pref_lang_header = cherrypy.request.headers["Accept-Language"] except KeyError: ## no language header was specified return None ## this could be a typical 'Accept-Language' header: ## de-de,de;q=0.8,en-us;q=0.5,en;q=0.3 regex = re.compile(u"\w+(-\w+)?(;q=[\d\.]+)?$") pref_langs = [e.split(";",1)[0] for e in pref_lang_header.split(",") if regex.match(e)] ## is one of these preferred languages available? for lang in pref_langs: if lang in availLangs: return lang ## we try to be nice: also look for "de" if "de-de" was specified ... for lang in pref_langs: ## use only the first part of the language short_lang = lang.split("-",1)[0] if short_lang in availLangs: return short_lang ## we give up return None def __setDevice(self, device): if device and re.match(u'[\w /\-]+$', device) and self.cbox.getContainer(device): self.log.debug("select device: %s" % device) self.dataset.setCurrentDiskState(device) return True else: self.log.warn("invalid device: %s" % device) self.dataset["Data.Warning"] = "InvalidDevice" return False def __checkVolumeName(self, name): if name and re.match(u'[\w \-]+$', name): return True else: return False def __getLanguageValue(self, value): hdf = self.__getLanguageData(self.dataset["Settings.Language"]) return hdf.getValue(value, "") def __getLanguageData(self, web_lang="en"): import neo_cgi, neo_util, os default_lang = "en" conf_lang = self.prefs["WebSettings"]["Language"] hdf = neo_util.HDF() langDir = os.path.abspath(self.prefs["Locations"]["LangDir"]) langFiles = [] ## first: read default language (en) if (default_lang != conf_lang) and (default_lang != web_lang): langFiles.append(os.path.join(langDir, default_lang + ".hdf")) ## second: read language as defined in the config file if (conf_lang != web_lang): langFiles.append(os.path.join(langDir, conf_lang + ".hdf")) ## third: read language as configured via web interface langFiles.append(os.path.join(langDir, web_lang + ".hdf")) for langFile in langFiles: if os.access(langFile, os.R_OK): hdf.readFile(langFile) else: log.warn("Couldn't read language file: %s" % langFile) return hdf def __render(self, renderInfo, plugin=None): '''renders from clearsilver templates and returns the resulting html ''' import os, types try: import neo_cgi, neo_util, neo_cs except ImportError: errorMsg = "Could not import clearsilver module. Try 'apt-get install python-clearsilver'." self.log.error(errorMsg) sys.stderr.write(errorMsg) raise ImportError, errorMsg ## is renderInfo a string (filename of the template) or a dictionary? if type(renderInfo) == types.DictType: template = renderInfo["template"] if renderInfo.has_key("generator"): generator = renderInfo["generator"] else: generator = False else: (template, generator) = (renderInfo, None) ## load the language data hdf = neo_util.HDF() hdf.copy("Lang", self.__getLanguageData(self.dataset["Settings.Language"])) ## first: assume, that the template file is in the global template directory self.dataset["Settings.TemplateFile"] = os.path.abspath(os.path.join(self.prefs["Locations"]["TemplateDir"], template + ".cs")) if plugin: ## check, if the plugin provides the template file -> overriding plugin_cs_file = plugin.getTemplateFileName(template) if plugin_cs_file: self.dataset["Settings.TemplateFile"] = plugin_cs_file ## add the current state of the plugins to the hdf dataset self.dataset["Data.Status.Plugins.%s" % plugin.getName()] = plugin.getStatus() ## load the language data pl_lang = plugin.getLanguageData(self.dataset["Settings.Language"]) if pl_lang: hdf.copy("Lang.Plugins.%s" % plugin.getName(), pl_lang) ## load the dataset of the plugin plugin.loadDataSet(hdf) self.log.info("rendering site: " + template) cs_path = os.path.abspath(os.path.join(self.prefs["Locations"]["TemplateDir"], "main.cs")) if not os.access(cs_path, os.R_OK): log.error("Couldn't read clearsilver file: %s" % cs_path) yield "Couldn't read clearsilver file: %s" % cs_path return ## update the container hdf-dataset (necessary if a plugin changed the state of a container) self.dataset.setContainersState() self.log.debug(self.dataset) for key in self.dataset.keys(): hdf.setValue(key,str(self.dataset[key])) cs = neo_cs.CS(hdf) cs.parseFile(cs_path) ## is there a generator containing additional information? if generator is None: ## all content in one flush yield cs.render() else: content_generate = generator() dummy_line = """""" ## now we do it linewise - checking for the content marker for line in cs.render().splitlines(): if line.find(dummy_line) != -1: yield line.replace(dummy_line, content_generate.next()) else: yield line + "\n" def test_stream(self): """just for testing purposes - to check if the "stream_response" feature actually works - for now (September 02006) it does not seem to be ok""" import time yield "neu

" ############################################################################ ## to make the sites visible through the webserver they must be exposed here index.exposed = True doc.exposed = True system.exposed = True status.exposed = True show_volume.exposed = True volume_name_set.exposed = True mount_do.exposed = True volume_init_ask.exposed = True init_do.exposed = True umount_do.exposed = True show_volumes.exposed = True test.exposed = True test_stream.exposed = True