codekasten/umfrage_kirchenplatz2012/src/umfrage.py

409 lines
14 KiB
Python
Raw Normal View History

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
#
# umfrage - einfache web-basierte Meinungsabfragen
# Copyright (C) 2012 - Lars Kruse <devel@sumpfralle.de>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import os
# the basedir is the parent dir of the location of this script
BASE_DIR = os.path.dirname(os.path.abspath(os.path.join(__file__, os.path.pardir)))
# add the project directory to the python search path
import sys
sys.path.insert(0, os.path.join(BASE_DIR, "src"))
from dataset import DEFINITION_OPTIONS, DEFINITION_QUALITY_RANGES, \
DEFINITION_QUESTIONS
import ConfigParser
import uuid
import re
import smtplib
import email.mime.text
import email.utils
import sqlobject
import bobo
import genshi
import genshi.template
import genshi.filters
CONFIG_FILE = os.path.join(BASE_DIR, "umfrage.conf")
MAIL_ADDRESS_REGEX = r"^([\w\-\.]+@(\w[\w\-]+\.)+[\w\-]+)$"
# Set the INIT_DB_UMFRAGE environment setting in order to clean up the
# database during the first initial run. All tables are removed and
# initialized again based on the content of src/dataset.py
# e.g. run the following:
# INIT_DB_UMFRAGE=1 python src/umfrage.py
RUN_FIRST_DB_INIT = "INIT_DB_UMFRAGE" in os.environ
config = ConfigParser.SafeConfigParser()
config.read(CONFIG_FILE)
db_uri = config.get("database", "uri")
sqlobject.sqlhub.processConnection = sqlobject.connectionForURI(db_uri)
loader = genshi.template.TemplateLoader(os.path.join(BASE_DIR, 'templates'), auto_reload=False)
class Session(sqlobject.SQLObject):
created = sqlobject.DateTimeCol(default=sqlobject.DateTimeCol.now)
name = sqlobject.UnicodeCol(default=lambda: uuid.uuid4().hex)
class Question(sqlobject.SQLObject):
text = sqlobject.UnicodeCol()
weight = sqlobject.IntCol()
quality_levels = sqlobject.PickleCol()
class Option(sqlobject.SQLObject):
title = sqlobject.UnicodeCol()
text = sqlobject.UnicodeCol()
image = sqlobject.UnicodeCol()
weight = sqlobject.IntCol()
class Answer(sqlobject.SQLObject):
text = sqlobject.UnicodeCol(default="")
quality = sqlobject.UnicodeCol(default="")
question = sqlobject.ForeignKey("Question")
option = sqlobject.ForeignKey("Option")
session = sqlobject.ForeignKey("Session")
class Submission(sqlobject.SQLObject):
session = sqlobject.ForeignKey("Session")
timestamp = sqlobject.DateTimeCol(default=sqlobject.DateTimeCol.now)
to_default_destination = sqlobject.BoolCol()
title = sqlobject.UnicodeCol()
text = sqlobject.UnicodeCol()
def get_default_values(**kwargs):
value_dict = {}
# we always need "options" (e.g. on frontpage)
value_dict["options"] = list(Option.select())
value_dict["options"].sort(key=lambda item: item.weight)
# useful values for different pages
value_dict["base_url"] = config.get("hosting", "full_url")
value_dict["to_address"] = config.get("content", "to_address")
value_dict["errors"] = []
# the base_url is expected to end with a slash
if not value_dict["base_url"].endswith("/"):
value_dict["base_url"] += "/"
for key, value in kwargs.items():
value_dict[key] = value
return value_dict
def render(filename, input_data=None, **values):
stream = loader.load(filename).generate(**values)
if not input_data is None:
stream |= genshi.filters.HTMLFormFiller(data=input_data)
return stream.render("html", doctype="html")
def get_previous_question(question):
questions = Question.select().orderBy("-weight")
for one_q in questions:
if one_q.weight < question.weight:
return one_q
return None
def get_next_question(question):
questions = Question.select().orderBy("weight")
for one_q in questions:
if one_q.weight > question.weight:
return one_q
return None
def get_last_question():
questions = list(Question.select().orderBy("weight"))
if questions:
return questions[-1]
else:
return None
def get_session(session_id):
session = None
if session_id:
sessions = Session.selectBy(name=session_id)
if sessions.count() > 0:
session = sessions[0]
return session
def send_mail(to_address, from_address, subject, text, sender=None):
msg = email.mime.text.MIMEText(unicode(text), _charset="utf-8")
msg["Subject"] = unicode(subject)
msg["From"] = from_address
msg["To"] = to_address
if sender:
msg["Sender"] = sender
msg["Date"] = email.utils.formatdate()
use_ssl = config.get("mail", "use_ssl", "no")
use_ssl = use_ssl.lower() in ("1", "true", "yes", "on", "enabled")
host = config.get("mail", "host", "localhost")
if use_ssl:
# see http://bugs.python.org/issue11927 (Python 2.7 uses port 25 instead of 465 by default)
s = smtplib.SMTP_SSL(host, port=465)
else:
s = smtplib.SMTP(host)
s.sendmail(from_address, [to_address], msg.as_string())
s.quit()
@bobo.query('/submit')
def do_submit(session_id=None, subject=None, from_address=None,
to_address=None, summary_text=None, go_backward=None):
session = get_session(session_id)
if go_backward:
return render_question(session, get_last_question())
input_data = {}
params = get_default_values()
if not session:
return bobo.redirect(params["base_url"])
params["session"] = session
if not subject:
params["errors"].append(u"Bitte tragen Sie einen Email-Titel (Betreff) ein!")
else:
input_data["subject"] = subject
if from_address:
# even with invalid input: store it for another attempt
input_data["from_address"] = from_address
if from_address and re.match(MAIL_ADDRESS_REGEX, from_address):
pass
elif from_address:
params["errors"].append(u"Die Absende-Emailadresse scheint ungültig zu sein. Bitte korrigieren Sie dies!")
else:
params["errors"].append(u"Bitte tragen Sie eine Email-Adresse als Absender ein!")
if to_address:
# even with invalid input: store it for another attempt
input_data["to_address"] = to_address
if to_address and re.match(MAIL_ADDRESS_REGEX, to_address):
pass
elif to_address:
params["errors"].append(u"Die Ziel-Emailadresse scheint ungültig zu sein. Bitte korrigieren Sie dies!")
else:
params["errors"].append(u"Bitte tragen Sie eine Ziel-Adresse für die Email " + \
"ein (Vorgabe: %s)!" % params["to_address"])
if not summary_text:
params["errors"].append(u"Die Email darf nicht leer sein. Kehren Sie " + \
"zurück zum Fragebogen und klicken Sie erneut auf " + \
"'Abschließen' um den Vorgabetext wiederherzustellen!")
else:
input_data["summary_text"] = summary_text
if params["errors"]:
return render("summary.html", input_data=input_data, **params)
else:
admin_address = config.get("mail", "admin_address", "")
try:
send_mail(to_address, admin_address or from_address, subject, summary_text, sender=from_address)
except smtplib.SMTPException, err_msg:
params["errors"] = "Der Versand der Mail schlug fehl: %s" % err_msg
return render("summary.html", input_data=input_data, **params)
try:
if admin_address:
send_mail(from_address, admin_address, subject, summary_text)
except smtplib.SMTPException, err_msg:
pass
submission = Submission(session=session,
to_default_destination=(to_address == params["to_address"]),
title=subject, text=summary_text)
return render("submitted.html", **params)
def get_quality_text(question, quality):
for key, text in question.quality_levels:
if key == quality:
return text
return None
def get_answer_lines(answer):
is_empty = True
lines = []
lines.append("")
lines.append("## %s" % answer.option.title)
if answer.quality:
is_empty = False
lines.append("Bewertung: %s (%s)" % (answer.quality,
get_quality_text(answer.question, answer.quality)))
if answer.text.strip():
is_empty = False
lines.append("Kommentar:")
for line in answer.text.splitlines():
if line.strip():
lines.append("* %s" % line.strip())
if is_empty:
return []
else:
return lines
def get_summary_text(session):
lines = []
prefix = config.get("content", "text_prefix", "")
if prefix:
lines.append(prefix)
lines.append("")
questions = Question.select().orderBy("weight")
for question in questions:
answers = list(Answer.selectBy(session=session, question=question))
if not answers:
# no answers: skip this question
continue
answers.sort(key=lambda item: item.option.weight)
lines.append("")
lines.append("# %s" % question.text)
for answer in answers:
lines.extend(get_answer_lines(answer))
lines.append("")
lines.append("")
return os.linesep.join(lines)
def show_summary(session):
params = get_default_values()
params["session"] = session
input_data = {}
input_data["subject"] = config.get("content", "subject")
input_data["to_address"] = params["to_address"]
input_data["summary_text"] = get_summary_text(session)
return render("summary.html", input_data=input_data, **params)
def render_question(session, question):
input_data = {}
for option in Option.select():
for key in ("text", "quality"):
dict_key = "option_%s_%s" % (option.id, key)
answers = list(Answer.selectBy(session=session, question=question,
option=option))
if answers:
input_data[dict_key] = getattr(answers[0], key)
else:
# this seems to be necessary for the input filter of genshi
input_data[dict_key] = ""
for answer in Answer.selectBy(session=session, question=question):
for key in ("text", "quality"):
input_data["option_%s_%s" % (answer.option.id, key)] = \
getattr(answer, key)
params = get_default_values()
params.update({"session": session,
"question": question,
"next_question": get_next_question(question),
"previous_question": get_previous_question(question),
})
return render("question.html", input_data=input_data, **params)
@bobo.query('/db')
def show_db():
params = get_default_values()
params["Session"] = Session
params["Question"] = Question
params["Option"] = Option
params["Answer"] = Answer
params["Submission"] = Submission
return render("db.html", **params)
@bobo.query('/')
def update_question(bobo_request, session_id=None, question_id=None,
go_backward=False):
kwargs = bobo_request.params
session = get_session(session_id)
if not session:
session = Session()
params = get_default_values()
params["session"] = session
return render("start.html", **params)
question = None
if question_id:
try:
question = Question.get(int(question_id))
except ValueError:
pass
# any new input values? (update "Answer" objects)
target_question = None
if question:
for option in Option.select():
opt_params = {}
for key in ("text", "quality"):
dict_key = "option_%s_%s" % (option.id, key)
if (dict_key in kwargs) and kwargs[dict_key]:
opt_params[key] = kwargs[dict_key]
answers = Answer.selectBy(session=session, question=question,
option=option)
# at least one item was found
if opt_params:
if answers.count() > 0:
answer = answers[0]
else:
# create new one
answer = Answer(session=session, question=question,
option=option)
# update one value
for key in ("text", "quality"):
setattr(answer, key, opt_params.get(key, ""))
else:
# everything is empty -> delete answer
if ("option_%s_%s" % (option.id, "text") in kwargs) and \
(answers.count() > 0):
answers[0].destroySelf()
if go_backward:
target_question = get_previous_question(question)
else:
target_question = get_next_question(question)
if not target_question:
# special case: we are finished
return show_summary(session)
if not target_question:
# start with the first question
target_question = Question.select().orderBy("weight")[0]
return render_question(session, target_question)
@bobo.query('')
def redirect_startpage():
return bobo.redirect(get_default_values()["base_url"])
# initialize the tables (do it only once!)
if RUN_FIRST_DB_INIT:
tables = (Session, Question, Option, Answer, Submission)
# drop all
drop_tables = list(tables)
drop_tables.reverse()
for table in drop_tables:
if table.tableExists():
table.dropTable()
for table in tables:
table.createTable()
for index, (title, image_url, lines) in enumerate(DEFINITION_OPTIONS):
Option(title=title, image=image_url,
text=os.linesep.join(lines), weight=index)
for index, (text, quality_levels) in enumerate(DEFINITION_QUESTIONS):
Question(text=text, weight=index, quality_levels=DEFINITION_QUALITY_RANGES[quality_levels])
# this application should be usable with mod_wsgi
application = bobo.Application(bobo_resources=__name__)