#!/usr/bin/env python2.6 # -*- coding: utf-8 -*- import os # the basedir is the parent dir of the location of this script BASE_DIR = os.path.dirname(os.path.abspath(os.path.join(__file__, os.path.pardir))) # add the project directory to the python search path import sys sys.path.insert(0, os.path.join(BASE_DIR, "src")) import bobo import forms import sqlobject import genshi.filters import genshi.input import genshi.template import genshi import formencode import webob import ConfigParser import datetime import mimetypes import uuid import re CONFIG_FILE = os.path.join(BASE_DIR, "wortschlucker.conf") """ *** Initialization *** """ config = ConfigParser.SafeConfigParser() config.read(CONFIG_FILE) db_uri = config.get("database", "uri") sqlobject.sqlhub.processConnection = sqlobject.connectionForURI(db_uri) loader = genshi.template.TemplateLoader(os.path.join(BASE_DIR, 'templates'), auto_reload=False) BASE_DICT = { "base_url": "/", # the trailing slash is necessary "show_navbar": True, "errors": {}, } # used as the default setting for expose/close dates DEFAULT_DAYS_AHEAD = 7 DATE_FORMAT = "%d.%m.%Y" DEFAULT_DATE = datetime.datetime.now() + datetime.timedelta(days=DEFAULT_DAYS_AHEAD) POLL_SETTINGS = { "show_all_submissions": (bool, True), "show_statistics": (bool, True), "public": (bool, False), "expose_date": (datetime.datetime, DEFAULT_DATE), "close_date": (datetime.datetime, DEFAULT_DATE), } POLL_SETTING_TEMPLATES = { "brainstorm": {}, "cards": {"show_all_submissions": False}, "feedback": {"show_all_submissions": False}, "evaluation": {"show_all_submissions": False}, "notes": {"show_statistics": False}, "shopping": {"show_statistics": False}, "clipboard": {"show_statistics": False}, "namefinder": {}, } class ContentSubmission(sqlobject.SQLObject): submitter = sqlobject.UnicodeCol() content = sqlobject.UnicodeCol() poll_id = sqlobject.ForeignKey("Poll") timestamp_creation = sqlobject.DateTimeCol() def get_creation_time_string(self): return str(self.timestamp_creation) def get_markup_content(self): def get_link_markup(match): prefix, url, suffix = match.groups() # only take the TLD part of the url short_name = url.split("/")[2] return """%s%s%s""" % (prefix, url, short_name, suffix) # surround all urls with html markup mark_links = re.sub(r"(\A|\s)(https?://[\w/\?\.\#=;,]*)(\s|\Z)", get_link_markup, self.content) markup = genshi.input.HTML(mark_links) | genshi.filters.HTMLSanitizer() # the markup is now marked as "safe" -> genshi will output it literally return markup class PollSetting(sqlobject.SQLObject): poll_id = sqlobject.ForeignKey("Poll") key = sqlobject.UnicodeCol() value = sqlobject.UnicodeCol() class PollRelation(sqlobject.SQLObject): first = sqlobject.ForeignKey("Poll") second = sqlobject.ForeignKey("Poll") class Poll(sqlobject.SQLObject): author = sqlobject.UnicodeCol() hash_key = sqlobject.StringCol() admin_hash_key = sqlobject.StringCol() title = sqlobject.UnicodeCol() description = sqlobject.UnicodeCol() timestamp_creation = sqlobject.DateTimeCol() def get_related_polls(self): """ get all directly and indirectly connected polls up to a certain distance """ return PollMesh(self).get_related_polls() def get_related_polls_direct(self): """ get all directly connected polls """ related = [] related.extend([poll.second for poll in PollRelation.selectBy(first=self.id)]) related.extend([poll.first for poll in PollRelation.selectBy(second=self.id)]) return related def get_settings(self): current_dict = {} for setting in PollSetting.selectBy(poll_id=self.id): if setting.key in POLL_SETTINGS.keys(): current_dict[setting.key] = validate_poll_setting(setting.key, setting.value) for key, meta_info in POLL_SETTINGS.items(): if not key in current_dict.keys(): current_dict[key] = meta_info[1] return current_dict def get_settings_strings(self): settings = self.get_settings() result = {} for key, value in settings.items(): result[key] = get_poll_setting_string(key, value) return result def change_setting(self, key, value): validated_value = validate_poll_setting(key, value) if not validated_value is None: validated_value = get_poll_setting_string(key, validated_value) poll_setting = PollSetting.selectBy(poll_id=self.id, key=key) if poll_setting.count() == 1: poll_setting[0].value = validated_value elif poll_setting.count() == 0: PollSetting(poll_id=self.id, key=key, value=validated_value) def get_num_of_submitters(self): all_submitters = [submission.submitter for submission in ContentSubmission.selectBy(poll_id=self.id)] unique_submitters = [] for submitter in all_submitters: if not submitter in unique_submitters: unique_submitters.append(submitter) return len(unique_submitters) def get_num_of_submissions(self): return ContentSubmission.selectBy(poll_id=self.id).count() def get_submissions(self): return ContentSubmission.selectBy(poll_id=self.id) def delete_poll(self): submissions = ContentSubmission.selectBy(poll_id=self.id) settings = PollSetting.selectBy(poll_id=self.id) for submission in submissions: submission.destroySelf() for setting in settings: setting.destroySelf() self.destroySelf() def get_url(self): return "%s%s" % (BASE_DICT["base_url"], self.hash_key) def get_submit_url(self): return "%s%s/submit" % (BASE_DICT["base_url"], self.hash_key) def get_admin_url(self): return "%s%s" % (BASE_DICT["base_url"], self.admin_hash_key) def get_edit_url(self): return "%s%s/admin" % (BASE_DICT["base_url"], self.admin_hash_key) def get_delete_url(self): return "%s%s/delete" % (BASE_DICT["base_url"], self.admin_hash_key) def get_creation_time_string(self): return str(self.timestamp_creation) class PollMesh: """ generate a mesh of directly or indirectly related polls Basically this is just a recursive search for unique related polls that are connected to the base poll with less than (e.g.) five nodes in between. """ def __init__(self, poll, depth=5): self.related = [] # start to collect the related polls immediately self.__collect_related_polls(poll, depth) def __collect_related_polls(self, current_poll, current_depth): """ recursive scanning for unique related polls up to a certain distance """ related = current_poll.get_related_polls_direct() new_queue = [poll for poll in related if not poll in self.related] self.related.extend(new_queue) if current_depth > 0: for poll in new_queue: self.__collect_related_polls(poll, current_depth - 1) def get_related_polls(self): return self.related def get_poll_setting_string(key, value): if not key in POLL_SETTINGS.keys(): return "" setting_type = POLL_SETTINGS[key][0] if setting_type in (basestring, unicode, str): return value elif setting_type == bool: return str(value) elif setting_type == datetime.datetime: # unset dates are None if value is None: return "" elif value == "": # value for "forever" return "" else: return value.strftime(DATE_FORMAT) else: return str(value) def validate_poll_setting(key, value): if not key in POLL_SETTINGS.keys(): return None setting_type = POLL_SETTINGS[key][0] if setting_type in (basestring, unicode, str): return value elif setting_type == bool: if value is None: return False elif isinstance(value, bool): return value else: text = value.lower() if text in ("0", "false", "no", "off", "disabled", ""): return False elif text in ("1", "true", "yes", "on", "enabled"): return True else: return None elif setting_type == datetime.datetime: if value is None: # default: one week later value = datetime.date.today() + datetime.timedelta(days=DEFAULT_DAYS_AHEAD) elif type(value) == datetime.datetime: pass elif value == "": # this is the value for "forever" return "" else: try: value = datetime.datetime.strptime(value, DATE_FORMAT) except ValueError: value = None return value elif setting_type == int: if value is None: value = 0 try: value = int(value) except ValueError: value = None return value else: # all other types (e.g. int, float, ...) try: return setting_type(value) except ValueError: return None def get_default_values(**kwargs): value_dict = dict(BASE_DICT) for key, value in kwargs.items(): value_dict[key] = value return value_dict def render(filename, input_data=None, **values): stream = loader.load(filename).generate(**values) if not input_data is None: stream |= genshi.filters.HTMLFormFiller(data=input_data) return stream.render("html", doctype="html") def get_poll_id(hash_key): if isinstance(hash_key, unicode): try: hash_key = str(hash_key) except UnicodeEncodeError: return None polls = Poll.selectBy(hash_key=hash_key) if polls.count() == 1: return polls[0].id else: return None def get_poll_admin_id(hash_key): if isinstance(hash_key, unicode): try: hash_key = str(hash_key) except UnicodeEncodeError: return None polls = Poll.selectBy(admin_hash_key=hash_key) if polls.count() == 1: return polls[0].id else: return None def extract_poll_admin_id(text): """ The text may be an admin hash or a the admin link of a poll """ result = get_poll_admin_id(text) if result is None: extracted_text = re.findall(r"[a-z0-9]+", text) # we assume that the hash is at the end of the string extracted_text.reverse() for found in extracted_text: guess = get_poll_admin_id(found) if not guess is None: return guess return None else: return result def get_new_hash_key(length=16, charset=None): """ returns a quite random hash key with the specified length """ if charset is None: charset = "0123456789abcdefghijklmnopqrstuvwxyz" def get_hash_string(length): base = uuid.uuid4().int result = [] while len(result) < length: value = base % len(charset) base //= len(charset) result.append(charset[value]) return "".join(result) # repeat the hash generation until a new value is found hash_key = get_hash_string(length) while (not get_poll_id(hash_key) is None) or (not get_poll_admin_id(hash_key) is None): hash_key = get_hash_string(length) return hash_key @bobo.query('/new') @bobo.query('/new/:template') @bobo.query('/new/:author/:title/:description') def new_poll(submit=None, cancel=None, author=None, title=None, description=None, template=None, hide_errors=False): value_dict = get_default_values() data = {"author": author, "title": title, "description": description, "template": template} if cancel: return bobo.redirect(BASE_DICT["base_url"]) elif not submit: # show the "new poll" form return render("poll_new.html", input_data=data, **value_dict) else: # create the new poll (if it is valid) errors = {} try: data = forms.PollForm.to_python(data) except formencode.Invalid, errors_packed: errors = errors_packed.unpack_errors() if errors: if not hide_errors: value_dict["errors"] = errors return render("poll_new.html", input_data=data, **value_dict) else: # get the template settings poll_settings = POLL_SETTINGS.copy() if template in POLL_SETTING_TEMPLATES.keys(): template_settings = POLL_SETTING_TEMPLATES[template].copy() else: template_settings = POLL_SETTING_TEMPLATES["brainstorm"] for key, value in template_settings.items(): poll_settings[key] = value # create the new poll hash_key = get_new_hash_key() admin_hash_key = get_new_hash_key() now = datetime.datetime.now() new_poll = Poll(hash_key=hash_key, admin_hash_key=admin_hash_key, timestamp_creation=now, author=data["author"], title=data["title"], description=data["description"]) # apply the template settings for key, value in template_settings.items(): new_poll.change_setting(key, value) return bobo.redirect(new_poll.get_admin_url()) @bobo.query('/:hash_key/submit') def submit_content(hash_key=None, submitter=None, content=None): value_dict = get_default_values() data = {"submitter": submitter, "content": content} poll_id = get_poll_id(hash_key) if not poll_id is None: poll = Poll.get(poll_id) value_dict["poll"] = poll errors = {} try: data = forms.SubmitForm.to_python(data) except formencode.Invalid, errors_packed: errors = errors_packed.unpack_errors() if errors: value_dict["errors"] = errors return render("poll_details.html", input_data=data, **value_dict) else: # create the new submission content data["timestamp_creation"] = datetime.datetime.now() data["poll_id"] = poll.id ContentSubmission(**data) # remove "content" for the next input del data["content"] return render("poll_details.html", input_data=data, **value_dict) return bobo.redirect(BASE_DICT["base_url"]) @bobo.query('/:admin_hash_key/delete') def delete_poll(admin_hash_key=None): admin_poll_id = get_poll_admin_id(admin_hash_key) if not admin_poll_id is None: poll = Poll.get(admin_poll_id) poll.delete_poll() return bobo.redirect(BASE_DICT["base_url"]) @bobo.query('/:admin_hash_key/admin') def admin_poll(cancel=False, submit=None, admin_hash_key=None, author=None, title=None, description=None, settings=None, setting_expose_date=None, setting_expose_date_forever=None, setting_close_date=None, setting_close_date_forever=None): value_dict = get_default_values() data = {"author": author, "title": title, "description": description, "template": ""} poll_id = get_poll_admin_id(admin_hash_key) if poll_id is None: return bobo.redirect(BASE_DICT["base_url"]) poll = Poll.get(poll_id) # cancel: return to (non-edit) admin page if cancel: return bobo.redirect(poll.get_admin_url()) if author is None: data["author"] = poll.author if title is None: data["title"] = poll.title if description is None: data["description"] = poll.description if setting_expose_date_forever is None: if setting_expose_date is None: setting_expose_date = poll.get_settings_strings()["expose_date"] else: setting_expose_date = "" if setting_close_date_forever is None: if setting_close_date is None: setting_close_date = poll.get_settings_strings()["close_date"] else: setting_close_date = "" poll_settings = poll.get_settings() # update the settings only after a submit (otherwise we clear all current settings) if submit: # override with the given settings (taken from the form input with the prefix "setting_") if settings is None: settings = [] elif not isinstance(settings, list): settings = [settings] else: # it is a list - everything is fine pass for setting_key in poll_settings.keys(): poll_settings[setting_key] = setting_key in settings # collect all errors errors = {} # add boolean "settings" after forms validation - since there is no destination type data["settings"] = [key for key, value in poll_settings.items() if value is True] for key, value in (("expose_date", setting_expose_date), ("close_date", setting_close_date)): validated_value = validate_poll_setting(key, value) if value == "": data["setting_%s_forever" % key] = "yes" data["setting_%s" % key] = value else: data["setting_%s_forever" % key] = "no" if validated_value is None: # keep the entered value and report an error errors[key] = u"Ungültiges Datum" data["setting_%s" % key] = value else: data["setting_%s" % key] = get_poll_setting_string(key, validated_value) # use the validator to check for possible errors if submit: # check for errors only if the content is submitted (not just rendered) try: data = forms.PollSettingsForm.to_python(data) except formencode.Invalid, errors_packed: errors = errors_packed.unpack_errors() # store the new settings if errors: value_dict["errors"] = errors return render("poll_admin_edit.html", input_data=data, **value_dict) else: if submit: # update core attributes of the existing poll poll.author = data["author"] poll.title = data["title"] poll.description = data["description"] current_settings = poll.get_settings() # update settings for key, value in poll_settings.items(): if (POLL_SETTINGS[key][0] == bool) and (current_settings[key] != value): poll.change_setting(key, value) poll.change_setting("expose_date", data["setting_expose_date"]) poll.change_setting("close_date", data["setting_close_date"]) return bobo.redirect(poll.get_admin_url()) else: return render("poll_admin_edit.html", input_data=data, **value_dict) @bobo.query('') def base(): return bobo.redirect(BASE_DICT["base_url"]) @bobo.query('/') def show_frontpage(page=None): return show_poll_list("frontpage.html", 10, page) @bobo.query('/admin') @bobo.query('/admin/') def show_admin_page(page=None, page_size=20): try: page_size = int(page_size) except ValueError: page_size = 20 return show_poll_list("admin.html", page_size, page, filter_private=False) def show_poll_list(render_file, page_size, page=None, filter_private=True): value_dict = get_default_values() polls = Poll.select().orderBy("-timestamp_creation") # TODO: speed the filtering up by using SQL statements (see sqlobject "filter") if filter_private: polls = [poll for poll in polls if poll.get_settings()["public"]] else: # convert the sql query into a list (probably an expensive operation) polls = [poll for poll in polls] if page is None: page = 1 else: try: page = int(page) except ValueError: page = 1 start = (page - 1) * page_size if start >= len(polls): start = 0 page = 1 end = start + page_size - 1 value_dict["polls"] = polls[start : end + 1] # show a link for the next page, if more polls are available value_dict["show_next_link"] = (end + 1 < len(polls)) value_dict["show_previous_link"] = (start > 0) value_dict["page"] = page return render(render_file, **value_dict) def render_poll_admin(poll, add_related, del_related): value_dict = get_default_values() errors = {} if not add_related is None: other_poll_id = extract_poll_admin_id(add_related) if other_poll_id == poll.id: errors["related"] = u"Wortschlucker kann nicht mit sich selbst verknüpft werden" elif other_poll_id is None: errors["related"] = u"Wortschlucker nicht gefunden" else: related_polls = poll.get_related_polls() if other_poll_id in [one_poll.id for one_poll in related_polls]: errors["related"] = u"Dieser Wortschlucker wurde bereits verknüpft" else: PollRelation(first=poll.id, second=other_poll_id) if not del_related is None: other_poll_id = extract_poll_admin_id(del_related) if other_poll_id is None: errors["related"] = u"Wortschlucker nicht gefunden" else: related_polls = poll.get_related_polls() if not other_poll_id in [one_poll.id for one_poll in related_polls]: errors["related"] = u"Dieser Wortschlucker war nicht verknüpft" else: # delete all relations between these two polls PollRelation.deleteBy(first=poll.id, second=other_poll_id) PollRelation.deleteBy(first=other_poll_id, second=poll.id) value_dict["poll"] = poll value_dict["errors"] = errors return render("poll_admin_details.html", **value_dict) @bobo.query('/:poll_hash') def show_one_poll(poll_hash=None, add_related=None, del_related=None): value_dict = get_default_values() poll_id = get_poll_id(poll_hash) if not poll_id is None: value_dict["poll"] = Poll.get(poll_id) return render("poll_details.html", **value_dict) else: admin_poll_id = get_poll_admin_id(poll_hash) if not admin_poll_id is None: return render_poll_admin(Poll.get(admin_poll_id), add_related, del_related) else: return bobo.redirect(BASE_DICT["base_url"]) @bobo.query('/node/:pagename') def show_static_nodes(pagename=None): """ meant for serving hand-changed, automatically styled content. """ value_dict = get_default_values() return render("nodes/"+pagename, **value_dict) def get_static_file(filename): """ deliver a static file - this function is used internally """ response = webob.Response() content_type = mimetypes.guess_type(filename)[0] if content_type is not None: response.content_type = content_type try: response.body = open(filename).read() except IOError: raise bobo.NotFound return response @bobo.query('/media/:p1') @bobo.query('/media/:p1/:p2') @bobo.query('/media/:p1/:p2/:p3') def static_files(p1=None, p2=None, p3=None): """ deliver files up to three levels below /media/ """ pathlist = [p1, p2, p3] pathname = os.path.join(BASE_DIR, "templates", "media") for subdir in pathlist: if not subdir is None: pathname = os.path.join(pathname, subdir) return get_static_file(pathname) for table in (Poll, ContentSubmission, PollSetting, PollRelation): #table.dropTable() if not table.tableExists(): table.createTable() # this line allows to use wortschlucker with mod_wsgi # see: http://groups.google.com/group/bobo-web/msg/2ba55fc381658cd1 #application = bobo.Application(bobo_resources=__name__)