changed coding style according to pylint

This commit is contained in:
lars 2006-12-05 12:24:47 +00:00
parent f6295a4b2d
commit fe69eb38ae
66 changed files with 2355 additions and 1515 deletions

View file

@ -31,6 +31,8 @@ Syntax:
this script will always return with an exitcode 0 (true), if "check" is the only argument
"""
__revision__ = "$Id"
import os
import sys
import subprocess

View file

@ -24,16 +24,42 @@
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
__revision__ = "$Id"
import os, sys
import cryptobox.web.sites
from cryptobox.core.exceptions import *
from optparse import OptionParser
## check python version
(ver_major, ver_minor, ver_sub, ver_desc, ver_subsub) = sys.version_info
if (ver_major < 2) or ((ver_major == 2) and (ver_minor < 4)):
sys.stderr.write("You need a python version >= 2.4\n")
sys.stderr.write("Current version is: %s\n" % sys.version)
sys.exit(1)
## check cherrypy dependency
try:
import cherrypy
except:
print "Could not import the cherrypy module! Try 'apt-get install python-cherrypy'."
sys.stderr.write("Could not import the cherrypy module!\n")
sys.stderr.write("Try 'apt-get install python-cherrypy'.\n")
sys.exit(1)
## check clearsilver dependency
try:
import neo_cgi, neo_util
except:
sys.stderr.write("Could not import the clearsilver module!\n")
sys.stderr.write("Try 'apt-get install python-clearsilver'.\n")
sys.exit(1)
## check configobj dependency
try:
import configobj, validate
except:
sys.stderr.write("Could not import the configobj or validate module!\n")
sys.stderr.write("Try 'apt-get install python-configobj'.\n")
sys.exit(1)
@ -129,17 +155,22 @@ def close_open_files():
def write_pid_file(pid_file):
if os.path.exists(pid_file):
sys.stderr.write(
"Warning: pid file (%s) already exists - overwriting ...\n" % pid_file)
try:
pidf = open(pid_file,"w")
pidf.write(str(os.getpid()))
pidf.close()
except (IOError, OSError), errMsg:
sys.stderr.write("Warning: failed to write pid file (%s): %s\n" % (pid_file, errMsg))
sys.stderr.write(
"Warning: failed to write pid file (%s): %s\n" % (pid_file, errMsg))
## it is just a warning - no need to break
def parseOptions():
version = "%prog" + cryptobox.core.main.VERSION
import cryptobox
version = "%prog" + cryptobox.__version__
parser = OptionParser(version=version)
parser.set_defaults(conffile="/etc/cryptobox-server/cryptobox.conf",
pidfile="/var/run/cryptobox-server/webserver.pid",

33
bin/do_pylint.sh Executable file
View file

@ -0,0 +1,33 @@
#!/bin/sh
#
# set some environmental variables for pylint
#
PROJ_DIR=$(dirname "$0")/..
PROJ_DIR=$(cd "$PROJ_DIR"; pwd)
PYLINTRC=$PROJ_DIR/src/pylintrc
PYTHONPATH=$PROJ_DIR/src
function check_for_filename()
{
# maybe the argument is a file instead of a module name
if echo "$1" | grep -q "\.py$" && test -e "$1"
then local FILE_DIR=$(dirname "$1")
local MODULE=$(basename "${1%.py}")
ARGS="${ARGS} ${MODULE}"
PYTHONPATH="${PYTHONPATH}:${FILE_DIR}"
else ARGS="${ARGS} ${1}"
fi
}
while test $# -gt 0
do check_for_filename "$1"
shift
done
export PYTHONPATH
export PYLINTRC
pylint $ARGS

View file

@ -16,7 +16,6 @@
# Short-Description: start CryptoBox webserver
### END INIT INFO
# read the default setting file, if it exists
[ -r /etc/default/cryptobox-server ] && source /etc/default/cryptobox-server
@ -33,6 +32,7 @@ CONF_FILE=/etc/cryptobox-server/cryptobox.conf
[ "$NO_START" = "1" ] && exit 0
DAEMON=/usr/sbin/CryptoBoxWebserver
PYTHON_EXEC=/usr/bin/python
PIDFILE=/var/run/cryptobox-server/webserver.pid
DESC="CryptoBox Daemon (webinterface)"
OPTIONS="-B --pidfile=$PIDFILE --config=$CONF_FILE --logfile=$LOGFILE --host=$HOST --port=$PORT $SERVER_OPTS"
@ -45,11 +45,7 @@ test -e "$DAEMON" || exit 0
case "$1" in
start )
# TODO: mount config dir
# TODO: create certificate
# TODO: run stunnel
# the lines above should go into the live-cd scripts
## create the directory of the pid file if necessary
# create the directory of the pid file if necessary
PIDDIR=$(dirname "$PIDFILE")
if [ -d "$PIDDIR" ]
then mkdir -p "$PIDDIR"
@ -60,14 +56,21 @@ case "$1" in
if start-stop-daemon \
--chuid $RUNAS: --quiet --start \
--user $RUNAS --pidfile "$PIDFILE" \
--exec /usr/bin/python --startas "$DAEMON" -- $OPTIONS
--startas "$PYTHON_EXEC" -- "$DAEMON" $OPTIONS
then log_end_msg 0
else log_end_msg 1
fi
;;
stop )
log_daemon_msg "Stopping cryptobox webserver" "$DESC"
if start-stop-daemon --quiet --stop \
# if there is no pid file for some reason, then we try to find the process
if test ! -e "$PIDFILE"
then if start-stop-daemon --quiet --stop --user "$RUNAS" --exec "$PYTHON_EXEC"
then log_end_msg 0
else log_end_msg 1
fi
# there is a pid file - great!
elif start-stop-daemon --quiet --stop \
--pidfile "$PIDFILE" \
--user "$RUNAS"
then test -e "$PIDFILE" && rm "$PIDFILE"

View file

@ -18,29 +18,39 @@
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
"""Change date and time.
"""
__revision__ = "$Id"
import cryptobox.plugins.base
class date(cryptobox.plugins.base.CryptoBoxPlugin):
"""The date feature of the CryptoBox.
"""
pluginCapabilities = [ "system" ]
pluginVisibility = [ "preferences" ]
requestAuth = False
plugin_capabilities = [ "system" ]
plugin_visibility = [ "preferences" ]
request_auth = False
rank = 10
def doAction(self, store=None, year=0, month=0, day=0, hour=0, minute=0):
def do_action(self, store=None, year=0, month=0, day=0, hour=0, minute=0):
"""The action handler.
"""
import datetime
if store:
try:
year, month, day = int(year), int(month), int(day)
hour, minute = int(hour), int(minute)
new_date = datetime.datetime(year, month, day, hour, minute)
## check if the values are valid
datetime.datetime(year, month, day, hour, minute)
except ValueError:
self.hdf["Data.Warning"] = "Plugins.date.InvalidDate"
self.__prepareFormData()
self.__prepare_form_data()
return "form_date"
date = "%02d%02d%02d%02d%d" % (month, day, hour, minute, year)
if self.__setDate(date):
if self.__set_date(date):
self.cbox.log.info("changed date to: %s" % date)
self.hdf["Data.Success"] = "Plugins.date.DateChanged"
return None
@ -48,33 +58,42 @@ class date(cryptobox.plugins.base.CryptoBoxPlugin):
## a failure should usually be an invalid date (we do not check it really)
self.cbox.log.info("failed to set date: %s" % date)
self.hdf["Data.Warning"] = "Plugins.date.InvalidDate"
self.__prepareFormData()
self.__prepare_form_data()
return "form_date"
else:
self.__prepareFormData()
self.__prepare_form_data()
return "form_date"
def getStatus(self):
now = self.__getCurrentDate()
return "%d/%d/%d/%d/%d/%d" % (now.year, now.month, now.day, now.hour, now.minute, now.second)
def get_status(self):
"""Retrieve the status of the feature.
"""
now = self.__get_current_date()
return "%d/%d/%d/%d/%d/%d" % \
(now.year, now.month, now.day, now.hour, now.minute, now.second)
def __prepareFormData(self):
date = self.__getCurrentDate()
self.hdf[self.hdf_prefix + "year"] = date.year
self.hdf[self.hdf_prefix + "month"] = date.month
self.hdf[self.hdf_prefix + "day"] = date.day
self.hdf[self.hdf_prefix + "hour"] = date.hour
self.hdf[self.hdf_prefix + "minute"] = date.minute
def __prepare_form_data(self):
"""Set some hdf values.
"""
cur_date = self.__get_current_date()
self.hdf[self.hdf_prefix + "year"] = cur_date.year
self.hdf[self.hdf_prefix + "month"] = cur_date.month
self.hdf[self.hdf_prefix + "day"] = cur_date.day
self.hdf[self.hdf_prefix + "hour"] = cur_date.hour
self.hdf[self.hdf_prefix + "minute"] = cur_date.minute
def __getCurrentDate(self):
def __get_current_date(self):
"""Retrieve the current date and time.
"""
import datetime
return datetime.datetime(2000,1,1).now()
return datetime.datetime(2000, 1, 1).now()
def __setDate(self, date):
def __set_date(self, date):
"""Set a new date and time.
"""
import subprocess
import os
proc = subprocess.Popen(
@ -83,7 +102,7 @@ class date(cryptobox.plugins.base.CryptoBoxPlugin):
self.cbox.prefs["Programs"]["super"],
self.cbox.prefs["Programs"]["CryptoBoxRootActions"],
"plugin",
os.path.join(self.pluginDir, "root_action.py"),
os.path.join(self.plugin_dir, "root_action.py"),
date])
proc.wait()
return proc.returncode == 0

View file

@ -19,6 +19,8 @@
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
__revision__ = "$Id"
## necessary: otherwise CryptoBoxRootActions.py will refuse to execute this script
PLUGIN_TYPE = "cryptobox"

View file

@ -18,18 +18,20 @@
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
__revision__ = "$Id"
import cryptobox.web.testclass
class unittests(cryptobox.web.testclass.WebInterfaceTestClass):
def test_get_date(self):
"""retrieve the current date"""
date = self._getCurrentDate()
date = self._get_current_date()
def test_change_date(self):
"""set the date back and forth"""
now = self._getCurrentDate()
now = self._get_current_date()
## copy current time
new_date = dict(now)
## move three minutes forward (more is not nice because of screensavers)
@ -38,10 +40,10 @@ class unittests(cryptobox.web.testclass.WebInterfaceTestClass):
new_date["hour"] = now["hour"] + ((now["minute"] + 3) / 60)
## move forward ...
self._setDate(new_date)
self.assertEquals(new_date, self._getCurrentDate())
self.assertEquals(new_date, self._get_current_date())
## ... and backward
self._setDate(now)
self.assertEquals(now, self._getCurrentDate())
self.assertEquals(now, self._get_current_date())
def test_try_broken_date(self):
@ -54,8 +56,8 @@ class unittests(cryptobox.web.testclass.WebInterfaceTestClass):
self.cmd.find("invalid value for date")
def _getCurrentDate(self):
date_url = self.URL + "date"
def _get_current_date(self):
date_url = self.url + "date"
self.register_auth(date_url)
self.cmd.go(date_url)
self.cmd.find("Data.Status.Plugins.date=([0-9]+/[0-9]+/[0-9]+/[0-9]+/[0-9]+/[0-9]+)$", "m")
@ -73,7 +75,7 @@ class unittests(cryptobox.web.testclass.WebInterfaceTestClass):
def _setDate(self, date):
"""for now we have to use this function instead of the one below"""
date_url = self.URL + "date?weblang=en&store=1&year=%s&month=%s&day=%s&hour=%s&minute=%s"\
date_url = self.url + "date?weblang=en&store=1&year=%s&month=%s&day=%s&hour=%s&minute=%s"\
% (str(date["year"]), str(date["month"]), str(date["day"]),
str(date["hour"]), str(date["minute"]))
self.register_auth(date_url)
@ -83,7 +85,7 @@ class unittests(cryptobox.web.testclass.WebInterfaceTestClass):
def _setDateBroken(self, date):
"""this should work, but the parsing of twill seems to be broken
as soon as the twill bug is fixed, we should use this function"""
date_url = self.URL + "date"
date_url = self.url + "date"
self.register_auth(date_url)
self.cmd.go(date_url)
self.cmd.formvalue("set_date", "year", str(date["year"]))

View file

@ -18,21 +18,31 @@
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
"""The disks feature of the CryptoBox.
"""
__revision__ = "$Id"
import cryptobox.plugins.base
class disks(cryptobox.plugins.base.CryptoBoxPlugin):
"""The disk feature of the CryptoBox.
"""
pluginCapabilities = [ "system" ]
pluginVisibility = [ "menu" ]
requestAuth = False
plugin_capabilities = [ "system" ]
plugin_visibility = [ "menu" ]
request_auth = False
rank = 10
def doAction(self):
self.cbox.reReadContainerList()
def do_action(self):
"""The action handler.
"""
self.cbox.reread_container_list()
return "disks"
def getStatus(self):
return ":".join([e.getDevice() for e in self.cbox.getContainerList()])
def get_status(self):
"""Retrieve the current status of the feature.
"""
return ":".join([e.get_device() for e in self.cbox.get_container_list()])

View file

@ -18,24 +18,26 @@
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
__revision__ = "$Id"
import cryptobox.web.testclass
class unittests(cryptobox.web.testclass.WebInterfaceTestClass):
def test_read_form(self):
'''display all devices'''
self.register_auth(self.URL)
self.cmd.go(self.URL + "disks?weblang=en")
self.register_auth(self.url)
self.cmd.go(self.url + "disks?weblang=en")
self.cmd.find("Available disks")
def test_is_device_in_list(self):
"""check if the device-under-test is in the device list"""
self.register_auth(self.URL)
self.cmd.go(self.URL + "disks?weblang=en")
self.register_auth(self.url)
self.cmd.go(self.url + "disks?weblang=en")
self.cmd.find("Available disks")
self.cmd.find(u'Data.Status.Plugins.disks=(.*)$', "m")
devices = self.locals["__match__"].split(":")
self.assertTrue(len(devices)>0)
self.assertTrue(("/dev/%s1" % self.device in devices) or ("/dev/%s2" % self.device in devices))
self.assertTrue("/dev/%s" % self.device in devices)

View file

@ -18,26 +18,34 @@
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
"""The help feature of the CryptoBox.
"""
__revision__ = "$Id"
import cryptobox.plugins.base
class help(cryptobox.plugins.base.CryptoBoxPlugin):
"""The help feature of the CryptoBox.
"""
pluginCapabilities = [ "system" ]
pluginVisibility = [ "menu" ]
requestAuth = False
plugin_capabilities = [ "system" ]
plugin_visibility = [ "menu" ]
request_auth = False
rank = 80
default_lang = 'en'
default_page = "CryptoBoxUser"
def doAction(self, page=""):
def do_action(self, page=""):
'''prints the offline wikipage
'''
import re,os
import re, os
## check for invalid characters and if the page exists in the default language
if page and \
not re.search(u'\W', page) and \
os.path.isfile(os.path.join(self.cbox.prefs["Locations"]["DocDir"], self.default_lang, page + '.html')):
os.path.isfile(os.path.join(self.cbox.prefs["Locations"]["DocDir"],
self.default_lang, page + '.html')):
## everything is ok
pass
else:
@ -49,20 +57,23 @@ class help(cryptobox.plugins.base.CryptoBoxPlugin):
## store the name of the page
self.hdf[self.hdf_prefix + "Page"] = page
## choose the right language
for l in self.site.langOrder:
if os.path.isfile(os.path.join(self.cbox.prefs["Locations"]["DocDir"], l, page + '.html')):
lang = l
for lang in self.site.lang_order:
if os.path.isfile(os.path.join(self.cbox.prefs["Locations"]["DocDir"],
lang, page + '.html')):
doc_lang = lang
break
else:
lang = self.default_lang
self.hdf[self.hdf_prefix + "Language"] = lang
doc_lang = self.default_lang
self.hdf[self.hdf_prefix + "Language"] = doc_lang
## store the current setting for a later "getStatus" call
self.current_lang = lang
self.current_lang = doc_lang
self.current_page = page
return "doc"
def getStatus(self):
def get_status(self):
"""Retrieve the current status of the feature.
"""
return "%s:%s" % (self.current_lang, self.current_page)

View file

@ -18,6 +18,8 @@
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
__revision__ = "$Id"
import cryptobox.web.testclass
from twill.errors import *
@ -27,7 +29,7 @@ class unittests(cryptobox.web.testclass.WebInterfaceTestClass):
'''help pages should be available in different languages'''
## check english help pages
self.cmd.go(self.URL + "help?weblang=en")
self.cmd.go(self.url + "help?weblang=en")
self.cmd.find("Table of Contents")
self.cmd.find("Getting started")
(lang,page) = self._getHelpStatus()
@ -35,7 +37,7 @@ class unittests(cryptobox.web.testclass.WebInterfaceTestClass):
self.assertTrue(page == "CryptoBoxUser")
## check german help pages
self.cmd.go(self.URL + "help?weblang=de")
self.cmd.go(self.url + "help?weblang=de")
self.cmd.find("Table of Contents")
self.cmd.find("Wie geht es los")
(lang,page) = self._getHelpStatus()
@ -43,7 +45,7 @@ class unittests(cryptobox.web.testclass.WebInterfaceTestClass):
self.assertTrue(page == "CryptoBoxUser")
## check slovene help pages
self.cmd.go(self.URL + "help?weblang=sl")
self.cmd.go(self.url + "help?weblang=sl")
self.assertRaises(TwillAssertionError, self.cmd.notfind, "Table of Contents")
## add a slovene text here, as soon as the help is translated
(lang,page) = self._getHelpStatus()
@ -52,7 +54,7 @@ class unittests(cryptobox.web.testclass.WebInterfaceTestClass):
self.assertTrue(page == "CryptoBoxUser")
## check french help pages
self.cmd.go(self.URL + "help?weblang=fr")
self.cmd.go(self.url + "help?weblang=fr")
self.assertRaises(TwillAssertionError, self.cmd.notfind, "Table of Contents")
## add a french text here, as soon as the help is translated
(lang,page) = self._getHelpStatus()
@ -61,7 +63,7 @@ class unittests(cryptobox.web.testclass.WebInterfaceTestClass):
self.assertTrue(page == "CryptoBoxUser")
## test a random language - it should fall back to english
self.cmd.go(self.URL + "help?weblang=foobar")
self.cmd.go(self.url + "help?weblang=foobar")
self.assertRaises(TwillAssertionError, self.cmd.notfind, "Table of Contents")
(lang,page) = self._getHelpStatus()
self.assertTrue(lang == "en")
@ -70,18 +72,18 @@ class unittests(cryptobox.web.testclass.WebInterfaceTestClass):
def test_help_pages(self):
"""check invalid page requests"""
self.cmd.go(self.URL + "help?page=foobar")
self.cmd.go(self.url + "help?page=foobar")
(lang,page) = self._getHelpStatus()
self.assertTrue(page == "CryptoBoxUser")
self.cmd.go(self.URL + "help?page=CryptoBoxUser")
self.cmd.go(self.url + "help?page=CryptoBoxUser")
(lang,page) = self._getHelpStatus()
self.assertTrue(page == "CryptoBoxUser")
def test_help_default_languages(self):
"""check invalid page requests"""
self.cmd.go(self.URL + "help?weblang=foobar")
self.cmd.go(self.url + "help?weblang=foobar")
(lang,page) = self._getHelpStatus()
self.assertTrue(lang == "en")

View file

@ -18,20 +18,31 @@
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
"""The language_selection feature of the CryptoBox.
"""
__revision__ = "$Id"
import cryptobox.plugins.base
class language_selection(cryptobox.plugins.base.CryptoBoxPlugin):
"""The language_selection feature of the CryptoBox.
"""
pluginCapabilities = [ "system" ]
pluginVisibility = [ "menu", "preferences" ]
requestAuth = False
plugin_capabilities = [ "system" ]
plugin_visibility = [ "menu", "preferences" ]
request_auth = False
rank = 60
def doAction(self):
def do_action(self):
"""Show all available languages.
"""
return "language_selection"
def getStatus(self):
return ":".join(self.site.langOrder)
def get_status(self):
"""The current status of the feature is defined as the current language.
"""
return ":".join(self.site.lang_order)

View file

@ -18,19 +18,21 @@
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
__revision__ = "$Id"
import cryptobox.web.testclass
class unittests(cryptobox.web.testclass.WebInterfaceTestClass):
def test_read_form(self):
url = self.URL + "language_selection?weblang=en"
url = self.url + "language_selection?weblang=en"
self.register_auth(url)
self.cmd.go(url)
self.cmd.find('hoose an interface language')
def test_check_language_list(self):
url = self.URL + "language_selection"
url = self.url + "language_selection"
self.register_auth(url)
self.cmd.go(url)
self.cmd.find(u'Data.Status.Plugins.language_selection=(.*)$', "m")

View file

@ -18,56 +18,76 @@
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
"""The logs feature of the CryptoBox.
"""
__revision__ = "$Id"
import cryptobox.plugins.base
import os
class logs(cryptobox.plugins.base.CryptoBoxPlugin):
"""The logs feature of the CryptoBox.
"""
pluginCapabilities = [ "system" ]
pluginVisibility = [ "preferences" ]
requestAuth = False
plugin_capabilities = [ "system" ]
plugin_visibility = [ "preferences" ]
request_auth = False
rank = 90
def doAction(self, lines=50, size=3000, pattern=None):
def do_action(self, lines=50, size=3000, pattern=None):
"""Show the latest part of the log file.
"""
import re
## filter input
try:
lines = int(lines)
if lines <= 0: raise(ValueError)
if lines <= 0:
raise(ValueError)
except ValueError:
lines = 50
try:
size = int(size)
if size <= 0: raise(ValueError)
if size <= 0:
raise(ValueError)
except ValueError:
size = 3000
if not pattern is None:
pattern = str(pattern)
if re.search(u'\W', pattern): pattern = None
self.hdf[self.hdf_prefix + "Content"] = self.__getLogContent(lines, size, pattern)
self.hdf[self.hdf_prefix + "StyleSheetFile"] = os.path.abspath(os.path.join(self.pluginDir, "logs.css"))
if re.search(u'\W', pattern):
pattern = None
self.hdf[self.hdf_prefix + "Content"] = self.__get_log_content(
lines, size, pattern)
self.hdf[self.hdf_prefix + "StyleSheetFile"] = os.path.abspath(os.path.join(
self.plugin_dir, "logs.css"))
return "show_log"
def getStatus(self):
def get_status(self):
"""The current status includes the log configuration details.
"""
return "%s:%s:%s" % (
self.cbox.prefs["Log"]["Level"],
self.cbox.prefs["Log"]["Destination"],
self.cbox.prefs["Log"]["Details"])
def __getLogContent(self, lines, maxSize, pattern):
import re
def __get_log_content(self, lines, max_size, pattern):
"""Filter, sort and shorten the log content.
"""
if pattern:
content = []
current_length = 0
for line in self.cbox.getLogData():
for line in self.cbox.get_log_data():
if line.find(pattern) != -1:
content.append(line)
current_length += len(line)
if lines and len(content) >=lines: break
if maxSize and current_length >=maxSize: break
if lines and len(content) >= lines:
break
if max_size and current_length >= max_size:
break
else:
content = self.cbox.getLogData(lines, maxSize)
content = self.cbox.get_log_data(lines, max_size)
return "<br/>".join(content)

View file

@ -18,12 +18,14 @@
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
__revision__ = "$Id"
import cryptobox.web.testclass
class unittests(cryptobox.web.testclass.WebInterfaceTestClass):
def test_read_logs(self):
log_url = self.URL + "logs"
log_url = self.url + "logs"
self.register_auth(log_url)
self.cmd.go(log_url)
self.cmd.find('class="console"')
@ -31,13 +33,13 @@ class unittests(cryptobox.web.testclass.WebInterfaceTestClass):
def test_write_logs(self):
log_text = "unittest - just a marker - please ignore"
self.cbox.log.error(log_text)
log_url = self.URL + "logs"
log_url = self.url + "logs"
self.register_auth(log_url)
self.cmd.go(log_url + "?pattern=ERROR")
self.cmd.find(log_text)
def test_invalid_args(self):
log_url = self.URL + "logs"
log_url = self.url + "logs"
self.cmd.go(log_url + "?lines=10")
self.cmd.find('class="console"')
self.cmd.go(log_url + "?lines=0")

View file

@ -18,23 +18,32 @@
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
"""The network feature of the CryptoBox.
"""
__revision__ = "$Id"
import subprocess
import os
import cryptobox.plugins.base
## specify (in seconds), how long we should wait before redirecting and ip change
REDIRECT_DELAY=20
CHANGE_IP_DELAY=1
REDIRECT_DELAY = 10
CHANGE_IP_DELAY = 2
class network(cryptobox.plugins.base.CryptoBoxPlugin):
"""The network feature of the CryptoBox.
"""
pluginCapabilities = [ "system" ]
pluginVisibility = [ "preferences" ]
requestAuth = True
plugin_capabilities = [ "system" ]
plugin_visibility = [ "preferences" ]
request_auth = True
rank = 30
def doAction(self, store=None, redirected="", ip1="", ip2="", ip3="", ip4=""):
def do_action(self, store=None, redirected="", ip1="", ip2="", ip3="", ip4=""):
"""Show a form containing the current IP - change it if requested.
"""
## if we were redirected, then we should display the default page
self.cbox.log.debug("executing network plugin")
if redirected == "1":
@ -46,58 +55,68 @@ class network(cryptobox.plugins.base.CryptoBoxPlugin):
try:
for ip_in in (ip1, ip2, ip3, ip4):
if (int(ip_in) < 0) or (int(ip_in) > 255):
self.cbox.log.info("invalid IP supplied: %s" % str((ip1,ip2,ip3,ip4)))
self.cbox.log.info("invalid IP supplied: %s" % \
str((ip1, ip2, ip3, ip4)))
raise ValueError
ip = "%d.%d.%d.%d" % (int(ip1), int(ip2), int(ip3), int(ip4))
new_ip = "%d.%d.%d.%d" % (int(ip1), int(ip2), int(ip3), int(ip4))
except ValueError:
self.hdf["Data.Warning"] = "Plugins.network.InvalidIP"
self.__prepareFormData()
self.__prepare_form_data()
return "form_network"
if self.__setIP(ip):
self.cbox.log.info("the IP was successfully changed: %s" % ip)
if self.__set_ip(new_ip):
self.cbox.log.info("the IP was successfully changed: %s" % new_ip)
self.hdf["Data.Success"] = "Plugins.network.IPChanged"
self.hdf["Data.Redirect.URL"] = self.__getRedirectDestination(ip)
self.hdf["Data.Redirect.URL"] = self.__get_redirect_destination(new_ip)
self.hdf["Data.Redirect.Delay"] = REDIRECT_DELAY
return None
else:
self.cbox.log.warn("failed to change IP address to: %s" % ip)
self.cbox.log.warn("failed to change IP address to: %s" % new_ip)
self.hdf["Data.Warning"] = "Plugins.network.InvalidIP"
self.__prepareFormData()
self.__prepare_form_data()
return "form_network"
else:
self.cbox.log.debug("network plugin: show form")
## just show the form
self.__prepareFormData()
self.__prepare_form_data()
return "form_network"
def getStatus(self):
return "%d.%d.%d.%d" % self.__getCurrentIP()
def get_status(self):
"""The current IP is the status of this feature.
"""
return "%d.%d.%d.%d" % self.__get_current_ip()
def __getRedirectDestination(self, ip):
def __get_redirect_destination(self, ip):
"""Put the new URL together.
"""
import cherrypy
req = cherrypy.request
base_parts = req.base.split(":")
dest = "%s:%s" % (base_parts[0], ip)
dest = "%s://%s" % (base_parts[0], ip)
if len(base_parts) == 3:
dest += ":%s" % base_parts[2]
return dest
def __prepareFormData(self):
(oc1, oc2, oc3, oc4) = self.__getCurrentIP()
def __prepare_form_data(self):
"""Set some hdf values.
"""
(oc1, oc2, oc3, oc4) = self.__get_current_ip()
self.hdf[self.hdf_prefix + "ip.oc1"] = oc1
self.hdf[self.hdf_prefix + "ip.oc2"] = oc2
self.hdf[self.hdf_prefix + "ip.oc3"] = oc3
self.hdf[self.hdf_prefix + "ip.oc4"] = oc4
def __getCurrentIP(self):
def __get_current_ip(self):
"""Retrieve the current IP.
"""
import re
import imp
## load some values from the root_action.py script
root_action_plug = imp.load_source("root_action", os.path.join(self.pluginDir, "root_action.py"))
root_action_plug = imp.load_source("root_action",
os.path.join(self.plugin_dir, "root_action.py"))
## get the current IP of the network interface
proc = subprocess.Popen(
shell = False,
@ -106,20 +125,25 @@ class network(cryptobox.plugins.base.CryptoBoxPlugin):
root_action_plug.IFCONFIG_BIN,
root_action_plug.IFACE])
(stdout, stderr) = proc.communicate()
if proc.returncode != 0: return (0,0,0,0)
if proc.returncode != 0:
return (0,0,0,0)
## this regex matches the four numbers of the IP
match = re.search(u'inet [\w]+:(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})\s', stdout)
if match:
## use the previously matched numbers
return tuple([int(e) for e in match.groups()])
else:
return (0,0,0,0)
return (0, 0, 0, 0)
def __setIP(self, ip):
def __set_ip(self, new_ip):
"""Change the IP.
"""
import threading
## call the root_action script after some seconds - so we can deliver the page before
def delayedIPchange():
def delayed_ip_change():
"""A threaded function to change the IP.
"""
import time
time.sleep(CHANGE_IP_DELAY)
proc = subprocess.Popen(
@ -129,19 +153,17 @@ class network(cryptobox.plugins.base.CryptoBoxPlugin):
self.cbox.prefs["Programs"]["super"],
self.cbox.prefs["Programs"]["CryptoBoxRootActions"],
"plugin",
os.path.join(self.pluginDir, "root_action.py"),
ip])
os.path.join(self.plugin_dir, "root_action.py"),
new_ip])
proc.wait()
if proc.returncode != 0:
self.cbox.log.warn("failed to change IP address: %s" % ip)
self.cbox.log.warn("failed to change IP address: %s" % new_ip)
self.cbox.log.warn("error output: %s" % str(proc.stderr.read()))
return
thread = threading.Thread()
thread.run = delayedIPchange
thread.run = delayed_ip_change
thread.setDaemon(True)
thread.start()
# TODO: how could we guess, if it failed?
return True

View file

@ -19,6 +19,8 @@
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
__revision__ = "$Id"
#TODO: add netmask and gateway

View file

@ -18,6 +18,8 @@
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
__revision__ = "$Id"
import cryptobox.web.testclass
from network import CHANGE_IP_DELAY
@ -28,43 +30,47 @@ class unittests(cryptobox.web.testclass.WebInterfaceTestClass):
'''change of network address'''
## the time module is necessary for the CHANGE_IP_DELAY
import time
self.register_auth(self.URL + "network")
self.cmd.go(self.URL + "network")
self.register_auth(self.url + "network")
## do not follow redirects - they would break the test otherwise
self.cmd.config("acknowledge_equiv_refresh", 0)
self.cmd.go(self.url + "network")
## extract the current IP from the network plugin output
def getCurrentIP():
self.cmd.go(self.URL + "network")
def get_current_ip():
self.register_auth(self.url + "network")
self.cmd.go(self.url + "network")
self.cmd.find(u'Data.Status.Plugins.network=([0-9\.]*)$', "m")
return self.locals["__match__"]
origIPtext = getCurrentIP()
origIPocts = origIPtext.split(".")
orig_ip_text = get_current_ip()
orig_ip_octs = orig_ip_text.split(".")
## check, if the original IP is valid (contains four octets)
self.assertEquals(4, len(origIPocts))
wrongIP = "192.168.123.321"
def setIP((ip1, ip2, ip3, ip4)):
self.cmd.go(self.URL + "network")
self.assertEquals(4, len(orig_ip_octs))
def set_ip((ip1, ip2, ip3, ip4)):
self.cmd.go(self.url + "network")
self.cmd.formvalue("set_ip", "ip1", str(ip1))
self.cmd.formvalue("set_ip", "ip2", str(ip2))
self.cmd.formvalue("set_ip", "ip3", str(ip3))
self.cmd.formvalue("set_ip", "ip4", str(ip4))
self.cmd.submit()
## sleep a little bit longer than the delay necessary for ip-change
time.sleep(CHANGE_IP_DELAY + 0.2)
setIP([1,-2,0,1])
self.assertEquals(origIPtext, getCurrentIP())
setIP([1,0,0,256])
self.assertEquals(origIPtext, getCurrentIP())
setIP([1,"foo",0,1])
self.assertEquals(origIPtext, getCurrentIP())
setIP([10,12,0,2])
self.assertEquals("10.12.0.2", getCurrentIP())
setIP(origIPocts)
self.assertEquals(origIPtext, getCurrentIP())
time.sleep(CHANGE_IP_DELAY + 3)
set_ip([1,-2,0,1])
self.assertEquals(orig_ip_text, get_current_ip())
set_ip([1,0,0,256])
self.assertEquals(orig_ip_text, get_current_ip())
set_ip([1,"foo",0,1])
self.assertEquals(orig_ip_text, get_current_ip())
new_ip = orig_ip_octs[:]
new_ip[3] = str((int(orig_ip_octs[3]) + 128) % 256)
set_ip(new_ip)
self.assertEquals(".".join(new_ip), get_current_ip())
set_ip(orig_ip_octs)
self.assertEquals(orig_ip_text, get_current_ip())
def test_inputs(self):
self.register_auth(self.URL + "network")
self.cmd.go(self.URL + "network" + "?redirected=1")
self.register_auth(self.url + "network")
self.cmd.go(self.url + "network" + "?redirected=1")
self.cmd.notfind("problem")
self.cmd.go(self.URL + "network" + "?store=1")
self.cmd.go(self.url + "network" + "?store=1")
self.cmd.find("invalid network address")

View file

</
@ -18,114 +18,141 @@
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
"""The partition feature of the CryptoBox.
"""
__revision__ = "$Id"
import subprocess
import os
import logging
import cryptobox.core.tools as cbxTools
import cryptobox.plugins.base
class partition(cryptobox.plugins.base.CryptoBoxPlugin):
pluginCapabilities = [ "system" ]
pluginVisibility = [ "preferences" ]
requestAuth = True
PARTTYPES = {
"windows" : ["0xC", "vfat"],
"linux" : ["L", "ext3"]}
CONFIGPARTITION = {
"size" : 5, # size of configuration partition (if necessary) in MB
"type" : "L",
"fs" : "ext2"}
class partition(cryptobox.plugins.base.CryptoBoxPlugin):
"""The partition feature of the CryptoBox.
"""
plugin_capabilities = [ "system" ]
plugin_visibility = [ "preferences" ]
request_auth = True
rank = 80
PartTypes = {
"windows" : ["0xC", "vfat"],
"linux" : ["L", "ext3"]}
ConfigPartition = {
"size" : 5, # size of configuration partition (if necessary) in MB
"type" : "L",
"fs" : "ext2"}
def doAction(self, **args):
def do_action(self, **args):
"""Show the partitioning form and execute the requested action.
"""
## load default hdf values
self.__prepareDataset()
self.__prepare_dataset()
## retrieve some values from 'args' - defaults are empty
self.device = self.__getSelectedDevice(args)
self.withConfigPartition = self.__isWithConfigPartition()
self.device = self.__get_selected_device(args)
self.with_config_partition = self.__is_with_config_partition()
self.cbox.log.debug("partition plugin: selected device=%s" % str(self.device))
self.deviceSize = self.__getAvailableDeviceSize(self.device)
self.device_size = self.__get_available_device_size(self.device)
try:
step = args["step"]
del args["step"]
except KeyError:
step = "select_device"
try:
## this way of selecting the easy setup is necessary: see select_device.py for details
if args["easy"]: step = "easy"
## this way of selecting the easy setup is necessary:
## see select_device.cs for details (button values for ie)
if args["easy"]:
step = "easy"
except KeyError:
pass
## no (or invalid) device was supplied
if not self.device:
step = "select_device"
if step == "add_partition":
return self.__actionAddPartition(args)
return self.__action_add_partition(args)
elif step == "del_partition":
return self.__actionDelPartition(args)
return self.__action_del_partition(args)
elif step == "finish":
return self.__actionFinish(args)
return self.__action_finish(args)
elif step == "easy":
return self.__actionEasySetup(args)
return self.__action_easy_setup()
else: # for "select_device" and for invalid targets
return self.__actionSelectDevice(args)
return self.__action_select_device()
def getStatus(self):
return "%s / %s / %s" % (self.device, self.deviceSize, self.withConfigPartition)
def get_status(self):
"""The status of this plugin is the selected device and some information.
"""
return "%s / %s / %s" % (self.device, self.device_size,
self.with_config_partition)
def __prepareDataset(self):
self.hdf[self.hdf_prefix + "StyleSheetFile"] = os.path.join(self.pluginDir, "partition.css")
def __prepare_dataset(self):
"""Set some hdf values.
"""
self.hdf[self.hdf_prefix + "StyleSheetFile"] = \
os.path.join(self.plugin_dir, "partition.css")
def __getSelectedDevice(self, args):
def __get_selected_device(self, args):
"""Check the selected device (valid, not busy, ...).
"""
try:
device = args["block_device"]
except KeyError:
return None
if not self.__isDeviceValid(device):
if not self.__is_device_valid(device):
return None
if self.__isDeviceBusy(device):
if self.__is_device_busy(device):
self.hdf["Data.Warning"] = "Plugins.partition.DiskIsBusy"
return None
return device
def __isDeviceValid(self, device):
def __is_device_valid(self, device):
"""Check if the device is valid and allowed.
"""
if not device:
return False
if not self.cbox.isDeviceAllowed(device):
if not self.cbox.is_device_allowed(device):
return False
if not device in cbxTools.getParentBlockDevices():
if not device in cbxTools.get_parent_blockdevices():
return False
return True
def __isDeviceBusy(self, device):
"""check if the device (or one of its partitions) is mounted"""
def __is_device_busy(self, device):
"""check if the device (or one of its partitions) is mounted
"""
# the config partition is ignored, as it will get unmounted if necessary
import re
for c in self.cbox.getContainerList():
if re.match(device + "\d*$", c.getDevice()):
if c.isMounted(): return True
for cont in self.cbox.get_container_list():
if re.match(device + "\d*$", cont.get_device()):
if cont.is_mounted():
return True
return False
def __actionSelectDevice(self, args):
def __action_select_device(self):
"""Show a form to select the device for partitioning.
"""
block_devices = [e
for e in cbxTools.getParentBlockDevices()
if self.cbox.isDeviceAllowed(e)]
for e in cbxTools.get_parent_blockdevices()
if self.cbox.is_device_allowed(e)]
counter = 0
for a in block_devices:
self.hdf[self.hdf_prefix + "BlockDevices.%d.name" % counter] = a
self.hdf[self.hdf_prefix + "BlockDevices.%d.size" % counter] = cbxTools.getBlockDeviceSizeHumanly(a)
self.cbox.log.debug("found a suitable block device: %s" % a)
for dev in block_devices:
self.hdf[self.hdf_prefix + "BlockDevices.%d.name" % counter] = dev
self.hdf[self.hdf_prefix + "BlockDevices.%d.size" % counter] = \
cbxTools.get_blockdevice_size_humanly(dev)
self.cbox.log.debug("found a suitable block device: %s" % dev)
counter += 1
if self.withConfigPartition:
if self.with_config_partition:
self.hdf[self.hdf_prefix + "CreateConfigPartition"] = "1"
## there is no disk available
if not block_devices:
@ -133,59 +160,72 @@ class partition(cryptobox.plugins.base.CryptoBoxPlugin):
return "select_device"
def __actionAddPartition(self, args):
def __action_add_partition(self, args):
"""Add a selected partition to the currently proposed partition table.
"""
self.hdf[self.hdf_prefix + "Device"] = self.device
self.hdf[self.hdf_prefix + "Device.Size"] = self.deviceSize
parts = self.__getPartitionsFromArgs(args)
self.__setPartitionData(parts)
self.hdf[self.hdf_prefix + "Device.Size"] = self.device_size
parts = self.__get_partitions_from_args(args)
self.__set_partition_data(parts)
return "set_partitions"
def __actionDelPartition(self, args):
def __action_del_partition(self, args):
"""Remove a partition from the proposed partition table.
"""
try:
part_num = int(args["del_num"])
except (TypeError,KeyError):
return self.__actionAddPartition(args)
return self.__action_add_partition(args)
self.hdf[self.hdf_prefix + "Device"] = self.device
self.hdf[self.hdf_prefix + "Device.Size"] = self.deviceSize
parts = self.__getPartitionsFromArgs(args)
self.hdf[self.hdf_prefix + "Device.Size"] = self.device_size
parts = self.__get_partitions_from_args(args)
## valid partition number to be deleted?
if part_num < len(parts):
del parts[part_num]
else:
return self.__actionAddPartition(args)
self.__setPartitionData(parts)