codekasten/wortschlucker/src/wortschlucker.py

761 lines
27 KiB
Python
Executable file

#!/usr/bin/env python2.5
# -*- coding: utf-8 -*-
import os
# the basedir is the parent dir of the location of this script
BASE_DIR = os.path.dirname(os.path.abspath(os.path.join(__file__, os.path.pardir)))
# add the project directory to the python search path
import sys
sys.path.insert(0, os.path.join(BASE_DIR, "src"))
import tools
import bobo
import forms
import sqlobject
import genshi.filters
import genshi.input
import genshi.template
import genshi
import formencode
import webob
import ConfigParser
import datetime
import mimetypes
import uuid
import re
import twitter
CONFIG_FILE = os.path.join(BASE_DIR, "wortschlucker.conf")
""" *** Initialization *** """
config = ConfigParser.SafeConfigParser()
config.read(CONFIG_FILE)
db_uri = config.get("database", "uri")
sqlobject.sqlhub.processConnection = sqlobject.connectionForURI(db_uri)
loader = genshi.template.TemplateLoader(os.path.join(BASE_DIR, 'templates'), auto_reload=False)
BLOG_DIR = os.path.join(BASE_DIR, "blog")
BASE_DICT = {
"base_url": "/", # the trailing slash is necessary
"show_navbar": True,
"errors": {},
}
# used as the default setting for expose/close dates
DEFAULT_DAYS_AHEAD = 7
DATE_FORMAT = "%d.%m.%Y"
DEFAULT_DATE = datetime.datetime.now() + datetime.timedelta(days=DEFAULT_DAYS_AHEAD)
POLL_SETTINGS = {
"show_all_submissions": (bool, True),
"show_statistics": (bool, True),
"public": (bool, False),
"expose_date": (datetime.datetime, DEFAULT_DATE),
"close_date": (datetime.datetime, DEFAULT_DATE),
}
POLL_SETTING_TEMPLATES = {
"brainstorm": {},
"cards": {"show_all_submissions": False},
"feedback": {"show_all_submissions": False},
"evaluation": {"show_all_submissions": False},
"notes": {"show_statistics": False},
"shopping": {"show_statistics": False},
"clipboard": {"show_statistics": False},
"namefinder": {},
}
class ContentSubmission(sqlobject.SQLObject):
submitter = sqlobject.UnicodeCol()
content = sqlobject.UnicodeCol()
poll_id = sqlobject.ForeignKey("Poll")
timestamp_creation = sqlobject.DateTimeCol()
def get_creation_time_string(self):
return str(self.timestamp_creation)
def get_markup_content(self):
def get_link_markup(match):
prefix, url, suffix = match.groups()
# only take the TLD part of the url
short_name = url.split("/")[2]
return """%s<a href="%s">%s</a>%s""" % (prefix, url, short_name, suffix)
# surround all urls with html markup
mark_links = re.sub(r"(\A|\s|\()(https?://[\w/\?\.\#=;,_\-\~]*)(\)|\s|\Z)", get_link_markup, self.content)
markup = genshi.input.HTML(mark_links) | genshi.filters.HTMLSanitizer()
# the markup is now marked as "safe" -> genshi will output it literally
return markup
class PollSetting(sqlobject.SQLObject):
poll_id = sqlobject.ForeignKey("Poll")
key = sqlobject.UnicodeCol()
value = sqlobject.UnicodeCol()
class PollRelation(sqlobject.SQLObject):
first = sqlobject.ForeignKey("Poll")
second = sqlobject.ForeignKey("Poll")
class Poll(sqlobject.SQLObject):
author = sqlobject.UnicodeCol()
hash_key = sqlobject.StringCol()
admin_hash_key = sqlobject.StringCol()
title = sqlobject.UnicodeCol()
description = sqlobject.UnicodeCol()
timestamp_creation = sqlobject.DateTimeCol()
def get_related_polls(self):
""" get all directly and indirectly connected polls up to a certain
distance
"""
return PollMesh(self).get_related_polls()
def get_related_polls_direct(self):
""" get all directly connected polls """
related = []
related.extend([poll.second for poll in PollRelation.selectBy(first=self.id)])
related.extend([poll.first for poll in PollRelation.selectBy(second=self.id)])
return related
def get_settings(self):
current_dict = {}
for setting in PollSetting.selectBy(poll_id=self.id):
if setting.key in POLL_SETTINGS.keys():
current_dict[setting.key] = validate_poll_setting(setting.key, setting.value)
for key, meta_info in POLL_SETTINGS.items():
if not key in current_dict.keys():
current_dict[key] = meta_info[1]
return current_dict
def get_settings_strings(self):
settings = self.get_settings()
result = {}
for key, value in settings.items():
result[key] = get_poll_setting_string(key, value)
return result
def change_setting(self, key, value):
validated_value = validate_poll_setting(key, value)
if not validated_value is None:
validated_value = get_poll_setting_string(key, validated_value)
poll_setting = PollSetting.selectBy(poll_id=self.id, key=key)
if poll_setting.count() == 1:
poll_setting[0].value = validated_value
elif poll_setting.count() == 0:
PollSetting(poll_id=self.id, key=key, value=validated_value)
if (key == 'public') and (value > 0):
self.announce_via_twitter()
def announce_via_twitter(self):
complete_url = self.get_url(absolute=True)
title = "%s %s %s" % (config.get('misc', 'twitter_alert_prefix'),
self.title[:79], complete_url)
username = config.get('misc', 'twitter_alert_user')
password = config.get('misc', 'twitter_alert_password')
publish_twitter_alert(title, username, password)
def get_num_of_submitters(self):
all_submitters = [submission.submitter for submission in ContentSubmission.selectBy(poll_id=self.id)]
unique_submitters = []
for submitter in all_submitters:
if not submitter in unique_submitters:
unique_submitters.append(submitter)
return len(unique_submitters)
def get_num_of_submissions(self):
return ContentSubmission.selectBy(poll_id=self.id).count()
def get_submissions(self):
return ContentSubmission.selectBy(poll_id=self.id)
def delete_poll(self):
submissions = ContentSubmission.selectBy(poll_id=self.id)
settings = PollSetting.selectBy(poll_id=self.id)
for submission in submissions:
submission.destroySelf()
for setting in settings:
setting.destroySelf()
self.destroySelf()
def get_url(self, absolute=False):
return get_url_string("%s%s" % (BASE_DICT["base_url"], self.hash_key), absolute)
def get_submit_url(self, absolute=False):
return get_url_string("%s%s/submit" % (BASE_DICT["base_url"], self.hash_key), absolute)
def get_admin_url(self, absolute=False):
return get_url_string("%s%s" % (BASE_DICT["base_url"], self.admin_hash_key), absolute)
def get_edit_url(self, absolute=False):
return get_url_string("%s%s/admin" % (BASE_DICT["base_url"], self.admin_hash_key), absolute)
def get_delete_url(self, absolute=False):
return get_url_string("%s%s/delete" % (BASE_DICT["base_url"], self.admin_hash_key), absolute)
def get_creation_time_string(self):
return str(self.timestamp_creation)
class PollMesh:
""" generate a mesh of directly or indirectly related polls
Basically this is just a recursive search for unique related polls that are
connected to the base poll with less than (e.g.) five nodes in between.
"""
def __init__(self, poll, depth=5):
self.related = []
# start to collect the related polls immediately
self.__collect_related_polls(poll, depth)
def __collect_related_polls(self, current_poll, current_depth):
""" recursive scanning for unique related polls up to a certain distance
"""
related = current_poll.get_related_polls_direct()
new_queue = [poll for poll in related if not poll in self.related]
self.related.extend(new_queue)
if current_depth > 0:
for poll in new_queue:
self.__collect_related_polls(poll, current_depth - 1)
def get_related_polls(self):
return self.related
def get_poll_setting_string(key, value):
if not key in POLL_SETTINGS.keys():
return ""
setting_type = POLL_SETTINGS[key][0]
if setting_type in (basestring, unicode, str):
return value
elif setting_type == bool:
return str(value)
elif setting_type == datetime.datetime:
# unset dates are None
if value is None:
return ""
elif value == "":
# value for "forever"
return ""
else:
return value.strftime(DATE_FORMAT)
else:
return str(value)
def validate_poll_setting(key, value):
if not key in POLL_SETTINGS.keys():
return None
setting_type = POLL_SETTINGS[key][0]
if setting_type in (basestring, unicode, str):
return value
elif setting_type == bool:
if value is None:
return False
elif isinstance(value, bool):
return value
else:
text = value.lower()
if text in ("0", "false", "no", "off", "disabled", ""):
return False
elif text in ("1", "true", "yes", "on", "enabled"):
return True
else:
return None
elif setting_type == datetime.datetime:
if value is None:
# default: one week later
value = datetime.date.today() + datetime.timedelta(days=DEFAULT_DAYS_AHEAD)
elif type(value) == datetime.datetime:
pass
elif value == "":
# this is the value for "forever"
return ""
else:
try:
value = datetime.datetime.strptime(value, DATE_FORMAT)
except ValueError:
value = None
return value
elif setting_type == int:
if value is None:
value = 0
try:
value = int(value)
except ValueError:
value = None
return value
else:
# all other types (e.g. int, float, ...)
try:
return setting_type(value)
except ValueError:
return None
def get_default_values(**kwargs):
value_dict = dict(BASE_DICT)
for key, value in kwargs.items():
value_dict[key] = value
return value_dict
def get_url_string(url, absolute=False):
""" return the URL as it is or turn it into an absolute URL
@value url: the given URL
@type url: str
@value absolute: should the URL be absolute or relative?
@type absolute: bool
@returns: the relative or absolute URL
@rtype: str
"""
if absolute:
return config.get('hosting', 'full_base_url') + url
else:
return url
def render(filename, input_data=None, **values):
stream = loader.load(filename).generate(**values)
if not input_data is None:
stream |= genshi.filters.HTMLFormFiller(data=input_data)
return stream.render("html", doctype="html")
def get_poll_id(hash_key):
if isinstance(hash_key, unicode):
try:
hash_key = str(hash_key)
except UnicodeEncodeError:
return None
polls = Poll.selectBy(hash_key=hash_key)
if polls.count() == 1:
return polls[0].id
else:
return None
def get_poll_admin_id(hash_key):
if isinstance(hash_key, unicode):
try:
hash_key = str(hash_key)
except UnicodeEncodeError:
return None
polls = Poll.selectBy(admin_hash_key=hash_key)
if polls.count() == 1:
return polls[0].id
else:
return None
def extract_poll_admin_id(text):
""" The text may be an admin hash or a the admin link of a poll """
result = get_poll_admin_id(text)
if result is None:
extracted_text = re.findall(r"[a-z0-9]+", text)
# we assume that the hash is at the end of the string
extracted_text.reverse()
for found in extracted_text:
guess = get_poll_admin_id(found)
if not guess is None:
return guess
return None
else:
return result
def get_new_hash_key(length=16, charset=None):
""" returns a quite random hash key with the specified length """
if charset is None:
charset = "0123456789abcdefghijklmnopqrstuvwxyz"
def get_hash_string(length):
base = uuid.uuid4().int
result = []
while len(result) < length:
value = base % len(charset)
base //= len(charset)
result.append(charset[value])
return "".join(result)
# repeat the hash generation until a new value is found
hash_key = get_hash_string(length)
while (not get_poll_id(hash_key) is None) or (not get_poll_admin_id(hash_key) is None):
hash_key = get_hash_string(length)
return hash_key
def publish_twitter_alert(text, user, passwd):
api = twitter.Api(username=user, password=passwd)
try:
api.PostUpdate(text)
except HTTPError:
# twitter error, most likely because of a duplicate message
# or maybe an authentication failure
pass
@bobo.query('/new')
@bobo.query('/new/:template')
@bobo.query('/new/:author/:title/:description')
def new_poll(submit=None, cancel=None, author=None, title=None,
description=None, template=None, hide_errors=False):
value_dict = get_default_values()
data = {"author": author, "title": title, "description": description,
"template": template}
if cancel:
return bobo.redirect(BASE_DICT["base_url"])
elif not submit:
# show the "new poll" form
return render("poll_new.html", input_data=data, **value_dict)
else:
# create the new poll (if it is valid)
errors = {}
try:
data = forms.PollForm.to_python(data)
except formencode.Invalid, errors_packed:
errors = errors_packed.unpack_errors()
if errors:
if not hide_errors:
value_dict["errors"] = errors
return render("poll_new.html", input_data=data, **value_dict)
else:
# get the template settings
poll_settings = POLL_SETTINGS.copy()
if template in POLL_SETTING_TEMPLATES.keys():
template_settings = POLL_SETTING_TEMPLATES[template].copy()
else:
template_settings = POLL_SETTING_TEMPLATES["brainstorm"]
for key, value in template_settings.items():
poll_settings[key] = value
# create the new poll
hash_key = get_new_hash_key()
admin_hash_key = get_new_hash_key()
now = datetime.datetime.now()
new_poll = Poll(hash_key=hash_key, admin_hash_key=admin_hash_key,
timestamp_creation=now, author=data["author"],
title=data["title"], description=data["description"])
# apply the template settings
for key, value in template_settings.items():
new_poll.change_setting(key, value)
return bobo.redirect(new_poll.get_admin_url())
@bobo.query('/:hash_key/submit')
def submit_content(hash_key=None, submitter=None, content=None):
value_dict = get_default_values()
data = {"submitter": submitter, "content": content}
poll_id = get_poll_id(hash_key)
if not poll_id is None:
poll = Poll.get(poll_id)
value_dict["poll"] = poll
errors = {}
try:
data = forms.SubmitForm.to_python(data)
except formencode.Invalid, errors_packed:
errors = errors_packed.unpack_errors()
if errors:
value_dict["errors"] = errors
return render("poll_details.html", input_data=data, **value_dict)
else:
# create the new submission content
data["timestamp_creation"] = datetime.datetime.now()
data["poll_id"] = poll.id
ContentSubmission(**data)
# remove "content" for the next input
del data["content"]
return render("poll_details.html", input_data=data, **value_dict)
return bobo.redirect(BASE_DICT["base_url"])
@bobo.query('/:admin_hash_key/delete')
def delete_poll(admin_hash_key=None):
admin_poll_id = get_poll_admin_id(admin_hash_key)
if not admin_poll_id is None:
poll = Poll.get(admin_poll_id)
poll.delete_poll()
return bobo.redirect(BASE_DICT["base_url"])
@bobo.query('/:admin_hash_key/admin')
def admin_poll(cancel=False, submit=None, admin_hash_key=None, author=None,
title=None, description=None, settings=None,
setting_expose_date=None, setting_expose_date_forever=None,
setting_close_date=None, setting_close_date_forever=None):
value_dict = get_default_values()
data = {"author": author, "title": title, "description": description, "template": ""}
poll_id = get_poll_admin_id(admin_hash_key)
if poll_id is None:
return bobo.redirect(BASE_DICT["base_url"])
poll = Poll.get(poll_id)
# cancel: return to (non-edit) admin page
if cancel:
return bobo.redirect(poll.get_admin_url())
if author is None:
data["author"] = poll.author
if title is None:
data["title"] = poll.title
if description is None:
data["description"] = poll.description
if setting_expose_date_forever is None:
if setting_expose_date is None:
setting_expose_date = poll.get_settings_strings()["expose_date"]
else:
setting_expose_date = ""
if setting_close_date_forever is None:
if setting_close_date is None:
setting_close_date = poll.get_settings_strings()["close_date"]
else:
setting_close_date = ""
poll_settings = poll.get_settings()
# update the settings only after a submit (otherwise we clear all current settings)
if submit:
# override with the given settings (taken from the form input with the prefix "setting_")
if settings is None:
settings = []
elif not isinstance(settings, list):
settings = [settings]
else:
# it is a list - everything is fine
pass
for setting_key in poll_settings.keys():
poll_settings[setting_key] = setting_key in settings
# collect all errors
errors = {}
# add boolean "settings" after forms validation - since there is no destination type
data["settings"] = [key for key, value in poll_settings.items() if value is True]
for key, value in (("expose_date", setting_expose_date), ("close_date", setting_close_date)):
validated_value = validate_poll_setting(key, value)
if value == "":
data["setting_%s_forever" % key] = "yes"
data["setting_%s" % key] = value
else:
data["setting_%s_forever" % key] = "no"
if validated_value is None:
# keep the entered value and report an error
errors[key] = u"Ungültiges Datum"
data["setting_%s" % key] = value
else:
data["setting_%s" % key] = get_poll_setting_string(key, validated_value)
# use the validator to check for possible errors
if submit:
# check for errors only if the content is submitted (not just rendered)
try:
data = forms.PollSettingsForm.to_python(data)
except formencode.Invalid, errors_packed:
errors = errors_packed.unpack_errors()
# store the new settings
if errors:
value_dict["errors"] = errors
return render("poll_admin_edit.html", input_data=data, **value_dict)
else:
if submit:
# update core attributes of the existing poll
poll.author = data["author"]
poll.title = data["title"]
poll.description = data["description"]
current_settings = poll.get_settings()
# update settings
for key, value in poll_settings.items():
if (POLL_SETTINGS[key][0] == bool) and (current_settings[key] != value):
poll.change_setting(key, value)
poll.change_setting("expose_date", data["setting_expose_date"])
poll.change_setting("close_date", data["setting_close_date"])
return bobo.redirect(poll.get_admin_url())
else:
return render("poll_admin_edit.html", input_data=data, **value_dict)
def render_blog_entry(blog_id):
blog_info = get_blog_info(blog_id)
if blog_info is None:
return None
else:
value_dict = get_default_values()
value_dict["title"] = blog_info[0]
value_dict["date"] = blog_info[1]
value_dict["link"] = blog_info[2]
value_dict["body"] = blog_info[3]
return render("blog_entry.html", **value_dict)
def get_blog_info(blog_id):
blog_file = os.path.join(BLOG_DIR, blog_id)
try:
input = open(blog_file)
title = input.readline()
body = input.read()
input.close()
except IOError:
return None
date = "%s.%s.%s %s:%s" % (blog_id[6:8], blog_id[4:6], blog_id[0:4],
blog_id[8:10], blog_id[10:12])
link = "%sblog/%s" % (get_default_values()["base_url"], blog_id)
body = tools.creole2html(body.decode("utf-8"))
return title, date, link, body
def get_blog_ids():
def add_files_to_list(file_list, dirname, fnames):
for fname in fnames:
if re.match(r"^[0-9]{12}$", fname) \
and os.path.isfile(os.path.join(dirname, fname)):
file_list.append(fname)
# remove all entries (e.g. sub-directories)
while len(fnames) > 0:
del fnames[0]
file_list = []
os.path.walk(BLOG_DIR, add_files_to_list, file_list)
file_list.sort(reverse = True)
return file_list
@bobo.query('/blog')
@bobo.query('/blog/')
@bobo.query('/blog/:blog_id')
@bobo.query('/blog/:blog_id/')
def serve_blog(blog_id=None):
value_dict = get_default_values()
if blog_id and re.match(r"^[0-9]{12}$", blog_id):
# the blog_id should consist of 12 digits
result = render_blog_entry(blog_id)
if not result is None:
return result
# if anything fails: render the blog list
blog_list = []
# add all valid blog infos to the list
for blog_id in get_blog_ids():
blog_info = get_blog_info(blog_id)
if not blog_info is None:
blog_list.append(blog_info)
value_dict["blog_list"] = blog_list
return render("blog_list.html", **value_dict)
@bobo.query('')
def base():
return bobo.redirect(BASE_DICT["base_url"])
@bobo.query('/')
@bobo.query('/public')
@bobo.query('/public/')
@bobo.query('/public/page/:page')
def show_frontpage(page=None):
return show_poll_list("frontpage.html", 20, page)
@bobo.query('/admin')
@bobo.query('/admin/')
@bobo.query('/admin/page/:page')
def show_admin_page(page=None, page_size=20):
try:
page_size = int(page_size)
except ValueError:
page_size = 20
return show_poll_list("admin.html", page_size, page, filter_private=False)
def show_poll_list(render_file, page_size, page=None, filter_private=True):
value_dict = get_default_values()
polls = Poll.select().orderBy("-timestamp_creation")
# TODO: speed the filtering up by using SQL statements (see sqlobject "filter")
if filter_private:
polls = [poll for poll in polls if poll.get_settings()["public"]]
else:
# convert the sql query into a list (probably an expensive operation)
polls = [poll for poll in polls]
if page is None:
page = 1
else:
try:
page = int(page)
except ValueError:
page = 1
# "page" should at least be 1 - zero shows an empty list
page = max(1, page)
start = (page - 1) * page_size
if start >= len(polls):
start = 0
page = 1
end = start + page_size - 1
value_dict["polls"] = polls[start : end + 1]
# show a link for the next page, if more polls are available
value_dict["show_next_link"] = (end + 1 < len(polls))
value_dict["show_previous_link"] = (start > 0)
value_dict["page"] = page
return render(render_file, **value_dict)
def render_poll_admin(poll, add_related, del_related):
value_dict = get_default_values()
errors = {}
if not add_related is None:
other_poll_id = extract_poll_admin_id(add_related)
if other_poll_id == poll.id:
errors["related"] = u"Wortschlucker kann nicht mit sich selbst verknüpft werden"
elif other_poll_id is None:
errors["related"] = u"Wortschlucker nicht gefunden"
else:
related_polls = poll.get_related_polls()
if other_poll_id in [one_poll.id for one_poll in related_polls]:
errors["related"] = u"Dieser Wortschlucker wurde bereits verknüpft"
else:
PollRelation(first=poll.id, second=other_poll_id)
if not del_related is None:
other_poll_id = extract_poll_admin_id(del_related)
if other_poll_id is None:
errors["related"] = u"Wortschlucker nicht gefunden"
else:
related_polls = poll.get_related_polls()
if not other_poll_id in [one_poll.id for one_poll in related_polls]:
errors["related"] = u"Dieser Wortschlucker war nicht verknüpft"
else:
# delete all relations between these two polls
PollRelation.deleteBy(first=poll.id, second=other_poll_id)
PollRelation.deleteBy(first=other_poll_id, second=poll.id)
value_dict["poll"] = poll
value_dict["errors"] = errors
return render("poll_admin_details.html", **value_dict)
@bobo.query('/:poll_hash')
@bobo.query('/:poll_hash/')
def show_one_poll(poll_hash=None, add_related=None, del_related=None):
value_dict = get_default_values()
poll_id = get_poll_id(poll_hash)
if not poll_id is None:
value_dict["poll"] = Poll.get(poll_id)
return render("poll_details.html", **value_dict)
else:
admin_poll_id = get_poll_admin_id(poll_hash)
if not admin_poll_id is None:
return render_poll_admin(Poll.get(admin_poll_id), add_related, del_related)
else:
return bobo.redirect(BASE_DICT["base_url"])
@bobo.query('/node/:pagename')
def show_static_nodes(pagename=None):
""" meant for serving hand-changed, automatically styled content. """
value_dict = get_default_values()
return render("nodes/"+pagename, **value_dict)
def get_static_file(filename):
""" deliver a static file - this function is used internally """
response = webob.Response()
content_type = mimetypes.guess_type(filename)[0]
if content_type is not None:
response.content_type = content_type
try:
response.body = open(filename).read()
except IOError:
raise bobo.NotFound
return response
@bobo.query('/media/:p1')
@bobo.query('/media/:p1/:p2')
@bobo.query('/media/:p1/:p2/:p3')
def static_files(p1=None, p2=None, p3=None):
""" deliver files up to three levels below /media/ """
pathlist = [p1, p2, p3]
pathname = os.path.join(BASE_DIR, "templates", "media")
for subdir in pathlist:
if not subdir is None:
pathname = os.path.join(pathname, subdir)
return get_static_file(pathname)
for table in (Poll, ContentSubmission, PollSetting, PollRelation):
#table.dropTable()
if not table.tableExists():
table.createTable()
# this line allows to use wortschlucker with mod_wsgi
# see: http://groups.google.com/group/bobo-web/msg/2ba55fc381658cd1
# see: http://blog.dscpl.com.au/2009/08/using-bobo-on-top-of-modwsgi.html
application = bobo.Application(bobo_resources=__name__)