refactor: Add Confirmation class to djeveric

This commit is contained in:
aldrin 2021-05-21 14:46:34 +02:00
parent 73c6fbf165
commit aa7ebb3f0a
6 changed files with 92 additions and 54 deletions

View file

@ -0,0 +1,65 @@
from django.contrib.auth.tokens import default_token_generator
from django.contrib.contenttypes.models import ContentType
from django.contrib.sites.models import Site
from django.utils.encoding import force_bytes
from django.utils.http import urlsafe_base64_encode
from rest_framework.exceptions import PermissionDenied
def encode_pk(resource):
"""Encode the primary key of a resource with Base64 for usage in URLs."""
return urlsafe_base64_encode(force_bytes(resource.pk))
class AlreadyConfirmed(Exception):
"""The given resource has already been confirmed."""
pass
class Confirmation:
"""
Base class for handling a confirmation process.
"""
def __init__(self, user, resource):
self.user = user
self.resource = resource
def check(self):
if not self.has_permission(self.user, self.resource):
raise PermissionDenied()
if self.is_confirmed(self.resource):
raise AlreadyConfirmed()
self.confirm(self.resource)
def confirm(self, resource):
"""Overwrite this method to supply operations to confirm the resource."""
pass
def has_permission(self, user, resource) -> bool:
"""Overwrite this method returning if a user may confirm a resource."""
return False
def is_confirmed(self, resource) -> bool:
"""Overwrite this method to tell if a resource is confirmed."""
return False
def send_request(self):
if self.is_confirmed(self.resource):
return
site = Site.objects.get_current()
uid = encode_pk(self.user)
token = default_token_generator.make_token(self.user)
obj_type = ContentType.objects.get_for_model(self.resource)
type_id = encode_pk(obj_type)
obj_id = encode_pk(self.resource)
confirmation_url = (
f"https://{site.domain}/confirm/{uid}/{token}/{type_id}/{obj_id}"
)
self.user.email_user(
f"{site.name}: Bestätigung der Anfrage",
f"Bitte bestätige, dass du deine E-Mail-Adresse auf der Seite {site.name} ({site.domain}) eingegeben hast. "
f"Kopiere dazu folgende URL in deinen Webbrowser:\n\n{confirmation_url}",
)

View file

@ -7,7 +7,8 @@ from rest_framework import serializers, status
from rest_framework.response import Response from rest_framework.response import Response
from rest_framework.views import APIView from rest_framework.views import APIView
from djeveric.signals import user_confirmed, AlreadyConfirmed from djeveric import AlreadyConfirmed
from userausfall.rest_api.confirmations import ConfidantConfirmation
class ConfirmationSerializer(serializers.Serializer): class ConfirmationSerializer(serializers.Serializer):
@ -27,7 +28,7 @@ class ConfirmationView(APIView):
obj = self.get_obj(serializer.validated_data["type_id"], serializer.validated_data["obj_id"]) obj = self.get_obj(serializer.validated_data["type_id"], serializer.validated_data["obj_id"])
if obj is not None: if obj is not None:
try: try:
user_confirmed.send(sender=self.__class__, user=user, instance=obj, key=None) ConfidantConfirmation(user, obj).check()
except AlreadyConfirmed as e: except AlreadyConfirmed as e:
return Response({"message": str(e)}, status=status.HTTP_400_BAD_REQUEST) return Response({"message": str(e)}, status=status.HTTP_400_BAD_REQUEST)
return Response(serializer.data, status=status.HTTP_204_NO_CONTENT) return Response(serializer.data, status=status.HTTP_204_NO_CONTENT)

View file

@ -1,34 +0,0 @@
from django.contrib.auth.tokens import default_token_generator
from django.contrib.contenttypes.models import ContentType
from django.contrib.sites.models import Site
from django.core.mail import send_mail
from django.dispatch import Signal
from django.utils.encoding import force_bytes
from django.utils.http import urlsafe_base64_encode
user_confirmed = Signal(providing_args=["user", "instance", "key"])
class AlreadyConfirmed(Exception):
"""The given resource has already been confirmed."""
pass
def encode_pk(obj):
return urlsafe_base64_encode(force_bytes(obj.pk))
def request_confirmation(user, instance, key=None):
site = Site.objects.first()
uid = encode_pk(user)
token = default_token_generator.make_token(user)
obj_type = ContentType.objects.get_for_model(instance)
type_id = encode_pk(obj_type)
obj_id = encode_pk(instance)
confirmation_url = f"https://{site.domain}/confirm/{uid}/{token}/{type_id}/{obj_id}"
send_mail(
f"{site.name}: Bestätigung der Anfrage",
f"Bitte bestätige, dass du deine E-Mail-Adresse auf der Seite {site.name} ({site.domain}) eingegeben hast. "
f"Kopiere dazu folgende URL in deinen Webbrowser:\n\n{confirmation_url}",
None, [user.email]
)

View file

@ -0,0 +1,14 @@
from djeveric import Confirmation
from userausfall.models import User
class ConfidantConfirmation(Confirmation):
def has_permission(self, user: User, resource: User):
return user == resource.confidant_unconfirmed
def is_confirmed(self, resource: User):
return resource.confidant_unconfirmed == resource.confidant
def confirm(self, resource: User):
resource.confidant = resource.confidant_unconfirmed
resource.save()

View file

@ -151,6 +151,13 @@ REST_FRAMEWORK = {
), ),
} }
# Sites Framework
# https://docs.djangoproject.com/en/2.2/ref/contrib/sites/
SITE_ID = 1
USERAUSFALL_LDAP = { USERAUSFALL_LDAP = {
'ADMIN_USER_DN': 'cn=admin,dc=local', 'ADMIN_USER_DN': 'cn=admin,dc=local',
'ADMIN_USER_PASSWORD': os.environ.get('USERAUSFALL_LDAP_PASSWORD'), 'ADMIN_USER_PASSWORD': os.environ.get('USERAUSFALL_LDAP_PASSWORD'),

View file

@ -1,26 +1,11 @@
from django.core.exceptions import PermissionDenied
from django.db.models.signals import post_save from django.db.models.signals import post_save
from django.dispatch import receiver from django.dispatch import receiver
from rest_framework.exceptions import PermissionDenied
from djeveric.signals import request_confirmation, user_confirmed, AlreadyConfirmed
from userausfall.models import User from userausfall.models import User
from userausfall.rest_api.confirmations import ConfidantConfirmation
@receiver(post_save, sender=User) @receiver(post_save, sender=User)
def user_saved(sender, instance: User, **kwargs): def user_saved(sender, instance: User, **kwargs):
if (instance.confidant_unconfirmed is not None) and (instance.confidant_unconfirmed != instance.confidant): if instance.confidant_unconfirmed is not None:
request_confirmation(instance.confidant_unconfirmed, instance) ConfidantConfirmation(instance.confidant_unconfirmed, instance).send_request()
@receiver(user_confirmed)
def user_confirmed(sender, user: User, instance: User, **kwargs):
if user == instance.confidant_unconfirmed:
if instance.confidant_unconfirmed != instance.confidant:
# confirm the confidant
instance.confidant = instance.confidant_unconfirmed
instance.save()
else:
raise AlreadyConfirmed("The confidant has already been confirmed.")
else:
raise PermissionDenied()