change identation from tabs to spaces
This commit is contained in:
parent
c8e28f5241
commit
3224d59dfe
|
@ -31,86 +31,86 @@ import cryptobox.plugins.base
|
|||
|
||||
|
||||
class date(cryptobox.plugins.base.CryptoBoxPlugin):
|
||||
"""The date feature of the CryptoBox.
|
||||
"""
|
||||
"""The date feature of the CryptoBox.
|
||||
"""
|
||||
|
||||
plugin_capabilities = [ "system" ]
|
||||
plugin_visibility = [ "preferences" ]
|
||||
request_auth = False
|
||||
rank = 10
|
||||
plugin_capabilities = [ "system" ]
|
||||
plugin_visibility = [ "preferences" ]
|
||||
request_auth = False
|
||||
rank = 10
|
||||
|
||||
def do_action(self, store=None, year=0, month=0, day=0, hour=0, minute=0):
|
||||
"""The action handler.
|
||||
"""
|
||||
import datetime
|
||||
if store:
|
||||
try:
|
||||
year, month, day = int(year), int(month), int(day)
|
||||
hour, minute = int(hour), int(minute)
|
||||
## check if the values are valid
|
||||
datetime.datetime(year, month, day, hour, minute)
|
||||
except ValueError:
|
||||
self.hdf["Data.Warning"] = "Plugins.date.InvalidDate"
|
||||
else:
|
||||
new_date = "%02d%02d%02d%02d%d" % (month, day, hour, minute, year)
|
||||
if self.__set_date(new_date):
|
||||
self.cbox.log.info("changed date to: %s" % new_date)
|
||||
self.hdf["Data.Success"] = "Plugins.date.DateChanged"
|
||||
else:
|
||||
## a failure should usually be an invalid date (we do not check it really)
|
||||
self.cbox.log.info("failed to set date: %s" % new_date)
|
||||
self.hdf["Data.Warning"] = "Plugins.date.InvalidDate"
|
||||
self.__prepare_form_data()
|
||||
return "form_date"
|
||||
def do_action(self, store=None, year=0, month=0, day=0, hour=0, minute=0):
|
||||
"""The action handler.
|
||||
"""
|
||||
import datetime
|
||||
if store:
|
||||
try:
|
||||
year, month, day = int(year), int(month), int(day)
|
||||
hour, minute = int(hour), int(minute)
|
||||
## check if the values are valid
|
||||
datetime.datetime(year, month, day, hour, minute)
|
||||
except ValueError:
|
||||
self.hdf["Data.Warning"] = "Plugins.date.InvalidDate"
|
||||
else:
|
||||
new_date = "%02d%02d%02d%02d%d" % (month, day, hour, minute, year)
|
||||
if self.__set_date(new_date):
|
||||
self.cbox.log.info("changed date to: %s" % new_date)
|
||||
self.hdf["Data.Success"] = "Plugins.date.DateChanged"
|
||||
else:
|
||||
## a failure should usually be an invalid date (we do not check it really)
|
||||
self.cbox.log.info("failed to set date: %s" % new_date)
|
||||
self.hdf["Data.Warning"] = "Plugins.date.InvalidDate"
|
||||
self.__prepare_form_data()
|
||||
return "form_date"
|
||||
|
||||
|
||||
def get_status(self):
|
||||
"""Retrieve the status of the feature.
|
||||
"""
|
||||
now = self.__get_current_date()
|
||||
return "%d/%d/%d/%d/%d/%d" % \
|
||||
(now.year, now.month, now.day, now.hour, now.minute, now.second)
|
||||
def get_status(self):
|
||||
"""Retrieve the status of the feature.
|
||||
"""
|
||||
now = self.__get_current_date()
|
||||
return "%d/%d/%d/%d/%d/%d" % \
|
||||
(now.year, now.month, now.day, now.hour, now.minute, now.second)
|
||||
|
||||
|
||||
def get_warnings(self):
|
||||
import os
|
||||
warnings = []
|
||||
if not os.path.isfile(self.root_action.DATE_BIN):
|
||||
warnings.append((48, "Plugins.%s.MissingProgramDate" % self.get_name()))
|
||||
return warnings
|
||||
def get_warnings(self):
|
||||
import os
|
||||
warnings = []
|
||||
if not os.path.isfile(self.root_action.DATE_BIN):
|
||||
warnings.append((48, "Plugins.%s.MissingProgramDate" % self.get_name()))
|
||||
return warnings
|
||||
|
||||
|
||||
def __prepare_form_data(self):
|
||||
"""Set some hdf values.
|
||||
"""
|
||||
cur_date = self.__get_current_date()
|
||||
self.hdf[self.hdf_prefix + "year"] = cur_date.year
|
||||
self.hdf[self.hdf_prefix + "month"] = cur_date.month
|
||||
self.hdf[self.hdf_prefix + "day"] = cur_date.day
|
||||
self.hdf[self.hdf_prefix + "hour"] = cur_date.hour
|
||||
self.hdf[self.hdf_prefix + "minute"] = cur_date.minute
|
||||
def __prepare_form_data(self):
|
||||
"""Set some hdf values.
|
||||
"""
|
||||
cur_date = self.__get_current_date()
|
||||
self.hdf[self.hdf_prefix + "year"] = cur_date.year
|
||||
self.hdf[self.hdf_prefix + "month"] = cur_date.month
|
||||
self.hdf[self.hdf_prefix + "day"] = cur_date.day
|
||||
self.hdf[self.hdf_prefix + "hour"] = cur_date.hour
|
||||
self.hdf[self.hdf_prefix + "minute"] = cur_date.minute
|
||||
|
||||
|
||||
def __get_current_date(self):
|
||||
"""Retrieve the current date and time.
|
||||
"""
|
||||
import datetime
|
||||
return datetime.datetime(2000, 1, 1).now()
|
||||
|
||||
def __get_current_date(self):
|
||||
"""Retrieve the current date and time.
|
||||
"""
|
||||
import datetime
|
||||
return datetime.datetime(2000, 1, 1).now()
|
||||
|
||||
|
||||
def __set_date(self, new_date):
|
||||
"""Set a new date and time.
|
||||
"""
|
||||
import subprocess
|
||||
import os
|
||||
proc = subprocess.Popen(
|
||||
shell = False,
|
||||
args = [
|
||||
self.cbox.prefs["Programs"]["super"],
|
||||
self.cbox.prefs["Programs"]["CryptoBoxRootActions"],
|
||||
"plugin",
|
||||
os.path.join(self.plugin_dir, "root_action.py"),
|
||||
new_date])
|
||||
proc.wait()
|
||||
return proc.returncode == 0
|
||||
def __set_date(self, new_date):
|
||||
"""Set a new date and time.
|
||||
"""
|
||||
import subprocess
|
||||
import os
|
||||
proc = subprocess.Popen(
|
||||
shell = False,
|
||||
args = [
|
||||
self.cbox.prefs["Programs"]["super"],
|
||||
self.cbox.prefs["Programs"]["CryptoBoxRootActions"],
|
||||
"plugin",
|
||||
os.path.join(self.plugin_dir, "root_action.py"),
|
||||
new_date])
|
||||
proc.wait()
|
||||
return proc.returncode == 0
|
||||
|
||||
|
|
|
@ -33,26 +33,26 @@ import sys
|
|||
import os
|
||||
|
||||
if __name__ == "__main__":
|
||||
args = sys.argv[1:]
|
||||
args = sys.argv[1:]
|
||||
|
||||
self_bin = sys.argv[0]
|
||||
|
||||
if len(args) > 1:
|
||||
sys.stderr.write("%s: too many arguments (%s)\n" % (self_bin, args))
|
||||
sys.exit(1)
|
||||
|
||||
if len(args) == 0:
|
||||
sys.stderr.write("%s: no argument supplied\n" % self_bin)
|
||||
sys.exit(1)
|
||||
|
||||
if re.search(r'\D', args[0]):
|
||||
sys.stderr.write("%s: illegal argument (%s)\n" % (self_bin, args[0]))
|
||||
sys.exit(1)
|
||||
|
||||
proc = subprocess.Popen(
|
||||
shell = False,
|
||||
stdout = subprocess.PIPE,
|
||||
args = [DATE_BIN, args[0]])
|
||||
proc.wait()
|
||||
sys.exit(proc.returncode)
|
||||
self_bin = sys.argv[0]
|
||||
|
||||
if len(args) > 1:
|
||||
sys.stderr.write("%s: too many arguments (%s)\n" % (self_bin, args))
|
||||
sys.exit(1)
|
||||
|
||||
if len(args) == 0:
|
||||
sys.stderr.write("%s: no argument supplied\n" % self_bin)
|
||||
sys.exit(1)
|
||||
|
||||
if re.search(r'\D', args[0]):
|
||||
sys.stderr.write("%s: illegal argument (%s)\n" % (self_bin, args[0]))
|
||||
sys.exit(1)
|
||||
|
||||
proc = subprocess.Popen(
|
||||
shell = False,
|
||||
stdout = subprocess.PIPE,
|
||||
args = [DATE_BIN, args[0]])
|
||||
proc.wait()
|
||||
sys.exit(proc.returncode)
|
||||
|
||||
|
|
|
@ -23,75 +23,75 @@ __revision__ = "$Id$"
|
|||
from cryptobox.tests.base import WebInterfaceTestClass
|
||||
|
||||
class unittests(WebInterfaceTestClass):
|
||||
|
||||
def test_get_date(self):
|
||||
"""retrieve the current date"""
|
||||
date = self._get_current_date()
|
||||
|
||||
|
||||
def test_get_date(self):
|
||||
"""retrieve the current date"""
|
||||
date = self._get_current_date()
|
||||
|
||||
|
||||
def test_change_date(self):
|
||||
"""set the date back and forth"""
|
||||
now = self._get_current_date()
|
||||
## copy current time
|
||||
new_date = dict(now)
|
||||
## move three minutes forward (more is not nice because of screensavers)
|
||||
new_date["minute"] = (now["minute"] + 3) % 60
|
||||
## in case of minute-overflow we also have to move the hour a little bit forward
|
||||
new_date["hour"] = now["hour"] + ((now["minute"] + 3) / 60)
|
||||
## move forward ...
|
||||
self._setDate(new_date)
|
||||
self.assertEquals(new_date, self._get_current_date())
|
||||
## ... and backward
|
||||
self._setDate(now)
|
||||
self.assertEquals(now, self._get_current_date())
|
||||
|
||||
def test_change_date(self):
|
||||
"""set the date back and forth"""
|
||||
now = self._get_current_date()
|
||||
## copy current time
|
||||
new_date = dict(now)
|
||||
## move three minutes forward (more is not nice because of screensavers)
|
||||
new_date["minute"] = (now["minute"] + 3) % 60
|
||||
## in case of minute-overflow we also have to move the hour a little bit forward
|
||||
new_date["hour"] = now["hour"] + ((now["minute"] + 3) / 60)
|
||||
## move forward ...
|
||||
self._setDate(new_date)
|
||||
self.assertEquals(new_date, self._get_current_date())
|
||||
## ... and backward
|
||||
self._setDate(now)
|
||||
self.assertEquals(now, self._get_current_date())
|
||||
|
||||
|
||||
def test_try_broken_date(self):
|
||||
"""expect error messages for invalid dates"""
|
||||
self._setDate({"hour":12, "minute":40, "year":2004, "month":7, "day":0})
|
||||
self.cmd.find("invalid value for date")
|
||||
self._setDate({"hour":12, "minute":40, "year":"x", "month":7, "day":2})
|
||||
self.cmd.find("invalid value for date")
|
||||
self._setDate({"hour":12, "minute":40, "year":2004, "month":2, "day":31})
|
||||
self.cmd.find("invalid value for date")
|
||||
def test_try_broken_date(self):
|
||||
"""expect error messages for invalid dates"""
|
||||
self._setDate({"hour":12, "minute":40, "year":2004, "month":7, "day":0})
|
||||
self.cmd.find("invalid value for date")
|
||||
self._setDate({"hour":12, "minute":40, "year":"x", "month":7, "day":2})
|
||||
self.cmd.find("invalid value for date")
|
||||
self._setDate({"hour":12, "minute":40, "year":2004, "month":2, "day":31})
|
||||
self.cmd.find("invalid value for date")
|
||||
|
||||
|
||||
def _get_current_date(self):
|
||||
date_url = self.url + "date"
|
||||
self.register_auth(date_url)
|
||||
self.cmd.go(date_url)
|
||||
self.cmd.find("Data.Status.Plugins.date=([0-9]+/[0-9]+/[0-9]+/[0-9]+/[0-9]+/[0-9]+)$", "m")
|
||||
dateNumbers = self.locals["__match__"].split("/")
|
||||
self.assertEquals(len(dateNumbers), 6)
|
||||
## we ignore seconds
|
||||
dateField = {
|
||||
"year" : int(dateNumbers[0]),
|
||||
"month" : int(dateNumbers[1]),
|
||||
"day" : int(dateNumbers[2]),
|
||||
"hour" : int(dateNumbers[3]),
|
||||
"minute" : int(dateNumbers[4])}
|
||||
return dateField
|
||||
|
||||
def _get_current_date(self):
|
||||
date_url = self.url + "date"
|
||||
self.register_auth(date_url)
|
||||
self.cmd.go(date_url)
|
||||
self.cmd.find("Data.Status.Plugins.date=([0-9]+/[0-9]+/[0-9]+/[0-9]+/[0-9]+/[0-9]+)$", "m")
|
||||
dateNumbers = self.locals["__match__"].split("/")
|
||||
self.assertEquals(len(dateNumbers), 6)
|
||||
## we ignore seconds
|
||||
dateField = {
|
||||
"year" : int(dateNumbers[0]),
|
||||
"month" : int(dateNumbers[1]),
|
||||
"day" : int(dateNumbers[2]),
|
||||
"hour" : int(dateNumbers[3]),
|
||||
"minute" : int(dateNumbers[4])}
|
||||
return dateField
|
||||
|
||||
|
||||
def _setDate(self, date):
|
||||
"""for now we have to use this function instead of the one below"""
|
||||
date_url = self.url + "date?weblang=en&store=1&year=%s&month=%s&day=%s&hour=%s&minute=%s"\
|
||||
% (str(date["year"]), str(date["month"]), str(date["day"]),
|
||||
str(date["hour"]), str(date["minute"]))
|
||||
self.register_auth(date_url)
|
||||
self.cmd.go(date_url)
|
||||
def _setDate(self, date):
|
||||
"""for now we have to use this function instead of the one below"""
|
||||
date_url = self.url + "date?weblang=en&store=1&year=%s&month=%s&day=%s&hour=%s&minute=%s"\
|
||||
% (str(date["year"]), str(date["month"]), str(date["day"]),
|
||||
str(date["hour"]), str(date["minute"]))
|
||||
self.register_auth(date_url)
|
||||
self.cmd.go(date_url)
|
||||
|
||||
|
||||
def _setDateBroken(self, date):
|
||||
"""this should work, but the parsing of twill seems to be broken
|
||||
as soon as the twill bug is fixed, we should use this function"""
|
||||
date_url = self.url + "date"
|
||||
self.register_auth(date_url)
|
||||
self.cmd.go(date_url)
|
||||
self.cmd.formvalue("set_date", "year", str(date["year"]))
|
||||
self.cmd.formvalue("set_date", "month", str(date["month"]))
|
||||
self.cmd.formvalue("set_date", "day", str(date["day"]))
|
||||
self.cmd.formvalue("set_date", "hour", str(date["hour"]))
|
||||
self.cmd.formvalue("set_date", "minute", str(date["minute"]))
|
||||
self.cmd.submit()
|
||||
def _setDateBroken(self, date):
|
||||
"""this should work, but the parsing of twill seems to be broken
|
||||
as soon as the twill bug is fixed, we should use this function"""
|
||||
date_url = self.url + "date"
|
||||
self.register_auth(date_url)
|
||||
self.cmd.go(date_url)
|
||||
self.cmd.formvalue("set_date", "year", str(date["year"]))
|
||||
self.cmd.formvalue("set_date", "month", str(date["month"]))
|
||||
self.cmd.formvalue("set_date", "day", str(date["day"]))
|
||||
self.cmd.formvalue("set_date", "hour", str(date["hour"]))
|
||||
self.cmd.formvalue("set_date", "minute", str(date["minute"]))
|
||||
self.cmd.submit()
|
||||
|
||||
|
|
|
@ -26,23 +26,23 @@ __revision__ = "$Id$"
|
|||
import cryptobox.plugins.base
|
||||
|
||||
class disks(cryptobox.plugins.base.CryptoBoxPlugin):
|
||||
"""The disk feature of the CryptoBox.
|
||||
"""
|
||||
"""The disk feature of the CryptoBox.
|
||||
"""
|
||||
|
||||
plugin_capabilities = [ "system" ]
|
||||
plugin_visibility = [ "menu" ]
|
||||
request_auth = False
|
||||
rank = 10
|
||||
plugin_capabilities = [ "system" ]
|
||||
plugin_visibility = [ "menu" ]
|
||||
request_auth = False
|
||||
rank = 10
|
||||
|
||||
def do_action(self):
|
||||
"""The action handler.
|
||||
"""
|
||||
self.cbox.reread_container_list()
|
||||
return "disks"
|
||||
def do_action(self):
|
||||
"""The action handler.
|
||||
"""
|
||||
self.cbox.reread_container_list()
|
||||
return "disks"
|
||||
|
||||
|
||||
def get_status(self):
|
||||
"""Retrieve the current status of the feature.
|
||||
"""
|
||||
return ":".join([e.get_device() for e in self.cbox.get_container_list()])
|
||||
def get_status(self):
|
||||
"""Retrieve the current status of the feature.
|
||||
"""
|
||||
return ":".join([e.get_device() for e in self.cbox.get_container_list()])
|
||||
|
||||
|
|
|
@ -24,20 +24,20 @@ from cryptobox.tests.base import WebInterfaceTestClass
|
|||
|
||||
class unittests(WebInterfaceTestClass):
|
||||
|
||||
def test_read_form(self):
|
||||
'''display all devices'''
|
||||
self.register_auth(self.url)
|
||||
self.cmd.go(self.url + "disks?weblang=en")
|
||||
self.cmd.find("Available disks")
|
||||
def test_read_form(self):
|
||||
'''display all devices'''
|
||||
self.register_auth(self.url)
|
||||
self.cmd.go(self.url + "disks?weblang=en")
|
||||
self.cmd.find("Available disks")
|
||||
|
||||
|
||||
def test_is_device_in_list(self):
|
||||
"""check if the device-under-test is in the device list"""
|
||||
self.register_auth(self.url)
|
||||
self.cmd.go(self.url + "disks?weblang=en")
|
||||
self.cmd.find("Available disks")
|
||||
self.cmd.find(r'Data.Status.Plugins.disks=(.*)$', "m")
|
||||
devices = self.locals["__match__"].split(":")
|
||||
self.assertTrue(len(devices)>0)
|
||||
self.assertTrue(self.device in devices)
|
||||
def test_is_device_in_list(self):
|
||||
"""check if the device-under-test is in the device list"""
|
||||
self.register_auth(self.url)
|
||||
self.cmd.go(self.url + "disks?weblang=en")
|
||||
self.cmd.find("Available disks")
|
||||
self.cmd.find(r'Data.Status.Plugins.disks=(.*)$', "m")
|
||||
devices = self.locals["__match__"].split(":")
|
||||
self.assertTrue(len(devices)>0)
|
||||
self.assertTrue(self.device in devices)
|
||||
|
||||
|
|
|
@ -35,232 +35,232 @@ import cherrypy
|
|||
CERT_FILENAME = 'cryptobox-ssl-certificate.pem'
|
||||
KEY_BITS = 1024
|
||||
ISSUER_INFOS = {
|
||||
"ST": "SomeIssuerState",
|
||||
"L": "SomeIssuerLocality",
|
||||
"O": "SomeIssuerOrganization",
|
||||
"OU": "CryptoBox-ServerIssuer",
|
||||
"CN": "cryptoboxIssuer",
|
||||
"emailAddress": "infoIssuer@cryptobox.org"}
|
||||
"ST": "SomeIssuerState",
|
||||
"L": "SomeIssuerLocality",
|
||||
"O": "SomeIssuerOrganization",
|
||||
"OU": "CryptoBox-ServerIssuer",
|
||||
"CN": "cryptoboxIssuer",
|
||||
"emailAddress": "infoIssuer@cryptobox.org"}
|
||||
CERT_INFOS = {
|
||||
"ST": "SomeState",
|
||||
"L": "SomeLocality",
|
||||
"O": "SomeOrganization",
|
||||
"OU": "CryptoBox-Server",
|
||||
"CN": "*",
|
||||
"emailAddress": "info@cryptobox.org"}
|
||||
"ST": "SomeState",
|
||||
"L": "SomeLocality",
|
||||
"O": "SomeOrganization",
|
||||
"OU": "CryptoBox-Server",
|
||||
"CN": "*",
|
||||
"emailAddress": "info@cryptobox.org"}
|
||||
EXPIRE_TIME = 60*60*24*365*20 # 20 years forward and backward
|
||||
SIGN_DIGEST = "md5"
|
||||
PID_FILE = os.path.join("/tmp/cryptobox-stunnel.pid")
|
||||
|
||||
|
||||
class encrypted_webinterface(cryptobox.plugins.base.CryptoBoxPlugin):
|
||||
"""Provide an encrypted webinterface connection via stunnel
|
||||
"""
|
||||
"""Provide an encrypted webinterface connection via stunnel
|
||||
"""
|
||||
|
||||
plugin_capabilities = [ "system" ]
|
||||
plugin_visibility = []
|
||||
request_auth = True
|
||||
rank = 80
|
||||
plugin_capabilities = [ "system" ]
|
||||
plugin_visibility = []
|
||||
request_auth = True
|
||||
rank = 80
|
||||
|
||||
def do_action(self):
|
||||
"""The action handler.
|
||||
"""
|
||||
return None
|
||||
def do_action(self):
|
||||
"""The action handler.
|
||||
"""
|
||||
return None
|
||||
|
||||
|
||||
def get_status(self):
|
||||
"""Retrieve the current state of the webinterface connection
|
||||
"""
|
||||
if self.__is_encrypted():
|
||||
return "1"
|
||||
else:
|
||||
return "0"
|
||||
def get_status(self):
|
||||
"""Retrieve the current state of the webinterface connection
|
||||
"""
|
||||
if self.__is_encrypted():
|
||||
return "1"
|
||||
else:
|
||||
return "0"
|
||||
|
||||
|
||||
def get_warnings(self):
|
||||
"""check if the connection is encrypted
|
||||
"""
|
||||
warnings = []
|
||||
## check if m2crypto is available
|
||||
try:
|
||||
import M2Crypto
|
||||
except ImportError:
|
||||
warnings.append((45, "Plugins.%s.MissingModuleM2Crypto" % self.get_name()))
|
||||
if not os.path.isfile(self.root_action.STUNNEL_BIN):
|
||||
warnings.append((44, "Plugins.%s.MissingProgramStunnel" % self.get_name()))
|
||||
if not self.__is_encrypted():
|
||||
## plaintext connection -> "heavy security risk" (priority=20..39)
|
||||
warnings.append((25, "Plugins.%s.NoSSL" % self.get_name()))
|
||||
return warnings
|
||||
|
||||
def get_warnings(self):
|
||||
"""check if the connection is encrypted
|
||||
"""
|
||||
warnings = []
|
||||
## check if m2crypto is available
|
||||
try:
|
||||
import M2Crypto
|
||||
except ImportError:
|
||||
warnings.append((45, "Plugins.%s.MissingModuleM2Crypto" % self.get_name()))
|
||||
if not os.path.isfile(self.root_action.STUNNEL_BIN):
|
||||
warnings.append((44, "Plugins.%s.MissingProgramStunnel" % self.get_name()))
|
||||
if not self.__is_encrypted():
|
||||
## plaintext connection -> "heavy security risk" (priority=20..39)
|
||||
warnings.append((25, "Plugins.%s.NoSSL" % self.get_name()))
|
||||
return warnings
|
||||
|
||||
|
||||
def __is_encrypted(self):
|
||||
"""perform some checks for encrypted connections
|
||||
"""
|
||||
if cherrypy.request.scheme == "https":
|
||||
return True
|
||||
## check an environment setting - this is quite common behind proxies
|
||||
if os.environ.has_key("HTTPS"):
|
||||
return True
|
||||
## check if it is a local connection (or via stunnel)
|
||||
if cherrypy.request.headers.has_key("Remote-Host") \
|
||||
and (cherrypy.request.headers["Remote-Host"] == "127.0.0.1"):
|
||||
return True
|
||||
## the arbitrarily chosen header is documented in README.proxy
|
||||
if cherrypy.request.headers.has_key("X-SSL-Request") \
|
||||
and (cherrypy.request.headers["X-SSL-Request"] == "1"):
|
||||
return True
|
||||
## it looks like a plain connection
|
||||
return False
|
||||
|
||||
def __is_encrypted(self):
|
||||
"""perform some checks for encrypted connections
|
||||
"""
|
||||
if cherrypy.request.scheme == "https":
|
||||
return True
|
||||
## check an environment setting - this is quite common behind proxies
|
||||
if os.environ.has_key("HTTPS"):
|
||||
return True
|
||||
## check if it is a local connection (or via stunnel)
|
||||
if cherrypy.request.headers.has_key("Remote-Host") \
|
||||
and (cherrypy.request.headers["Remote-Host"] == "127.0.0.1"):
|
||||
return True
|
||||
## the arbitrarily chosen header is documented in README.proxy
|
||||
if cherrypy.request.headers.has_key("X-SSL-Request") \
|
||||
and (cherrypy.request.headers["X-SSL-Request"] == "1"):
|
||||
return True
|
||||
## it looks like a plain connection
|
||||
return False
|
||||
|
||||
|
||||
def handle_event(self, event, event_info=None):
|
||||
"""Create a certificate during startup (if it does not exist) and run stunnel
|
||||
"""
|
||||
if event == "bootup":
|
||||
cert_abs_name = self.cbox.prefs.get_misc_config_filename(CERT_FILENAME)
|
||||
if not os.path.isfile(cert_abs_name):
|
||||
cert = self.__get_certificate()
|
||||
if cert is None:
|
||||
## failed to create a certificate?
|
||||
self.cbox.log.warn("Failed to import M2Crypto python module" \
|
||||
+ " required for SSL certificate generation")
|
||||
return
|
||||
try:
|
||||
self.cbox.prefs.create_misc_config_file(CERT_FILENAME, cert)
|
||||
self.cbox.log.info("Created new SSL certificate: %s" % \
|
||||
cert_abs_name)
|
||||
## make it non-readable for other users
|
||||
try:
|
||||
os.chmod(cert_abs_name, 0600)
|
||||
except OSError, err_msg:
|
||||
self.cbox.log.warn("Failed to change permissions of secret " \
|
||||
+ "certificate file (%s): %s" % \
|
||||
(cert_abs_name, err_msg))
|
||||
except IOError, err_msg:
|
||||
## do not run stunnel without a certificate
|
||||
self.cbox.log.warn("Failed to create new SSL certificate (%s): %s" \
|
||||
% (cert_abs_name, err_msg))
|
||||
return
|
||||
self.__run_stunnel(cert_abs_name)
|
||||
elif event == "shutdown":
|
||||
self.__kill_stunnel()
|
||||
|
||||
def handle_event(self, event, event_info=None):
|
||||
"""Create a certificate during startup (if it does not exist) and run stunnel
|
||||
"""
|
||||
if event == "bootup":
|
||||
cert_abs_name = self.cbox.prefs.get_misc_config_filename(CERT_FILENAME)
|
||||
if not os.path.isfile(cert_abs_name):
|
||||
cert = self.__get_certificate()
|
||||
if cert is None:
|
||||
## failed to create a certificate?
|
||||
self.cbox.log.warn("Failed to import M2Crypto python module" \
|
||||
+ " required for SSL certificate generation")
|
||||
return
|
||||
try:
|
||||
self.cbox.prefs.create_misc_config_file(CERT_FILENAME, cert)
|
||||
self.cbox.log.info("Created new SSL certificate: %s" % \
|
||||
cert_abs_name)
|
||||
## make it non-readable for other users
|
||||
try:
|
||||
os.chmod(cert_abs_name, 0600)
|
||||
except OSError, err_msg:
|
||||
self.cbox.log.warn("Failed to change permissions of secret " \
|
||||
+ "certificate file (%s): %s" % \
|
||||
(cert_abs_name, err_msg))
|
||||
except IOError, err_msg:
|
||||
## do not run stunnel without a certificate
|
||||
self.cbox.log.warn("Failed to create new SSL certificate (%s): %s" \
|
||||
% (cert_abs_name, err_msg))
|
||||
return
|
||||
self.__run_stunnel(cert_abs_name)
|
||||
elif event == "shutdown":
|
||||
self.__kill_stunnel()
|
||||
|
||||
|
||||
def __kill_stunnel(self):
|
||||
"""try to kill a running stunnel daemon
|
||||
"""
|
||||
if not os.path.isfile(PID_FILE):
|
||||
self.cbox.log.warn("Could not find the pid file of a running stunnel " \
|
||||
+ "daemon: %s" % PID_FILE)
|
||||
return
|
||||
try:
|
||||
pfile = open(PID_FILE, "r")
|
||||
try:
|
||||
pid = pfile.read().strip()
|
||||
except IOError, err_msg:
|
||||
self.cbox.log.warn("Failed to read the pid file (%s): %s" % (PID_FILE, err_msg))
|
||||
pfile.close()
|
||||
return
|
||||
pfile.close()
|
||||
except IOError, err_msg:
|
||||
self.cbox.log.warn("Failed to open the pid file (%s): %s" % (PID_FILE, err_msg))
|
||||
return
|
||||
if pid.isdigit():
|
||||
pid = int(pid)
|
||||
else:
|
||||
return
|
||||
try:
|
||||
## SIGTERM = 15
|
||||
os.kill(pid, 15)
|
||||
self.cbox.log.info("Successfully stopped stunnel")
|
||||
try:
|
||||
os.remove(PID_FILE)
|
||||
except OSError, err_msg:
|
||||
self.cbox.log.warn("Failed to remove the pid file (%s) of stunnel: %s" \
|
||||
% (PID_FILE, err_msg))
|
||||
except OSError, err_msg:
|
||||
self.cbox.log.warn("Failed to kill stunnel process (PID: %d): %s" % \
|
||||
(pid, err_msg))
|
||||
def __kill_stunnel(self):
|
||||
"""try to kill a running stunnel daemon
|
||||
"""
|
||||
if not os.path.isfile(PID_FILE):
|
||||
self.cbox.log.warn("Could not find the pid file of a running stunnel " \
|
||||
+ "daemon: %s" % PID_FILE)
|
||||
return
|
||||
try:
|
||||
pfile = open(PID_FILE, "r")
|
||||
try:
|
||||
pid = pfile.read().strip()
|
||||
except IOError, err_msg:
|
||||
self.cbox.log.warn("Failed to read the pid file (%s): %s" % (PID_FILE, err_msg))
|
||||
pfile.close()
|
||||
return
|
||||
pfile.close()
|
||||
except IOError, err_msg:
|
||||
self.cbox.log.warn("Failed to open the pid file (%s): %s" % (PID_FILE, err_msg))
|
||||
return
|
||||
if pid.isdigit():
|
||||
pid = int(pid)
|
||||
else:
|
||||
return
|
||||
try:
|
||||
## SIGTERM = 15
|
||||
os.kill(pid, 15)
|
||||
self.cbox.log.info("Successfully stopped stunnel")
|
||||
try:
|
||||
os.remove(PID_FILE)
|
||||
except OSError, err_msg:
|
||||
self.cbox.log.warn("Failed to remove the pid file (%s) of stunnel: %s" \
|
||||
% (PID_FILE, err_msg))
|
||||
except OSError, err_msg:
|
||||
self.cbox.log.warn("Failed to kill stunnel process (PID: %d): %s" % \
|
||||
(pid, err_msg))
|
||||
|
||||
|
||||
def __run_stunnel(self, cert_name, dest_port=443):
|
||||
## retrieve currently requested port (not necessarily the port served
|
||||
## by cherrypy - e.g. in a proxy setup)
|
||||
request_port = cherrypy.config.get("server.socket_port", 80)
|
||||
self.cbox.log.debug("[encrypted_webinterface] starting " \
|
||||
+ "%s on port %s for %s" % \
|
||||
(self.root_action.STUNNEL_BIN, dest_port, request_port))
|
||||
proc = subprocess.Popen(
|
||||
shell = False,
|
||||
stdout = subprocess.PIPE,
|
||||
stderr = subprocess.PIPE,
|
||||
args = [
|
||||
self.cbox.prefs["Programs"]["super"],
|
||||
self.cbox.prefs["Programs"]["CryptoBoxRootActions"],
|
||||
"plugin", os.path.join(self.plugin_dir, "root_action.py"),
|
||||
cert_name,
|
||||
str(request_port),
|
||||
str(dest_port),
|
||||
PID_FILE ])
|
||||
(output, error) = proc.communicate()
|
||||
if proc.returncode == 0:
|
||||
self.cbox.log.info("Successfully started 'stunnel'")
|
||||
return True
|
||||
else:
|
||||
self.cbox.log.warn("Failed to run 'stunnel': %s" % error)
|
||||
return False
|
||||
def __run_stunnel(self, cert_name, dest_port=443):
|
||||
## retrieve currently requested port (not necessarily the port served
|
||||
## by cherrypy - e.g. in a proxy setup)
|
||||
request_port = cherrypy.config.get("server.socket_port", 80)
|
||||
self.cbox.log.debug("[encrypted_webinterface] starting " \
|
||||
+ "%s on port %s for %s" % \
|
||||
(self.root_action.STUNNEL_BIN, dest_port, request_port))
|
||||
proc = subprocess.Popen(
|
||||
shell = False,
|
||||
stdout = subprocess.PIPE,
|
||||
stderr = subprocess.PIPE,
|
||||
args = [
|
||||
self.cbox.prefs["Programs"]["super"],
|
||||
self.cbox.prefs["Programs"]["CryptoBoxRootActions"],
|
||||
"plugin", os.path.join(self.plugin_dir, "root_action.py"),
|
||||
cert_name,
|
||||
str(request_port),
|
||||
str(dest_port),
|
||||
PID_FILE ])
|
||||
(output, error) = proc.communicate()
|
||||
if proc.returncode == 0:
|
||||
self.cbox.log.info("Successfully started 'stunnel'")
|
||||
return True
|
||||
else:
|
||||
self.cbox.log.warn("Failed to run 'stunnel': %s" % error)
|
||||
return False
|
||||
|
||||
|
||||
def __get_certificate(self):
|
||||
"""Create a self-signed certificate and return its pem content
|
||||
|
||||
The code is mainly inspired by:
|
||||
https://dev.tribler.org/browser/m2crypto/trunk/contrib/SimpleX509create.py
|
||||
"""
|
||||
## check if m2crypto is available
|
||||
try:
|
||||
import M2Crypto
|
||||
except ImportError:
|
||||
## failed to import the module
|
||||
return None
|
||||
import time
|
||||
string_type = 0x1000 | 1 # see http://www.koders.com/python/..
|
||||
# ../fid07A99E089F55187896A06CD4E0B6F21B9B8F5B0B.aspx?s=bavaria
|
||||
key_gen_number = 0x10001 # commonly used for key generation: 65537
|
||||
rsa_key = M2Crypto.RSA.gen_key(KEY_BITS, key_gen_number, callback=lambda: None)
|
||||
rsa_key2 = M2Crypto.RSA.gen_key(KEY_BITS, key_gen_number, callback=lambda: None)
|
||||
pkey = M2Crypto.EVP.PKey(md=SIGN_DIGEST)
|
||||
pkey.assign_rsa(rsa_key)
|
||||
sign_key = M2Crypto.EVP.PKey(md=SIGN_DIGEST)
|
||||
sign_key.assign_rsa(rsa_key2)
|
||||
subject = M2Crypto.X509.X509_Name()
|
||||
for (key, value) in CERT_INFOS.items():
|
||||
subject.add_entry_by_txt(key, string_type, value, -1, -1, 0)
|
||||
issuer = M2Crypto.X509.X509_Name(M2Crypto.m2.x509_name_new())
|
||||
for (key, value) in ISSUER_INFOS.items():
|
||||
issuer.add_entry_by_txt(key, string_type, value, -1, -1, 0)
|
||||
## time object
|
||||
asn_time1 = M2Crypto.ASN1.ASN1_UTCTIME()
|
||||
asn_time1.set_time(long(time.time()) - EXPIRE_TIME)
|
||||
asn_time2 = M2Crypto.ASN1.ASN1_UTCTIME()
|
||||
asn_time2.set_time(long(time.time()) + EXPIRE_TIME)
|
||||
request = M2Crypto.X509.Request()
|
||||
request.set_subject_name(subject)
|
||||
request.set_pubkey(pkey)
|
||||
request.sign(pkey=pkey, md=SIGN_DIGEST)
|
||||
cert = M2Crypto.X509.X509()
|
||||
## always create a unique version number
|
||||
cert.set_version(0)
|
||||
cert.set_serial_number(long(time.time()))
|
||||
cert.set_pubkey(pkey)
|
||||
cert.set_not_before(asn_time1)
|
||||
cert.set_not_after(asn_time2)
|
||||
cert.set_subject_name(request.get_subject())
|
||||
cert.set_issuer_name(issuer)
|
||||
cert.sign(pkey, SIGN_DIGEST)
|
||||
result = ""
|
||||
result += cert.as_pem()
|
||||
result += pkey.as_pem(cipher=None)
|
||||
return result
|
||||
def __get_certificate(self):
|
||||
"""Create a self-signed certificate and return its pem content
|
||||
|
||||
The code is mainly inspired by:
|
||||
https://dev.tribler.org/browser/m2crypto/trunk/contrib/SimpleX509create.py
|
||||
"""
|
||||
## check if m2crypto is available
|
||||
try:
|
||||
import M2Crypto
|
||||
except ImportError:
|
||||
## failed to import the module
|
||||
return None
|
||||
import time
|
||||
string_type = 0x1000 | 1 # see http://www.koders.com/python/..
|
||||
# ../fid07A99E089F55187896A06CD4E0B6F21B9B8F5B0B.aspx?s=bavaria
|
||||
key_gen_number = 0x10001 # commonly used for key generation: 65537
|
||||
rsa_key = M2Crypto.RSA.gen_key(KEY_BITS, key_gen_number, callback=lambda: None)
|
||||
rsa_key2 = M2Crypto.RSA.gen_key(KEY_BITS, key_gen_number, callback=lambda: None)
|
||||
pkey = M2Crypto.EVP.PKey(md=SIGN_DIGEST)
|
||||
pkey.assign_rsa(rsa_key)
|
||||
sign_key = M2Crypto.EVP.PKey(md=SIGN_DIGEST)
|
||||
sign_key.assign_rsa(rsa_key2)
|
||||
subject = M2Crypto.X509.X509_Name()
|
||||
for (key, value) in CERT_INFOS.items():
|
||||
subject.add_entry_by_txt(key, string_type, value, -1, -1, 0)
|
||||
issuer = M2Crypto.X509.X509_Name(M2Crypto.m2.x509_name_new())
|
||||
for (key, value) in ISSUER_INFOS.items():
|
||||
issuer.add_entry_by_txt(key, string_type, value, -1, -1, 0)
|
||||
## time object
|
||||
asn_time1 = M2Crypto.ASN1.ASN1_UTCTIME()
|
||||
asn_time1.set_time(long(time.time()) - EXPIRE_TIME)
|
||||
asn_time2 = M2Crypto.ASN1.ASN1_UTCTIME()
|
||||
asn_time2.set_time(long(time.time()) + EXPIRE_TIME)
|
||||
request = M2Crypto.X509.Request()
|
||||
request.set_subject_name(subject)
|
||||
request.set_pubkey(pkey)
|
||||
request.sign(pkey=pkey, md=SIGN_DIGEST)
|
||||
cert = M2Crypto.X509.X509()
|
||||
## always create a unique version number
|
||||
cert.set_version(0)
|
||||
cert.set_serial_number(long(time.time()))
|
||||
cert.set_pubkey(pkey)
|
||||
cert.set_not_before(asn_time1)
|
||||
cert.set_not_after(asn_time2)
|
||||
cert.set_subject_name(request.get_subject())
|
||||
cert.set_issuer_name(issuer)
|
||||
cert.sign(pkey, SIGN_DIGEST)
|
||||
result = ""
|
||||
result += cert.as_pem()
|
||||
result += pkey.as_pem(cipher=None)
|
||||
return result
|
||||
|
||||
|
|
|
@ -32,61 +32,61 @@ import os
|
|||
|
||||
|
||||
def _get_username():
|
||||
if ("SUPERCMD" in os.environ) and ("ORIG_USER" in os.environ):
|
||||
return os.environ["ORIG_USER"]
|
||||
elif "USER" in os.environ:
|
||||
return os.environ["USER"]
|
||||
else:
|
||||
return "cryptobox"
|
||||
if ("SUPERCMD" in os.environ) and ("ORIG_USER" in os.environ):
|
||||
return os.environ["ORIG_USER"]
|
||||
elif "USER" in os.environ:
|
||||
return os.environ["USER"]
|
||||
else:
|
||||
return "cryptobox"
|
||||
|
||||
|
||||
def run_stunnel(cert_file, src_port, dst_port, pid_file):
|
||||
import subprocess
|
||||
if not src_port.isdigit():
|
||||
sys.stderr.write("Source port is not a number: %s" % src_port)
|
||||
return False
|
||||
if not dst_port.isdigit():
|
||||
sys.stderr.write("Destination port is not a number: %s" % dst_port)
|
||||
return False
|
||||
if not os.path.isfile(cert_file):
|
||||
sys.stderr.write("The certificate file (%s) does not exist!" % cert_file)
|
||||
return False
|
||||
username = _get_username()
|
||||
if not username:
|
||||
sys.stderr.write("Could not retrieve the username with uid=%d." % os.getuid())
|
||||
return False
|
||||
## the environment (especially PATH) should be clean, as 'stunnel' cares about
|
||||
## this in a setuid situation
|
||||
proc = subprocess.Popen(
|
||||
shell = False,
|
||||
env = {},
|
||||
stdin = subprocess.PIPE,
|
||||
args = [ STUNNEL_BIN,
|
||||
"-fd",
|
||||
"0"])
|
||||
proc.stdin.write("setuid = %s\n" % username)
|
||||
proc.stdin.write("pid = %s\n" % pid_file)
|
||||
proc.stdin.write("[cryptobox-server]\n")
|
||||
proc.stdin.write("connect = %s\n" % src_port)
|
||||
proc.stdin.write("accept = %s\n" % dst_port)
|
||||
proc.stdin.write("cert = %s\n" % cert_file)
|
||||
(output, error) = proc.communicate()
|
||||
return proc.returncode == 0
|
||||
|
||||
import subprocess
|
||||
if not src_port.isdigit():
|
||||
sys.stderr.write("Source port is not a number: %s" % src_port)
|
||||
return False
|
||||
if not dst_port.isdigit():
|
||||
sys.stderr.write("Destination port is not a number: %s" % dst_port)
|
||||
return False
|
||||
if not os.path.isfile(cert_file):
|
||||
sys.stderr.write("The certificate file (%s) does not exist!" % cert_file)
|
||||
return False
|
||||
username = _get_username()
|
||||
if not username:
|
||||
sys.stderr.write("Could not retrieve the username with uid=%d." % os.getuid())
|
||||
return False
|
||||
## the environment (especially PATH) should be clean, as 'stunnel' cares about
|
||||
## this in a setuid situation
|
||||
proc = subprocess.Popen(
|
||||
shell = False,
|
||||
env = {},
|
||||
stdin = subprocess.PIPE,
|
||||
args = [ STUNNEL_BIN,
|
||||
"-fd",
|
||||
"0"])
|
||||
proc.stdin.write("setuid = %s\n" % username)
|
||||
proc.stdin.write("pid = %s\n" % pid_file)
|
||||
proc.stdin.write("[cryptobox-server]\n")
|
||||
proc.stdin.write("connect = %s\n" % src_port)
|
||||
proc.stdin.write("accept = %s\n" % dst_port)
|
||||
proc.stdin.write("cert = %s\n" % cert_file)
|
||||
(output, error) = proc.communicate()
|
||||
return proc.returncode == 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
args = sys.argv[1:]
|
||||
args = sys.argv[1:]
|
||||
|
||||
self_bin = sys.argv[0]
|
||||
|
||||
if len(args) != 4:
|
||||
sys.stderr.write("%s: invalid number of arguments (%d instead of %d))\n" % \
|
||||
(self_bin, len(args), 4))
|
||||
sys.exit(1)
|
||||
|
||||
if not run_stunnel(args[0], args[1], args[2], args[3]):
|
||||
sys.stderr.write("%s: failed to run 'stunnel'!" % self_bin)
|
||||
sys.exit(100)
|
||||
|
||||
sys.exit(0)
|
||||
self_bin = sys.argv[0]
|
||||
|
||||
if len(args) != 4:
|
||||
sys.stderr.write("%s: invalid number of arguments (%d instead of %d))\n" % \
|
||||
(self_bin, len(args), 4))
|
||||
sys.exit(1)
|
||||
|
||||
if not run_stunnel(args[0], args[1], args[2], args[3]):
|
||||
sys.stderr.write("%s: failed to run 'stunnel'!" % self_bin)
|
||||
sys.exit(100)
|
||||
|
||||
sys.exit(0)
|
||||
|
||||
|
|
|
@ -23,12 +23,12 @@ __revision__ = "$Id$"
|
|||
from cryptobox.tests.base import WebInterfaceTestClass
|
||||
|
||||
class unittests(WebInterfaceTestClass):
|
||||
|
||||
def test_get_cert_form(self):
|
||||
"""retrieve the default form of the certificate manager"""
|
||||
url = self.url + "encrypted_webinterface"
|
||||
self.register_auth(url)
|
||||
self.cmd.go(url)
|
||||
## TODO: enable it, as soon as the plugin is enabled by default
|
||||
#self.cmd.find("Data.Status.Plugins.encrypted_webinterface")
|
||||
|
||||
def test_get_cert_form(self):
|
||||
"""retrieve the default form of the certificate manager"""
|
||||
url = self.url + "encrypted_webinterface"
|
||||
self.register_auth(url)
|
||||
self.cmd.go(url)
|
||||
## TODO: enable it, as soon as the plugin is enabled by default
|
||||
#self.cmd.find("Data.Status.Plugins.encrypted_webinterface")
|
||||
|
||||
|
|
|
@ -26,56 +26,56 @@ __revision__ = "$Id$"
|
|||
import cryptobox.plugins.base
|
||||
|
||||
class help(cryptobox.plugins.base.CryptoBoxPlugin):
|
||||
"""The help feature of the CryptoBox.
|
||||
"""
|
||||
"""The help feature of the CryptoBox.
|
||||
"""
|
||||
|
||||
#plugin_capabilities = [ "system" ]
|
||||
#TODO: enable this plugin as soon as the user documentation is ready again
|
||||
plugin_capabilities = [ ]
|
||||
plugin_visibility = [ "menu" ]
|
||||
request_auth = False
|
||||
rank = 80
|
||||
#plugin_capabilities = [ "system" ]
|
||||
#TODO: enable this plugin as soon as the user documentation is ready again
|
||||
plugin_capabilities = [ ]
|
||||
plugin_visibility = [ "menu" ]
|
||||
request_auth = False
|
||||
rank = 80
|
||||
|
||||
default_lang = 'en'
|
||||
default_page = "CryptoBoxUser"
|
||||
default_lang = 'en'
|
||||
default_page = "CryptoBoxUser"
|
||||
|
||||
def do_action(self, page=""):
|
||||
'''prints the offline wikipage
|
||||
'''
|
||||
import re, os
|
||||
## check for invalid characters and if the page exists in the default language
|
||||
if page and \
|
||||
not re.search(r'\W', page) and \
|
||||
os.path.isfile(os.path.join(self.cbox.prefs["Locations"]["DocDir"],
|
||||
self.default_lang, page + '.html')):
|
||||
## everything is ok
|
||||
pass
|
||||
else:
|
||||
## display this page as default help page
|
||||
page = self.default_page
|
||||
if page:
|
||||
## issue warning
|
||||
self.cbox.log.info("could not find the selected documentation page: %s" % str(page))
|
||||
## store the name of the page
|
||||
self.hdf[self.hdf_prefix + "Page"] = page
|
||||
## choose the right language
|
||||
for lang in self.site.lang_order:
|
||||
if os.path.isfile(os.path.join(self.cbox.prefs["Locations"]["DocDir"],
|
||||
lang, page + '.html')):
|
||||
doc_lang = lang
|
||||
break
|
||||
else:
|
||||
doc_lang = self.default_lang
|
||||
self.hdf[self.hdf_prefix + "Language"] = doc_lang
|
||||
## store the current setting for a later "getStatus" call
|
||||
self.current_lang = doc_lang
|
||||
self.current_page = page
|
||||
return "doc"
|
||||
def do_action(self, page=""):
|
||||
'''prints the offline wikipage
|
||||
'''
|
||||
import re, os
|
||||
## check for invalid characters and if the page exists in the default language
|
||||
if page and \
|
||||
not re.search(r'\W', page) and \
|
||||
os.path.isfile(os.path.join(self.cbox.prefs["Locations"]["DocDir"],
|
||||
self.default_lang, page + '.html')):
|
||||
## everything is ok
|
||||
pass
|
||||
else:
|
||||
## display this page as default help page
|
||||
page = self.default_page
|
||||
if page:
|
||||
## issue warning
|
||||
self.cbox.log.info("could not find the selected documentation page: %s" % str(page))
|
||||
## store the name of the page
|
||||
self.hdf[self.hdf_prefix + "Page"] = page
|
||||
## choose the right language
|
||||
for lang in self.site.lang_order:
|
||||
if os.path.isfile(os.path.join(self.cbox.prefs["Locations"]["DocDir"],
|
||||
lang, page + '.html')):
|
||||
doc_lang = lang
|
||||
break
|
||||
else:
|
||||
doc_lang = self.default_lang
|
||||
self.hdf[self.hdf_prefix + "Language"] = doc_lang
|
||||
## store the current setting for a later "getStatus" call
|
||||
self.current_lang = doc_lang
|
||||
self.current_page = page
|
||||
return "doc"
|
||||
|
||||
|
||||
def get_status(self):
|
||||
"""Retrieve the current status of the feature.
|
||||
"""
|
||||
return "%s:%s" % (self.current_lang, self.current_page)
|
||||
def get_status(self):
|
||||
"""Retrieve the current status of the feature.
|
||||
"""
|
||||
return "%s:%s" % (self.current_lang, self.current_page)
|
||||
|
||||
|
||||
|
|
|
@ -25,70 +25,70 @@ from twill.errors import *
|
|||
|
||||
class unittests(WebInterfaceTestClass):
|
||||
|
||||
def test_help_language_texts(self):
|
||||
'''help pages should be available in different languages'''
|
||||
def test_help_language_texts(self):
|
||||
'''help pages should be available in different languages'''
|
||||
|
||||
## check english help pages
|
||||
self.cmd.go(self.url + "help?weblang=en")
|
||||
self.cmd.find("Table of Contents")
|
||||
self.cmd.find("Getting started")
|
||||
(lang,page) = self._getHelpStatus()
|
||||
self.assertTrue(lang == "en")
|
||||
self.assertTrue(page == "CryptoBoxUser")
|
||||
## check english help pages
|
||||
self.cmd.go(self.url + "help?weblang=en")
|
||||
self.cmd.find("Table of Contents")
|
||||
self.cmd.find("Getting started")
|
||||
(lang,page) = self._getHelpStatus()
|
||||
self.assertTrue(lang == "en")
|
||||
self.assertTrue(page == "CryptoBoxUser")
|
||||
|
||||
## check german help pages
|
||||
self.cmd.go(self.url + "help?weblang=de")
|
||||
self.cmd.find("Table of Contents")
|
||||
self.cmd.find("Wie geht es los")
|
||||
(lang,page) = self._getHelpStatus()
|
||||
self.assertTrue(lang == "de")
|
||||
self.assertTrue(page == "CryptoBoxUser")
|
||||
## check german help pages
|
||||
self.cmd.go(self.url + "help?weblang=de")
|
||||
self.cmd.find("Table of Contents")
|
||||
self.cmd.find("Wie geht es los")
|
||||
(lang,page) = self._getHelpStatus()
|
||||
self.assertTrue(lang == "de")
|
||||
self.assertTrue(page == "CryptoBoxUser")
|
||||
|
||||
## check slovene help pages
|
||||
self.cmd.go(self.url + "help?weblang=sl")
|
||||
self.assertRaises(TwillAssertionError, self.cmd.notfind, "Table of Contents")
|
||||
## add a slovene text here, as soon as the help is translated
|
||||
(lang,page) = self._getHelpStatus()
|
||||
## change this to "sl" as soon as the help is translated
|
||||
self.assertTrue(lang == "en")
|
||||
self.assertTrue(page == "CryptoBoxUser")
|
||||
## check slovene help pages
|
||||
self.cmd.go(self.url + "help?weblang=sl")
|
||||
self.assertRaises(TwillAssertionError, self.cmd.notfind, "Table of Contents")
|
||||
## add a slovene text here, as soon as the help is translated
|
||||
(lang,page) = self._getHelpStatus()
|
||||
## change this to "sl" as soon as the help is translated
|
||||
self.assertTrue(lang == "en")
|
||||
self.assertTrue(page == "CryptoBoxUser")
|
||||
|
||||
## check french help pages
|
||||
self.cmd.go(self.url + "help?weblang=fr")
|
||||
self.assertRaises(TwillAssertionError, self.cmd.notfind, "Table of Contents")
|
||||
## add a french text here, as soon as the help is translated
|
||||
(lang,page) = self._getHelpStatus()
|
||||
## change this to "fr" as soon as the help is translated
|
||||
self.assertTrue(lang == "en")
|
||||
self.assertTrue(page == "CryptoBoxUser")
|
||||
## check french help pages
|
||||
self.cmd.go(self.url + "help?weblang=fr")
|
||||
self.assertRaises(TwillAssertionError, self.cmd.notfind, "Table of Contents")
|
||||
## add a french text here, as soon as the help is translated
|
||||
(lang,page) = self._getHelpStatus()
|
||||
## change this to "fr" as soon as the help is translated
|
||||
self.assertTrue(lang == "en")
|
||||
self.assertTrue(page == "CryptoBoxUser")
|
||||
|
||||
## test a random language - it should fall back to english
|
||||
self.cmd.go(self.url + "help?weblang=foobar")
|
||||
self.assertRaises(TwillAssertionError, self.cmd.notfind, "Table of Contents")
|
||||
(lang,page) = self._getHelpStatus()
|
||||
self.assertTrue(lang == "en")
|
||||
self.assertTrue(page == "CryptoBoxUser")
|
||||
|
||||
## test a random language - it should fall back to english
|
||||
self.cmd.go(self.url + "help?weblang=foobar")
|
||||
self.assertRaises(TwillAssertionError, self.cmd.notfind, "Table of Contents")
|
||||
(lang,page) = self._getHelpStatus()
|
||||
self.assertTrue(lang == "en")
|
||||
self.assertTrue(page == "CryptoBoxUser")
|
||||
|
||||
|
||||
def test_help_pages(self):
|
||||
"""check invalid page requests"""
|
||||
self.cmd.go(self.url + "help?page=foobar")
|
||||
(lang,page) = self._getHelpStatus()
|
||||
self.assertTrue(page == "CryptoBoxUser")
|
||||
def test_help_pages(self):
|
||||
"""check invalid page requests"""
|
||||
self.cmd.go(self.url + "help?page=foobar")
|
||||
(lang,page) = self._getHelpStatus()
|
||||
self.assertTrue(page == "CryptoBoxUser")
|
||||
|
||||
self.cmd.go(self.url + "help?page=CryptoBoxUser")
|
||||
(lang,page) = self._getHelpStatus()
|
||||
self.assertTrue(page == "CryptoBoxUser")
|
||||
self.cmd.go(self.url + "help?page=CryptoBoxUser")
|
||||
(lang,page) = self._getHelpStatus()
|
||||
self.assertTrue(page == "CryptoBoxUser")
|
||||
|
||||
|
||||
def test_help_default_languages(self):
|
||||
"""check invalid page requests"""
|
||||
self.cmd.go(self.url + "help?weblang=foobar")
|
||||
(lang,page) = self._getHelpStatus()
|
||||
self.assertTrue(lang == "en")
|
||||
def test_help_default_languages(self):
|
||||
"""check invalid page requests"""
|
||||
self.cmd.go(self.url + "help?weblang=foobar")
|
||||
(lang,page) = self._getHelpStatus()
|
||||
self.assertTrue(lang == "en")
|
||||
|
||||
|
||||
def _getHelpStatus(self):
|
||||
self.cmd.find(r'Data.Status.Plugins.help=(.*)$', "m")
|
||||
return tuple(self.locals["__match__"].split(":"))
|
||||
def _getHelpStatus(self):
|
||||
self.cmd.find(r'Data.Status.Plugins.help=(.*)$', "m")
|
||||
return tuple(self.locals["__match__"].split(":"))
|
||||
|
||||
|
|
|
@ -27,22 +27,22 @@ import cryptobox.plugins.base
|
|||
|
||||
|
||||
class language_selection(cryptobox.plugins.base.CryptoBoxPlugin):
|
||||
"""The language_selection feature of the CryptoBox.
|
||||
"""
|
||||
"""The language_selection feature of the CryptoBox.
|
||||
"""
|
||||
|
||||
plugin_capabilities = [ "system" ]
|
||||
plugin_visibility = [ "menu", "preferences" ]
|
||||
request_auth = False
|
||||
rank = 60
|
||||
plugin_capabilities = [ "system" ]
|
||||
plugin_visibility = [ "menu", "preferences" ]
|
||||
request_auth = False
|
||||
rank = 60
|
||||
|
||||
def do_action(self):
|
||||
"""Show all available languages.
|
||||
"""
|
||||
return "language_selection"
|
||||
def do_action(self):
|
||||
"""Show all available languages.
|
||||
"""
|
||||
return "language_selection"
|
||||
|
||||
|
||||
def get_status(self):
|
||||
"""The current status of the feature is defined as the current language.
|
||||
"""
|
||||
return ":".join(self.site.lang_order)
|
||||
def get_status(self):
|
||||
"""The current status of the feature is defined as the current language.
|
||||
"""
|
||||
return ":".join(self.site.lang_order)
|
||||
|
||||
|
|
|
@ -24,23 +24,23 @@ from cryptobox.tests.base import WebInterfaceTestClass
|
|||
|
||||
class unittests(WebInterfaceTestClass):
|
||||
|
||||
def test_read_form(self):
|
||||
"""Check if the 'language_selection' plugin works.
|
||||
"""
|
||||
url = self.url + "language_selection?weblang=en"
|
||||
self.register_auth(url)
|
||||
self.cmd.go(url)
|
||||
self.cmd.find('Choose your tongue')
|
||||
def test_read_form(self):
|
||||
"""Check if the 'language_selection' plugin works.
|
||||
"""
|
||||
url = |