cryptonas/src/cryptobox/web/sites.py

498 lines
18 KiB
Python

import cryptobox.core.main
import cryptobox.web.dataset
import cryptobox.plugins.manage
from cryptobox.core.exceptions import *
import re
import cherrypy
import types
import os
try:
import neo_cgi, neo_util, neo_cs
except ImportError:
errorMsg = "Could not import clearsilver module. Try 'apt-get install python-clearsilver'."
self.log.error(errorMsg)
sys.stderr.write(errorMsg)
raise ImportError, errorMsg
GETTEXT_DOMAIN = 'cryptobox-server'
class PluginIconHandler:
def __init__(self, plugins):
for plugin in plugins.getPlugins():
if not plugin: continue
plname = plugin.getName()
## expose the getIcon function of this plugin
setattr(self, plname, plugin.getIcon)
class WebInterfaceSites:
'''
'''
## this template is used under strange circumstances
defaultTemplate = "empty"
def __init__(self, conf_file=None):
import logging,sys
self.cbox = cryptobox.core.main.CryptoBoxProps(conf_file)
self.log = logging.getLogger("CryptoBox")
self.prefs = self.cbox.prefs
self.__resetDataset()
## store the original http error handler
self._cp_on_http_error = self.newHTTPErrorHandler
## set initial language order
self.langOrder = self.cbox.prefs["WebSettings"]["Languages"]
def __resetDataset(self):
"""this method has to be called at the beginning of every "site" action
important: only at the beginning of an action (to not loose information)
important: for _every_ "site" action (cherrypy is stateful)
also take care for the plugins, as they also contain datasets
"""
self.__loadPlugins()
self.dataset = cryptobox.web.dataset.WebInterfaceDataset(self.cbox, self.prefs, self.pluginList.getPlugins())
## publish plugin icons
self.icons = PluginIconHandler(self.pluginList)
self.icons.exposed = True
## check, if a configuration partition has become available
self.cbox.prefs.preparePartition()
def __loadPlugins(self):
self.pluginList = cryptobox.plugins.manage.PluginManager(self.cbox, self.prefs["Locations"]["PluginDir"], self)
for plugin in self.pluginList.getPlugins():
if not plugin: continue
plname = plugin.getName()
if plugin.isEnabled():
self.cbox.log.info("Plugin '%s' loaded" % plname)
## this should be the "easiest" way to expose all plugins as URLs
setattr(self, plname, self.return_plugin_action(plugin))
setattr(getattr(self, plname), "exposed", True)
# TODO: check, if this really works - for now the "stream_response" feature seems to be broken
#setattr(getattr(self, plname), "stream_respones", True)
else:
self.cbox.log.info("Plugin '%s' is disabled" % plname)
## remove the plugin, if it was active before
setattr(self, plname, None)
## this is a function decorator to check authentication
## it has to be defined before any page definition requiring authentification
def __requestAuth(self=None):
def check_credentials(site):
def _inner_wrapper(self, *args, **kargs):
import base64
## define a "non-allowed" function
user, password = None, None
try:
resp = cherrypy.request.headers["Authorization"][6:] # ignore "Basic "
(user, password) = base64.b64decode(resp).split(":",1)
except KeyError:
## no "authorization" header was sent
pass
except TypeError:
## invalid base64 string
pass
except AttributeError:
## no cherrypy response header defined
pass
authDict = self.cbox.prefs.userDB["admins"]
if user in authDict.keys():
if self.cbox.prefs.userDB.getDigest(password) == authDict[user]:
## ok: return the choosen page
self.cbox.log.info("access granted for: %s" % user)
return site(self, *args, **kargs)
else:
self.cbox.log.info("wrong password supplied for: %s" % user)
else:
self.cbox.log.info("unknown user: %s" % str(user))
## wrong credentials: return "access denied"
cherrypy.response.headers["WWW-Authenticate"] = '''Basic realm="CryptoBox"'''
cherrypy.response.status = 401
return self.__render("access_denied")
return _inner_wrapper
return check_credentials
######################################################################
## put real sites down here and don't forget to expose them at the end
@cherrypy.expose
def index(self, weblang=""):
self.__resetDataset()
self.__setWebLang(weblang)
self.__checkEnvironment()
## do not forget the language!
param_dict = {"weblang":weblang}
## render "disks" plugin by default
return self.return_plugin_action(self.pluginList.getPlugin("disks"))(**param_dict)
def newHTTPErrorHandler(self, errorCode, message):
"""handle http errors gracefully
404 - not found errors: ignored if url is below /cryptobox-misc/
other 404 errors: send the error code and return a nice informative page
500 - runtime errors: return "ok" exit code and show a polite excuse
others: are there any other possible http errors?
"""
import traceback, sys
## we ignore uninteresting not-found errors
if (errorCode == 404) and \
(cherrypy.request.path.startswith("/cryptobox-misc/") or \
cherrypy.request.path in ['/robots.txt','/favicon.ico']):
cherrypy.response.status = errorCode
return
## an invalid action was requested
if errorCode == 404:
## we send a not-found error (with the usual interface)
cherrypy.response.status = errorCode
self.dataset["Data.Warning"] = "InvalidAction"
cherrypy.response.body = self.__render("empty")
return
## are there still bugs in the code?
if errorCode == 500:
## we fix the error code (200 is "OK")
cherrypy.response.status = 200
self.cbox.log.error("HTTP-ERROR[500] - a runtime error occoured: %s" % str(message))
## add a traceback and exception information to the lo
for a in traceback.format_exception(*sys.exc_info()):
self.cbox.log.error("\t%s" % a)
self.dataset["Data.Warning"] = "RuntimeError"
cherrypy.response.body = self.__render("empty")
return
## unknown error type
cherrypy.response.status = errorCode
self.cbox.log.warn("HTTP-ERROR[%d] - an unknown error occoured: %s" % (errorCode, message))
cherrypy.response.body = self.__render("empty")
def return_plugin_action(self, plugin):
def handler(self, **args):
self.__resetDataset()
self.__checkEnvironment()
args_orig = dict(args)
## set web interface language
try:
self.__setWebLang(args["weblang"])
del args["weblang"]
except KeyError:
self.__setWebLang("")
## we always read the "device" setting - otherwise volume-plugin links
## would not work easily (see "volume_props" linking to "volume_format_fs")
## it will get ignored for non-volume plugins
try:
plugin.device = None
if self.__setDevice(args["device"]):
plugin.device = args["device"]
del args["device"]
except KeyError:
pass
## check the device argument of volume plugins
if "volume" in plugin.pluginCapabilities:
## initialize the dataset of the selected device if necessary
if plugin.device:
self.dataset.setCurrentDiskState(plugin.device)
else:
## invalid (or missing) device setting
return self.__render(self.defaultTemplate)
## check if there is a "redirect" setting - this will override the return
## value of the doAction function (e.g. useful for umount-before-format)
try:
if args["redirect"]:
override_nextTemplate = { "plugin":args["redirect"] }
if "volume" in plugin.pluginCapabilities:
override_nextTemplate["values"] = {"device":plugin.device}
del args["redirect"]
except KeyError:
override_nextTemplate = None
## check for information to be kept after the last call
try:
keep_values = args["message_keep"]
del args["message_keep"]
for key, value in keep_values["dataset"].items():
self.dataset[key] = value
except KeyError:
keep_values = None
## call the plugin handler
nextTemplate = plugin.doAction(**args)
## for 'volume' plugins: reread the dataset of the current disk
## additionally: set the default template for plugins
if "volume" in plugin.pluginCapabilities:
## maybe the state of the current volume was changed?
self.dataset.setCurrentDiskState(plugin.device)
if not nextTemplate: nextTemplate = { "plugin":"volume_mount", "values":{"device":plugin.device}}
else:
## maybe a non-volume plugin changed some plugin settings (e.g. plugin_manager)
self.dataset.setPluginData()
## update the container hdf-dataset (maybe a plugin changed the state of a container)
self.dataset.setContainersState()
## default page for non-volume plugins is the disk selection
if not nextTemplate: nextTemplate = { "plugin":"disks", "values":{} }
## was a redirect requested?
if override_nextTemplate:
nextTemplate = override_nextTemplate
## if another plugins was choosen for 'nextTemplate', then do it!
if isinstance(nextTemplate, types.DictType) \
and "plugin" in nextTemplate.keys() \
and "values" in nextTemplate.keys() \
and self.pluginList.getPlugin(nextTemplate["plugin"]):
valueDict = dict(nextTemplate["values"])
## force the current weblang attribute - otherwise it gets lost
valueDict["weblang"] = self.dataset["Settings.Language"]
## check for warnings/success messages, that should be kept
if "Data.Warning" in plugin.hdf.keys() \
or "Data.Success" in plugin.hdf.keys():
self.cbox.log.info("keep warning message")
valueDict["message_keep"] = { "plugin":plugin, "dataset":{}}
for keep_key in ("Data.Warning", "Data.Success"):
if keep_key in plugin.hdf.keys():
valueDict["message_keep"]["dataset"][keep_key] = plugin.hdf[keep_key]
new_plugin = self.pluginList.getPlugin(nextTemplate["plugin"])
return self.return_plugin_action(new_plugin)(**valueDict)
## save the currently active plugin name
self.dataset["Data.ActivePlugin"] = plugin.getName()
return self.__render(nextTemplate, plugin)
## apply authentication?
if plugin.isAuthRequired():
return lambda **args: self.__requestAuth()(handler)(self, **args)
else:
return lambda **args: handler(self, **args)
## test authentication
@cherrypy.expose
@__requestAuth
def test(self, weblang=""):
self.__resetDataset()
self.__setWebLang(weblang)
self.__checkEnvironment()
return "test passed"
@cherrypy.expose
def test_stream(self):
"""just for testing purposes - to check if the "stream_response" feature
actually works - for now (September 02006) it does not seem to be ok"""
import time
yield "<html><head><title>neu</title></head><body><p><ul>"
for a in range(10):
yield "<li>yes: %d - %s</li>" % (a, str(time.time()))
time.sleep(1)
yield "</ul></p></html>"
##################### input checker ##########################
def __checkEnvironment(self):
"""here we should place all interesting checks to inform the user of problems
examples are: non-https, readonly-config, ...
"""
## this check is done _after_ "resetDataSet" -> a possible config partition was
## loaded before
if self.cbox.prefs.requiresPartition() and not self.cbox.prefs.getActivePartition():
self.dataset["Data.EnvironmentWarning"] = "ReadOnlyConfig"
# TODO: turn this on soon (add "not") - for now it is annoying
if self.__checkHTTPS():
self.dataset["Data.EnvironmentWarning"] = "NoSSL"
def __checkHTTPS(self):
## check the request scheme
if cherrypy.request.scheme == "https": return True
## check an environment setting - this is quite common behind proxies
try:
if os.environ["HTTPS"]: return True
except KeyError:
pass
## check http header TODO (check pound for the name)
try:
if cherrypy.request.headers["TODO"]: return True
except KeyError:
pass
## the connection seems to be unencrypted
return False
def __setWebLang(self, value):
"""set the preferred priority of languages according to the following order:
1. language selected via web interface
2. preferred browser language setting
3. languages defined in the config file
"""
## start with the configured language order
langOrder = self.cbox.prefs["WebSettings"]["Languages"][:]
## put the preferred browser language in front
guess = self.__getPreferredBrowserLanguage(langOrder)
if guess:
langOrder.remove(guess)
langOrder.insert(0,guess)
## is the chosen language (via web interface) valid? - put it in front
if value and (value in langOrder) and (not re.search(u'\W',value)):
langOrder.remove(value)
langOrder.insert(0,value)
elif value:
self.cbox.log.info("invalid language selected: %s" % value)
## store current language setting
self.langOrder = langOrder
self.dataset["Settings.Language"] = langOrder[0]
self.dataset["Settings.LinkAttrs.weblang"] = langOrder[0]
def __getPreferredBrowserLanguage(self, availLangs):
"""guess the preferred language of the user (as sent by the browser)
take the first language, that is part of 'availLangs'
"""
try:
pref_lang_header = cherrypy.request.headers["Accept-Language"]
except KeyError:
## no language header was specified
return None
## this could be a typical 'Accept-Language' header:
## de-de,de;q=0.8,en-us;q=0.5,en;q=0.3
regex = re.compile(u"\w+(-\w+)?(;q=[\d\.]+)?$")
pref_langs = [e.split(";",1)[0]
for e in pref_lang_header.split(",")
if regex.match(e)]
## is one of these preferred languages available?
for lang in pref_langs:
if lang in availLangs: return lang
## we try to be nice: also look for "de" if "de-de" was specified ...
for lang in pref_langs:
## use only the first part of the language
short_lang = lang.split("-",1)[0]
if short_lang in availLangs: return short_lang
## we give up
return None
def __setDevice(self, device):
"""check a device name that was chosen via the web interface
issue a warning if the device is invalid"""
if device and re.match(u'[\w /\-]+$', device) and self.cbox.getContainer(device):
self.log.debug("select device: %s" % device)
return True
else:
self.log.warn("invalid device: %s" % device)
self.dataset["Data.Warning"] = "InvalidDevice"
return False
def __substituteGettext(self, languages, textDomain, hdf):
"""substitute all texts in the hdf dataset with their translated
counterparts as returned by gettext
"""
import gettext
try:
translator = gettext.translation(textDomain, languages=languages)
except IOError, errMsg:
## no translation found
self.cbox.log.warn("unable to load language file: %s" % errMsg)
return hdf
def walk_tree(hdf_node):
def translate_node(node):
for (key,value) in node.attrs():
if key == 'LINK': return
node.setValue("",translator.ugettext(node.value()))
while hdf_node:
translate_node(hdf_node)
walk_tree(hdf_node.child())
hdf_node = hdf_node.next()
walk_tree(hdf)
def __getLanguageData(self):
"""return the hdf dataset of the main interface and all plugins
translations are done according to self.langOrder
"""
## check if the language setting was changed - use cached data if possible
try:
if self.cachedLanguageData["langOrder"] == self.langOrder:
self.cbox.log.debug("using cached language data")
return self.cachedLanguageData["hdf"]
except AttributeError:
pass
self.cbox.log.debug("generating language data")
hdf = neo_util.HDF()
hdf.readFile(os.path.join(self.prefs["Locations"]["TemplateDir"],"language.hdf"))
self.__substituteGettext(self.langOrder, GETTEXT_DOMAIN, hdf)
## load the language data of all plugins
for p in self.pluginList.getPlugins():
pl_lang = p.getLanguageData()
self.__substituteGettext(self.langOrder, "%s-feature-%s" % (GETTEXT_DOMAIN, p.getName()), pl_lang)
hdf.copy("Plugins.%s" % p.getName(), pl_lang)
self.cbox.log.debug("language data for plugin loaded: %s" % p.getName())
## cache result for later retrieval
self.cachedLanguageData = {"langOrder": self.langOrder, "hdf": hdf}
return hdf
def __render(self, renderInfo, plugin=None):
'''renders from clearsilver templates and returns the resulting html
'''
## is renderInfo a string (filename of the template) or a dictionary?
if type(renderInfo) == types.DictType:
template = renderInfo["template"]
if renderInfo.has_key("generator"):
generator = renderInfo["generator"]
else:
generator = None
else:
(template, generator) = (renderInfo, None)
## load the language data
hdf = neo_util.HDF()
hdf.copy("Lang", self.__getLanguageData())
## first: assume, that the template file is in the global template directory
self.dataset["Settings.TemplateFile"] = os.path.abspath(os.path.join(self.prefs["Locations"]["TemplateDir"], template + ".cs"))
if plugin:
## check, if the plugin provides the template file -> overriding
plugin_cs_file = plugin.getTemplateFileName(template)
if plugin_cs_file:
self.dataset["Settings.TemplateFile"] = plugin_cs_file
## add the current state of the plugins to the hdf dataset
self.dataset["Data.Status.Plugins.%s" % plugin.getName()] = plugin.getStatus()
## load the dataset of the plugin
plugin.loadDataSet(hdf)
self.log.info("rendering site: " + template)
cs_path = os.path.abspath(os.path.join(self.prefs["Locations"]["TemplateDir"], "main.cs"))
if not os.access(cs_path, os.R_OK):
log.error("Couldn't read clearsilver file: %s" % cs_path)
yield "Couldn't read clearsilver file: %s" % cs_path
return
self.log.debug(self.dataset)
for key in self.dataset.keys():
hdf.setValue(key,str(self.dataset[key]))
cs = neo_cs.CS(hdf)
cs.parseFile(cs_path)
## is there a generator containing additional information?
if not generator:
## all content in one flush
yield cs.render()
else:
content_generate = generator()
dummy_line = """<!-- CONTENT_DUMMY -->"""
## now we do it linewise - checking for the content marker
for line in cs.render().splitlines():
if line.find(dummy_line) != -1:
yield line.replace(dummy_line, content_generate.next())
else:
yield line + "\n"