lars
fa37629d2c
use efficient sql SELECT for frontpage poll list added link counter for spam detection improve efficiency of post deletion by using sqlobject's "cascade" attribute
1324 lines
51 KiB
Python
Executable file
1324 lines
51 KiB
Python
Executable file
#!/usr/bin/env python2.6
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import os
|
|
# the basedir is the parent dir of the location of this script
|
|
BASE_DIR = os.path.dirname(os.path.abspath(os.path.join(__file__, os.path.pardir)))
|
|
# add the project directory to the python search path
|
|
import sys
|
|
sys.path.insert(0, os.path.join(BASE_DIR, "src"))
|
|
|
|
import tools
|
|
import bobo
|
|
import forms
|
|
import sqlobject
|
|
import genshi.filters
|
|
import genshi.input
|
|
import genshi.template
|
|
import genshi
|
|
import formencode
|
|
import webob
|
|
import csv
|
|
import ConfigParser
|
|
import datetime
|
|
import mimetypes
|
|
import uuid
|
|
import re
|
|
import smtplib
|
|
import email.mime.text
|
|
import email.utils
|
|
import hashlib
|
|
import twitter
|
|
import urllib2
|
|
|
|
CONFIG_FILE = os.path.join(BASE_DIR, "wortschlucker.conf")
|
|
|
|
|
|
""" *** Initialization *** """
|
|
config = ConfigParser.SafeConfigParser()
|
|
config.read(CONFIG_FILE)
|
|
db_uri = config.get("database", "uri")
|
|
sqlobject.sqlhub.processConnection = sqlobject.connectionForURI(db_uri)
|
|
loader = genshi.template.TemplateLoader(os.path.join(BASE_DIR, 'templates'), auto_reload=False)
|
|
|
|
BLOG_DIR = os.path.join(BASE_DIR, "blog")
|
|
|
|
BASE_DICT = {
|
|
"base_url": "/", # the trailing slash is necessary
|
|
"show_navbar": True,
|
|
"errors": {},
|
|
"authlevel": "public", #authentiction level of template: one of "public" (e.g. frontpage), "poll_public" (e.g. poll_details),"poll_admin" (poll_admin_details","admin" (admin interface)
|
|
}
|
|
|
|
|
|
# used as the default setting for expose/close dates
|
|
DEFAULT_DAYS_AHEAD = 7
|
|
DATE_FORMAT = "%d.%m.%Y"
|
|
EXPORT_DATE_FORMAT = "%d.%m.%Y %H:%M:%S"
|
|
EXPORT_ENCODING = "utf-8"
|
|
EXPORT_CHARSET = "utf8"
|
|
EXPORT_FILENAME_TEMPLATE = "%%(prefix)s_%%(title)s_%s.csv" % datetime.datetime.now().strftime("%Y-%m-%d")
|
|
DEFAULT_DATE = datetime.datetime.now() + datetime.timedelta(days=DEFAULT_DAYS_AHEAD)
|
|
|
|
POLL_SETTINGS = {
|
|
"show_all_submissions": (bool, True),
|
|
"show_statistics": (bool, True),
|
|
"public": (bool, False),
|
|
"expose_date": (datetime.datetime, DEFAULT_DATE),
|
|
"close_date": (datetime.datetime, DEFAULT_DATE),
|
|
"vote_enabled": (bool, False),
|
|
"vote_closed": (bool, False),
|
|
}
|
|
|
|
POLL_SETTING_TEMPLATES = {
|
|
"brainstorm": {},
|
|
"cards": {"show_all_submissions": False},
|
|
"feedback": {"show_all_submissions": False},
|
|
"evaluation": {"show_all_submissions": False},
|
|
"notes": {"show_statistics": False},
|
|
"shopping": {"show_statistics": False},
|
|
"clipboard": {"show_statistics": False},
|
|
"namefinder": {},
|
|
}
|
|
|
|
|
|
class ContentSubmission(sqlobject.SQLObject):
|
|
submitter = sqlobject.UnicodeCol()
|
|
content = sqlobject.UnicodeCol()
|
|
poll_id = sqlobject.ForeignKey("Poll", cascade=True)
|
|
timestamp_creation = sqlobject.DateTimeCol()
|
|
|
|
def get_creation_time_string(self):
|
|
return str(self.timestamp_creation)
|
|
|
|
def get_markup_content(self):
|
|
mark_links = get_markup_with_links(self.content)
|
|
markup = genshi.input.HTML(mark_links) | genshi.filters.HTMLSanitizer()
|
|
# the markup is now marked as "safe" -> genshi will output it literally
|
|
return markup
|
|
|
|
def get_delete_url(self, absolute=False):
|
|
return self.poll_id.get_admin_url(absolute=absolute,
|
|
suffix="/delete/%s" % self.get_obfuscated_digest())
|
|
|
|
def get_obfuscated_digest(self):
|
|
return hashlib.md5(str(self.id)).hexdigest()
|
|
|
|
|
|
class VoteOrder(sqlobject.SQLObject):
|
|
submitter = sqlobject.UnicodeCol()
|
|
content_submission_id = sqlobject.ForeignKey("ContentSubmission")
|
|
priority = sqlobject.IntCol()
|
|
|
|
|
|
class Profile(sqlobject.SQLObject):
|
|
email = sqlobject.UnicodeCol(unique=True)
|
|
hash_key = sqlobject.StringCol(unique=True)
|
|
|
|
def get_user_polls(self, *args, **kwargs):
|
|
return self._get_generic_polls(False, *args, **kwargs)
|
|
|
|
def get_admin_polls(self, *args, **kwargs):
|
|
return self._get_generic_polls(True, *args, **kwargs)
|
|
|
|
def _get_generic_polls(self, is_admin, old_to_new=True):
|
|
polls = [user_poll.poll for user_poll in ProfilePolls.selectBy(
|
|
user=self.id, is_admin=is_admin)]
|
|
polls.sort(key=lambda poll: poll.timestamp_creation,
|
|
reverse=not old_to_new)
|
|
return polls
|
|
|
|
def get_url(self, absolute=False):
|
|
return get_url_string("%s%s/%s" % (BASE_DICT["base_url"], "profile", self.hash_key), absolute)
|
|
|
|
|
|
class ProfilePolls(sqlobject.SQLObject):
|
|
user = sqlobject.ForeignKey("Profile", cascade=True)
|
|
poll = sqlobject.ForeignKey("Poll", cascade=True)
|
|
is_admin = sqlobject.BoolCol()
|
|
|
|
|
|
class PollSetting(sqlobject.SQLObject):
|
|
poll_id = sqlobject.ForeignKey("Poll", cascade=True)
|
|
key = sqlobject.UnicodeCol()
|
|
value = sqlobject.UnicodeCol()
|
|
|
|
|
|
class PollRelation(sqlobject.SQLObject):
|
|
first = sqlobject.ForeignKey("Poll", cascade=True)
|
|
second = sqlobject.ForeignKey("Poll", cascade=True)
|
|
|
|
|
|
class Poll(sqlobject.SQLObject):
|
|
author = sqlobject.UnicodeCol()
|
|
hash_key = sqlobject.StringCol()
|
|
admin_hash_key = sqlobject.StringCol()
|
|
title = sqlobject.UnicodeCol()
|
|
description = sqlobject.UnicodeCol()
|
|
timestamp_creation = sqlobject.DateTimeCol()
|
|
|
|
def get_related_polls(self):
|
|
""" get all directly and indirectly connected polls up to a certain
|
|
distance
|
|
"""
|
|
return PollMesh(self).get_related_polls()
|
|
|
|
def get_related_polls_direct(self):
|
|
""" get all directly connected polls """
|
|
related = []
|
|
related.extend([poll.second for poll in PollRelation.selectBy(first=self.id)])
|
|
related.extend([poll.first for poll in PollRelation.selectBy(second=self.id)])
|
|
return related
|
|
|
|
def get_description_markup(self):
|
|
mark_links = get_markup_with_links(self.description)
|
|
markup = genshi.input.HTML(mark_links) | genshi.filters.HTMLSanitizer()
|
|
# the markup is now marked as "safe" -> genshi will output it literally
|
|
return markup
|
|
|
|
def get_settings(self):
|
|
current_dict = {}
|
|
for setting in PollSetting.selectBy(poll_id=self.id):
|
|
if setting.key in POLL_SETTINGS.keys():
|
|
current_dict[setting.key] = validate_poll_setting(setting.key, setting.value)
|
|
for key, meta_info in POLL_SETTINGS.items():
|
|
if not key in current_dict.keys():
|
|
current_dict[key] = meta_info[1]
|
|
return current_dict
|
|
|
|
def get_settings_strings(self):
|
|
settings = self.get_settings()
|
|
result = {}
|
|
for key, value in settings.items():
|
|
result[key] = get_poll_setting_string(key, value)
|
|
return result
|
|
|
|
def change_setting(self, key, value):
|
|
validated_value = validate_poll_setting(key, value)
|
|
if not validated_value is None:
|
|
validated_value = get_poll_setting_string(key, validated_value)
|
|
poll_setting = PollSetting.selectBy(poll_id=self.id, key=key)
|
|
if poll_setting.count() == 1:
|
|
poll_setting[0].value = validated_value
|
|
elif poll_setting.count() == 0:
|
|
PollSetting(poll_id=self.id, key=key, value=validated_value)
|
|
if (key == 'public') and (value > 0):
|
|
self.announce_via_twitter()
|
|
|
|
def get_ordered_submissions(self, submitter):
|
|
votes = []
|
|
for submission in ContentSubmission.selectBy(poll_id=self.id):
|
|
votes.extend(list(VoteOrder.selectBy(content_submission_id=submission,
|
|
submitter=submitter)))
|
|
votes.sort(key=lambda item: item.priority)
|
|
return [order.content_submission_id for order in votes]
|
|
|
|
def announce_via_twitter(self):
|
|
complete_url = self.get_url(absolute=True)
|
|
try:
|
|
title = "%s %s %s" % (config.get('misc', 'twitter_alert_prefix'),
|
|
self.title[:79], complete_url)
|
|
# the following line is quick and dirty fix for the unicode bug twitter exception.
|
|
# of course it would be better to preserve the umlauts somehow...
|
|
title = title.encode('ascii','ignore')
|
|
twitter_key = config.get('misc', 'twitter_consumer_key')
|
|
twitter_secret = config.get('misc', 'twitter_consumer_secret')
|
|
twitter_access_key = config.get('misc', 'twitter_access_token_key')
|
|
twitter_access_secret = config.get('misc', 'twitter_access_token_secret')
|
|
except ConfigParser.Error:
|
|
# a config setting seems to be missing (e.g. in a dev environment)
|
|
return
|
|
publish_twitter_alert(title, twitter_key, twitter_secret, twitter_access_key, twitter_access_secret)
|
|
|
|
def get_num_of_votes(self):
|
|
submitters = []
|
|
for submission in ContentSubmission.selectBy(poll_id=self.id):
|
|
for vote in VoteOrder.selectBy(content_submission_id=submission):
|
|
if not vote.submitter in submitters:
|
|
submitters.append(vote.submitter)
|
|
return len(submitters)
|
|
|
|
def get_num_of_submitters(self):
|
|
all_submitters = [submission.submitter for submission in ContentSubmission.selectBy(poll_id=self.id)]
|
|
unique_submitters = []
|
|
for submitter in all_submitters:
|
|
if not submitter in unique_submitters:
|
|
unique_submitters.append(submitter)
|
|
return len(unique_submitters)
|
|
|
|
def get_num_of_submissions(self):
|
|
return ContentSubmission.selectBy(poll_id=self.id).count()
|
|
|
|
def get_submissions(self):
|
|
return ContentSubmission.selectBy(poll_id=self.id).orderBy("timestamp_creation")
|
|
|
|
def get_url(self, absolute=False):
|
|
return get_url_string("%s%s" % (BASE_DICT["base_url"], self.hash_key), absolute)
|
|
|
|
def get_submit_url(self, absolute=False):
|
|
return get_url_string("%s%s/submit" % (BASE_DICT["base_url"], self.hash_key), absolute)
|
|
|
|
def get_vote_url(self, absolute=False):
|
|
return get_url_string("%s%s/vote" % (BASE_DICT["base_url"], self.hash_key), absolute)
|
|
|
|
def get_admin_url(self, absolute=False, suffix=""):
|
|
return get_url_string("%s%s%s" % (BASE_DICT["base_url"], self.admin_hash_key, suffix), absolute)
|
|
|
|
def get_edit_url(self, absolute=False):
|
|
return get_url_string("%s%s/admin" % (BASE_DICT["base_url"], self.admin_hash_key), absolute)
|
|
|
|
def get_delete_url(self, absolute=False):
|
|
return get_url_string("%s%s/delete" % (BASE_DICT["base_url"], self.admin_hash_key), absolute)
|
|
|
|
def get_vote_enable_url(self, absolute=False):
|
|
return get_url_string("%s%s/vote_enable" % (BASE_DICT["base_url"], self.admin_hash_key), absolute)
|
|
|
|
def get_creation_time_string(self):
|
|
return str(self.timestamp_creation)
|
|
|
|
def is_vote_enabled(self):
|
|
return self.get_settings()["vote_enabled"]
|
|
|
|
def is_vote_finished(self, submitter):
|
|
return len(self.get_ordered_submissions(submitter)) > 0
|
|
|
|
def get_submissions_visibility(self):
|
|
settings = self.get_settings()
|
|
return bool(settings["show_all_submissions"] or (settings["expose_date"] and \
|
|
(settings["expose_date"].date() <= datetime.datetime.now().date())))
|
|
|
|
def is_closed(self):
|
|
return bool(self.get_settings()["close_date"] and \
|
|
(self.get_settings()["close_date"].date() < datetime.datetime.now().date()))
|
|
|
|
|
|
class PollMesh:
|
|
""" generate a mesh of directly or indirectly related polls
|
|
|
|
Basically this is just a recursive search for unique related polls that are
|
|
connected to the base poll with less than (e.g.) five nodes in between.
|
|
"""
|
|
|
|
def __init__(self, poll, depth=5):
|
|
self.related = []
|
|
# start to collect the related polls immediately
|
|
self.__collect_related_polls(poll, depth)
|
|
|
|
def __collect_related_polls(self, current_poll, current_depth):
|
|
""" recursive scanning for unique related polls up to a certain distance
|
|
"""
|
|
related = current_poll.get_related_polls_direct()
|
|
new_queue = [poll for poll in related if not poll in self.related]
|
|
self.related.extend(new_queue)
|
|
if current_depth > 0:
|
|
for poll in new_queue:
|
|
self.__collect_related_polls(poll, current_depth - 1)
|
|
|
|
def get_related_polls(self):
|
|
return self.related
|
|
|
|
|
|
def get_markup_with_links(text):
|
|
def get_link_markup(match):
|
|
prefix, url, suffix = match.groups()
|
|
# only take the TLD part of the url
|
|
short_name = url.split("/")[2]
|
|
return """%s<a href="%s" rel="nofollow">%s</a>%s""" % (prefix, url, short_name, suffix)
|
|
def expand_protocol(match):
|
|
prefix, url, suffix = match.groups()
|
|
if url.lower().startswith("www") or suffix.startswith("/"):
|
|
# just prepend "http://"
|
|
return prefix + "http://" + url + suffix
|
|
else:
|
|
return prefix + url + suffix
|
|
# surround all urls with html markup
|
|
text = genshi.escape(text)
|
|
text = re.sub(r"(\A|\s|\()([a-zA-Z_\-\.]+\.[a-zA-Z]{2,4})(/|\)|\s|\Z)",
|
|
expand_protocol, text)
|
|
text = re.sub(r"(\A|\s|\()(https?://[\w/\?\.\#=;,_\-\~&]*)(\)|\s|\Z)",
|
|
get_link_markup, text)
|
|
return get_markup_with_formatted_linebreaks(text, "<br />")
|
|
|
|
def get_markup_with_formatted_linebreaks(text, break_char):
|
|
text = text.replace("\r\n", "\n")
|
|
text = text.replace("\r", "\n")
|
|
text = text.replace("\n", break_char)
|
|
return text
|
|
|
|
def get_poll_setting_string(key, value):
|
|
if not key in POLL_SETTINGS.keys():
|
|
return ""
|
|
setting_type = POLL_SETTINGS[key][0]
|
|
if setting_type in (basestring, unicode, str):
|
|
return value
|
|
elif setting_type == bool:
|
|
return str(value)
|
|
elif setting_type == datetime.datetime:
|
|
# unset dates are None
|
|
if value is None:
|
|
return ""
|
|
elif value == "":
|
|
# value for "forever"
|
|
return ""
|
|
else:
|
|
return value.strftime(DATE_FORMAT)
|
|
else:
|
|
return str(value)
|
|
|
|
def validate_poll_setting(key, value):
|
|
if not key in POLL_SETTINGS.keys():
|
|
return None
|
|
setting_type = POLL_SETTINGS[key][0]
|
|
if setting_type in (basestring, unicode, str):
|
|
return value
|
|
elif setting_type == bool:
|
|
if value is None:
|
|
return False
|
|
elif isinstance(value, bool):
|
|
return value
|
|
else:
|
|
text = value.lower()
|
|
if text in ("0", "false", "no", "off", "disabled", ""):
|
|
return False
|
|
elif text in ("1", "true", "yes", "on", "enabled"):
|
|
return True
|
|
else:
|
|
return None
|
|
elif setting_type == datetime.datetime:
|
|
if value is None:
|
|
# default: one week later
|
|
value = datetime.date.today() + datetime.timedelta(days=DEFAULT_DAYS_AHEAD)
|
|
elif type(value) == datetime.datetime:
|
|
pass
|
|
elif value == "":
|
|
# this is the value for "forever"
|
|
return ""
|
|
else:
|
|
try:
|
|
value = datetime.datetime.strptime(value, DATE_FORMAT)
|
|
except ValueError:
|
|
value = None
|
|
return value
|
|
elif setting_type == int:
|
|
if value is None:
|
|
value = 0
|
|
try:
|
|
value = int(value)
|
|
except ValueError:
|
|
value = None
|
|
return value
|
|
else:
|
|
# all other types (e.g. int, float, ...)
|
|
try:
|
|
return setting_type(value)
|
|
except ValueError:
|
|
return None
|
|
|
|
def send_profile_mail(user):
|
|
data = {}
|
|
for key, default in (("host", "localhost"), ("port", 25),
|
|
("use_ssl", False), ("from", None),
|
|
("subject", None), ("body", None)):
|
|
try:
|
|
data[key] = config.get("mail", key, raw=True)
|
|
except ConfigParser.Error, err_msg:
|
|
if default is None:
|
|
# fail!
|
|
open("/tmp/test.out", "w").write(str(err_msg))
|
|
return False
|
|
data[key] = default
|
|
data["port"] = int(data["port"])
|
|
if data["use_ssl"]:
|
|
provider = smtplib.SMTP_SSL
|
|
else:
|
|
provider = smtplib.SMTP
|
|
input_data = {"profile_url": user.get_url(absolute=True)}
|
|
content = data["body"] % input_data
|
|
# empty lines can't be parsed via ConfigParser
|
|
# re.sub in Python 2.5 does not understand "flags" -> compile first
|
|
dot_line_regex = re.compile("^\.$", flags=re.MULTILINE)
|
|
content = dot_line_regex.sub("", content)
|
|
message = email.mime.text.MIMEText(content)
|
|
message["Subject"] = data["subject"]
|
|
message["From"] = data["from"]
|
|
message["To"] = user.email
|
|
message["Date"] = email.utils.formatdate()
|
|
try:
|
|
server = provider(data["host"], data["port"])
|
|
server.sendmail(data["from"], [user.email], message.as_string())
|
|
except smtplib.SMTPException:
|
|
return False
|
|
return True
|
|
|
|
def get_default_values(request, **kwargs):
|
|
value_dict = dict(BASE_DICT)
|
|
for key, value in kwargs.items():
|
|
value_dict[key] = value
|
|
# add configuration settings
|
|
try:
|
|
enable_users = config.get("users", "enabled")
|
|
except ConfigParser.Error:
|
|
enable_users = False
|
|
value_dict["enable_users"] = enable_users
|
|
try:
|
|
enable_voting = config.get("voting", "enabled")
|
|
except ConfigParser.Error:
|
|
enable_voting = False
|
|
value_dict["enable_voting"] = enable_voting
|
|
# read the user's hash key from the cookie
|
|
if enable_users and ("user_hash_key" in request.cookies):
|
|
value_dict["user_hash_key"] = str(request.cookies["user_hash_key"])
|
|
return value_dict
|
|
|
|
def get_url_string(url, absolute=False):
|
|
""" return the URL as it is or turn it into an absolute URL
|
|
|
|
@value url: the given URL
|
|
@type url: str
|
|
@value absolute: should the URL be absolute or relative?
|
|
@type absolute: bool
|
|
@returns: the relative or absolute URL
|
|
@rtype: str
|
|
"""
|
|
if absolute:
|
|
return config.get('hosting', 'full_base_url') + url
|
|
else:
|
|
return url
|
|
|
|
def render(filename, input_data=None, **values):
|
|
stream = loader.load(filename).generate(**values)
|
|
if not input_data is None:
|
|
stream |= genshi.filters.HTMLFormFiller(data=input_data)
|
|
#return stream.render("html", doctype="html")
|
|
response = webob.Response(content_type="text/html", charset="utf8")
|
|
if "user_hash_key" in values:
|
|
response.set_cookie("user_hash_key", values["user_hash_key"],
|
|
max_age=365, path="/")
|
|
response.body = stream.render("html", doctype="html")
|
|
return response
|
|
|
|
def get_poll_id(hash_key):
|
|
if isinstance(hash_key, unicode):
|
|
try:
|
|
hash_key = str(hash_key)
|
|
except UnicodeEncodeError:
|
|
return None
|
|
polls = Poll.selectBy(hash_key=hash_key)
|
|
if polls.count() == 1:
|
|
return polls[0].id
|
|
else:
|
|
return None
|
|
|
|
def get_poll_admin_id(hash_key):
|
|
if isinstance(hash_key, unicode):
|
|
try:
|
|
hash_key = str(hash_key)
|
|
except UnicodeEncodeError:
|
|
return None
|
|
polls = Poll.selectBy(admin_hash_key=hash_key)
|
|
if polls.count() == 1:
|
|
return polls[0].id
|
|
else:
|
|
return None
|
|
|
|
def extract_poll_id(text, admin=False):
|
|
""" The text may be an admin hash or the admin link of a poll """
|
|
if admin:
|
|
get_func = get_poll_admin_id
|
|
else:
|
|
get_func = get_poll_id
|
|
result = get_func(text)
|
|
if result is None:
|
|
extracted_text = re.findall(r"[a-z0-9]+", text)
|
|
# we assume that the hash is at the end of the string
|
|
extracted_text.reverse()
|
|
for found in extracted_text:
|
|
guess = get_func(found)
|
|
if not guess is None:
|
|
return guess
|
|
return None
|
|
else:
|
|
return result
|
|
|
|
def get_new_hash_key(length=16, charset=None):
|
|
""" returns a quite random hash key with the specified length """
|
|
if charset is None:
|
|
charset = "0123456789abcdefghijklmnopqrstuvwxyz"
|
|
def get_hash_string(length):
|
|
base = uuid.uuid4().int
|
|
result = []
|
|
while len(result) < length:
|
|
value = base % len(charset)
|
|
base //= len(charset)
|
|
result.append(charset[value])
|
|
return "".join(result)
|
|
# repeat the hash generation until a new value is found
|
|
hash_key = get_hash_string(length)
|
|
while (not get_poll_id(hash_key) is None) or (not get_poll_admin_id(hash_key) is None):
|
|
hash_key = get_hash_string(length)
|
|
return hash_key
|
|
|
|
def publish_twitter_alert(text, key, secret,access_key,access_secret):
|
|
api = twitter.Api(consumer_key= key, consumer_secret = secret, access_token_key=access_key, access_token_secret= access_secret)
|
|
try:
|
|
api.PostUpdate(text)
|
|
except urllib2.HTTPError,e:
|
|
# twitter error, most likely because of a duplicate message
|
|
# or maybe an authentication failure
|
|
print e.code
|
|
except urllib2.URLError, e:
|
|
print e.reason
|
|
|
|
def is_spam_submitter_name(name, errors_dict):
|
|
lower_text = re.sub("[^a-z]", "", name)
|
|
upper_text = re.sub("[^A-Z]", "", name)
|
|
if (len(lower_text) + len(upper_text) == len(name)) and \
|
|
(len(lower_text) > 3) and (len(upper_text) > 3) and \
|
|
(len(name) >= 8) and (not name.startswith(upper_text)):
|
|
errors_dict["submitter"] = "Spam-Verdacht: bitte den Namen korrigieren"
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def count_urls(text, errors_dict):
|
|
hits = re.findall(r"(\swww\.|http://|https://)", text)
|
|
return len(hits)
|
|
|
|
def check_spam_content(text):
|
|
if re.search(r"(<a\s|\shref=|</a>)", text.lower()):
|
|
return True
|
|
if count_urls(text) > 2:
|
|
return True
|
|
return False
|
|
|
|
@bobo.query('/profile/logout')
|
|
def user_logout(bobo_request):
|
|
# default start page
|
|
response = show_frontpage(bobo_request)
|
|
# clear the cookie
|
|
response.delete_cookie("user_hash_key")
|
|
return response
|
|
|
|
@bobo.query('/profile/resend')
|
|
def resend_user_key(bobo_request, email=None, submit=None, email_sent=None):
|
|
value_dict = get_default_values(bobo_request)
|
|
if not value_dict["enable_users"]:
|
|
return bobo.redirect(BASE_DICT["base_url"])
|
|
value_dict["email_sent"] = False
|
|
value_dict["user_new"] = False
|
|
data = {"email": email}
|
|
if not submit:
|
|
return render("user_key.html", input_data=data, **value_dict)
|
|
else:
|
|
# find the user's data (if it exists)
|
|
try:
|
|
data = forms.ProfileForm.to_python(data)
|
|
except formencode.Invalid, errors_packed:
|
|
errors = errors_packed.unpack_errors()
|
|
users = Profile.selectBy(email=email)
|
|
if users.count():
|
|
user = users[0]
|
|
value_dict["user"] = user
|
|
value_dict["email_sent"] = True
|
|
value_dict["email_ok"] = send_profile_mail(user)
|
|
return render("user_key.html", **value_dict)
|
|
else:
|
|
errors = {}
|
|
errors["email"] = u"Diese E-Mail-Adresse ist nicht registriert. Möchtest du ein neues Profil anlegen?"
|
|
value_dict["user_new"] = True
|
|
value_dict["errors"] = errors
|
|
return render("user_key.html", input_data={"email": email},
|
|
**value_dict)
|
|
|
|
def manage_link_in_profile(user, link, add, value_dict):
|
|
poll = None
|
|
for is_admin in (False, True):
|
|
poll_id = extract_poll_id(link, admin=is_admin)
|
|
if poll_id:
|
|
poll = Poll.get(poll_id)
|
|
break
|
|
if poll:
|
|
connected = ProfilePolls.selectBy(poll=poll, user=user,
|
|
is_admin=is_admin)
|
|
if is_admin:
|
|
value_dict["profile_manage_poll_hash"] = poll.admin_hash_key
|
|
else:
|
|
value_dict["profile_manage_poll_hash"] = poll.hash_key
|
|
if add and not connected.count():
|
|
ProfilePolls(poll=poll, user=user, is_admin=is_admin)
|
|
value_dict["poll_remember"] = poll
|
|
return True
|
|
elif not add and connected.count():
|
|
connected[0].destroySelf()
|
|
value_dict["poll_forget"] = poll
|
|
return True
|
|
else:
|
|
return False
|
|
return False
|
|
|
|
@bobo.query('/profile/new')
|
|
def user_create(bobo_request, email=None, submit=None, add_link=None):
|
|
value_dict = get_default_values(bobo_request)
|
|
if not value_dict["enable_users"]:
|
|
return bobo.redirect(BASE_DICT["base_url"])
|
|
value_dict["user_new"] = True
|
|
data = {"email": email, "add_link": add_link}
|
|
if not submit:
|
|
return render("user_key.html", input_data=data, **value_dict)
|
|
else:
|
|
# create a new user (if email is valid)
|
|
try:
|
|
data = forms.ProfileForm.to_python(data)
|
|
except formencode.Invalid, errors_packed:
|
|
errors = errors_packed.unpack_errors()
|
|
users = Profile.selectBy(email=email)
|
|
if users.count():
|
|
# the mail address is already registered
|
|
errors = {}
|
|
errors["email"] = u"Diese E-Mail-Adresse ist bereits registriert. Sollen die Zugangsdaten erneut an die Adresse versandt werden?"
|
|
value_dict["errors"] = errors
|
|
value_dict["user_new"] = False
|
|
return render("user_key.html", input_data=data, **value_dict)
|
|
else:
|
|
# create a new user
|
|
hash_key = get_new_hash_key()
|
|
new_user = Profile(hash_key=hash_key, email=email)
|
|
value_dict["user"] = new_user
|
|
value_dict["email_sent"] = True
|
|
value_dict["email_ok"] = send_profile_mail(new_user)
|
|
return render("user_details.html", **value_dict)
|
|
|
|
@bobo.query('/profile')
|
|
@bobo.query('/profile/:hash_key')
|
|
@bobo.query('/:add_link/remember')
|
|
@bobo.query('/:del_link/forget')
|
|
def show_user(bobo_request, hash_key=None, add_link=None, del_link=None):
|
|
value_dict = get_default_values(bobo_request)
|
|
if not value_dict["enable_users"]:
|
|
return bobo.redirect(BASE_DICT["base_url"])
|
|
if hash_key:
|
|
# overwrite the cookie-based value, if a profile is explicitely given
|
|
user_hash_key = hash_key
|
|
elif "user_hash_key" in value_dict:
|
|
user_hash_key = value_dict["user_hash_key"]
|
|
else:
|
|
user_hash_key = None
|
|
if user_hash_key:
|
|
# sqlobject fails to handle unicode values -> convert to string
|
|
user_hash_key = str(user_hash_key)
|
|
users = Profile.selectBy(hash_key=user_hash_key)
|
|
if not user_hash_key or not users.count():
|
|
# TODO: store add/del link -> render instead of redirect
|
|
return bobo.redirect(BASE_DICT["base_url"] + "profile/new")
|
|
user = users[0]
|
|
value_dict["user"] = user
|
|
value_dict["user_hash_key"] = user_hash_key
|
|
if add_link:
|
|
manage_link_in_profile(user, add_link, True, value_dict)
|
|
elif del_link:
|
|
manage_link_in_profile(user, del_link, False, value_dict)
|
|
else:
|
|
pass
|
|
return render("user_details.html", **value_dict)
|
|
|
|
@bobo.query('/new')
|
|
@bobo.query('/new/:template')
|
|
@bobo.query('/new/:author/:title/:description')
|
|
def new_poll(bobo_request, submit=None, cancel=None, author=None, title=None,
|
|
description=None, template=None, hide_errors=False):
|
|
value_dict = get_default_values(bobo_request)
|
|
data = {"author": author, "title": title, "description": description,
|
|
"template": template}
|
|
if cancel:
|
|
return bobo.redirect(BASE_DICT["base_url"])
|
|
elif not submit:
|
|
# show the "new poll" form
|
|
return render("poll_new.html", input_data=data, **value_dict)
|
|
else:
|
|
# create the new poll (if it is valid)
|
|
errors = {}
|
|
try:
|
|
data = forms.PollForm.to_python(data)
|
|
except formencode.Invalid, errors_packed:
|
|
errors = errors_packed.unpack_errors()
|
|
if errors:
|
|
if not hide_errors:
|
|
value_dict["errors"] = errors
|
|
return render("poll_new.html", input_data=data, **value_dict)
|
|
else:
|
|
# get the template settings
|
|
poll_settings = POLL_SETTINGS.copy()
|
|
if template in POLL_SETTING_TEMPLATES.keys():
|
|
template_settings = POLL_SETTING_TEMPLATES[template].copy()
|
|
else:
|
|
template_settings = POLL_SETTING_TEMPLATES["brainstorm"]
|
|
for key, value in template_settings.items():
|
|
poll_settings[key] = value
|
|
# create the new poll
|
|
hash_key = get_new_hash_key()
|
|
admin_hash_key = get_new_hash_key()
|
|
now = datetime.datetime.now()
|
|
new_poll = Poll(hash_key=hash_key, admin_hash_key=admin_hash_key,
|
|
timestamp_creation=now, author=data["author"],
|
|
title=data["title"], description=data["description"])
|
|
# apply the template settings
|
|
for key, value in template_settings.items():
|
|
new_poll.change_setting(key, value)
|
|
return bobo.redirect(new_poll.get_admin_url())
|
|
|
|
|
|
@bobo.query('/login')
|
|
@bobo.query('/login/')
|
|
def show_login(bobo_request):
|
|
""" shows a form for logging in and creating an account"""
|
|
value_dict = get_default_values(bobo_request)
|
|
return render("login.html", **value_dict)
|
|
|
|
|
|
@bobo.query('/newprofile')
|
|
@bobo.query('/newprofile/')
|
|
def show_newprofile(bobo_request):
|
|
""" Show polls and account data of this user and allows for changing the password. """
|
|
value_dict = get_default_values(bobo_request)
|
|
return render("profile.html", **value_dict)
|
|
|
|
@bobo.query('/:hash_key/vote_result.csv')
|
|
def vote_result(bobo_request, hash_key=None):
|
|
value_dict = get_default_values(bobo_request)
|
|
value_dict["errors"] = {}
|
|
data = {}
|
|
poll_id = get_poll_id(hash_key)
|
|
preorder = ["Paintball", "Billard", "Eislaufen", "Kegeln", "Kneipengang", "Karaoke", "Tischtennis", "Bowling", "Geek-out", "Go-Kart", "Dart"]
|
|
if not poll_id is None:
|
|
poll = Poll.get(poll_id)
|
|
voters = []
|
|
for submission in ContentSubmission.selectBy(poll_id=poll.id):
|
|
for vote in VoteOrder.selectBy(content_submission_id=submission):
|
|
if not vote.submitter in voters:
|
|
voters.append(vote.submitter)
|
|
lines = []
|
|
for voter in voters:
|
|
sorting = poll.get_ordered_submissions(voter)
|
|
items = [str(preorder.index(submission.content) + 1) for submission in sorting]
|
|
lines.append(" ".join(items))
|
|
return os.linesep.join(lines)
|
|
else:
|
|
return bobo.redirect(BASE_DICT["base_url"])
|
|
|
|
@bobo.query('/:hash_key/vote')
|
|
def vote_submission_order(bobo_request, hash_key=None, submitter=None,
|
|
vote_order=None):
|
|
value_dict = get_default_values(bobo_request)
|
|
value_dict["errors"] = {}
|
|
value_dict["authlevel"] = "poll_public"
|
|
data = {}
|
|
if submitter and not is_spam_submitter_name(submitter, value_dict["errors"]):
|
|
data["submitter"] = submitter.strip()
|
|
if not vote_order:
|
|
vote_order = ""
|
|
data["vote_order"] = vote_order
|
|
poll_id = get_poll_id(hash_key)
|
|
if not poll_id is None:
|
|
poll = Poll.get(poll_id)
|
|
value_dict["poll"] = poll
|
|
# populate the "vote_order" list (to keep the order intact)
|
|
digest_dict = {}
|
|
for submission in ContentSubmission.selectBy(poll_id=poll.id):
|
|
digest_dict[submission.get_obfuscated_digest()] = submission
|
|
value_dict["vote_order"] = [digest_dict[content_hash]
|
|
for content_hash in vote_order.split()]
|
|
try:
|
|
data = forms.VoteSubmissionOrderForm.to_python(data)
|
|
except formencode.Invalid, errors_packed:
|
|
# merge errors with previous ones - but never overwrite existing ones
|
|
errors = errors_packed.unpack_errors()
|
|
errors.update(value_dict["errors"])
|
|
value_dict["errors"] = errors
|
|
if value_dict["errors"] or poll.is_closed() or \
|
|
not poll.is_vote_enabled():
|
|
# ignore silently
|
|
pass
|
|
elif poll.is_vote_finished(submitter):
|
|
# there is already a vote stored for that submitter name
|
|
value_dict["errors"]["submit"] = \
|
|
"Fehler: deine Wahl wurde bereits gespeichert."
|
|
else:
|
|
# store this order of items
|
|
vote_order = vote_order.split()
|
|
for index, digest in enumerate(vote_order):
|
|
if digest in digest_dict:
|
|
VoteOrder(submitter=submitter, priority=index,
|
|
content_submission_id=digest_dict[digest])
|
|
value_dict["submitter"] = submitter
|
|
return render("poll_details.html", input_data=data, **value_dict)
|
|
else:
|
|
return bobo.redirect(BASE_DICT["base_url"])
|
|
|
|
@bobo.query('/:hash_key/submit')
|
|
def submit_content(bobo_request, hash_key=None, submitter=None, content=None):
|
|
value_dict = get_default_values(bobo_request)
|
|
value_dict["errors"] = {}
|
|
value_dict["authlevel"] = "poll_public"
|
|
data = {}
|
|
if content and check_spam_content(content):
|
|
value_dict["errors"]["content"] = \
|
|
"Spam-Verdacht: Inhalt darf keine HTML-Tags und nicht " + \
|
|
"zuviele Links enthalten"
|
|
else:
|
|
data["content"] = content
|
|
if submitter and (not is_spam_submitter_name(submitter, value_dict["errors"])):
|
|
data["submitter"] = submitter
|
|
poll_id = get_poll_id(hash_key)
|
|
if not poll_id is None:
|
|
poll = Poll.get(poll_id)
|
|
value_dict["poll"] = poll
|
|
try:
|
|
data = forms.SubmitForm.to_python(data)
|
|
except formencode.Invalid, errors_packed:
|
|
# merge errors with previous ones - but never overwrite existing ones
|
|
errors = errors_packed.unpack_errors()
|
|
errors.update(value_dict["errors"])
|
|
value_dict["errors"] = errors
|
|
if value_dict["errors"] or poll.is_closed():
|
|
return render("poll_details.html", input_data=data, **value_dict)
|
|
else:
|
|
# create the new submission content
|
|
data["timestamp_creation"] = datetime.datetime.now()
|
|
data["poll_id"] = poll.id
|
|
ContentSubmission(**data)
|
|
# remove "content" for the next input
|
|
del data["content"]
|
|
return render("poll_details.html", input_data=data, **value_dict)
|
|
else:
|
|
return bobo.redirect(BASE_DICT["base_url"])
|
|
|
|
@bobo.query('/:admin_hash_key/delete')
|
|
def delete_poll(bobo_request, admin_hash_key=None):
|
|
admin_poll_id = get_poll_admin_id(admin_hash_key)
|
|
if not admin_poll_id is None:
|
|
poll = Poll.get(admin_poll_id)
|
|
poll.destroySelf()
|
|
return bobo.redirect(BASE_DICT["base_url"])
|
|
|
|
@bobo.query('/:admin_hash_key/vote_enable')
|
|
def vote_enable_poll(bobo_request, admin_hash_key=None, vote_select=None):
|
|
# vote_select: on/off
|
|
admin_poll_id = get_poll_admin_id(admin_hash_key)
|
|
if not admin_poll_id is None:
|
|
poll = Poll.get(admin_poll_id)
|
|
poll.change_setting("vote_enabled", vote_select == "on")
|
|
return bobo.redirect(poll.get_admin_url())
|
|
else:
|
|
return bobo.redirect(BASE_DICT["base_url"])
|
|
|
|
@bobo.query('/:admin_hash_key/delete/:submission_id_digest')
|
|
def delete_submission(bobo_request, admin_hash_key=None, submission_id_digest=None):
|
|
admin_poll_id = get_poll_admin_id(admin_hash_key)
|
|
if (not admin_poll_id is None) and (not submission_id_digest is None):
|
|
poll = Poll.get(admin_poll_id)
|
|
# This loop is slightly expensive, but it does not expose the overall
|
|
# count of submissions (via the id).
|
|
for submission in ContentSubmission.selectBy(poll_id=poll.id):
|
|
if submission.get_obfuscated_digest() == submission_id_digest:
|
|
submission.destroySelf()
|
|
break
|
|
return bobo.redirect(poll.get_admin_url())
|
|
|
|
@bobo.query('/:admin_hash_key/export')
|
|
def export_poll(bobo_request, admin_hash_key=None):
|
|
""" Return a download file in csv format (date, author, text).
|
|
"""
|
|
value_dict = get_default_values(bobo_request)
|
|
poll_id = get_poll_admin_id(admin_hash_key)
|
|
if poll_id is None:
|
|
return bobo.redirect(BASE_DICT["base_url"])
|
|
poll = Poll.get(poll_id)
|
|
response = webob.Response(content_type="text/csv", charset=EXPORT_CHARSET)
|
|
# common (excel-based) CSV format with semicolons instead of commas
|
|
writer = csv.writer(response.body_file)
|
|
submissions = list(poll.get_submissions())
|
|
for submission in submissions:
|
|
timestamp = submission.timestamp_creation.strftime(EXPORT_DATE_FORMAT)
|
|
author = submission.submitter.encode(EXPORT_ENCODING, "ignore")
|
|
content = get_markup_with_formatted_linebreaks(submission.content, os.linesep)
|
|
items = [timestamp, author, content]
|
|
for index in range(len(items)):
|
|
try:
|
|
items[index] = items[index].encode(EXPORT_ENCODING, "ignore")
|
|
except UnicodeDecodeError:
|
|
# any other conversion error
|
|
pass
|
|
writer.writerow(items)
|
|
try:
|
|
export_prefix = config.get("misc", "export_prefix")
|
|
except ConfigParser.Error:
|
|
export_prefix = "wortschlucker"
|
|
filename = EXPORT_FILENAME_TEMPLATE % {"title": poll.title, "prefix": export_prefix}
|
|
filename = filename.replace(" ", "_")
|
|
filename = re.sub(r"[^a-zA-Z0-9_\-\.]", "", filename)
|
|
response.content_disposition = 'attachment; filename=%s' % filename
|
|
return response
|
|
|
|
@bobo.query('/:admin_hash_key/admin')
|
|
def admin_poll(bobo_request, cancel=False, submit=None, admin_hash_key=None, author=None,
|
|
title=None, description=None, settings=None,
|
|
setting_expose_date=None, setting_expose_date_forever=None,
|
|
setting_close_date=None, setting_close_date_forever=None):
|
|
value_dict = get_default_values(bobo_request)
|
|
data = {"author": author, "title": title, "description": description, "template": ""}
|
|
poll_id = get_poll_admin_id(admin_hash_key)
|
|
if poll_id is None:
|
|
return bobo.redirect(BASE_DICT["base_url"])
|
|
poll = Poll.get(poll_id)
|
|
value_dict["poll"] = poll
|
|
value_dict["authlevel"] = "poll_admin"
|
|
# cancel: return to (non-edit) admin page
|
|
if cancel:
|
|
return bobo.redirect(poll.get_admin_url())
|
|
if author is None:
|
|
data["author"] = poll.author
|
|
if title is None:
|
|
data["title"] = poll.title
|
|
if description is None:
|
|
data["description"] = poll.description
|
|
if setting_expose_date_forever is None:
|
|
if setting_expose_date is None:
|
|
setting_expose_date = poll.get_settings_strings()["expose_date"]
|
|
else:
|
|
setting_expose_date = ""
|
|
if setting_close_date_forever is None:
|
|
if setting_close_date is None:
|
|
setting_close_date = poll.get_settings_strings()["close_date"]
|
|
else:
|
|
setting_close_date = ""
|
|
poll_settings = poll.get_settings()
|
|
# update the settings only after a submit (otherwise we clear all current settings)
|
|
if submit:
|
|
# override with the given settings (taken from the form input with the prefix "setting_")
|
|
if settings is None:
|
|
settings = []
|
|
elif not isinstance(settings, list):
|
|
settings = [settings]
|
|
else:
|
|
# it is a list - everything is fine
|
|
pass
|
|
for setting_key in poll_settings.keys():
|
|
poll_settings[setting_key] = setting_key in settings
|
|
# collect all errors
|
|
errors = {}
|
|
# add boolean "settings" after forms validation - since there is no destination type
|
|
data["settings"] = [key for key, value in poll_settings.items() if value is True]
|
|
for key, value in (("expose_date", setting_expose_date), ("close_date", setting_close_date)):
|
|
validated_value = validate_poll_setting(key, value)
|
|
if value == "":
|
|
data["setting_%s_forever" % key] = "yes"
|
|
data["setting_%s" % key] = value
|
|
else:
|
|
data["setting_%s_forever" % key] = "no"
|
|
if validated_value is None:
|
|
# keep the entered value and report an error
|
|
errors[key] = u"Ungültiges Datum"
|
|
data["setting_%s" % key] = value
|
|
else:
|
|
data["setting_%s" % key] = get_poll_setting_string(key, validated_value)
|
|
# use the validator to check for possible errors
|
|
if submit:
|
|
# check for errors only if the content is submitted (not just rendered)
|
|
try:
|
|
data = forms.PollSettingsForm.to_python(data)
|
|
except formencode.Invalid, errors_packed:
|
|
errors = errors_packed.unpack_errors()
|
|
# store the new settings
|
|
if errors:
|
|
value_dict["errors"] = errors
|
|
return render("poll_admin_edit.html", input_data=data, **value_dict)
|
|
else:
|
|
if submit:
|
|
# update core attributes of the existing poll
|
|
poll.author = data["author"]
|
|
poll.title = data["title"]
|
|
poll.description = data["description"]
|
|
current_settings = poll.get_settings()
|
|
# update settings
|
|
for key, value in poll_settings.items():
|
|
if (POLL_SETTINGS[key][0] == bool) and (current_settings[key] != value):
|
|
poll.change_setting(key, value)
|
|
poll.change_setting("expose_date", data["setting_expose_date"])
|
|
poll.change_setting("close_date", data["setting_close_date"])
|
|
return bobo.redirect(poll.get_admin_url())
|
|
else:
|
|
return render("poll_admin_edit.html", input_data=data, **value_dict)
|
|
|
|
def update_blog_entry_values(bobo_request, blog_id, value_dict):
|
|
blog_info = get_blog_info(bobo_request, blog_id)
|
|
if blog_info is None:
|
|
return False
|
|
else:
|
|
value_dict["title"] = blog_info[0]
|
|
value_dict["date"] = blog_info[1]
|
|
value_dict["link"] = blog_info[2]
|
|
value_dict["body"] = blog_info[3]
|
|
return True
|
|
|
|
def get_blog_info(bobo_request, blog_id):
|
|
blog_file = os.path.join(BLOG_DIR, blog_id)
|
|
try:
|
|
input = open(blog_file)
|
|
title = input.readline()
|
|
body = input.read()
|
|
input.close()
|
|
except IOError:
|
|
return None
|
|
date = "%s.%s.%s %s:%s" % (blog_id[6:8], blog_id[4:6], blog_id[0:4],
|
|
blog_id[8:10], blog_id[10:12])
|
|
link = "%sblog/%s" % (get_default_values(bobo_request)["base_url"], blog_id)
|
|
body = tools.creole2html(body.decode("utf-8"))
|
|
return title, date, link, body
|
|
|
|
def get_blog_ids():
|
|
def add_files_to_list(file_list, dirname, fnames):
|
|
for fname in fnames:
|
|
if re.match(r"^[0-9]{12}$", fname) \
|
|
and os.path.isfile(os.path.join(dirname, fname)):
|
|
file_list.append(fname)
|
|
# remove all entries (e.g. sub-directories)
|
|
while len(fnames) > 0:
|
|
del fnames[0]
|
|
file_list = []
|
|
os.path.walk(BLOG_DIR, add_files_to_list, file_list)
|
|
file_list.sort(reverse = True)
|
|
return file_list
|
|
|
|
@bobo.query('/blog')
|
|
@bobo.query('/blog/')
|
|
@bobo.query('/blog/:blog_id')
|
|
@bobo.query('/blog/:blog_id/')
|
|
def serve_blog(bobo_request, blog_id=None):
|
|
value_dict = get_default_values(bobo_request)
|
|
if blog_id and re.match(r"^[0-9]{12}$", blog_id):
|
|
# the blog_id should consist of 12 digits
|
|
if update_blog_entry_values(bobo_request, blog_id, value_dict):
|
|
return render("blog_entry.html", **value_dict)
|
|
# if anything fails: render the blog list
|
|
blog_list = []
|
|
# add all valid blog infos to the list
|
|
for blog_id in get_blog_ids():
|
|
blog_info = get_blog_info(bobo_request, blog_id)
|
|
if not blog_info is None:
|
|
blog_list.append(blog_info)
|
|
value_dict["blog_list"] = blog_list
|
|
return render("blog_list.html", **value_dict)
|
|
|
|
def show_poll_list(bobo_request, render_file, page_size, page=None,
|
|
filter_private=True, value_dict=None):
|
|
if value_dict is None:
|
|
value_dict = {}
|
|
value_dict.update(get_default_values(bobo_request))
|
|
true_string = get_poll_setting_string("public", True)
|
|
if filter_private:
|
|
polls = Poll.select(sqlobject.sqlbuilder.AND(
|
|
PollSetting.q.key == "public",
|
|
PollSetting.q.value == true_string,
|
|
PollSetting.q.poll_id == Poll.q.id))
|
|
else:
|
|
polls = Poll.select()
|
|
polls = polls.orderBy("-timestamp_creation")
|
|
poll_count = polls.count()
|
|
if page is None:
|
|
page = 1
|
|
else:
|
|
try:
|
|
page = int(page)
|
|
except ValueError:
|
|
page = 1
|
|
# "page" should at least be 1 - zero shows an empty list
|
|
page = max(1, page)
|
|
start = (page - 1) * page_size
|
|
if start >= poll_count:
|
|
start = poll_count - page_size
|
|
page = (poll_count + page_size - 1) / page_size
|
|
end = start + page_size - 1
|
|
value_dict["polls"] = polls[start : end + 1]
|
|
# show a link for the next page, if more polls are available
|
|
value_dict["show_next_link"] = (end + 1 < poll_count)
|
|
value_dict["show_previous_link"] = (start > 0)
|
|
value_dict["page"] = page
|
|
value_dict["page_size"] = page_size
|
|
return render(render_file, **value_dict)
|
|
|
|
def render_poll_admin(bobo_request, poll, add_related, del_related, count, page, show_delete):
|
|
value_dict = get_default_values(bobo_request)
|
|
errors = {}
|
|
if not add_related is None:
|
|
other_poll_id = extract_poll_id(add_related, admin=True)
|
|
if other_poll_id == poll.id:
|
|
errors["related"] = u"Wortschlucker kann nicht mit sich selbst verknüpft werden"
|
|
elif other_poll_id is None:
|
|
errors["related"] = u"Wortschlucker nicht gefunden"
|
|
else:
|
|
related_polls = poll.get_related_polls()
|
|
if other_poll_id in [one_poll.id for one_poll in related_polls]:
|
|
errors["related"] = u"Dieser Wortschlucker wurde bereits verknüpft"
|
|
else:
|
|
PollRelation(first=poll.id, second=other_poll_id)
|
|
if not del_related is None:
|
|
other_poll_id = extract_poll_id(del_related, admin=True)
|
|
if other_poll_id is None:
|
|
errors["related"] = u"Wortschlucker nicht gefunden"
|
|
else:
|
|
related_polls = poll.get_related_polls()
|
|
if not other_poll_id in [one_poll.id for one_poll in related_polls]:
|
|
errors["related"] = u"Dieser Wortschlucker war nicht verknüpft"
|
|
else:
|
|
# delete all relations between these two polls
|
|
PollRelation.deleteBy(first=poll.id, second=other_poll_id)
|
|
PollRelation.deleteBy(first=other_poll_id, second=poll.id)
|
|
value_dict["poll"] = poll
|
|
value_dict["page"] = page
|
|
value_dict["count"] = count
|
|
value_dict["show_delete"] = show_delete
|
|
value_dict["errors"] = errors
|
|
value_dict["authlevel"] = "poll_admin"
|
|
return render("poll_admin_details.html", **value_dict)
|
|
|
|
@bobo.query('/admin/maintenance')
|
|
def admin_maintenance(age_days=60, keyword="viagra", keyword_submission="viagra", method=None):
|
|
if method == "Remove old":
|
|
age_days = int(age_days)
|
|
now = datetime.datetime.now()
|
|
oldest = now - datetime.timedelta(days=age_days)
|
|
for poll in Poll.select(Poll.q.timestamp_creation < oldest):
|
|
if poll.get_num_of_submissions() == 0:
|
|
poll.destroySelf()
|
|
elif method == "Remove poll by keyword" and keyword:
|
|
keyword = keyword.lower()
|
|
for poll in Poll.select():
|
|
if (keyword in poll.title.lower()) or (keyword in poll.description.lower()):
|
|
poll.destroySelf()
|
|
elif method == "Remove submission by keyword" and keyword:
|
|
keyword = keyword_submission.lower()
|
|
for submission in ContentSubmission.select():
|
|
if keyword in submission.content.lower():
|
|
submission.destroySelf()
|
|
return bobo.redirect("../admin")
|
|
|
|
@bobo.query('/admin')
|
|
@bobo.query('/admin/')
|
|
@bobo.query('/admin/page/:page')
|
|
def show_admin_page(bobo_request, page=None, page_size=50):
|
|
try:
|
|
page_size = int(page_size)
|
|
except ValueError:
|
|
page_size = 50
|
|
value_dict = {}
|
|
value_dict["poll_table"] = Poll
|
|
value_dict["submission_table"] = ContentSubmission
|
|
return show_poll_list(bobo_request, "admin.html", page_size, page=page,
|
|
filter_private=False, value_dict=value_dict)
|
|
|
|
@bobo.query('/')
|
|
@bobo.query('/public')
|
|
@bobo.query('/public/')
|
|
@bobo.query('/public/page/:page')
|
|
def show_frontpage(bobo_request, page=None):
|
|
return show_poll_list(bobo_request, "frontpage.html", 20, page=page)
|
|
|
|
@bobo.query('')
|
|
def base():
|
|
return bobo.redirect(BASE_DICT["base_url"])
|
|
|
|
@bobo.query('/:poll_hash')
|
|
@bobo.query('/:poll_hash/')
|
|
@bobo.query('/admin/poll/:poll_hash_for_admin')
|
|
def show_one_poll(bobo_request, poll_hash_for_admin=None, poll_hash=None,
|
|
add_related=None, del_related=None, delete_multiple=None,
|
|
delete_submission_id_list=None, count=50, page=1, show_delete=False):
|
|
if poll_hash_for_admin:
|
|
# shortcut for admins
|
|
admin_poll_id = get_poll_id(poll_hash_for_admin)
|
|
if admin_poll_id:
|
|
poll_hash = Poll.get(admin_poll_id).admin_hash_key
|
|
try:
|
|
count = int(count)
|
|
except ValueError:
|
|
count = 50
|
|
count = max(1, min(count, 1000))
|
|
try:
|
|
page = int(page)
|
|
except ValueError:
|
|
page = 1
|
|
page = max(1, page)
|
|
value_dict = get_default_values(bobo_request)
|
|
poll_id = get_poll_id(poll_hash)
|
|
if not poll_id is None:
|
|
poll = Poll.get(poll_id)
|
|
page = min(page, 1 + (poll.get_num_of_submissions() - 1) / count)
|
|
value_dict["poll"] = poll
|
|
value_dict["page"] = page
|
|
value_dict["count"] = count
|
|
value_dict["authlevel"] = "poll_public"
|
|
return render("poll_details.html", **value_dict)
|
|
else:
|
|
admin_poll_id = get_poll_admin_id(poll_hash)
|
|
if not admin_poll_id is None:
|
|
if delete_multiple and delete_submission_id_list:
|
|
if not isinstance(delete_submission_id_list, list):
|
|
delete_submission_id_list = [delete_submission_id_list]
|
|
for delete_id in delete_submission_id_list:
|
|
ContentSubmission.get(int(delete_id)).destroySelf()
|
|
return render_poll_admin(bobo_request, Poll.get(admin_poll_id), add_related, del_related, count, page, bool(show_delete))
|
|
else:
|
|
return bobo.redirect(BASE_DICT["base_url"])
|
|
|
|
|
|
@bobo.query('/node/:pagename')
|
|
def show_static_nodes(bobo_request, pagename=None):
|
|
""" meant for serving hand-changed, automatically styled content. """
|
|
value_dict = get_default_values(bobo_request)
|
|
return render("nodes/"+pagename, **value_dict)
|
|
|
|
def get_static_file(filename):
|
|
""" deliver a static file - this function is used internally """
|
|
response = webob.Response()
|
|
content_type = mimetypes.guess_type(filename)[0]
|
|
if content_type is not None:
|
|
response.content_type = content_type
|
|
try:
|
|
response.body = open(filename).read()
|
|
except IOError:
|
|
raise bobo.NotFound
|
|
return response
|
|
|
|
@bobo.query('/media/:p1')
|
|
@bobo.query('/media/:p1/:p2')
|
|
@bobo.query('/media/:p1/:p2/:p3')
|
|
def static_files(p1=None, p2=None, p3=None):
|
|
""" deliver files up to three levels below /media/ """
|
|
pathlist = [p1, p2, p3]
|
|
pathname = os.path.join(BASE_DIR, "templates", "media")
|
|
for subdir in pathlist:
|
|
if not subdir is None:
|
|
pathname = os.path.join(pathname, subdir)
|
|
return get_static_file(pathname)
|
|
|
|
|
|
for table in (Poll, ContentSubmission, VoteOrder, PollSetting, PollRelation, Profile, ProfilePolls):
|
|
#table.dropTable()
|
|
if not table.tableExists():
|
|
table.createTable()
|
|
|
|
# how to add new columns to the database:
|
|
# * uncomment the line below and change it according to your needs
|
|
# * run this script once (manually)
|
|
# * disable the line below
|
|
# * add the column specification to the object definition
|
|
# -> done!
|
|
#Poll.sqlmeta.addColumn(sqlobject.BoolCol("vote_open", default=False), changeSchema=True)
|
|
|
|
|
|
# this line allows to use wortschlucker with mod_wsgi
|
|
# see: http://groups.google.com/group/bobo-web/msg/2ba55fc381658cd1
|
|
# see: http://blog.dscpl.com.au/2009/08/using-bobo-on-top-of-modwsgi.html
|
|
application = bobo.Application(bobo_resources=__name__)
|
|
|