added user account management (disabled by default)

This commit is contained in:
lars 2011-12-19 03:21:07 +00:00
parent ffbdaa4351
commit e4e2e416db
2 changed files with 255 additions and 48 deletions

View file

@ -17,3 +17,6 @@ class SubmitForm(formencode.Schema):
submitter = formencode.validators.UnicodeString(strip=True, not_empty=True)
content = formencode.validators.UnicodeString(strip=True, not_empty=True)
class ProfileForm(formencode.Schema):
email = formencode.validators.Email(resolve_domain=True, strip=True, not_empty=True)

View file

@ -1,4 +1,4 @@
#!/usr/bin/env python2.5
#!/usr/bin/env python2.6
# -*- coding: utf-8 -*-
import os
@ -24,6 +24,9 @@ import datetime
import mimetypes
import uuid
import re
import smtplib
import email.mime.text
import email.utils
import hashlib
import twitter
import urllib2
@ -99,6 +102,33 @@ class ContentSubmission(sqlobject.SQLObject):
return hashlib.md5(str(self.id)).hexdigest()
class Profile(sqlobject.SQLObject):
email = sqlobject.UnicodeCol(unique=True)
hash_key = sqlobject.StringCol(unique=True)
def get_user_polls(self, *args, **kwargs):
return self._get_generic_polls(False, *args, **kwargs)
def get_admin_polls(self, *args, **kwargs):
return self._get_generic_polls(True, *args, **kwargs)
def _get_generic_polls(self, is_admin, old_to_new=True):
polls = [user_poll.poll for user_poll in ProfilePolls.selectBy(
user=self.id, is_admin=is_admin)]
polls.sort(key=lambda poll: poll.timestamp_creation,
reverse=not old_to_new)
return polls
def get_url(self, absolute=False):
return get_url_string("%s%s/%s" % (BASE_DICT["base_url"], "profile", self.hash_key), absolute)
class ProfilePolls(sqlobject.SQLObject):
user = sqlobject.ForeignKey("Profile")
poll = sqlobject.ForeignKey("Poll")
is_admin = sqlobject.BoolCol()
class PollSetting(sqlobject.SQLObject):
poll_id = sqlobject.ForeignKey("Poll")
key = sqlobject.UnicodeCol()
@ -346,10 +376,55 @@ def validate_poll_setting(key, value):
except ValueError:
return None
def get_default_values(**kwargs):
def send_profile_mail(user):
data = {}
for key, default in (("host", "localhost"), ("port", 25),
("use_ssl", False), ("from", None),
("subject", None), ("body", None)):
try:
data[key] = config.get("mail", key, raw=True)
except ConfigParser.Error, err_msg:
if default is None:
# fail!
open("/tmp/test.out", "w").write(str(err_msg))
return False
data[key] = default
data["port"] = int(data["port"])
if data["use_ssl"]:
provider = smtplib.SMTP_SSL
else:
provider = smtplib.SMTP
input_data = {"profile_url": user.get_url(absolute=True)}
content = data["body"] % input_data
# empty lines can't be parsed via ConfigParser
# re.sub in Python 2.5 does not understand "flags" -> compile first
dot_line_regex = re.compile("^\.$", flags=re.MULTILINE)
content = dot_line_regex.sub("", content)
message = email.mime.text.MIMEText(content)
message["Subject"] = data["subject"]
message["From"] = data["from"]
message["To"] = user.email
message["Date"] = email.utils.formatdate()
try:
server = provider(data["host"], data["port"])
server.sendmail(data["from"], [user.email], message.as_string())
except smtplib.SMTPException:
return False
return True
def get_default_values(request, **kwargs):
value_dict = dict(BASE_DICT)
for key, value in kwargs.items():
value_dict[key] = value
# add configuration settings
try:
enable_users = config.get("users", "enabled")
except ConfigParser.Error:
enable_users = False
value_dict["enable_users"] = enable_users
# read the user's hash key from the cookie
if enable_users and ("user_hash_key" in request.cookies):
value_dict["user_hash_key"] = str(request.cookies["user_hash_key"])
return value_dict
def get_url_string(url, absolute=False):
@ -371,7 +446,13 @@ def render(filename, input_data=None, **values):
stream = loader.load(filename).generate(**values)
if not input_data is None:
stream |= genshi.filters.HTMLFormFiller(data=input_data)
return stream.render("html", doctype="html")
#return stream.render("html", doctype="html")
response = webob.Response(content_type="text/html", charset="utf8")
if "user_hash_key" in values:
response.set_cookie("user_hash_key", values["user_hash_key"],
max_age=365, path="/")
response.body = stream.render("html", doctype="html")
return response
def get_poll_id(hash_key):
if isinstance(hash_key, unicode):
@ -397,15 +478,19 @@ def get_poll_admin_id(hash_key):
else:
return None
def extract_poll_admin_id(text):
""" The text may be an admin hash or a the admin link of a poll """
result = get_poll_admin_id(text)
def extract_poll_id(text, admin=False):
""" The text may be an admin hash or the admin link of a poll """
if admin:
get_func = get_poll_admin_id
else:
get_func = get_poll_id
result = get_func(text)
if result is None:
extracted_text = re.findall(r"[a-z0-9]+", text)
# we assume that the hash is at the end of the string
extracted_text.reverse()
for found in extracted_text:
guess = get_poll_admin_id(found)
guess = get_func(found)
if not guess is None:
return guess
return None
@ -430,7 +515,6 @@ def get_new_hash_key(length=16, charset=None):
hash_key = get_hash_string(length)
return hash_key
def publish_twitter_alert(text, key, secret,access_key,access_secret):
api = twitter.Api(consumer_key= key, consumer_secret = secret, access_token_key=access_key, access_token_secret= access_secret)
try:
@ -442,13 +526,136 @@ def publish_twitter_alert(text, key, secret,access_key,access_secret):
except urllib2.URLError, e:
print e.reason
@bobo.query('/profile/logout')
def user_logout(bobo_request):
# default start page
response = show_frontpage(bobo_request)
# clear the cookie
response.delete_cookie("user_hash_key")
return response
@bobo.query('/profile/resend')
def resend_user_key(bobo_request, email=None, submit=None, email_sent=None):
value_dict = get_default_values(bobo_request)
value_dict["email_sent"] = False
value_dict["user_new"] = False
data = {"email": email}
if not submit:
return render("user_key.html", input_data=data, **value_dict)
else:
# find the user's data (if it exists)
try:
data = forms.ProfileForm.to_python(data)
except formencode.Invalid, errors_packed:
errors = errors_packed.unpack_errors()
users = Profile.selectBy(email=email)
if users.count():
user = users[0]
value_dict["user"] = user
value_dict["email_sent"] = True
value_dict["email_ok"] = send_profile_mail(user)
return render("user_key.html", **value_dict)
else:
errors = {}
errors["email"] = u"Diese E-Mail-Adresse ist nicht registriert. Möchtest du ein neues Profil anlegen?"
value_dict["user_new"] = True
value_dict["errors"] = errors
return render("user_key.html", input_data={"email": email},
**value_dict)
def manage_link_in_profile(user, link, add, value_dict):
poll = None
for is_admin in (False, True):
poll_id = extract_poll_id(link, admin=is_admin)
if poll_id:
poll = Poll.get(poll_id)
break
if poll:
connected = ProfilePolls.selectBy(poll=poll, user=user,
is_admin=is_admin)
if is_admin:
value_dict["profile_manage_poll_hash"] = poll.admin_hash_key
else:
value_dict["profile_manage_poll_hash"] = poll.hash_key
if add and not connected.count():
ProfilePolls(poll=poll, user=user, is_admin=is_admin)
value_dict["poll_remember"] = poll
return True
elif not add and connected.count():
connected[0].destroySelf()
value_dict["poll_forget"] = poll
return True
else:
return False
return False
@bobo.query('/profile/new')
def user_create(bobo_request, email=None, submit=None, add_link=None):
value_dict = get_default_values(bobo_request)
value_dict["user_new"] = True
data = {"email": email, "add_link": add_link}
if not submit:
return render("user_key.html", input_data=data, **value_dict)
else:
# create a new user (if email is valid)
try:
data = forms.ProfileForm.to_python(data)
except formencode.Invalid, errors_packed:
errors = errors_packed.unpack_errors()
users = Profile.selectBy(email=email)
if users.count():
# the mail address is already registered
errors = {}
errors["email"] = u"Diese E-Mail-Adresse ist bereits registriert. Sollen die Zugangsdaten erneut an die Adresse versandt werden?"
value_dict["errors"] = errors
value_dict["user_new"] = False
return render("user_key.html", input_data=data, **value_dict)
else:
# create a new user
hash_key = get_new_hash_key()
new_user = Profile(hash_key=hash_key, email=email)
value_dict["user"] = new_user
value_dict["email_sent"] = True
value_dict["email_ok"] = send_profile_mail(new_user)
return render("user_details.html", **value_dict)
@bobo.query('/profile')
@bobo.query('/profile/:hash_key')
@bobo.query('/:add_link/remember')
@bobo.query('/:del_link/forget')
def show_user(bobo_request, hash_key=None, add_link=None, del_link=None):
value_dict = get_default_values(bobo_request)
if hash_key:
# overwrite the cookie-based value, if a profile is explicitely given
user_hash_key = hash_key
elif "user_hash_key" in value_dict:
user_hash_key = value_dict["user_hash_key"]
else:
user_hash_key = None
if user_hash_key:
# sqlobject fails to handle unicode values -> convert to string
user_hash_key = str(user_hash_key)
users = Profile.selectBy(hash_key=user_hash_key)
if not user_hash_key or not users.count():
# TODO: store add/del link -> render instead of redirect
return bobo.redirect(BASE_DICT["base_url"] + "profile/new")
user = users[0]
value_dict["user"] = user
value_dict["user_hash_key"] = user_hash_key
if add_link:
manage_link_in_profile(user, add_link, True, value_dict)
elif del_link:
manage_link_in_profile(user, del_link, False, value_dict)
else:
pass
return render("user_details.html", **value_dict)
@bobo.query('/new')
@bobo.query('/new/:template')
@bobo.query('/new/:author/:title/:description')
def new_poll(submit=None, cancel=None, author=None, title=None,
def new_poll(bobo_request, submit=None, cancel=None, author=None, title=None,
description=None, template=None, hide_errors=False):
value_dict = get_default_values()
value_dict = get_default_values(bobo_request)
data = {"author": author, "title": title, "description": description,
"template": template}
if cancel:
@ -489,8 +696,8 @@ def new_poll(submit=None, cancel=None, author=None, title=None,
return bobo.redirect(new_poll.get_admin_url())
@bobo.query('/:hash_key/submit')
def submit_content(hash_key=None, submitter=None, content=None):
value_dict = get_default_values()
def submit_content(bobo_request, hash_key=None, submitter=None, content=None):
value_dict = get_default_values(bobo_request)
data = {"submitter": submitter, "content": content}
poll_id = get_poll_id(hash_key)
if not poll_id is None:
@ -515,7 +722,7 @@ def submit_content(hash_key=None, submitter=None, content=None):
return bobo.redirect(BASE_DICT["base_url"])
@bobo.query('/:admin_hash_key/delete')
def delete_poll(admin_hash_key=None):
def delete_poll(bobo_request, admin_hash_key=None):
admin_poll_id = get_poll_admin_id(admin_hash_key)
if not admin_poll_id is None:
poll = Poll.get(admin_poll_id)
@ -523,7 +730,7 @@ def delete_poll(admin_hash_key=None):
return bobo.redirect(BASE_DICT["base_url"])
@bobo.query('/:admin_hash_key/delete/:submission_id_digest')
def delete_submission(admin_hash_key=None, submission_id_digest=None):
def delete_submission(bobo_request, admin_hash_key=None, submission_id_digest=None):
admin_poll_id = get_poll_admin_id(admin_hash_key)
if (not admin_poll_id is None) and (not submission_id_digest is None):
poll = Poll.get(admin_poll_id)
@ -536,10 +743,10 @@ def delete_submission(admin_hash_key=None, submission_id_digest=None):
return bobo.redirect(poll.get_admin_url())
@bobo.query('/:admin_hash_key/export')
def export_poll(admin_hash_key=None):
def export_poll(bobo_request, admin_hash_key=None):
""" Return a download file in csv format (date, author, text).
"""
value_dict = get_default_values()
value_dict = get_default_values(bobo_request)
poll_id = get_poll_admin_id(admin_hash_key)
if poll_id is None:
return bobo.redirect(BASE_DICT["base_url"])
@ -567,11 +774,11 @@ def export_poll(admin_hash_key=None):
return response
@bobo.query('/:admin_hash_key/admin')
def admin_poll(cancel=False, submit=None, admin_hash_key=None, author=None,
def admin_poll(bobo_request, cancel=False, submit=None, admin_hash_key=None, author=None,
title=None, description=None, settings=None,
setting_expose_date=None, setting_expose_date_forever=None,
setting_close_date=None, setting_close_date_forever=None):
value_dict = get_default_values()
value_dict = get_default_values(bobo_request)
data = {"author": author, "title": title, "description": description, "template": ""}
poll_id = get_poll_admin_id(admin_hash_key)
if poll_id is None:
@ -654,19 +861,18 @@ def admin_poll(cancel=False, submit=None, admin_hash_key=None, author=None,
else:
return render("poll_admin_edit.html", input_data=data, **value_dict)
def render_blog_entry(blog_id):
blog_info = get_blog_info(blog_id)
def update_blog_entry_values(bobo_request, blog_id, value_dict):
blog_info = get_blog_info(bobo_request, blog_id)
if blog_info is None:
return None
return False
else:
value_dict = get_default_values()
value_dict["title"] = blog_info[0]
value_dict["date"] = blog_info[1]
value_dict["link"] = blog_info[2]
value_dict["body"] = blog_info[3]
return render("blog_entry.html", **value_dict)
return True
def get_blog_info(blog_id):
def get_blog_info(bobo_request, blog_id):
blog_file = os.path.join(BLOG_DIR, blog_id)
try:
input = open(blog_file)
@ -677,7 +883,7 @@ def get_blog_info(blog_id):
return None
date = "%s.%s.%s %s:%s" % (blog_id[6:8], blog_id[4:6], blog_id[0:4],
blog_id[8:10], blog_id[10:12])
link = "%sblog/%s" % (get_default_values()["base_url"], blog_id)
link = "%sblog/%s" % (get_default_values(bobo_request)["base_url"], blog_id)
body = tools.creole2html(body.decode("utf-8"))
return title, date, link, body
@ -699,18 +905,17 @@ def get_blog_ids():
@bobo.query('/blog/')
@bobo.query('/blog/:blog_id')
@bobo.query('/blog/:blog_id/')
def serve_blog(blog_id=None):
value_dict = get_default_values()
def serve_blog(bobo_request, blog_id=None):
value_dict = get_default_values(bobo_request)
if blog_id and re.match(r"^[0-9]{12}$", blog_id):
# the blog_id should consist of 12 digits
result = render_blog_entry(blog_id)
if not result is None:
return result
if update_blog_entry_values(bobo_request, blog_id, value_dict):
return render("blog_entry.html", **value_dict)
# if anything fails: render the blog list
blog_list = []
# add all valid blog infos to the list
for blog_id in get_blog_ids():
blog_info = get_blog_info(blog_id)
blog_info = get_blog_info(bobo_request, blog_id)
if not blog_info is None:
blog_list.append(blog_info)
value_dict["blog_list"] = blog_list
@ -724,21 +929,21 @@ def base():
@bobo.query('/public')
@bobo.query('/public/')
@bobo.query('/public/page/:page')
def show_frontpage(page=None):
return show_poll_list("frontpage.html", 20, page)
def show_frontpage(bobo_request, page=None):
return show_poll_list(bobo_request, "frontpage.html", 20, page)
@bobo.query('/admin')
@bobo.query('/admin/')
@bobo.query('/admin/page/:page')
def show_admin_page(page=None, page_size=20):
def show_admin_page(bobo_request, page=None, page_size=20):
try:
page_size = int(page_size)
except ValueError:
page_size = 30
return show_poll_list("admin.html", page_size, page, filter_private=False)
return show_poll_list(bobo_request, "admin.html", page_size, page, filter_private=False)
def show_poll_list(render_file, page_size, page=None, filter_private=True):
value_dict = get_default_values()
def show_poll_list(bobo_request, render_file, page_size, page=None, filter_private=True):
value_dict = get_default_values(bobo_request)
polls = Poll.select().orderBy("-timestamp_creation")
# TODO: speed the filtering up by using SQL statements (see sqlobject "filter")
if filter_private:
@ -767,11 +972,11 @@ def show_poll_list(render_file, page_size, page=None, filter_private=True):
value_dict["page"] = page
return render(render_file, **value_dict)
def render_poll_admin(poll, add_related, del_related):
value_dict = get_default_values()
def render_poll_admin(bobo_request, poll, add_related, del_related):
value_dict = get_default_values(bobo_request)
errors = {}
if not add_related is None:
other_poll_id = extract_poll_admin_id(add_related)
other_poll_id = extract_poll_id(add_related, admin=True)
if other_poll_id == poll.id:
errors["related"] = u"Wortschlucker kann nicht mit sich selbst verknüpft werden"
elif other_poll_id is None:
@ -783,7 +988,7 @@ def render_poll_admin(poll, add_related, del_related):
else:
PollRelation(first=poll.id, second=other_poll_id)
if not del_related is None:
other_poll_id = extract_poll_admin_id(del_related)
other_poll_id = extract_poll_id(del_related, admin=True)
if other_poll_id is None:
errors["related"] = u"Wortschlucker nicht gefunden"
else:
@ -800,8 +1005,8 @@ def render_poll_admin(poll, add_related, del_related):
@bobo.query('/:poll_hash')
@bobo.query('/:poll_hash/')
def show_one_poll(poll_hash=None, add_related=None, del_related=None):
value_dict = get_default_values()
def show_one_poll(bobo_request, poll_hash=None, add_related=None, del_related=None):
value_dict = get_default_values(bobo_request)
poll_id = get_poll_id(poll_hash)
if not poll_id is None:
value_dict["poll"] = Poll.get(poll_id)
@ -809,14 +1014,14 @@ def show_one_poll(poll_hash=None, add_related=None, del_related=None):
else:
admin_poll_id = get_poll_admin_id(poll_hash)
if not admin_poll_id is None:
return render_poll_admin(Poll.get(admin_poll_id), add_related, del_related)
return render_poll_admin(bobo_request, Poll.get(admin_poll_id), add_related, del_related)
else:
return bobo.redirect(BASE_DICT["base_url"])
@bobo.query('/node/:pagename')
def show_static_nodes(pagename=None):
def show_static_nodes(bobo_request, pagename=None):
""" meant for serving hand-changed, automatically styled content. """
value_dict = get_default_values()
value_dict = get_default_values(bobo_request)
return render("nodes/"+pagename, **value_dict)
def get_static_file(filename):
@ -843,8 +1048,7 @@ def static_files(p1=None, p2=None, p3=None):
pathname = os.path.join(pathname, subdir)
return get_static_file(pathname)
for table in (Poll, ContentSubmission, PollSetting, PollRelation):
for table in (Poll, ContentSubmission, PollSetting, PollRelation, Profile, ProfilePolls):
#table.dropTable()
if not table.tableExists():
table.createTable()