codekasten/wortschlucker/wortschlucker.py

398 lines
14 KiB
Python
Raw Normal View History

2010-04-14 20:03:44 +02:00
#!/usr/bin/env python2.6
import os
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
# add the project directory to the python search path
2010-04-14 20:03:44 +02:00
import sys
sys.path.insert(0, BASE_DIR)
2010-04-14 20:03:44 +02:00
import forms
import sqlobject
import bobo
from genshi.template import TemplateLoader
2010-04-15 12:32:45 +02:00
import genshi.filters
import genshi.input
2010-04-14 20:03:44 +02:00
import genshi
import formencode
import datetime
import webob
import mimetypes
import uuid
import re
2010-04-14 20:03:44 +02:00
db_filename = os.path.join(BASE_DIR, "database.sqlite")
2010-04-14 20:03:44 +02:00
database = sqlobject.connectionForURI("sqlite://" + db_filename)
sqlobject.sqlhub.processConnection = database
loader = TemplateLoader(os.path.join(BASE_DIR, 'templates'), auto_reload=True)
2010-04-14 20:03:44 +02:00
BASE_DICT = {
"base_url": "/", # the trailing slash is necessary
"errors": {},
}
2010-04-15 12:32:45 +02:00
POLL_SETTINGS = {
"show_all_submissions": (bool, True),
2010-04-15 12:32:45 +02:00
"show_statistics": (bool, True),
}
class ContentSubmission(sqlobject.SQLObject):
2010-04-14 20:03:44 +02:00
submitter = sqlobject.StringCol()
content = sqlobject.StringCol()
poll_id = sqlobject.ForeignKey("Poll")
timestamp_creation = sqlobject.DateTimeCol()
def get_creation_time_string(self):
return str(self.timestamp_creation)
def get_markup_content(self):
def get_link_markup(match):
prefix, url, suffix = match.groups()
# only take the TLD part of the url
short_name = url.split("/")[2]
return """%s<a href="%s">%s</a>%s""" % (prefix, url, short_name, suffix)
# surround all urls with html markup
mark_links = re.sub(r"(\A|\s)(https?://[\w/\?\.\#=;,]*)(\s|\Z)", get_link_markup, self.content)
markup = genshi.input.HTML(mark_links) | genshi.filters.HTMLSanitizer()
# the markup is now marked as "safe" -> genshi will output it literally
return markup
2010-04-14 20:03:44 +02:00
2010-04-15 12:32:45 +02:00
class PollSetting(sqlobject.SQLObject):
poll_id = sqlobject.ForeignKey("Poll")
key = sqlobject.StringCol()
value = sqlobject.StringCol()
2010-04-14 20:03:44 +02:00
2010-04-14 20:03:44 +02:00
class Poll(sqlobject.SQLObject):
author = sqlobject.StringCol()
hash_key = sqlobject.StringCol()
2010-04-15 12:32:45 +02:00
admin_hash_key = sqlobject.StringCol()
2010-04-14 20:03:44 +02:00
title = sqlobject.StringCol()
description = sqlobject.StringCol()
timestamp_creation = sqlobject.DateTimeCol()
2010-04-15 12:32:45 +02:00
def get_settings(self):
current_dict = {}
for setting in PollSetting.selectBy(poll_id=self.id):
if setting.key in POLL_SETTINGS.keys():
current_dict[setting.key] = self.validate_poll_setting(setting.key, setting.value)
for key, meta_info in POLL_SETTINGS.items():
if not key in current_dict.keys():
current_dict[key] = meta_info[1]
return current_dict
def change_setting(self, key, value):
validated_value = self.validate_poll_setting(key, value)
if not validated_value is None:
poll_setting = PollSetting.selectBy(poll_id=self.id, key=key)
if poll_setting.count() == 1:
poll_setting.value = str(validated_value)
2010-04-14 20:03:44 +02:00
def get_num_of_submitters(self):
all_submitters = [submission.submitter for submission in ContentSubmission.selectBy(poll_id=self.id)]
2010-04-14 20:03:44 +02:00
unique_submitters = []
for submitter in all_submitters:
if not submitter in unique_submitters:
unique_submitters.append(submitter)
return len(unique_submitters)
def get_num_of_submissions(self):
return ContentSubmission.selectBy(poll_id=self.id).count()
def get_submissions(self):
return ContentSubmission.selectBy(poll_id=self.id)
2010-04-14 20:03:44 +02:00
2010-04-15 12:58:36 +02:00
def delete_poll(self):
submissions = ContentSubmission.selectBy(poll_id=self.id)
2010-04-15 12:58:36 +02:00
settings = PollSetting.selectBy(poll_id=self.id)
for submission in submissions:
submission.destroySelf()
for setting in settings:
setting.destroySelf()
self.destroySelf()
2010-04-14 20:03:44 +02:00
def get_url(self):
return "%spolls/%s" % (BASE_DICT["base_url"], self.hash_key)
def get_submit_url(self):
return "%spolls/%s/submit" % (BASE_DICT["base_url"], self.hash_key)
2010-04-15 12:32:45 +02:00
def get_admin_url(self):
return "%spolls/%s" % (BASE_DICT["base_url"], self.admin_hash_key)
def get_edit_url(self):
return "%spolls/%s/admin" % (BASE_DICT["base_url"], self.admin_hash_key)
2010-04-15 12:58:36 +02:00
def get_delete_url(self):
return "%spolls/%s/delete" % (BASE_DICT["base_url"], self.admin_hash_key)
2010-04-14 20:03:44 +02:00
def get_creation_time_string(self):
return str(self.timestamp_creation)
2010-04-15 12:32:45 +02:00
def validate_poll_setting(key, value):
if not key in POLL_SETTINGS.keys():
return None
setting_type = POLL_SETTINGS[key][0]
if setting_type in (basestring, unicode, str):
return value
elif setting_type == bool:
text = value.tolower()
if text in ("0", "false", "no", "off", "disabled", ""):
2010-04-15 12:32:45 +02:00
return False
elif text in ("1", "true", "yes", "on", "enabled"):
return True
else:
return None
else:
# all other types (e.g. int, float, ...)
try:
return setting_type(value)
except ValueError:
return None
2010-04-14 20:03:44 +02:00
def get_default_values(**kwargs):
value_dict = dict(BASE_DICT)
for key, value in kwargs.items():
value_dict[key] = value
return value_dict
2010-04-15 12:32:45 +02:00
def render(filename, input_data=None, **values):
stream = loader.load(filename).generate(**values)
if not input_data is None:
stream |= genshi.filters.HTMLFormFiller(data=input_data)
return stream.render("html", doctype="html")
2010-04-14 20:03:44 +02:00
def get_poll_id(hash_key):
polls = Poll.selectBy(hash_key=hash_key)
if polls.count() == 1:
return polls[0].id
else:
return None
2010-04-15 12:32:45 +02:00
def get_poll_admin_id(hash_key):
polls = Poll.selectBy(admin_hash_key=hash_key)
if polls.count() == 1:
return polls[0].id
else:
return None
2010-04-14 20:03:44 +02:00
def get_new_hash_key():
hash_key = uuid.uuid4().get_hex()
2010-04-15 12:32:45 +02:00
while (not get_poll_id(hash_key) is None) or (not get_poll_admin_id(hash_key) is None):
2010-04-14 20:03:44 +02:00
hash_key = uuid.uuid4().get_hex()
return hash_key
@bobo.query('/polls/new')
@bobo.query('/polls/new/:setting_defaults')
2010-04-14 20:03:44 +02:00
@bobo.query('/polls/new/:author/:title/:description')
def new_poll(submit=None, cancel=None, setting_defaults=None, author=None, title=None, description=None):
# TODO: implement "setting_defaults" for different (pre-defined) categories of polls
2010-04-14 20:03:44 +02:00
value_dict = get_default_values()
2010-04-15 12:32:45 +02:00
data = {"author": author, "title": title, "description": description}
2010-04-14 20:03:44 +02:00
if cancel:
return bobo.redirect(BASE_DICT["base_url"])
2010-04-15 12:32:45 +02:00
elif not submit:
# show the "new poll" form
return render("poll_new.html", input_data=data, **value_dict)
2010-04-14 20:03:44 +02:00
else:
2010-04-15 12:32:45 +02:00
# create the new poll (if it is valid)
2010-04-14 20:03:44 +02:00
errors = {}
try:
data = forms.PollForm.to_python(data)
except formencode.Invalid, errors_packed:
errors = errors_packed.unpack_errors()
if errors:
value_dict["errors"] = errors
return render("poll_new.html", input_data=data, **value_dict)
2010-04-14 20:03:44 +02:00
else:
# create the new poll
hash_key = get_new_hash_key()
2010-04-15 12:32:45 +02:00
admin_hash_key = get_new_hash_key()
2010-04-14 20:03:44 +02:00
now = datetime.datetime.now()
2010-04-15 12:32:45 +02:00
new_poll = Poll(hash_key=hash_key, admin_hash_key=admin_hash_key, timestamp_creation=now, **data)
return bobo.redirect(new_poll.get_admin_url())
@bobo.query('/polls/:hash_key/submit')
def submit_content(hash_key=None, submitter=None, content=None):
value_dict = get_default_values()
data = {"submitter": submitter, "content": content}
poll_id = get_poll_id(hash_key)
if not poll_id is None:
poll = Poll.get(poll_id)
value_dict["poll"] = poll
errors = {}
try:
data = forms.SubmitForm.to_python(data)
except formencode.Invalid, errors_packed:
errors = errors_packed.unpack_errors()
if errors:
value_dict["errors"] = errors
return render("poll_details.html", input_data=data, **value_dict)
else:
# create the new submission content
data["timestamp_creation"] = datetime.datetime.now()
data["poll_id"] = poll.id
ContentSubmission(**data)
# remove "content" for the next input
del data["content"]
return render("poll_details.html", input_data=data, **value_dict)
return bobo.redirect(BASE_DICT["base_url"])
2010-04-15 12:58:36 +02:00
@bobo.query('/polls/:admin_hash_key/delete')
def delete_poll(admin_hash_key=None):
admin_poll_id = get_poll_admin_id(admin_hash_key)
if not admin_poll_id is None:
poll = Poll.get(admin_poll_id)
poll.delete_poll()
return bobo.redirect(BASE_DICT["base_url"])
2010-04-15 12:32:45 +02:00
@bobo.query('/polls/:admin_hash_key/admin')
def admin_poll(cancel=False, submit=None, admin_hash_key=None, author=None, title=None, description=None, **kwargs):
value_dict = get_default_values()
data = {"author": author, "title": title, "description": description}
poll_id = get_poll_admin_id(admin_hash_key)
if poll_id is None:
return bobo.redirect(BASE_DICT["base_url"])
poll = Poll.get(poll_id)
# cancel: return to (non-edit) admin page
if cancel:
return bobo.redirect(poll.get_admin_url())
if author is None:
data["author"] = poll.author
if title is None:
data["title"] = poll.title
if description is None:
data["description"] = poll.description
settings = poll.get_settings()
# override with the given settings (taken from the form input with the prefix "setting_")
for setting_key, setting_value in kwargs.items():
if setting_key.startswith("setting_"):
real_key = setting_key[len("setting_"):]
if real_key in POLL_SETTINGS.keys():
valid_value = validate_poll_setting(real_key, setting_value)
if not valid_value is None:
settings[real_key] = valid_value
# store the new settings or create the new poll
errors = {}
if submit:
# check for errors only if the content is submitted (not just rendered)
try:
data = forms.PollForm.to_python(data)
except formencode.Invalid, errors_packed:
errors = errors_packed.unpack_errors()
# add "settings" after forms validation - since there is no destination type
data["settings"] = settings
# the admin hash should also not be validated - thus we may not add it before
if errors:
value_dict["errors"] = errors
return render("poll_admin.html", input_data=data, **value_dict)
else:
if submit:
# update core attributes of the existing poll
poll.author = data["author"]
poll.title = data["title"]
poll.description = data["description"]
current_settings = poll.get_settings()
# update settings
for key, value in settings.items():
if current_settings[key] != value:
poll.change_setting(key, value)
return bobo.redirect(poll.get_admin_url())
else:
return render("poll_admin.html", input_data=data, **value_dict)
2010-04-14 20:03:44 +02:00
@bobo.query('')
2010-04-15 12:32:45 +02:00
def base():
return bobo.redirect(BASE_DICT["base_url"])
2010-04-14 20:03:44 +02:00
@bobo.query('/polls')
2010-04-15 12:32:45 +02:00
def base_polls():
return bobo.redirect(BASE_DICT["base_url"] + "polls/")
@bobo.query('/')
def show_frontpage():
value_dict = get_default_values()
value_dict["polls"] = Poll.select()
return render("frontpage.html", **value_dict)
2010-04-14 20:03:44 +02:00
@bobo.query('/polls/')
def show_polls():
value_dict = get_default_values()
value_dict["polls"] = Poll.select()
return render("polls.html", **value_dict)
@bobo.query('/polls/:poll_hash')
def show_one_poll(poll_hash=None):
2010-04-15 12:32:45 +02:00
value_dict = get_default_values()
2010-04-14 20:03:44 +02:00
poll_id = get_poll_id(poll_hash)
if not poll_id is None:
value_dict["poll"] = Poll.get(poll_id)
return render("poll_details.html", **value_dict)
else:
2010-04-15 12:32:45 +02:00
admin_poll_id = get_poll_admin_id(poll_hash)
if not admin_poll_id is None:
value_dict["poll"] = Poll.get(admin_poll_id)
return render("poll_admin_details.html", **value_dict)
else:
return bobo.redirect(BASE_DICT["base_url"])
2010-04-15 14:43:23 +02:00
@bobo.query('/static/:pagename')
def show_static(pagename=None):
value_dict = get_default_values()
return render(pagename, **value_dict)
2010-04-14 20:03:44 +02:00
@bobo.query('/node/:pagename')
2010-04-17 13:46:51 +02:00
def show_static_nodes(pagename=None):
""" meant for serving hand-changed, automatically styled content. """
value_dict = get_default_values()
return render("nodes/"+pagename, **value_dict)
def get_static_file(filename):
""" deliver a static file - this function is used internally """
response = webob.Response()
content_type = mimetypes.guess_type(filename)[0]
if content_type is not None:
response.content_type = content_type
try:
response.body = open(filename).read()
except IOError:
raise bobo.NotFound
return response
2010-04-14 20:03:44 +02:00
@bobo.query('/media/:p1')
@bobo.query('/media/:p1/:p2')
@bobo.query('/media/:p1/:p2/:p3')
def static_files(p1=None, p2=None, p3=None):
""" deliver files up to three levels below /media/ """
pathlist = [p1, p2, p3]
pathname = os.path.join(BASE_DIR, "templates", "media")
for subdir in pathlist:
if not subdir is None:
pathname = os.path.join(pathname, subdir)
return get_static_file(pathname)
@bobo.query("/:anypath")
def custom_static_files(anypath=None):
""" deliver static files directly below the directory /static/ """
if anypath is None:
bobo.redirect(BASE_DICT["base_url"])
basedir = os.path.join(BASE_DIR, "static")
pathname = os.path.abspath(os.path.join(basedir, anypath))
if pathname.startswith(basedir) and os.path.isfile(pathname):
return get_static_file(pathname)
bobo.redirect(BASE_DICT["base_url"])
for table in (Poll, ContentSubmission, PollSetting):
#Poll.dropTable()
if not table.tableExists():
table.createTable()
for poll in Poll.select():
print poll
2010-04-15 12:32:45 +02:00
# this line allows to use wortschlucker with mod_wsgi
# see: http://groups.google.com/group/bobo-web/msg/2ba55fc381658cd1
#application = bobo.Application(bobo_resources=__name__)