1170 lines
44 KiB
Python
Executable file
1170 lines
44 KiB
Python
Executable file
#!/usr/bin/env python2.6
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import os
|
|
# the basedir is the parent dir of the location of this script
|
|
BASE_DIR = os.path.dirname(os.path.abspath(os.path.join(__file__, os.path.pardir)))
|
|
# add the project directory to the python search path
|
|
import sys
|
|
sys.path.insert(0, os.path.join(BASE_DIR, "src"))
|
|
|
|
import tools
|
|
import bobo
|
|
import forms
|
|
import sqlobject
|
|
import genshi.filters
|
|
import genshi.input
|
|
import genshi.template
|
|
import genshi
|
|
import formencode
|
|
import webob
|
|
import csv
|
|
import ConfigParser
|
|
import datetime
|
|
import mimetypes
|
|
import uuid
|
|
import re
|
|
import smtplib
|
|
import email.mime.text
|
|
import email.utils
|
|
import hashlib
|
|
import twitter
|
|
import urllib2
|
|
|
|
CONFIG_FILE = os.path.join(BASE_DIR, "wortschlucker.conf")
|
|
|
|
|
|
""" *** Initialization *** """
|
|
config = ConfigParser.SafeConfigParser()
|
|
config.read(CONFIG_FILE)
|
|
db_uri = config.get("database", "uri")
|
|
sqlobject.sqlhub.processConnection = sqlobject.connectionForURI(db_uri)
|
|
loader = genshi.template.TemplateLoader(os.path.join(BASE_DIR, 'templates'), auto_reload=False)
|
|
|
|
BLOG_DIR = os.path.join(BASE_DIR, "blog")
|
|
|
|
|
|
BASE_DICT = {
|
|
"base_url": "/", # the trailing slash is necessary
|
|
"show_navbar": True,
|
|
"errors": {},
|
|
}
|
|
|
|
# used as the default setting for expose/close dates
|
|
DEFAULT_DAYS_AHEAD = 7
|
|
DATE_FORMAT = "%d.%m.%Y"
|
|
EXPORT_DATE_FORMAT = "%d.%m.%Y %H:%M:%S"
|
|
EXPORT_ENCODING = "utf-8"
|
|
EXPORT_CHARSET = "utf8"
|
|
EXPORT_FILENAME_TEMPLATE = "%%(prefix)s_%%(title)s_%s.csv" % datetime.datetime.now().strftime("%Y-%m-%d")
|
|
DEFAULT_DATE = datetime.datetime.now() + datetime.timedelta(days=DEFAULT_DAYS_AHEAD)
|
|
|
|
POLL_SETTINGS = {
|
|
"show_all_submissions": (bool, True),
|
|
"show_statistics": (bool, True),
|
|
"public": (bool, False),
|
|
"expose_date": (datetime.datetime, DEFAULT_DATE),
|
|
"close_date": (datetime.datetime, DEFAULT_DATE),
|
|
"vote_enabled": (bool, False),
|
|
"vote_closed": (bool, False),
|
|
}
|
|
|
|
POLL_SETTING_TEMPLATES = {
|
|
"brainstorm": {},
|
|
"cards": {"show_all_submissions": False},
|
|
"feedback": {"show_all_submissions": False},
|
|
"evaluation": {"show_all_submissions": False},
|
|
"notes": {"show_statistics": False},
|
|
"shopping": {"show_statistics": False},
|
|
"clipboard": {"show_statistics": False},
|
|
"namefinder": {},
|
|
}
|
|
|
|
|
|
class ContentSubmission(sqlobject.SQLObject):
|
|
submitter = sqlobject.UnicodeCol()
|
|
content = sqlobject.UnicodeCol()
|
|
poll_id = sqlobject.ForeignKey("Poll")
|
|
timestamp_creation = sqlobject.DateTimeCol()
|
|
|
|
def get_creation_time_string(self):
|
|
return str(self.timestamp_creation)
|
|
|
|
def get_markup_content(self):
|
|
mark_links = get_markup_with_links(self.content)
|
|
markup = genshi.input.HTML(mark_links) | genshi.filters.HTMLSanitizer()
|
|
# the markup is now marked as "safe" -> genshi will output it literally
|
|
return markup
|
|
|
|
def get_delete_url(self, absolute=False):
|
|
return self.poll_id.get_admin_url(absolute=absolute,
|
|
suffix="/delete/%s" % self.get_obfuscated_digest())
|
|
|
|
def get_obfuscated_digest(self):
|
|
return hashlib.md5(str(self.id)).hexdigest()
|
|
|
|
|
|
class Profile(sqlobject.SQLObject):
|
|
email = sqlobject.UnicodeCol(unique=True)
|
|
hash_key = sqlobject.StringCol(unique=True)
|
|
|
|
def get_user_polls(self, *args, **kwargs):
|
|
return self._get_generic_polls(False, *args, **kwargs)
|
|
|
|
def get_admin_polls(self, *args, **kwargs):
|
|
return self._get_generic_polls(True, *args, **kwargs)
|
|
|
|
def _get_generic_polls(self, is_admin, old_to_new=True):
|
|
polls = [user_poll.poll for user_poll in ProfilePolls.selectBy(
|
|
user=self.id, is_admin=is_admin)]
|
|
polls.sort(key=lambda poll: poll.timestamp_creation,
|
|
reverse=not old_to_new)
|
|
return polls
|
|
|
|
def get_url(self, absolute=False):
|
|
return get_url_string("%s%s/%s" % (BASE_DICT["base_url"], "profile", self.hash_key), absolute)
|
|
|
|
|
|
class ProfilePolls(sqlobject.SQLObject):
|
|
user = sqlobject.ForeignKey("Profile")
|
|
poll = sqlobject.ForeignKey("Poll")
|
|
is_admin = sqlobject.BoolCol()
|
|
|
|
|
|
class PollSetting(sqlobject.SQLObject):
|
|
poll_id = sqlobject.ForeignKey("Poll")
|
|
key = sqlobject.UnicodeCol()
|
|
value = sqlobject.UnicodeCol()
|
|
|
|
|
|
class PollRelation(sqlobject.SQLObject):
|
|
first = sqlobject.ForeignKey("Poll")
|
|
second = sqlobject.ForeignKey("Poll")
|
|
|
|
|
|
class Poll(sqlobject.SQLObject):
|
|
author = sqlobject.UnicodeCol()
|
|
hash_key = sqlobject.StringCol()
|
|
admin_hash_key = sqlobject.StringCol()
|
|
title = sqlobject.UnicodeCol()
|
|
description = sqlobject.UnicodeCol()
|
|
timestamp_creation = sqlobject.DateTimeCol()
|
|
|
|
def get_related_polls(self):
|
|
""" get all directly and indirectly connected polls up to a certain
|
|
distance
|
|
"""
|
|
return PollMesh(self).get_related_polls()
|
|
|
|
def get_related_polls_direct(self):
|
|
""" get all directly connected polls """
|
|
related = []
|
|
related.extend([poll.second for poll in PollRelation.selectBy(first=self.id)])
|
|
related.extend([poll.first for poll in PollRelation.selectBy(second=self.id)])
|
|
return related
|
|
|
|
def get_description_markup(self):
|
|
mark_links = get_markup_with_links(self.description)
|
|
markup = genshi.input.HTML(mark_links) | genshi.filters.HTMLSanitizer()
|
|
# the markup is now marked as "safe" -> genshi will output it literally
|
|
return markup
|
|
|
|
def get_settings(self):
|
|
current_dict = {}
|
|
for setting in PollSetting.selectBy(poll_id=self.id):
|
|
if setting.key in POLL_SETTINGS.keys():
|
|
current_dict[setting.key] = validate_poll_setting(setting.key, setting.value)
|
|
for key, meta_info in POLL_SETTINGS.items():
|
|
if not key in current_dict.keys():
|
|
current_dict[key] = meta_info[1]
|
|
return current_dict
|
|
|
|
def get_settings_strings(self):
|
|
settings = self.get_settings()
|
|
result = {}
|
|
for key, value in settings.items():
|
|
result[key] = get_poll_setting_string(key, value)
|
|
return result
|
|
|
|
def change_setting(self, key, value):
|
|
validated_value = validate_poll_setting(key, value)
|
|
if not validated_value is None:
|
|
validated_value = get_poll_setting_string(key, validated_value)
|
|
poll_setting = PollSetting.selectBy(poll_id=self.id, key=key)
|
|
if poll_setting.count() == 1:
|
|
poll_setting[0].value = validated_value
|
|
elif poll_setting.count() == 0:
|
|
PollSetting(poll_id=self.id, key=key, value=validated_value)
|
|
if (key == 'public') and (value > 0):
|
|
self.announce_via_twitter()
|
|
|
|
def announce_via_twitter(self):
|
|
complete_url = self.get_url(absolute=True)
|
|
try:
|
|
title = "%s %s %s" % (config.get('misc', 'twitter_alert_prefix'),
|
|
self.title[:79], complete_url)
|
|
# the following line is quick and dirty fix for the unicode bug twitter exception.
|
|
# of course it would be better to preserve the umlauts somehow...
|
|
title = title.encode('ascii','ignore')
|
|
twitter_key = config.get('misc', 'twitter_consumer_key')
|
|
twitter_secret = config.get('misc', 'twitter_consumer_secret')
|
|
twitter_access_key = config.get('misc', 'twitter_access_token_key')
|
|
twitter_access_secret = config.get('misc', 'twitter_access_token_secret')
|
|
except ConfigParser.Error:
|
|
# a config setting seems to be missing (e.g. in a dev environment)
|
|
return
|
|
publish_twitter_alert(title, twitter_key, twitter_secret, twitter_access_key, twitter_access_secret)
|
|
|
|
def get_num_of_submitters(self):
|
|
all_submitters = [submission.submitter for submission in ContentSubmission.selectBy(poll_id=self.id)]
|
|
unique_submitters = []
|
|
for submitter in all_submitters:
|
|
if not submitter in unique_submitters:
|
|
unique_submitters.append(submitter)
|
|
return len(unique_submitters)
|
|
|
|
def get_num_of_submissions(self):
|
|
return ContentSubmission.selectBy(poll_id=self.id).count()
|
|
|
|
def get_submissions(self):
|
|
return ContentSubmission.selectBy(poll_id=self.id).orderBy("timestamp_creation")
|
|
|
|
def delete_poll(self):
|
|
submissions = ContentSubmission.selectBy(poll_id=self.id)
|
|
settings = PollSetting.selectBy(poll_id=self.id)
|
|
for submission in submissions:
|
|
submission.destroySelf()
|
|
for setting in settings:
|
|
setting.destroySelf()
|
|
self.destroySelf()
|
|
|
|
def get_url(self, absolute=False):
|
|
return get_url_string("%s%s" % (BASE_DICT["base_url"], self.hash_key), absolute)
|
|
|
|
def get_submit_url(self, absolute=False):
|
|
return get_url_string("%s%s/submit" % (BASE_DICT["base_url"], self.hash_key), absolute)
|
|
|
|
def get_vote_url(self, absolute=False):
|
|
#TODO vote: Diese Funktion ist noch falsch, aber erstmal als Platzhalter noetig
|
|
return ""
|
|
|
|
def get_admin_url(self, absolute=False, suffix=""):
|
|
return get_url_string("%s%s%s" % (BASE_DICT["base_url"], self.admin_hash_key, suffix), absolute)
|
|
|
|
def get_edit_url(self, absolute=False):
|
|
return get_url_string("%s%s/admin" % (BASE_DICT["base_url"], self.admin_hash_key), absolute)
|
|
|
|
def get_delete_url(self, absolute=False):
|
|
return get_url_string("%s%s/delete" % (BASE_DICT["base_url"], self.admin_hash_key), absolute)
|
|
|
|
def get_vote_enable_url(self, absolute=False):
|
|
return get_url_string("%s%s/vote_enable" % (BASE_DICT["base_url"], self.admin_hash_key), absolute)
|
|
|
|
def get_creation_time_string(self):
|
|
return str(self.timestamp_creation)
|
|
|
|
def is_vote_enabled(self):
|
|
return self.get_settings()["vote_enabled"]
|
|
|
|
def get_submissions_visibility(self):
|
|
settings = self.get_settings()
|
|
return bool(settings["show_all_submissions"] or (settings["expose_date"] and \
|
|
(settings["expose_date"].date() <= datetime.datetime.now().date())))
|
|
|
|
def is_closed(self):
|
|
return bool(self.get_settings()["close_date"] and \
|
|
(self.get_settings()["close_date"].date() < datetime.datetime.now().date()))
|
|
|
|
|
|
class PollMesh:
|
|
""" generate a mesh of directly or indirectly related polls
|
|
|
|
Basically this is just a recursive search for unique related polls that are
|
|
connected to the base poll with less than (e.g.) five nodes in between.
|
|
"""
|
|
|
|
def __init__(self, poll, depth=5):
|
|
self.related = []
|
|
# start to collect the related polls immediately
|
|
self.__collect_related_polls(poll, depth)
|
|
|
|
def __collect_related_polls(self, current_poll, current_depth):
|
|
""" recursive scanning for unique related polls up to a certain distance
|
|
"""
|
|
related = current_poll.get_related_polls_direct()
|
|
new_queue = [poll for poll in related if not poll in self.related]
|
|
self.related.extend(new_queue)
|
|
if current_depth > 0:
|
|
for poll in new_queue:
|
|
self.__collect_related_polls(poll, current_depth - 1)
|
|
|
|
def get_related_polls(self):
|
|
return self.related
|
|
|
|
|
|
def get_markup_with_links(text):
|
|
def get_link_markup(match):
|
|
prefix, url, suffix = match.groups()
|
|
# only take the TLD part of the url
|
|
short_name = url.split("/")[2]
|
|
return """%s<a href="%s" rel="nofollow">%s</a>%s""" % (prefix, url, short_name, suffix)
|
|
def expand_protocol(match):
|
|
prefix, url, suffix = match.groups()
|
|
if url.lower().startswith("www") or suffix.startswith("/"):
|
|
# just prepend "http://"
|
|
return prefix + "http://" + url + suffix
|
|
else:
|
|
return prefix + url + suffix
|
|
# surround all urls with html markup
|
|
text = genshi.escape(text)
|
|
text = re.sub(r"(\A|\s|\()([a-zA-Z_\-\.]+\.[a-zA-Z]{2,4})(/|\)|\s|\Z)",
|
|
expand_protocol, text)
|
|
text = re.sub(r"(\A|\s|\()(https?://[\w/\?\.\#=;,_\-\~&]*)(\)|\s|\Z)",
|
|
get_link_markup, text)
|
|
return get_markup_with_formatted_linebreaks(text, "<br />")
|
|
|
|
def get_markup_with_formatted_linebreaks(text, break_char):
|
|
text = text.replace("\r\n", "\n")
|
|
text = text.replace("\r", "\n")
|
|
text = text.replace("\n", break_char)
|
|
return text
|
|
|
|
def get_poll_setting_string(key, value):
|
|
if not key in POLL_SETTINGS.keys():
|
|
return ""
|
|
setting_type = POLL_SETTINGS[key][0]
|
|
if setting_type in (basestring, unicode, str):
|
|
return value
|
|
elif setting_type == bool:
|
|
return str(value)
|
|
elif setting_type == datetime.datetime:
|
|
# unset dates are None
|
|
if value is None:
|
|
return ""
|
|
elif value == "":
|
|
# value for "forever"
|
|
return ""
|
|
else:
|
|
return value.strftime(DATE_FORMAT)
|
|
else:
|
|
return str(value)
|
|
|
|
def validate_poll_setting(key, value):
|
|
if not key in POLL_SETTINGS.keys():
|
|
return None
|
|
setting_type = POLL_SETTINGS[key][0]
|
|
if setting_type in (basestring, unicode, str):
|
|
return value
|
|
elif setting_type == bool:
|
|
if value is None:
|
|
return False
|
|
elif isinstance(value, bool):
|
|
return value
|
|
else:
|
|
text = value.lower()
|
|
if text in ("0", "false", "no", "off", "disabled", ""):
|
|
return False
|
|
elif text in ("1", "true", "yes", "on", "enabled"):
|
|
return True
|
|
else:
|
|
return None
|
|
elif setting_type == datetime.datetime:
|
|
if value is None:
|
|
# default: one week later
|
|
value = datetime.date.today() + datetime.timedelta(days=DEFAULT_DAYS_AHEAD)
|
|
elif type(value) == datetime.datetime:
|
|
pass
|
|
elif value == "":
|
|
# this is the value for "forever"
|
|
return ""
|
|
else:
|
|
try:
|
|
value = datetime.datetime.strptime(value, DATE_FORMAT)
|
|
except ValueError:
|
|
value = None
|
|
return value
|
|
elif setting_type == int:
|
|
if value is None:
|
|
value = 0
|
|
try:
|
|
value = int(value)
|
|
except ValueError:
|
|
value = None
|
|
return value
|
|
else:
|
|
# all other types (e.g. int, float, ...)
|
|
try:
|
|
return setting_type(value)
|
|
except ValueError:
|
|
return None
|
|
|
|
def send_profile_mail(user):
|
|
data = {}
|
|
for key, default in (("host", "localhost"), ("port", 25),
|
|
("use_ssl", False), ("from", None),
|
|
("subject", None), ("body", None)):
|
|
try:
|
|
data[key] = config.get("mail", key, raw=True)
|
|
except ConfigParser.Error, err_msg:
|
|
if default is None:
|
|
# fail!
|
|
open("/tmp/test.out", "w").write(str(err_msg))
|
|
return False
|
|
data[key] = default
|
|
data["port"] = int(data["port"])
|
|
if data["use_ssl"]:
|
|
provider = smtplib.SMTP_SSL
|
|
else:
|
|
provider = smtplib.SMTP
|
|
input_data = {"profile_url": user.get_url(absolute=True)}
|
|
content = data["body"] % input_data
|
|
# empty lines can't be parsed via ConfigParser
|
|
# re.sub in Python 2.5 does not understand "flags" -> compile first
|
|
dot_line_regex = re.compile("^\.$", flags=re.MULTILINE)
|
|
content = dot_line_regex.sub("", content)
|
|
message = email.mime.text.MIMEText(content)
|
|
message["Subject"] = data["subject"]
|
|
message["From"] = data["from"]
|
|
message["To"] = user.email
|
|
message["Date"] = email.utils.formatdate()
|
|
try:
|
|
server = provider(data["host"], data["port"])
|
|
server.sendmail(data["from"], [user.email], message.as_string())
|
|
except smtplib.SMTPException:
|
|
return False
|
|
return True
|
|
|
|
def get_default_values(request, **kwargs):
|
|
value_dict = dict(BASE_DICT)
|
|
for key, value in kwargs.items():
|
|
value_dict[key] = value
|
|
# add configuration settings
|
|
try:
|
|
enable_users = config.get("users", "enabled")
|
|
except ConfigParser.Error:
|
|
enable_users = False
|
|
value_dict["enable_users"] = enable_users
|
|
try:
|
|
enable_voting = config.get("voting", "enabled")
|
|
except ConfigParser.Error:
|
|
enable_voting = False
|
|
value_dict["enable_voting"] = enable_voting
|
|
# read the user's hash key from the cookie
|
|
if enable_users and ("user_hash_key" in request.cookies):
|
|
value_dict["user_hash_key"] = str(request.cookies["user_hash_key"])
|
|
return value_dict
|
|
|
|
def get_url_string(url, absolute=False):
|
|
""" return the URL as it is or turn it into an absolute URL
|
|
|
|
@value url: the given URL
|
|
@type url: str
|
|
@value absolute: should the URL be absolute or relative?
|
|
@type absolute: bool
|
|
@returns: the relative or absolute URL
|
|
@rtype: str
|
|
"""
|
|
if absolute:
|
|
return config.get('hosting', 'full_base_url') + url
|
|
else:
|
|
return url
|
|
|
|
def render(filename, input_data=None, **values):
|
|
stream = loader.load(filename).generate(**values)
|
|
if not input_data is None:
|
|
stream |= genshi.filters.HTMLFormFiller(data=input_data)
|
|
#return stream.render("html", doctype="html")
|
|
response = webob.Response(content_type="text/html", charset="utf8")
|
|
if "user_hash_key" in values:
|
|
response.set_cookie("user_hash_key", values["user_hash_key"],
|
|
max_age=365, path="/")
|
|
response.body = stream.render("html", doctype="html")
|
|
return response
|
|
|
|
def get_poll_id(hash_key):
|
|
if isinstance(hash_key, unicode):
|
|
try:
|
|
hash_key = str(hash_key)
|
|
except UnicodeEncodeError:
|
|
return None
|
|
polls = Poll.selectBy(hash_key=hash_key)
|
|
if polls.count() == 1:
|
|
return polls[0].id
|
|
else:
|
|
return None
|
|
|
|
def get_poll_admin_id(hash_key):
|
|
if isinstance(hash_key, unicode):
|
|
try:
|
|
hash_key = str(hash_key)
|
|
except UnicodeEncodeError:
|
|
return None
|
|
polls = Poll.selectBy(admin_hash_key=hash_key)
|
|
if polls.count() == 1:
|
|
return polls[0].id
|
|
else:
|
|
return None
|
|
|
|
def extract_poll_id(text, admin=False):
|
|
""" The text may be an admin hash or the admin link of a poll """
|
|
if admin:
|
|
get_func = get_poll_admin_id
|
|
else:
|
|
get_func = get_poll_id
|
|
result = get_func(text)
|
|
if result is None:
|
|
extracted_text = re.findall(r"[a-z0-9]+", text)
|
|
# we assume that the hash is at the end of the string
|
|
extracted_text.reverse()
|
|
for found in extracted_text:
|
|
guess = get_func(found)
|
|
if not guess is None:
|
|
return guess
|
|
return None
|
|
else:
|
|
return result
|
|
|
|
def get_new_hash_key(length=16, charset=None):
|
|
""" returns a quite random hash key with the specified length """
|
|
if charset is None:
|
|
charset = "0123456789abcdefghijklmnopqrstuvwxyz"
|
|
def get_hash_string(length):
|
|
base = uuid.uuid4().int
|
|
result = []
|
|
while len(result) < length:
|
|
value = base % len(charset)
|
|
base //= len(charset)
|
|
result.append(charset[value])
|
|
return "".join(result)
|
|
# repeat the hash generation until a new value is found
|
|
hash_key = get_hash_string(length)
|
|
while (not get_poll_id(hash_key) is None) or (not get_poll_admin_id(hash_key) is None):
|
|
hash_key = get_hash_string(length)
|
|
return hash_key
|
|
|
|
def publish_twitter_alert(text, key, secret,access_key,access_secret):
|
|
api = twitter.Api(consumer_key= key, consumer_secret = secret, access_token_key=access_key, access_token_secret= access_secret)
|
|
try:
|
|
api.PostUpdate(text)
|
|
except urllib2.HTTPError,e:
|
|
# twitter error, most likely because of a duplicate message
|
|
# or maybe an authentication failure
|
|
print e.code
|
|
except urllib2.URLError, e:
|
|
print e.reason
|
|
|
|
def check_spam_submitter_name(name):
|
|
lower_text = re.sub("[^a-z]", "", name)
|
|
upper_text = re.sub("[^A-Z]", "", name)
|
|
return (len(lower_text) + len(upper_text) == len(name)) and \
|
|
(len(lower_text) > 3) and (len(upper_text) > 3) and \
|
|
(len(name) >= 8) and (not name.startswith(upper_text))
|
|
|
|
def check_spam_content(text):
|
|
return bool(re.search(r"(<a\s|\shref=|</a>)", text.lower()))
|
|
|
|
@bobo.query('/profile/logout')
|
|
def user_logout(bobo_request):
|
|
# default start page
|
|
response = show_frontpage(bobo_request)
|
|
# clear the cookie
|
|
response.delete_cookie("user_hash_key")
|
|
return response
|
|
|
|
@bobo.query('/profile/resend')
|
|
def resend_user_key(bobo_request, email=None, submit=None, email_sent=None):
|
|
value_dict = get_default_values(bobo_request)
|
|
if not value_dict["enable_users"]:
|
|
return bobo.redirect(BASE_DICT["base_url"])
|
|
value_dict["email_sent"] = False
|
|
value_dict["user_new"] = False
|
|
data = {"email": email}
|
|
if not submit:
|
|
return render("user_key.html", input_data=data, **value_dict)
|
|
else:
|
|
# find the user's data (if it exists)
|
|
try:
|
|
data = forms.ProfileForm.to_python(data)
|
|
except formencode.Invalid, errors_packed:
|
|
errors = errors_packed.unpack_errors()
|
|
users = Profile.selectBy(email=email)
|
|
if users.count():
|
|
user = users[0]
|
|
value_dict["user"] = user
|
|
value_dict["email_sent"] = True
|
|
value_dict["email_ok"] = send_profile_mail(user)
|
|
return render("user_key.html", **value_dict)
|
|
else:
|
|
errors = {}
|
|
errors["email"] = u"Diese E-Mail-Adresse ist nicht registriert. Möchtest du ein neues Profil anlegen?"
|
|
value_dict["user_new"] = True
|
|
value_dict["errors"] = errors
|
|
return render("user_key.html", input_data={"email": email},
|
|
**value_dict)
|
|
|
|
def manage_link_in_profile(user, link, add, value_dict):
|
|
poll = None
|
|
for is_admin in (False, True):
|
|
poll_id = extract_poll_id(link, admin=is_admin)
|
|
if poll_id:
|
|
poll = Poll.get(poll_id)
|
|
break
|
|
if poll:
|
|
connected = ProfilePolls.selectBy(poll=poll, user=user,
|
|
is_admin=is_admin)
|
|
if is_admin:
|
|
value_dict["profile_manage_poll_hash"] = poll.admin_hash_key
|
|
else:
|
|
value_dict["profile_manage_poll_hash"] = poll.hash_key
|
|
if add and not connected.count():
|
|
ProfilePolls(poll=poll, user=user, is_admin=is_admin)
|
|
value_dict["poll_remember"] = poll
|
|
return True
|
|
elif not add and connected.count():
|
|
connected[0].destroySelf()
|
|
value_dict["poll_forget"] = poll
|
|
return True
|
|
else:
|
|
return False
|
|
return False
|
|
|
|
@bobo.query('/profile/new')
|
|
def user_create(bobo_request, email=None, submit=None, add_link=None):
|
|
value_dict = get_default_values(bobo_request)
|
|
if not value_dict["enable_users"]:
|
|
return bobo.redirect(BASE_DICT["base_url"])
|
|
value_dict["user_new"] = True
|
|
data = {"email": email, "add_link": add_link}
|
|
if not submit:
|
|
return render("user_key.html", input_data=data, **value_dict)
|
|
else:
|
|
# create a new user (if email is valid)
|
|
try:
|
|
data = forms.ProfileForm.to_python(data)
|
|
except formencode.Invalid, errors_packed:
|
|
errors = errors_packed.unpack_errors()
|
|
users = Profile.selectBy(email=email)
|
|
if users.count():
|
|
# the mail address is already registered
|
|
errors = {}
|
|
errors["email"] = u"Diese E-Mail-Adresse ist bereits registriert. Sollen die Zugangsdaten erneut an die Adresse versandt werden?"
|
|
value_dict["errors"] = errors
|
|
value_dict["user_new"] = False
|
|
return render("user_key.html", input_data=data, **value_dict)
|
|
else:
|
|
# create a new user
|
|
hash_key = get_new_hash_key()
|
|
new_user = Profile(hash_key=hash_key, email=email)
|
|
value_dict["user"] = new_user
|
|
value_dict["email_sent"] = True
|
|
value_dict["email_ok"] = send_profile_mail(new_user)
|
|
return render("user_details.html", **value_dict)
|
|
|
|
@bobo.query('/profile')
|
|
@bobo.query('/profile/:hash_key')
|
|
@bobo.query('/:add_link/remember')
|
|
@bobo.query('/:del_link/forget')
|
|
def show_user(bobo_request, hash_key=None, add_link=None, del_link=None):
|
|
value_dict = get_default_values(bobo_request)
|
|
if not value_dict["enable_users"]:
|
|
return bobo.redirect(BASE_DICT["base_url"])
|
|
if hash_key:
|
|
# overwrite the cookie-based value, if a profile is explicitely given
|
|
user_hash_key = hash_key
|
|
elif "user_hash_key" in value_dict:
|
|
user_hash_key = value_dict["user_hash_key"]
|
|
else:
|
|
user_hash_key = None
|
|
if user_hash_key:
|
|
# sqlobject fails to handle unicode values -> convert to string
|
|
user_hash_key = str(user_hash_key)
|
|
users = Profile.selectBy(hash_key=user_hash_key)
|
|
if not user_hash_key or not users.count():
|
|
# TODO: store add/del link -> render instead of redirect
|
|
return bobo.redirect(BASE_DICT["base_url"] + "profile/new")
|
|
user = users[0]
|
|
value_dict["user"] = user
|
|
value_dict["user_hash_key"] = user_hash_key
|
|
if add_link:
|
|
manage_link_in_profile(user, add_link, True, value_dict)
|
|
elif del_link:
|
|
manage_link_in_profile(user, del_link, False, value_dict)
|
|
else:
|
|
pass
|
|
return render("user_details.html", **value_dict)
|
|
|
|
@bobo.query('/new')
|
|
@bobo.query('/new/:template')
|
|
@bobo.query('/new/:author/:title/:description')
|
|
def new_poll(bobo_request, submit=None, cancel=None, author=None, title=None,
|
|
description=None, template=None, hide_errors=False):
|
|
value_dict = get_default_values(bobo_request)
|
|
data = {"author": author, "title": title, "description": description,
|
|
"template": template}
|
|
if cancel:
|
|
return bobo.redirect(BASE_DICT["base_url"])
|
|
elif not submit:
|
|
# show the "new poll" form
|
|
return render("poll_new.html", input_data=data, **value_dict)
|
|
else:
|
|
# create the new poll (if it is valid)
|
|
errors = {}
|
|
try:
|
|
data = forms.PollForm.to_python(data)
|
|
except formencode.Invalid, errors_packed:
|
|
errors = errors_packed.unpack_errors()
|
|
if errors:
|
|
if not hide_errors:
|
|
value_dict["errors"] = errors
|
|
return render("poll_new.html", input_data=data, **value_dict)
|
|
else:
|
|
# get the template settings
|
|
poll_settings = POLL_SETTINGS.copy()
|
|
if template in POLL_SETTING_TEMPLATES.keys():
|
|
template_settings = POLL_SETTING_TEMPLATES[template].copy()
|
|
else:
|
|
template_settings = POLL_SETTING_TEMPLATES["brainstorm"]
|
|
for key, value in template_settings.items():
|
|
poll_settings[key] = value
|
|
# create the new poll
|
|
hash_key = get_new_hash_key()
|
|
admin_hash_key = get_new_hash_key()
|
|
now = datetime.datetime.now()
|
|
new_poll = Poll(hash_key=hash_key, admin_hash_key=admin_hash_key,
|
|
timestamp_creation=now, author=data["author"],
|
|
title=data["title"], description=data["description"])
|
|
# apply the template settings
|
|
for key, value in template_settings.items():
|
|
new_poll.change_setting(key, value)
|
|
return bobo.redirect(new_poll.get_admin_url())
|
|
|
|
@bobo.query('/:hash_key/submit')
|
|
def submit_content(bobo_request, hash_key=None, submitter=None, content=None):
|
|
value_dict = get_default_values(bobo_request)
|
|
value_dict["errors"] = {}
|
|
data = {}
|
|
if content and check_spam_content(content):
|
|
value_dict["errors"]["content"] = \
|
|
"Spam-Verdacht: Inhalt darf keine HTML-Tags enthalten"
|
|
else:
|
|
data["content"] = content
|
|
if submitter and check_spam_submitter_name(submitter):
|
|
value_dict["errors"]["submitter"] = \
|
|
"Spam-Verdacht: bitte den Namen korrigieren"
|
|
else:
|
|
data["submitter"] = submitter
|
|
poll_id = get_poll_id(hash_key)
|
|
if not poll_id is None:
|
|
poll = Poll.get(poll_id)
|
|
value_dict["poll"] = poll
|
|
try:
|
|
data = forms.SubmitForm.to_python(data)
|
|
except formencode.Invalid, errors_packed:
|
|
# merge errors with previous ones - but never overwrite existing ones
|
|
errors = errors_packed.unpack_errors()
|
|
errors.update(value_dict["errors"])
|
|
value_dict["errors"] = errors
|
|
if value_dict["errors"] or poll.is_closed():
|
|
return render("poll_details.html", input_data=data, **value_dict)
|
|
else:
|
|
# create the new submission content
|
|
data["timestamp_creation"] = datetime.datetime.now()
|
|
data["poll_id"] = poll.id
|
|
ContentSubmission(**data)
|
|
# remove "content" for the next input
|
|
del data["content"]
|
|
return render("poll_details.html", input_data=data, **value_dict)
|
|
return bobo.redirect(BASE_DICT["base_url"])
|
|
|
|
@bobo.query('/:admin_hash_key/delete')
|
|
def delete_poll(bobo_request, admin_hash_key=None):
|
|
admin_poll_id = get_poll_admin_id(admin_hash_key)
|
|
if not admin_poll_id is None:
|
|
poll = Poll.get(admin_poll_id)
|
|
poll.delete_poll()
|
|
return bobo.redirect(BASE_DICT["base_url"])
|
|
|
|
@bobo.query('/:admin_hash_key/vote_enable')
|
|
def vote_enable_poll(bobo_request, admin_hash_key=None, vote_select=None):
|
|
# vote_select: on/off
|
|
admin_poll_id = get_poll_admin_id(admin_hash_key)
|
|
if not admin_poll_id is None:
|
|
poll = Poll.get(admin_poll_id)
|
|
poll.change_setting("vote_enabled", vote_select == "on")
|
|
return bobo.redirect(poll.get_admin_url())
|
|
else:
|
|
return bobo.redirect(BASE_DICT["base_url"])
|
|
|
|
@bobo.query('/:admin_hash_key/delete/:submission_id_digest')
|
|
def delete_submission(bobo_request, admin_hash_key=None, submission_id_digest=None):
|
|
admin_poll_id = get_poll_admin_id(admin_hash_key)
|
|
if (not admin_poll_id is None) and (not submission_id_digest is None):
|
|
poll = Poll.get(admin_poll_id)
|
|
# This loop is slightly expensive, but it does not expose the overall
|
|
# count of submissions (via the id).
|
|
for submission in ContentSubmission.selectBy(poll_id=poll.id):
|
|
if submission.get_obfuscated_digest() == submission_id_digest:
|
|
submission.destroySelf()
|
|
break
|
|
return bobo.redirect(poll.get_admin_url())
|
|
|
|
@bobo.query('/:admin_hash_key/export')
|
|
def export_poll(bobo_request, admin_hash_key=None):
|
|
""" Return a download file in csv format (date, author, text).
|
|
"""
|
|
value_dict = get_default_values(bobo_request)
|
|
poll_id = get_poll_admin_id(admin_hash_key)
|
|
if poll_id is None:
|
|
return bobo.redirect(BASE_DICT["base_url"])
|
|
poll = Poll.get(poll_id)
|
|
response = webob.Response(content_type="text/csv", charset=EXPORT_CHARSET)
|
|
# common (excel-based) CSV format with semicolons instead of commas
|
|
writer = csv.writer(response.body_file)
|
|
submissions = list(poll.get_submissions())
|
|
for submission in submissions:
|
|
timestamp = submission.timestamp_creation.strftime(EXPORT_DATE_FORMAT)
|
|
author = submission.submitter.encode(EXPORT_ENCODING, "ignore")
|
|
content = get_markup_with_formatted_linebreaks(submission.content, os.linesep)
|
|
items = [timestamp, author, content]
|
|
for index in range(len(items)):
|
|
try:
|
|
items[index] = items[index].encode(EXPORT_ENCODING, "ignore")
|
|
except UnicodeDecodeError:
|
|
# any other conversion error
|
|
pass
|
|
writer.writerow(items)
|
|
try:
|
|
export_prefix = config.get("misc", "export_prefix")
|
|
except ConfigParser.Error:
|
|
export_prefix = "wortschlucker"
|
|
filename = EXPORT_FILENAME_TEMPLATE % {"title": poll.title, "prefix": export_prefix}
|
|
filename = filename.replace(" ", "_")
|
|
filename = re.sub(r"[^a-zA-Z0-9_\-\.]", "", filename)
|
|
response.content_disposition = 'attachment; filename=%s' % filename
|
|
return response
|
|
|
|
@bobo.query('/:admin_hash_key/admin')
|
|
def admin_poll(bobo_request, cancel=False, submit=None, admin_hash_key=None, author=None,
|
|
title=None, description=None, settings=None,
|
|
setting_expose_date=None, setting_expose_date_forever=None,
|
|
setting_close_date=None, setting_close_date_forever=None):
|
|
value_dict = get_default_values(bobo_request)
|
|
data = {"author": author, "title": title, "description": description, "template": ""}
|
|
poll_id = get_poll_admin_id(admin_hash_key)
|
|
if poll_id is None:
|
|
return bobo.redirect(BASE_DICT["base_url"])
|
|
poll = Poll.get(poll_id)
|
|
# cancel: return to (non-edit) admin page
|
|
if cancel:
|
|
return bobo.redirect(poll.get_admin_url())
|
|
if author is None:
|
|
data["author"] = poll.author
|
|
if title is None:
|
|
data["title"] = poll.title
|
|
if description is None:
|
|
data["description"] = poll.description
|
|
if setting_expose_date_forever is None:
|
|
if setting_expose_date is None:
|
|
setting_expose_date = poll.get_settings_strings()["expose_date"]
|
|
else:
|
|
setting_expose_date = ""
|
|
if setting_close_date_forever is None:
|
|
if setting_close_date is None:
|
|
setting_close_date = poll.get_settings_strings()["close_date"]
|
|
else:
|
|
setting_close_date = ""
|
|
poll_settings = poll.get_settings()
|
|
# update the settings only after a submit (otherwise we clear all current settings)
|
|
if submit:
|
|
# override with the given settings (taken from the form input with the prefix "setting_")
|
|
if settings is None:
|
|
settings = []
|
|
elif not isinstance(settings, list):
|
|
settings = [settings]
|
|
else:
|
|
# it is a list - everything is fine
|
|
pass
|
|
for setting_key in poll_settings.keys():
|
|
poll_settings[setting_key] = setting_key in settings
|
|
# collect all errors
|
|
errors = {}
|
|
# add boolean "settings" after forms validation - since there is no destination type
|
|
data["settings"] = [key for key, value in poll_settings.items() if value is True]
|
|
for key, value in (("expose_date", setting_expose_date), ("close_date", setting_close_date)):
|
|
validated_value = validate_poll_setting(key, value)
|
|
if value == "":
|
|
data["setting_%s_forever" % key] = "yes"
|
|
data["setting_%s" % key] = value
|
|
else:
|
|
data["setting_%s_forever" % key] = "no"
|
|
if validated_value is None:
|
|
# keep the entered value and report an error
|
|
errors[key] = u"Ungültiges Datum"
|
|
data["setting_%s" % key] = value
|
|
else:
|
|
data["setting_%s" % key] = get_poll_setting_string(key, validated_value)
|
|
# use the validator to check for possible errors
|
|
if submit:
|
|
# check for errors only if the content is submitted (not just rendered)
|
|
try:
|
|
data = forms.PollSettingsForm.to_python(data)
|
|
except formencode.Invalid, errors_packed:
|
|
errors = errors_packed.unpack_errors()
|
|
# store the new settings
|
|
if errors:
|
|
value_dict["errors"] = errors
|
|
return render("poll_admin_edit.html", input_data=data, **value_dict)
|
|
else:
|
|
if submit:
|
|
# update core attributes of the existing poll
|
|
poll.author = data["author"]
|
|
poll.title = data["title"]
|
|
poll.description = data["description"]
|
|
current_settings = poll.get_settings()
|
|
# update settings
|
|
for key, value in poll_settings.items():
|
|
if (POLL_SETTINGS[key][0] == bool) and (current_settings[key] != value):
|
|
poll.change_setting(key, value)
|
|
poll.change_setting("expose_date", data["setting_expose_date"])
|
|
poll.change_setting("close_date", data["setting_close_date"])
|
|
return bobo.redirect(poll.get_admin_url())
|
|
else:
|
|
return render("poll_admin_edit.html", input_data=data, **value_dict)
|
|
|
|
def update_blog_entry_values(bobo_request, blog_id, value_dict):
|
|
blog_info = get_blog_info(bobo_request, blog_id)
|
|
if blog_info is None:
|
|
return False
|
|
else:
|
|
value_dict["title"] = blog_info[0]
|
|
value_dict["date"] = blog_info[1]
|
|
value_dict["link"] = blog_info[2]
|
|
value_dict["body"] = blog_info[3]
|
|
return True
|
|
|
|
def get_blog_info(bobo_request, blog_id):
|
|
blog_file = os.path.join(BLOG_DIR, blog_id)
|
|
try:
|
|
input = open(blog_file)
|
|
title = input.readline()
|
|
body = input.read()
|
|
input.close()
|
|
except IOError:
|
|
return None
|
|
date = "%s.%s.%s %s:%s" % (blog_id[6:8], blog_id[4:6], blog_id[0:4],
|
|
blog_id[8:10], blog_id[10:12])
|
|
link = "%sblog/%s" % (get_default_values(bobo_request)["base_url"], blog_id)
|
|
body = tools.creole2html(body.decode("utf-8"))
|
|
return title, date, link, body
|
|
|
|
def get_blog_ids():
|
|
def add_files_to_list(file_list, dirname, fnames):
|
|
for fname in fnames:
|
|
if re.match(r"^[0-9]{12}$", fname) \
|
|
and os.path.isfile(os.path.join(dirname, fname)):
|
|
file_list.append(fname)
|
|
# remove all entries (e.g. sub-directories)
|
|
while len(fnames) > 0:
|
|
del fnames[0]
|
|
file_list = []
|
|
os.path.walk(BLOG_DIR, add_files_to_list, file_list)
|
|
file_list.sort(reverse = True)
|
|
return file_list
|
|
|
|
@bobo.query('/blog')
|
|
@bobo.query('/blog/')
|
|
@bobo.query('/blog/:blog_id')
|
|
@bobo.query('/blog/:blog_id/')
|
|
def serve_blog(bobo_request, blog_id=None):
|
|
value_dict = get_default_values(bobo_request)
|
|
if blog_id and re.match(r"^[0-9]{12}$", blog_id):
|
|
# the blog_id should consist of 12 digits
|
|
if update_blog_entry_values(bobo_request, blog_id, value_dict):
|
|
return render("blog_entry.html", **value_dict)
|
|
# if anything fails: render the blog list
|
|
blog_list = []
|
|
# add all valid blog infos to the list
|
|
for blog_id in get_blog_ids():
|
|
blog_info = get_blog_info(bobo_request, blog_id)
|
|
if not blog_info is None:
|
|
blog_list.append(blog_info)
|
|
value_dict["blog_list"] = blog_list
|
|
return render("blog_list.html", **value_dict)
|
|
|
|
def show_poll_list(bobo_request, render_file, page_size, page=None, filter_private=True):
|
|
value_dict = get_default_values(bobo_request)
|
|
polls = Poll.select().orderBy("-timestamp_creation")
|
|
# TODO: speed the filtering up by using SQL statements (see sqlobject "filter")
|
|
if filter_private:
|
|
polls = [poll for poll in polls if poll.get_settings()["public"]]
|
|
else:
|
|
# convert the sql query into a list (probably an expensive operation)
|
|
polls = [poll for poll in polls]
|
|
if page is None:
|
|
page = 1
|
|
else:
|
|
try:
|
|
page = int(page)
|
|
except ValueError:
|
|
page = 1
|
|
# "page" should at least be 1 - zero shows an empty list
|
|
page = max(1, page)
|
|
start = (page - 1) * page_size
|
|
if start >= len(polls):
|
|
start = len(polls) - page_size
|
|
page = (len(polls) + page_size - 1) / page_size
|
|
end = start + page_size - 1
|
|
value_dict["polls"] = polls[start : end + 1]
|
|
# show a link for the next page, if more polls are available
|
|
value_dict["show_next_link"] = (end + 1 < len(polls))
|
|
value_dict["show_previous_link"] = (start > 0)
|
|
value_dict["page"] = page
|
|
return render(render_file, **value_dict)
|
|
|
|
def render_poll_admin(bobo_request, poll, add_related, del_related, count, page, show_delete):
|
|
value_dict = get_default_values(bobo_request)
|
|
errors = {}
|
|
if not add_related is None:
|
|
other_poll_id = extract_poll_id(add_related, admin=True)
|
|
if other_poll_id == poll.id:
|
|
errors["related"] = u"Wortschlucker kann nicht mit sich selbst verknüpft werden"
|
|
elif other_poll_id is None:
|
|
errors["related"] = u"Wortschlucker nicht gefunden"
|
|
else:
|
|
related_polls = poll.get_related_polls()
|
|
if other_poll_id in [one_poll.id for one_poll in related_polls]:
|
|
errors["related"] = u"Dieser Wortschlucker wurde bereits verknüpft"
|
|
else:
|
|
PollRelation(first=poll.id, second=other_poll_id)
|
|
if not del_related is None:
|
|
other_poll_id = extract_poll_id(del_related, admin=True)
|
|
if other_poll_id is None:
|
|
errors["related"] = u"Wortschlucker nicht gefunden"
|
|
else:
|
|
related_polls = poll.get_related_polls()
|
|
if not other_poll_id in [one_poll.id for one_poll in related_polls]:
|
|
errors["related"] = u"Dieser Wortschlucker war nicht verknüpft"
|
|
else:
|
|
# delete all relations between these two polls
|
|
PollRelation.deleteBy(first=poll.id, second=other_poll_id)
|
|
PollRelation.deleteBy(first=other_poll_id, second=poll.id)
|
|
value_dict["poll"] = poll
|
|
value_dict["page"] = page
|
|
value_dict["count"] = count
|
|
value_dict["show_delete"] = show_delete
|
|
value_dict["errors"] = errors
|
|
return render("poll_admin_details.html", **value_dict)
|
|
|
|
@bobo.query('/admin')
|
|
@bobo.query('/admin/')
|
|
@bobo.query('/admin/page/:page')
|
|
def show_admin_page(bobo_request, page=None, page_size=20):
|
|
try:
|
|
page_size = int(page_size)
|
|
except ValueError:
|
|
page_size = 30
|
|
return show_poll_list(bobo_request, "admin.html", page_size, page, filter_private=False)
|
|
|
|
@bobo.query('/')
|
|
@bobo.query('/public')
|
|
@bobo.query('/public/')
|
|
@bobo.query('/public/page/:page')
|
|
def show_frontpage(bobo_request, page=None):
|
|
return show_poll_list(bobo_request, "frontpage.html", 20, page)
|
|
|
|
@bobo.query('')
|
|
def base():
|
|
return bobo.redirect(BASE_DICT["base_url"])
|
|
|
|
@bobo.query('/:poll_hash')
|
|
@bobo.query('/:poll_hash/')
|
|
@bobo.query('/admin/poll/:poll_hash_for_admin')
|
|
def show_one_poll(bobo_request, poll_hash_for_admin=None, poll_hash=None,
|
|
add_related=None, del_related=None, delete_multiple=None,
|
|
delete_submission_id_list=None, count=50, page=1, show_delete=False):
|
|
if poll_hash_for_admin:
|
|
# shortcut for admins
|
|
admin_poll_id = get_poll_id(poll_hash_for_admin)
|
|
if admin_poll_id:
|
|
poll_hash = Poll.get(admin_poll_id).admin_hash_key
|
|
try:
|
|
count = int(count)
|
|
except ValueError:
|
|
count = 50
|
|
count = max(1, min(count, 1000))
|
|
try:
|
|
page = int(page)
|
|
except ValueError:
|
|
page = 1
|
|
page = max(1, page)
|
|
value_dict = get_default_values(bobo_request)
|
|
poll_id = get_poll_id(poll_hash)
|
|
if not poll_id is None:
|
|
poll = Poll.get(poll_id)
|
|
page = min(page, 1 + (poll.get_num_of_submissions() - 1) / count)
|
|
value_dict["poll"] = poll
|
|
value_dict["page"] = page
|
|
value_dict["count"] = count
|
|
return render("poll_details.html", **value_dict)
|
|
else:
|
|
admin_poll_id = get_poll_admin_id(poll_hash)
|
|
if not admin_poll_id is None:
|
|
if delete_multiple and delete_submission_id_list:
|
|
if not isinstance(delete_submission_id_list, list):
|
|
delete_submission_id_list = [delete_submission_id_list]
|
|
for delete_id in delete_submission_id_list:
|
|
ContentSubmission.get(int(delete_id)).destroySelf()
|
|
return render_poll_admin(bobo_request, Poll.get(admin_poll_id), add_related, del_related, count, page, bool(show_delete))
|
|
else:
|
|
return bobo.redirect(BASE_DICT["base_url"])
|
|
|
|
@bobo.query('/node/:pagename')
|
|
def show_static_nodes(bobo_request, pagename=None):
|
|
""" meant for serving hand-changed, automatically styled content. """
|
|
value_dict = get_default_values(bobo_request)
|
|
return render("nodes/"+pagename, **value_dict)
|
|
|
|
def get_static_file(filename):
|
|
""" deliver a static file - this function is used internally """
|
|
response = webob.Response()
|
|
content_type = mimetypes.guess_type(filename)[0]
|
|
if content_type is not None:
|
|
response.content_type = content_type
|
|
try:
|
|
response.body = open(filename).read()
|
|
except IOError:
|
|
raise bobo.NotFound
|
|
return response
|
|
|
|
@bobo.query('/media/:p1')
|
|
@bobo.query('/media/:p1/:p2')
|
|
@bobo.query('/media/:p1/:p2/:p3')
|
|
def static_files(p1=None, p2=None, p3=None):
|
|
""" deliver files up to three levels below /media/ """
|
|
pathlist = [p1, p2, p3]
|
|
pathname = os.path.join(BASE_DIR, "templates", "media")
|
|
for subdir in pathlist:
|
|
if not subdir is None:
|
|
pathname = os.path.join(pathname, subdir)
|
|
return get_static_file(pathname)
|
|
|
|
|
|
for table in (Poll, ContentSubmission, PollSetting, PollRelation, Profile, ProfilePolls):
|
|
#table.dropTable()
|
|
if not table.tableExists():
|
|
table.createTable()
|
|
|
|
# how to add new columns to the database:
|
|
# * uncomment the line below and change it according to your needs
|
|
# * run this script once (manually)
|
|
# * disable the line below
|
|
# * add the column specification to the object definition
|
|
# -> done!
|
|
#Poll.sqlmeta.addColumn(sqlobject.BoolCol("vote_open", default=False), changeSchema=True)
|
|
|
|
|
|
# this line allows to use wortschlucker with mod_wsgi
|
|
# see: http://groups.google.com/group/bobo-web/msg/2ba55fc381658cd1
|
|
# see: http://blog.dscpl.com.au/2009/08/using-bobo-on-top-of-modwsgi.html
|
|
application = bobo.Application(bobo_resources=__name__)
|
|
|