# # Copyright 2006 sense.lab e.V. # # This file is part of the CryptoBox. # # The CryptoBox is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # The CryptoBox is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with the CryptoBox; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # import cryptobox.core.main import cryptobox.web.dataset import cryptobox.plugins.manage from cryptobox.core.exceptions import * import re import cherrypy import types import os try: import neo_cgi, neo_util, neo_cs except ImportError: errorMsg = "Could not import clearsilver module. Try 'apt-get install python-clearsilver'." self.log.error(errorMsg) sys.stderr.write(errorMsg) raise ImportError, errorMsg GETTEXT_DOMAIN = 'cryptobox-server' class PluginIconHandler: def __init__(self, plugins): for plugin in plugins.getPlugins(): if not plugin: continue plname = plugin.getName() ## expose the getIcon function of this plugin setattr(self, plname, plugin.getIcon) class WebInterfaceSites: ''' ''' ## this template is used under strange circumstances defaultTemplate = "empty" def __init__(self, conf_file=None): import logging,sys self.cbox = cryptobox.core.main.CryptoBoxProps(conf_file) self.log = logging.getLogger("CryptoBox") self.prefs = self.cbox.prefs self.__resetDataset() ## store the original http error handler self._cp_on_http_error = self.newHTTPErrorHandler ## set initial language order self.langOrder = self.cbox.prefs["WebSettings"]["Languages"][:] def __resetDataset(self): """this method has to be called at the beginning of every "site" action important: only at the beginning of an action (to not loose information) important: for _every_ "site" action (cherrypy is stateful) also take care for the plugins, as they also contain datasets """ self.__loadPlugins() self.dataset = cryptobox.web.dataset.WebInterfaceDataset(self.cbox, self.prefs, self.pluginList.getPlugins()) ## publish plugin icons self.icons = PluginIconHandler(self.pluginList) self.icons.exposed = True ## check, if a configuration partition has become available self.cbox.prefs.preparePartition() def __loadPlugins(self): self.pluginList = cryptobox.plugins.manage.PluginManager(self.cbox, self.prefs["Locations"]["PluginDir"], self) for plugin in self.pluginList.getPlugins(): if not plugin: continue plname = plugin.getName() if plugin.isEnabled(): self.cbox.log.info("Plugin '%s' loaded" % plname) ## this should be the "easiest" way to expose all plugins as URLs setattr(self, plname, self.return_plugin_action(plugin)) setattr(getattr(self, plname), "exposed", True) # TODO: check, if this really works - for now the "stream_response" feature seems to be broken #setattr(getattr(self, plname), "stream_respones", True) else: self.cbox.log.info("Plugin '%s' is disabled" % plname) ## remove the plugin, if it was active before setattr(self, plname, None) ## this is a function decorator to check authentication ## it has to be defined before any page definition requiring authentification def __requestAuth(self=None): def check_credentials(site): def _inner_wrapper(self, *args, **kargs): import base64 ## define a "non-allowed" function user, password = None, None try: resp = cherrypy.request.headers["Authorization"][6:] # ignore "Basic " (user, password) = base64.b64decode(resp).split(":",1) except KeyError: ## no "authorization" header was sent pass except TypeError: ## invalid base64 string pass except AttributeError: ## no cherrypy response header defined pass authDict = self.cbox.prefs.userDB["admins"] if user in authDict.keys(): if self.cbox.prefs.userDB.getDigest(password) == authDict[user]: ## ok: return the choosen page self.cbox.log.info("access granted for: %s" % user) return site(self, *args, **kargs) else: self.cbox.log.info("wrong password supplied for: %s" % user) else: self.cbox.log.info("unknown user: %s" % str(user)) ## wrong credentials: return "access denied" cherrypy.response.headers["WWW-Authenticate"] = '''Basic realm="CryptoBox"''' cherrypy.response.status = 401 return self.__render("access_denied") return _inner_wrapper return check_credentials ###################################################################### ## put real sites down here and don't forget to expose them at the end @cherrypy.expose def index(self, weblang=""): self.__resetDataset() self.__setWebLang(weblang) self.__checkEnvironment() ## do not forget the language! param_dict = {"weblang":weblang} ## render "disks" plugin by default return self.return_plugin_action(self.pluginList.getPlugin("disks"))(**param_dict) def newHTTPErrorHandler(self, errorCode, message): """handle http errors gracefully 404 - not found errors: ignored if url is below /cryptobox-misc/ other 404 errors: send the error code and return a nice informative page 500 - runtime errors: return "ok" exit code and show a polite excuse others: are there any other possible http errors? """ import traceback, sys ## we ignore uninteresting not-found errors if (errorCode == 404) and \ (cherrypy.request.path.startswith("/cryptobox-misc/") or \ cherrypy.request.path in ['/robots.txt','/favicon.ico']): cherrypy.response.status = errorCode return ## an invalid action was requested if errorCode == 404: ## we send a not-found error (with the usual interface) cherrypy.response.status = errorCode self.dataset["Data.Warning"] = "InvalidAction" cherrypy.response.body = self.__render("empty") return ## are there still bugs in the code? if errorCode == 500: ## we fix the error code (200 is "OK") cherrypy.response.status = 200 self.cbox.log.error("HTTP-ERROR[500] - a runtime error occoured: %s" % str(message)) ## add a traceback and exception information to the lo for a in traceback.format_exception(*sys.exc_info()): self.cbox.log.error("\t%s" % a) self.dataset["Data.Warning"] = "RuntimeError" cherrypy.response.body = self.__render("empty") return ## unknown error type cherrypy.response.status = errorCode self.cbox.log.warn("HTTP-ERROR[%d] - an unknown error occoured: %s" % (errorCode, message)) cherrypy.response.body = self.__render("empty") def return_plugin_action(self, plugin): def handler(self, **args): self.__resetDataset() self.__checkEnvironment() args_orig = dict(args) ## set web interface language try: self.__setWebLang(args["weblang"]) del args["weblang"] except KeyError: self.__setWebLang("") ## we always read the "device" setting - otherwise volume-plugin links ## would not work easily (see "volume_props" linking to "volume_format_fs") ## it will get ignored for non-volume plugins try: plugin.device = None if self.__setDevice(args["device"]): plugin.device = args["device"] del args["device"] except KeyError: pass ## check the device argument of volume plugins if "volume" in plugin.pluginCapabilities: ## initialize the dataset of the selected device if necessary if plugin.device: self.dataset.setCurrentDiskState(plugin.device) else: ## invalid (or missing) device setting return self.__render(self.defaultTemplate) ## check if there is a "redirect" setting - this will override the return ## value of the doAction function (e.g. useful for umount-before-format) try: if args["redirect"]: override_nextTemplate = { "plugin":args["redirect"] } if "volume" in plugin.pluginCapabilities: override_nextTemplate["values"] = {"device":plugin.device} del args["redirect"] except KeyError: override_nextTemplate = None ## check for information to be kept after the last call try: keep_values = args["message_keep"] del args["message_keep"] for key, value in keep_values["dataset"].items(): self.dataset[key] = value except KeyError: keep_values = None ## call the plugin handler nextTemplate = plugin.doAction(**args) ## for 'volume' plugins: reread the dataset of the current disk ## additionally: set the default template for plugins if "volume" in plugin.pluginCapabilities: ## maybe the state of the current volume was changed? self.dataset.setCurrentDiskState(plugin.device) if not nextTemplate: nextTemplate = { "plugin":"volume_mount", "values":{"device":plugin.device}} else: ## maybe a non-volume plugin changed some plugin settings (e.g. plugin_manager) self.dataset.setPluginData() ## update the container hdf-dataset (maybe a plugin changed the state of a container) self.dataset.setContainersState() ## default page for non-volume plugins is the disk selection if not nextTemplate: nextTemplate = { "plugin":"disks", "values":{} } ## was a redirect requested? if override_nextTemplate: nextTemplate = override_nextTemplate ## if another plugins was choosen for 'nextTemplate', then do it! if isinstance(nextTemplate, types.DictType) \ and "plugin" in nextTemplate.keys() \ and "values" in nextTemplate.keys() \ and self.pluginList.getPlugin(nextTemplate["plugin"]): valueDict = dict(nextTemplate["values"]) ## force the current weblang attribute - otherwise it gets lost valueDict["weblang"] = self.dataset["Settings.Language"] ## check for warnings/success messages, that should be kept if "Data.Warning" in plugin.hdf.keys() \ or "Data.Success" in plugin.hdf.keys(): self.cbox.log.info("keep warning message") valueDict["message_keep"] = { "plugin":plugin, "dataset":{}} for keep_key in ("Data.Warning", "Data.Success"): if keep_key in plugin.hdf.keys(): valueDict["message_keep"]["dataset"][keep_key] = plugin.hdf[keep_key] new_plugin = self.pluginList.getPlugin(nextTemplate["plugin"]) return self.return_plugin_action(new_plugin)(**valueDict) ## save the currently active plugin name self.dataset["Data.ActivePlugin"] = plugin.getName() return self.__render(nextTemplate, plugin) ## apply authentication? if plugin.isAuthRequired(): return lambda **args: self.__requestAuth()(handler)(self, **args) else: return lambda **args: handler(self, **args) ## test authentication @cherrypy.expose @__requestAuth def test(self, weblang=""): self.__resetDataset() self.__setWebLang(weblang) self.__checkEnvironment() return "test passed" @cherrypy.expose def test_stream(self): """just for testing purposes - to check if the "stream_response" feature actually works - for now (September 02006) it does not seem to be ok""" import time yield "neu

" ##################### input checker ########################## def __checkEnvironment(self): """here we should place all interesting checks to inform the user of problems examples are: non-https, readonly-config, ... """ ## this check is done _after_ "resetDataSet" -> a possible config partition was ## loaded before if self.cbox.prefs.requiresPartition() and not self.cbox.prefs.getActivePartition(): self.dataset["Data.EnvironmentWarning"] = "ReadOnlyConfig" # TODO: turn this on soon (add "not") - for now it is annoying if self.__checkHTTPS(): self.dataset["Data.EnvironmentWarning"] = "NoSSL" def __checkHTTPS(self): ## check the request scheme if cherrypy.request.scheme == "https": return True ## check an environment setting - this is quite common behind proxies try: if os.environ["HTTPS"]: return True except KeyError: pass ## check http header TODO (check pound for the name) try: if cherrypy.request.headers["TODO"]: return True except KeyError: pass ## the connection seems to be unencrypted return False def __setWebLang(self, value): """set the preferred priority of languages according to the following order: 1. language selected via web interface 2. preferred browser language setting 3. languages defined in the config file """ ## start with the configured language order langOrder = self.cbox.prefs["WebSettings"]["Languages"][:] self.cbox.log.debug("updating language preferences (default: %s)" % str(langOrder)) ## put the preferred browser language in front guess = self.__getPreferredBrowserLanguage(langOrder) if guess: langOrder.remove(guess) langOrder.insert(0,guess) self.cbox.log.debug("raised priority of preferred browser language: %s" % guess) ## is the chosen language (via web interface) valid? - put it in front if value and (value in langOrder) and (not re.search(u'\W',value)): langOrder.remove(value) langOrder.insert(0,value) self.cbox.log.debug("raised priority of selected language: %s" % value) elif value: self.cbox.log.info("invalid language selected: %s" % value) ## store current language setting self.cbox.log.debug("current language preference: %s" % str(langOrder)) self.langOrder = langOrder self.dataset["Settings.Language"] = langOrder[0] self.dataset["Settings.LinkAttrs.weblang"] = langOrder[0] def __getPreferredBrowserLanguage(self, availLangs): """guess the preferred language of the user (as sent by the browser) take the first language, that is part of 'availLangs' """ try: pref_lang_header = cherrypy.request.headers["Accept-Language"] except KeyError: ## no language header was specified return None ## this could be a typical 'Accept-Language' header: ## de-de,de;q=0.8,en-us;q=0.5,en;q=0.3 regex = re.compile(u"\w+(-\w+)?(;q=[\d\.]+)?$") pref_langs = [e.split(";",1)[0] for e in pref_lang_header.split(",") if regex.match(e)] ## is one of these preferred languages available? for lang in pref_langs: if lang in availLangs: return lang ## we try to be nice: also look for "de" if "de-de" was specified ... for lang in pref_langs: ## use only the first part of the language short_lang = lang.split("-",1)[0] if short_lang in availLangs: return short_lang ## we give up return None def __setDevice(self, device): """check a device name that was chosen via the web interface issue a warning if the device is invalid""" if device and re.match(u'[\w /\-]+$', device) and self.cbox.getContainer(device): self.log.debug("select device: %s" % device) return True else: self.log.warn("invalid device: %s" % device) self.dataset["Data.Warning"] = "InvalidDevice" return False def __substituteGettext(self, languages, textDomain, hdf): """substitute all texts in the hdf dataset with their translated counterparts as returned by gettext """ import gettext try: translator = gettext.translation(textDomain, languages=languages) except IOError, errMsg: ## no translation found self.cbox.log.warn("unable to load language file: %s" % errMsg) return hdf def walk_tree(hdf_node): def translate_node(node): for (key,value) in node.attrs(): if key == 'LINK': return try: node.setValue("",translator.ugettext(node.value())) except UnicodeEncodeError, errMsg: self.cbox.log.info("Failed unicode encoding for gettext: %s - %s" % (node.value(),errMsg)) ## fallback to default encoding node.setValue("",translator.gettext(node.value())) while hdf_node: translate_node(hdf_node) walk_tree(hdf_node.child()) hdf_node = hdf_node.next() walk_tree(hdf) def __getLanguageData(self): """return the hdf dataset of the main interface and all plugins translations are done according to self.langOrder """ ## check if the language setting was changed - use cached data if possible try: if self.cachedLanguageData["langOrder"] == self.langOrder: self.cbox.log.debug("using cached language data: %s" % str(self.langOrder)) return self.cachedLanguageData["hdf"] except AttributeError: pass self.cbox.log.debug("generating language data") hdf = neo_util.HDF() hdf.readFile(os.path.join(self.prefs["Locations"]["TemplateDir"],"language.hdf")) self.__substituteGettext(self.langOrder, GETTEXT_DOMAIN, hdf) ## load the language data of all plugins for p in self.pluginList.getPlugins(): pl_lang = p.getLanguageData() self.__substituteGettext(self.langOrder, "%s-feature-%s" % (GETTEXT_DOMAIN, p.getName()), pl_lang) hdf.copy("Plugins.%s" % p.getName(), pl_lang) self.cbox.log.debug("language data for plugin loaded: %s" % p.getName()) ## cache result for later retrieval self.cachedLanguageData = {"langOrder": self.langOrder, "hdf": hdf} return hdf def __render(self, renderInfo, plugin=None): '''renders from clearsilver templates and returns the resulting html ''' ## is renderInfo a string (filename of the template) or a dictionary? if type(renderInfo) == types.DictType: template = renderInfo["template"] if renderInfo.has_key("generator"): generator = renderInfo["generator"] else: generator = None else: (template, generator) = (renderInfo, None) ## load the language data hdf = neo_util.HDF() hdf.copy("Lang", self.__getLanguageData()) ## first: assume, that the template file is in the global template directory self.dataset["Settings.TemplateFile"] = os.path.abspath(os.path.join(self.prefs["Locations"]["TemplateDir"], template + ".cs")) if plugin: ## check, if the plugin provides the template file -> overriding plugin_cs_file = plugin.getTemplateFileName(template) if plugin_cs_file: self.dataset["Settings.TemplateFile"] = plugin_cs_file ## add the current state of the plugins to the hdf dataset self.dataset["Data.Status.Plugins.%s" % plugin.getName()] = plugin.getStatus() ## load the dataset of the plugin plugin.loadDataSet(hdf) self.log.info("rendering site: " + template) cs_path = os.path.abspath(os.path.join(self.prefs["Locations"]["TemplateDir"], "main.cs")) if not os.access(cs_path, os.R_OK): log.error("Couldn't read clearsilver file: %s" % cs_path) yield "Couldn't read clearsilver file: %s" % cs_path return self.log.debug(self.dataset) for key in self.dataset.keys(): hdf.setValue(key,str(self.dataset[key])) cs = neo_cs.CS(hdf) cs.parseFile(cs_path) ## is there a generator containing additional information? if not generator: ## all content in one flush yield cs.render() else: content_generate = generator() dummy_line = """""" ## now we do it linewise - checking for the content marker for line in cs.render().splitlines(): if line.find(dummy_line) != -1: yield line.replace(dummy_line, content_generate.next()) else: yield line + "\n"