#!/usr/bin/env python2.6 # -*- coding: utf-8 -*- import os # the basedir is the parent dir of the location of this script BASE_DIR = os.path.dirname(os.path.abspath(os.path.join(__file__, os.path.pardir))) # add the project directory to the python search path import sys sys.path.insert(0, os.path.join(BASE_DIR, "src")) import bobo import forms import sqlobject import cherrypy from genshi.template import TemplateLoader import genshi.filters import genshi.input import genshi import formencode import datetime import webob import mimetypes import uuid import re db_filename = os.path.join(BASE_DIR, "database.sqlite") database = sqlobject.connectionForURI("sqlite://" + db_filename) sqlobject.sqlhub.processConnection = database loader = TemplateLoader(os.path.join(BASE_DIR, 'templates'), auto_reload=True) BASE_DICT = { "base_url": "/", # the trailing slash is necessary "show_navbar": True, "errors": {}, } POLL_SETTINGS = { "show_all_submissions": (bool, True), "show_statistics": (bool, True), "public": (bool, False), "expose_date": (int, 0), "close_date": (int, 0), } POLL_SETTING_TEMPLATES = { "brainstorm": {"expose_date": 20}, "cards": {"show_all_submissions": False, "close_date": 30}, "feedback": {"show_all_submissions": False}, "evaluation": {"show_all_submissions": False}, "notes": {"show_statistics": False}, "shopping": {"show_statistics": False}, "clipboard": {"show_statistics": False}, "namefinder": {}, } class ContentSubmission(sqlobject.SQLObject): submitter = sqlobject.UnicodeCol() content = sqlobject.UnicodeCol() poll_id = sqlobject.ForeignKey("Poll") timestamp_creation = sqlobject.DateTimeCol() def get_creation_time_string(self): return str(self.timestamp_creation) def get_markup_content(self): def get_link_markup(match): prefix, url, suffix = match.groups() # only take the TLD part of the url short_name = url.split("/")[2] return """%s%s%s""" % (prefix, url, short_name, suffix) # surround all urls with html markup mark_links = re.sub(r"(\A|\s)(https?://[\w/\?\.\#=;,]*)(\s|\Z)", get_link_markup, self.content) markup = genshi.input.HTML(mark_links) | genshi.filters.HTMLSanitizer() # the markup is now marked as "safe" -> genshi will output it literally return markup class PollSetting(sqlobject.SQLObject): poll_id = sqlobject.ForeignKey("Poll") key = sqlobject.UnicodeCol() value = sqlobject.UnicodeCol() class PollRelation(sqlobject.SQLObject): first = sqlobject.ForeignKey("Poll") second = sqlobject.ForeignKey("Poll") class Poll(sqlobject.SQLObject): author = sqlobject.UnicodeCol() hash_key = sqlobject.StringCol() admin_hash_key = sqlobject.StringCol() title = sqlobject.UnicodeCol() description = sqlobject.UnicodeCol() timestamp_creation = sqlobject.DateTimeCol() def get_related_polls(self): related = [] related.extend([poll.second for poll in PollRelation.selectBy(first=self.id)]) related.extend([poll.first for poll in PollRelation.selectBy(second=self.id)]) return related def get_settings(self): current_dict = {} for setting in PollSetting.selectBy(poll_id=self.id): if setting.key in POLL_SETTINGS.keys(): current_dict[setting.key] = validate_poll_setting(setting.key, setting.value) for key, meta_info in POLL_SETTINGS.items(): if not key in current_dict.keys(): current_dict[key] = meta_info[1] return current_dict def change_setting(self, key, value): validated_value = validate_poll_setting(key, value) if not validated_value is None: poll_setting = PollSetting.selectBy(poll_id=self.id, key=key) if poll_setting.count() == 1: poll_setting[0].value = str(validated_value) elif poll_setting.count() == 0: PollSetting(poll_id=self.id, key=key, value=str(validated_value)) def get_num_of_submitters(self): all_submitters = [submission.submitter for submission in ContentSubmission.selectBy(poll_id=self.id)] unique_submitters = [] for submitter in all_submitters: if not submitter in unique_submitters: unique_submitters.append(submitter) return len(unique_submitters) def get_num_of_submissions(self): return ContentSubmission.selectBy(poll_id=self.id).count() def get_submissions(self): return ContentSubmission.selectBy(poll_id=self.id) def delete_poll(self): submissions = ContentSubmission.selectBy(poll_id=self.id) settings = PollSetting.selectBy(poll_id=self.id) for submission in submissions: submission.destroySelf() for setting in settings: setting.destroySelf() self.destroySelf() def get_url(self): return "%s%s" % (BASE_DICT["base_url"], self.hash_key) def get_submit_url(self): return "%s%s/submit" % (BASE_DICT["base_url"], self.hash_key) def get_admin_url(self): return "%s%s" % (BASE_DICT["base_url"], self.admin_hash_key) def get_edit_url(self): return "%s%s/admin" % (BASE_DICT["base_url"], self.admin_hash_key) def get_delete_url(self): return "%s%s/delete" % (BASE_DICT["base_url"], self.admin_hash_key) def get_creation_time_string(self): return str(self.timestamp_creation) def validate_poll_setting(key, value): if not key in POLL_SETTINGS.keys(): return None setting_type = POLL_SETTINGS[key][0] if setting_type in (basestring, unicode, str): return value elif setting_type == bool: if value is None: value = "false" if isinstance(value, bool): return value else: text = value.lower() if text in ("0", "false", "no", "off", "disabled", "", None): return False elif text in ("1", "true", "yes", "on", "enabled"): return True else: return None elif setting_type == int: if value is None: value = 0 try: value = int(value) except ValueError: value = None return value else: # all other types (e.g. int, float, ...) try: return setting_type(value) except ValueError: return None def get_default_values(**kwargs): value_dict = dict(BASE_DICT) for key, value in kwargs.items(): value_dict[key] = value return value_dict def render(filename, input_data=None, **values): stream = loader.load(filename).generate(**values) if not input_data is None: stream |= genshi.filters.HTMLFormFiller(data=input_data) return stream.render("html", doctype="html") def get_poll_id(hash_key): if isinstance(hash_key, unicode): try: hash_key = str(hash_key) except UnicodeEncodeError: return None polls = Poll.selectBy(hash_key=hash_key) if polls.count() == 1: return polls[0].id else: return None def get_poll_admin_id(hash_key): if isinstance(hash_key, unicode): try: hash_key = str(hash_key) except UnicodeEncodeError: return None polls = Poll.selectBy(admin_hash_key=hash_key) if polls.count() == 1: return polls[0].id else: return None def extract_poll_admin_id(text): """ The text may be an admin hash or a the admin link of a poll """ result = get_poll_admin_id(text) if result is None: extracted_text = re.findall(r"[a-z0-9]+", text) # we assume that the hash is at the end of the string extracted_text.reverse() for found in extracted_text: guess = get_poll_admin_id(found) if not guess is None: return guess return None else: return result def get_new_hash_key(length=16, charset=None): """ returns a quite random hash key with the specified length """ if charset is None: charset = "0123456789abcdefghijklmnopqrstuvwxyz" def get_hash_string(length): base = uuid.uuid4().int result = [] while len(result) < length: value = base % len(charset) base //= len(charset) result.append(charset[value]) return "".join(result) # repeat the hash generation until a new value is found hash_key = get_hash_string(length) while (not get_poll_id(hash_key) is None) or (not get_poll_admin_id(hash_key) is None): hash_key = get_hash_string(length) return hash_key @bobo.query('/new') @bobo.query('/new/:template') @bobo.query('/new/:author/:title/:description') def new_poll(submit=None, cancel=None, author=None, title=None, description=None, template=None, hide_errors=False): value_dict = get_default_values() data = {"author": author, "title": title, "description": description, "template": template} if cancel: return bobo.redirect(BASE_DICT["base_url"]) elif not submit: # show the "new poll" form return render("poll_new.html", input_data=data, **value_dict) else: # create the new poll (if it is valid) errors = {} try: data = forms.PollForm.to_python(data) except formencode.Invalid, errors_packed: errors = errors_packed.unpack_errors() if errors: if not hide_errors: value_dict["errors"] = errors return render("poll_new.html", input_data=data, **value_dict) else: # get the template settings poll_settings = POLL_SETTINGS.copy() if template in POLL_SETTING_TEMPLATES.keys(): template_settings = POLL_SETTING_TEMPLATES[template].copy() else: template_settings = POLL_SETTING_TEMPLATES["brainstorm"] for key, value in template_settings.items(): poll_settings[key] = value # create the new poll hash_key = get_new_hash_key() admin_hash_key = get_new_hash_key() now = datetime.datetime.now() new_poll = Poll(hash_key=hash_key, admin_hash_key=admin_hash_key, timestamp_creation=now, author=data["author"], title=data["title"], description=data["description"]) # apply the template settings for key, value in template_settings.items(): new_poll.change_setting(key, value) return bobo.redirect(new_poll.get_admin_url()) @bobo.query('/:hash_key/submit') def submit_content(hash_key=None, submitter=None, content=None): value_dict = get_default_values() data = {"submitter": submitter, "content": content} poll_id = get_poll_id(hash_key) if not poll_id is None: poll = Poll.get(poll_id) value_dict["poll"] = poll errors = {} try: data = forms.SubmitForm.to_python(data) except formencode.Invalid, errors_packed: errors = errors_packed.unpack_errors() if errors: value_dict["errors"] = errors return render("poll_details.html", input_data=data, **value_dict) else: # create the new submission content data["timestamp_creation"] = datetime.datetime.now() data["poll_id"] = poll.id ContentSubmission(**data) # remove "content" for the next input del data["content"] return render("poll_details.html", input_data=data, **value_dict) return bobo.redirect(BASE_DICT["base_url"]) @bobo.query('/:admin_hash_key/delete') def delete_poll(admin_hash_key=None): admin_poll_id = get_poll_admin_id(admin_hash_key) if not admin_poll_id is None: poll = Poll.get(admin_poll_id) poll.delete_poll() return bobo.redirect(BASE_DICT["base_url"]) @bobo.query('/:admin_hash_key/admin') def admin_poll(cancel=False, submit=None, admin_hash_key=None, author=None, title=None, description=None, settings=None, setting_expose_date=None, setting_close_date=None): value_dict = get_default_values() data = {"author": author, "title": title, "description": description, "template": ""} poll_id = get_poll_admin_id(admin_hash_key) if poll_id is None: return bobo.redirect(BASE_DICT["base_url"]) poll = Poll.get(poll_id) # cancel: return to (non-edit) admin page if cancel: return bobo.redirect(poll.get_admin_url()) if author is None: data["author"] = poll.author if title is None: data["title"] = poll.title if description is None: data["description"] = poll.description poll_settings = poll.get_settings() # update the settings only after a submit (otherwise we clear all current settings) if submit: # override with the given settings (taken from the form input with the prefix "setting_") if settings is None: settings = [] elif not isinstance(settings, list): settings = [settings] else: # it is a list - everything is fine pass for setting_key in poll_settings.keys(): poll_settings[setting_key] = setting_key in settings # store the new settings or create the new poll errors = {} if submit: # check for errors only if the content is submitted (not just rendered) try: data = forms.PollForm.to_python(data) except formencode.Invalid, errors_packed: errors = errors_packed.unpack_errors() # add boolean "settings" after forms validation - since there is no destination type data["settings"] = [key for key, value in poll_settings.items() if value is True] validated_expose_date = validate_poll_setting("expose_date", setting_expose_date) if not validated_expose_date is None: data["setting_expose_date"] = validated_expose_date validated_close_date = validate_poll_setting("close_date", setting_close_date) if not validated_close_date is None: data["setting_close_date"] = validated_close_date if errors: value_dict["errors"] = errors return render("poll_admin_edit.html", input_data=data, **value_dict) else: if submit: # update core attributes of the existing poll poll.author = data["author"] poll.title = data["title"] poll.description = data["description"] current_settings = poll.get_settings() # update settings for key, value in poll_settings.items(): if current_settings[key] != value: poll.change_setting(key, value) poll.change_setting("expose_date", data["setting_expose_date"]) poll.change_setting("close_date", data["setting_close_date"]) return bobo.redirect(poll.get_admin_url()) else: return render("poll_admin_edit.html", input_data=data, **value_dict) @bobo.query('') def base(): return bobo.redirect(BASE_DICT["base_url"]) @bobo.query('/') def show_frontpage(page=None): return show_poll_list("frontpage.html", 10, page) @bobo.query('/admin') @bobo.query('/admin/') def show_admin_page(page=None, page_size=20): try: page_size = int(page_size) except ValueError: page_size = 20 return show_poll_list("admin.html", page_size, page, filter_private=False) def show_poll_list(render_file, page_size, page=None, filter_private=True): value_dict = get_default_values() polls = Poll.select().orderBy("-timestamp_creation") # TODO: speed the filtering up by using SQL statements (see sqlobject "filter") if filter_private: polls = [poll for poll in polls if poll.get_settings()["public"]] else: # convert the sql query into a list (probably an expensive operation) polls = [poll for poll in polls] if page is None: page = 1 else: try: page = int(page) except ValueError: page = 1 start = (page - 1) * page_size if start >= len(polls): start = 0 page = 1 end = start + page_size - 1 value_dict["polls"] = polls[start : end + 1] print value_dict["polls"] # show a link for the next page, if more polls are available value_dict["show_next_link"] = (end + 1 < len(polls)) value_dict["show_previous_link"] = (start > 0) value_dict["page"] = page return render(render_file, **value_dict) def render_poll_admin(poll, add_related, del_related): value_dict = get_default_values() errors = {} if not add_related is None: other_poll_id = extract_poll_admin_id(add_related) if other_poll_id == poll.id: errors["related"] = u"Wortschlucker kann nicht mit sich selbst verknüpft werden" elif other_poll_id is None: errors["related"] = u"Wortschlucker nicht gefunden" else: related_polls = poll.get_related_polls() if other_poll_id in [one_poll.id for one_poll in related_polls]: errors["related"] = u"Dieser Wortschlucker wurde bereits verknüpft" else: PollRelation(first=poll.id, second=other_poll_id) if not del_related is None: other_poll_id = extract_poll_admin_id(del_related) if other_poll_id is None: errors["related"] = u"Wortschlucker nicht gefunden" else: related_polls = poll.get_related_polls() if not other_poll_id in [one_poll.id for one_poll in related_polls]: errors["related"] = u"Dieser Wortschlucker war nicht verknüpft" else: # delete all relations between these two polls PollRelation.deleteBy(first=poll.id, second=other_poll_id) PollRelation.deleteBy(first=other_poll_id, second=poll.id) value_dict["poll"] = poll value_dict["errors"] = errors return render("poll_admin_details.html", **value_dict) @bobo.query('/:poll_hash') def show_one_poll(poll_hash=None, add_related=None, del_related=None): value_dict = get_default_values() poll_id = get_poll_id(poll_hash) if not poll_id is None: value_dict["poll"] = Poll.get(poll_id) return render("poll_details.html", **value_dict) else: admin_poll_id = get_poll_admin_id(poll_hash) if not admin_poll_id is None: return render_poll_admin(Poll.get(admin_poll_id), add_related, del_related) else: return bobo.redirect(BASE_DICT["base_url"]) @bobo.query('/node/:pagename') def show_static_nodes(pagename=None): """ meant for serving hand-changed, automatically styled content. """ value_dict = get_default_values() return render("nodes/"+pagename, **value_dict) def get_static_file(filename): """ deliver a static file - this function is used internally """ response = webob.Response() content_type = mimetypes.guess_type(filename)[0] if content_type is not None: response.content_type = content_type try: response.body = open(filename).read() except IOError: raise bobo.NotFound return response @bobo.query('/media/:p1') @bobo.query('/media/:p1/:p2') @bobo.query('/media/:p1/:p2/:p3') def static_files(p1=None, p2=None, p3=None): """ deliver files up to three levels below /media/ """ pathlist = [p1, p2, p3] pathname = os.path.join(BASE_DIR, "templates", "media") for subdir in pathlist: if not subdir is None: pathname = os.path.join(pathname, subdir) return get_static_file(pathname) for table in (Poll, ContentSubmission, PollSetting, PollRelation): #Poll.dropTable() if not table.tableExists(): table.createTable() for poll in Poll.select(): print poll # this line allows to use wortschlucker with mod_wsgi # see: http://groups.google.com/group/bobo-web/msg/2ba55fc381658cd1 #application = bobo.Application(bobo_resources=__name__)