cryptonas-branches/pythonrewrite/bin/WebInterfaceSites.py
lars 0fe6d426ed added mounting and unmounting of config partition
moved config partition handling to CryptoBoxSettings
implemented environment checks (writeable config, https (off for now))
chown mounted directory after mount to the cryptobox user
2006-11-03 14:27:19 +00:00

409 lines
13 KiB
Python
Executable file

import CryptoBox
import WebInterfaceDataset
import re
import Plugins
from CryptoBoxExceptions import *
import cherrypy
import types
import os
try:
import neo_cgi, neo_util, neo_cs
except ImportError:
errorMsg = "Could not import clearsilver module. Try 'apt-get install python-clearsilver'."
self.log.error(errorMsg)
sys.stderr.write(errorMsg)
raise ImportError, errorMsg
class WebInterfacePlugins:
def __init__(self, log, plugins, handler_func):
for plugin in plugins.getPlugins():
if not plugin: continue
if not plugin.isEnabled(): continue
plname = plugin.getName()
log.info("Plugin '%s' loaded" % plname)
## this should be the "easiest" way to expose all plugins as URLs
setattr(self, plname, handler_func(plugin))
setattr(getattr(self, plname), "exposed", True)
# TODO: check, if this really works - for now the "stream_response" feature seems to be broken
#setattr(getattr(self, plname), "stream_respones", True)
class IconHandler:
def __init__(self, plugins):
self.plugins = PluginIconHandler(plugins)
self.plugins.exposed = True
class PluginIconHandler:
def __init__(self, plugins):
for plugin in plugins.getPlugins():
if not plugin: continue
plname = plugin.getName()
## expose the getIcon function of this plugin
setattr(self, plname, plugin.getIcon)
class WebInterfaceSites:
'''
'''
## this template is used under strange circumstances
defaultTemplate = "empty"
def __init__(self):
import logging
self.cbox = CryptoBox.CryptoBoxProps()
self.log = logging.getLogger("CryptoBox")
self.prefs = self.cbox.prefs
self.__resetDataset()
def __resetDataset(self):
"""this method has to be called at the beginning of every "site" action
important: only at the beginning of an action (to not loose information)
important: for _every_ "site" action (cherrypy is stateful)
also take care for the plugins, as they also contain datasets
"""
self.pluginList = Plugins.PluginManager(self.cbox, self.prefs["Locations"]["PluginDir"])
self.plugins = WebInterfacePlugins(self.log, self.pluginList, self.return_plugin_action)
self.dataset = WebInterfaceDataset.WebInterfaceDataset(self.cbox, self.prefs, self.pluginList.getPlugins())
## publish plugin icons
self.icons = IconHandler(self.pluginList)
## this is a function decorator to check authentication
## it has to be defined before any page definition requiring authentification
def __requestAuth(self=None):
def check_credentials(site):
def _inner_wrapper(self, *args, **kargs):
import base64
## define a "non-allowed" function
user, password = None, None
try:
resp = cherrypy.request.headers["Authorization"][6:] # ignore "Basic "
(user, password) = base64.b64decode(resp).split(":",1)
except KeyError:
## no "authorization" header was sent
pass
except TypeError:
## invalid base64 string
pass
except AttributeError:
## no cherrypy response header defined
pass
authDict = self.cbox.prefs.userDB["admins"]
if user in authDict.keys():
if self.cbox.prefs.userDB.getDigest(password) == authDict[user]:
## ok: return the choosen page
self.cbox.log.info("access granted for: %s" % user)
return site(self, *args, **kargs)
else:
self.cbox.log.info("wrong password supplied for: %s" % user)
else:
self.cbox.log.info("unknown user: %s" % str(user))
## wrong credentials: return "access denied"
cherrypy.response.headers["WWW-Authenticate"] = '''Basic realm="CryptoBox"'''
cherrypy.response.status = 401
return self.__render("access_denied")
return _inner_wrapper
return check_credentials
######################################################################
## put real sites down here and don't forget to expose them at the end
@cherrypy.expose
def index(self, weblang=""):
self.__resetDataset()
self.__setWebLang(weblang)
self.__checkEnvironment()
## do not forget the language!
param_dict = {"weblang":weblang}
## render "disks" plugin by default
return self.return_plugin_action(self.pluginList.getPlugin("disks"))(**param_dict)
def return_plugin_action(self, plugin):
def handler(self, **args):
self.__resetDataset()
self.__checkEnvironment()
args_orig = dict(args)
try:
self.__setWebLang(args["weblang"])
del args["weblang"]
except KeyError:
self.__setWebLang("")
## check the device argument of volume plugins
if "volume" in plugin.pluginCapabilities:
try:
## initialize the dataset of the selected device if necessary
if self.__setDevice(args["device"]):
plugin.device = args["device"]
self.dataset.setCurrentDiskState(plugin.device)
else:
return self.__render(self.defaultTemplate)
except KeyError:
return self.__render(self.defaultTemplate)
else:
## the parameter 'device' exists - we have to remove it
del args["device"]
## call the plugin handler
nextTemplate = plugin.doAction(**args)
## for 'volume' plugins: reread the dataset of the current disk
## additionally: set the default template for plugins
if "volume" in plugin.pluginCapabilities:
self.dataset.setCurrentDiskState(plugin.device)
if not nextTemplate: nextTemplate = { "plugin":"volume_mount", "values":{"device":plugin.device}}
else:
self.dataset.setPluginData()
if not nextTemplate: nextTemplate = { "plugin":"disks", "values":{} }
## if another plugins was choosen for 'nextTemplate', then do it!
if isinstance(nextTemplate, types.DictType) \
and "plugin" in nextTemplate.keys() \
and "values" in nextTemplate.keys() \
and self.pluginList.getPlugin(nextTemplate["plugin"]):
valueDict = dict(nextTemplate["values"])
## force the current weblang attribute - otherwise it gets lost
valueDict["weblang"] = self.dataset["Settings.Language"]
new_plugin = self.pluginList.getPlugin(nextTemplate["plugin"])
return self.return_plugin_action(new_plugin)(**valueDict)
## save the currently active plugin name
self.dataset["Data.ActivePlugin"] = plugin.getName()
return self.__render(nextTemplate, plugin)
## apply authentication?
if plugin.isAuthRequired():
return lambda **args: self.__requestAuth()(handler)(self, **args)
else:
return lambda **args: handler(self, **args)
## test authentication
@cherrypy.expose
@__requestAuth
def test(self, weblang=""):
self.__resetDataset()
self.__setWebLang(weblang)
self.__checkEnvironment()
return "test passed"
@cherrypy.expose
def test_stream(self):
"""just for testing purposes - to check if the "stream_response" feature
actually works - for now (September 02006) it does not seem to be ok"""
import time
yield "<html><head><title>neu</title></head><body><p><ul>"
for a in range(10):
yield "<li>yes: %d - %s</li>" % (a, str(time.time()))
time.sleep(1)
yield "</ul></p></html>"
##################### input checker ##########################
def __checkEnvironment(self):
"""here we should place all interesting checks to inform the user of problems
examples are: non-https, readonly-config, ...
"""
if not self.cbox.prefs.isWriteable():
self.dataset["Data.EnvironmentWarning"] = "ReadOnlyConfig"
# TODO: turn this on soon (add "not") - for now it is annoying
if self.__checkHTTPS():
self.dataset["Data.EnvironmentWarning"] = "NoSSL"
def __checkHTTPS(self):
## check the request scheme
if cherrypy.request.scheme == "https": return True
## check an environment setting - this is quite common behind proxies
try:
if os.environ["HTTPS"]: return True
except KeyError:
pass
## check http header TODO (check pound for the name)
try:
if cherrypy.request.headers["TODO"]: return True
except KeyError:
pass
## the connection seems to be unencrypted
return False
def __setWebLang(self, value):
guess = value
availLangs = self.cbox.getAvailableLanguages()
## no language specified: check browser language
if not guess:
guess = self.__getPreferredBrowserLanguage(availLangs)
## no preferred language or invalid language?
if not guess \
or not guess in availLangs \
or re.search(u'\W', guess):
## warn only for invalid languages
if not guess is None:
self.cbox.log.info("invalid language choosen: %s" % guess)
guess = self.prefs["WebSettings"]["Language"]
## maybe the language is still not valid
if not guess in availLangs:
self.log.warn("the configured language is invalid: %s" % guess)
guess = "en"
## maybe there is no english dataset???
if not guess in availLangs:
self.log.warn("couldn't find the english dataset")
guess = availLangs[0]
self.dataset["Settings.Language"] = guess
## we only have to save it, if it was specified correctly and explicitly
if value == guess:
self.dataset["Settings.LinkAttrs.weblang"] = guess
def __getPreferredBrowserLanguage(self, availLangs):
"""guess the preferred language of the user (as sent by the browser)
take the first language, that is part of 'availLangs'
"""
try:
pref_lang_header = cherrypy.request.headers["Accept-Language"]
except KeyError:
## no language header was specified
return None
## this could be a typical 'Accept-Language' header:
## de-de,de;q=0.8,en-us;q=0.5,en;q=0.3
regex = re.compile(u"\w+(-\w+)?(;q=[\d\.]+)?$")
pref_langs = [e.split(";",1)[0]
for e in pref_lang_header.split(",")
if regex.match(e)]
## is one of these preferred languages available?
for lang in pref_langs:
if lang in availLangs: return lang
## we try to be nice: also look for "de" if "de-de" was specified ...
for lang in pref_langs:
## use only the first part of the language
short_lang = lang.split("-",1)[0]
if short_lang in availLangs: return short_lang
## we give up
return None
def __setDevice(self, device):
if device and re.match(u'[\w /\-]+$', device) and self.cbox.getContainer(device):
self.log.debug("select device: %s" % device)
self.dataset.setCurrentDiskState(device)
return True
else:
self.log.warn("invalid device: %s" % device)
self.dataset["Data.Warning"] = "InvalidDevice"
return False
def __checkVolumeName(self, name):
if name and re.match(u'[\w \-]+$', name):
return True
else:
return False
def __getLanguageValue(self, value):
hdf = self.__getLanguageData(self.dataset["Settings.Language"])
return hdf.getValue(value, "")
def __getLanguageData(self, web_lang="en"):
default_lang = "en"
conf_lang = self.prefs["WebSettings"]["Language"]
hdf = neo_util.HDF()
langDir = os.path.abspath(self.prefs["Locations"]["LangDir"])
langFiles = []
## first: read default language (en)
if (default_lang != conf_lang) and (default_lang != web_lang):
langFiles.append(os.path.join(langDir, default_lang + ".hdf"))
## second: read language as defined in the config file
if (conf_lang != web_lang):
langFiles.append(os.path.join(langDir, conf_lang + ".hdf"))
## third: read language as configured via web interface
langFiles.append(os.path.join(langDir, web_lang + ".hdf"))
for langFile in langFiles:
if os.access(langFile, os.R_OK):
hdf.readFile(langFile)
else:
log.warn("Couldn't read language file: %s" % langFile)
return hdf
def __render(self, renderInfo, plugin=None):
'''renders from clearsilver templates and returns the resulting html
'''
## is renderInfo a string (filename of the template) or a dictionary?
if type(renderInfo) == types.DictType:
template = renderInfo["template"]
if renderInfo.has_key("generator"):
generator = renderInfo["generator"]
else:
generator = False
else:
(template, generator) = (renderInfo, None)
## load the language data
hdf = neo_util.HDF()
hdf.copy("Lang", self.__getLanguageData(self.dataset["Settings.Language"]))
## first: assume, that the template file is in the global template directory
self.dataset["Settings.TemplateFile"] = os.path.abspath(os.path.join(self.prefs["Locations"]["TemplateDir"], template + ".cs"))
if plugin:
## check, if the plugin provides the template file -> overriding
plugin_cs_file = plugin.getTemplateFileName(template)
if plugin_cs_file:
self.dataset["Settings.TemplateFile"] = plugin_cs_file
## add the current state of the plugins to the hdf dataset
self.dataset["Data.Status.Plugins.%s" % plugin.getName()] = plugin.getStatus()
## load the language data
pl_lang = plugin.getLanguageData(self.dataset["Settings.Language"])
if pl_lang:
hdf.copy("Lang.Plugins.%s" % plugin.getName(), pl_lang)
## load the dataset of the plugin
plugin.loadDataSet(hdf)
self.log.info("rendering site: " + template)
cs_path = os.path.abspath(os.path.join(self.prefs["Locations"]["TemplateDir"], "main.cs"))
if not os.access(cs_path, os.R_OK):
log.error("Couldn't read clearsilver file: %s" % cs_path)
yield "Couldn't read clearsilver file: %s" % cs_path
return
## update the container hdf-dataset (necessary if a plugin changed the state of a container)
self.dataset.setContainersState()
self.log.debug(self.dataset)
for key in self.dataset.keys():
hdf.setValue(key,str(self.dataset[key]))
cs = neo_cs.CS(hdf)
cs.parseFile(cs_path)
## is there a generator containing additional information?
if generator is None:
## all content in one flush
yield cs.render()
else:
content_generate = generator()
dummy_line = """<!-- CONTENT_DUMMY -->"""
## now we do it linewise - checking for the content marker
for line in cs.render().splitlines():
if line.find(dummy_line) != -1:
yield line.replace(dummy_line, content_generate.next())
else:
yield line + "\n"