refactor: Refactor djeveric code
This commit is contained in:
parent
aa7ebb3f0a
commit
ab1981f1bc
7 changed files with 131 additions and 81 deletions
|
@ -1,9 +1,13 @@
|
|||
from django.contrib.auth import get_user_model
|
||||
from django.contrib.auth.tokens import default_token_generator
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.contrib.sites.models import Site
|
||||
from django.utils.encoding import force_bytes
|
||||
from django.utils.http import urlsafe_base64_encode
|
||||
from rest_framework.exceptions import PermissionDenied
|
||||
from django.utils.http import urlsafe_base64_encode, urlsafe_base64_decode
|
||||
from rest_framework import serializers, status
|
||||
from rest_framework.exceptions import ValidationError
|
||||
from rest_framework.response import Response
|
||||
from rest_framework.settings import api_settings
|
||||
from rest_framework.views import APIView
|
||||
|
||||
|
||||
def encode_pk(resource):
|
||||
|
@ -11,16 +15,44 @@ def encode_pk(resource):
|
|||
return urlsafe_base64_encode(force_bytes(resource.pk))
|
||||
|
||||
|
||||
class AlreadyConfirmed(Exception):
|
||||
"""The given resource has already been confirmed."""
|
||||
class ConfirmationError(Exception):
|
||||
"""An error occurring while checking a confirmation."""
|
||||
|
||||
pass
|
||||
def __init__(self, message=None, errors=None):
|
||||
if errors is None:
|
||||
self.errors = {}
|
||||
else:
|
||||
self.errors = errors
|
||||
if message is not None:
|
||||
non_field_errors = self.errors.get(api_settings.NON_FIELD_ERRORS_KEY, [])
|
||||
non_field_errors.append(message)
|
||||
self.errors[api_settings.NON_FIELD_ERRORS_KEY] = non_field_errors
|
||||
|
||||
|
||||
class Email:
|
||||
"""
|
||||
Base class for an email message.
|
||||
"""
|
||||
subject = ""
|
||||
|
||||
def __init__(self, user):
|
||||
self.user = user
|
||||
|
||||
def get_subject(self):
|
||||
return self.subject
|
||||
|
||||
def get_message(self, context):
|
||||
raise NotImplementedError()
|
||||
|
||||
def send(self, context):
|
||||
self.user.email_user(self.get_subject(), self.get_message(context))
|
||||
|
||||
|
||||
class Confirmation:
|
||||
"""
|
||||
Base class for handling a confirmation process.
|
||||
"""
|
||||
email_class = Email
|
||||
|
||||
def __init__(self, user, resource):
|
||||
self.user = user
|
||||
|
@ -28,15 +60,18 @@ class Confirmation:
|
|||
|
||||
def check(self):
|
||||
if not self.has_permission(self.user, self.resource):
|
||||
raise PermissionDenied()
|
||||
raise ConfirmationError("Permission denied")
|
||||
if self.is_confirmed(self.resource):
|
||||
raise AlreadyConfirmed()
|
||||
raise ConfirmationError("Already confirmed")
|
||||
self.confirm(self.resource)
|
||||
|
||||
def confirm(self, resource):
|
||||
"""Overwrite this method to supply operations to confirm the resource."""
|
||||
pass
|
||||
|
||||
def get_email(self):
|
||||
return self.email_class(self.user)
|
||||
|
||||
def has_permission(self, user, resource) -> bool:
|
||||
"""Overwrite this method returning if a user may confirm a resource."""
|
||||
return False
|
||||
|
@ -48,18 +83,72 @@ class Confirmation:
|
|||
def send_request(self):
|
||||
if self.is_confirmed(self.resource):
|
||||
return
|
||||
self.get_email().send({
|
||||
"token": default_token_generator.make_token(self.user),
|
||||
"uid": encode_pk(self.user),
|
||||
"rtid": encode_pk(ContentType.objects.get_for_model(self.resource)),
|
||||
"rid": encode_pk(self.resource),
|
||||
})
|
||||
|
||||
site = Site.objects.get_current()
|
||||
uid = encode_pk(self.user)
|
||||
token = default_token_generator.make_token(self.user)
|
||||
obj_type = ContentType.objects.get_for_model(self.resource)
|
||||
type_id = encode_pk(obj_type)
|
||||
obj_id = encode_pk(self.resource)
|
||||
confirmation_url = (
|
||||
f"https://{site.domain}/confirm/{uid}/{token}/{type_id}/{obj_id}"
|
||||
)
|
||||
self.user.email_user(
|
||||
f"{site.name}: Bestätigung der Anfrage",
|
||||
f"Bitte bestätige, dass du deine E-Mail-Adresse auf der Seite {site.name} ({site.domain}) eingegeben hast. "
|
||||
f"Kopiere dazu folgende URL in deinen Webbrowser:\n\n{confirmation_url}",
|
||||
)
|
||||
|
||||
class ConfirmationSerializer(serializers.Serializer):
|
||||
"""
|
||||
Serializer class for confirmation requests.
|
||||
"""
|
||||
token = serializers.CharField()
|
||||
uid = serializers.CharField()
|
||||
rtid = serializers.CharField()
|
||||
rid = serializers.CharField()
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self.user = None
|
||||
self.resource_type = None
|
||||
self.resource = None
|
||||
|
||||
def validate_uid(self, value):
|
||||
self.user = self.get_model_object(get_user_model(), value)
|
||||
return value
|
||||
|
||||
def validate_rtid(self, value):
|
||||
self.resource_type = self.get_model_object(ContentType, value)
|
||||
return value
|
||||
|
||||
def validate(self, data):
|
||||
# we need to be sure that the rtid was already decoded
|
||||
self.resource = self.get_model_object(self.resource_type.model_class(), data["rid"])
|
||||
return data
|
||||
|
||||
def get_model_object(self, model, oid):
|
||||
try:
|
||||
# urlsafe_base64_decode() decodes to bytestring
|
||||
oid = urlsafe_base64_decode(oid).decode()
|
||||
return model._default_manager.get(id=oid)
|
||||
except (TypeError, ValueError, OverflowError, model.DoesNotExist):
|
||||
raise ValidationError("Error while decoding object id")
|
||||
|
||||
|
||||
class ConfirmationView(APIView):
|
||||
"""
|
||||
View for creating a confirmation API endpoint.
|
||||
"""
|
||||
confirmation_class = Confirmation
|
||||
serializer_class = ConfirmationSerializer
|
||||
|
||||
def post(self, request, format=None):
|
||||
try:
|
||||
self.check_confirmation(request.data)
|
||||
return Response({}, status=status.HTTP_204_NO_CONTENT)
|
||||
except ConfirmationError as e:
|
||||
return Response(e.errors, status=status.HTTP_400_BAD_REQUEST)
|
||||
|
||||
def check_confirmation(self, data):
|
||||
serializer = self.serializer_class(data=data)
|
||||
if not serializer.is_valid():
|
||||
raise ConfirmationError(errors=serializer.errors)
|
||||
token = serializer.validated_data["token"]
|
||||
if default_token_generator.check_token(serializer.user, token):
|
||||
# confirm resource or raise an exception otherwise
|
||||
self.confirmation_class(serializer.user, serializer.resource).check()
|
||||
else:
|
||||
raise ConfirmationError("Invalid token")
|
||||
|
|
|
@ -1,55 +0,0 @@
|
|||
from django.contrib.auth import get_user_model
|
||||
from django.contrib.auth.tokens import default_token_generator
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.db import models
|
||||
from django.utils.http import urlsafe_base64_decode
|
||||
from rest_framework import serializers, status
|
||||
from rest_framework.response import Response
|
||||
from rest_framework.views import APIView
|
||||
|
||||
from djeveric import AlreadyConfirmed
|
||||
from userausfall.rest_api.confirmations import ConfidantConfirmation
|
||||
|
||||
|
||||
class ConfirmationSerializer(serializers.Serializer):
|
||||
uid = serializers.CharField()
|
||||
token = serializers.CharField()
|
||||
type_id = serializers.CharField()
|
||||
obj_id = serializers.CharField()
|
||||
|
||||
|
||||
class ConfirmationView(APIView):
|
||||
def post(self, request, format=None):
|
||||
serializer = ConfirmationSerializer(data=request.data)
|
||||
if serializer.is_valid():
|
||||
user = self.get_user(serializer.validated_data["uid"])
|
||||
token = serializer.validated_data["token"]
|
||||
if user is not None and default_token_generator.check_token(user, token):
|
||||
obj = self.get_obj(serializer.validated_data["type_id"], serializer.validated_data["obj_id"])
|
||||
if obj is not None:
|
||||
try:
|
||||
ConfidantConfirmation(user, obj).check()
|
||||
except AlreadyConfirmed as e:
|
||||
return Response({"message": str(e)}, status=status.HTTP_400_BAD_REQUEST)
|
||||
return Response(serializer.data, status=status.HTTP_204_NO_CONTENT)
|
||||
|
||||
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
|
||||
|
||||
def get_obj(self, type_id, obj_id):
|
||||
try:
|
||||
# urlsafe_base64_decode() decodes to bytestring
|
||||
type_pk = urlsafe_base64_decode(type_id).decode()
|
||||
obj_pk = urlsafe_base64_decode(obj_id).decode()
|
||||
obj = ContentType.objects.get_for_id(type_pk).get_object_for_this_type(pk=obj_pk)
|
||||
except (TypeError, ValueError, OverflowError, models.Model.DoesNotExist):
|
||||
obj = None
|
||||
return obj
|
||||
|
||||
def get_user(self, uid):
|
||||
try:
|
||||
# urlsafe_base64_decode() decodes to bytestring
|
||||
uid = urlsafe_base64_decode(uid).decode()
|
||||
user = get_user_model().objects.get(pk=uid)
|
||||
except (TypeError, ValueError, OverflowError, models.Model.DoesNotExist):
|
||||
user = None
|
||||
return user
|
|
@ -1,8 +1,11 @@
|
|||
from djeveric import Confirmation
|
||||
from userausfall.emails import ConfidantConfirmationEmail
|
||||
from userausfall.models import User
|
||||
|
||||
|
||||
class ConfidantConfirmation(Confirmation):
|
||||
email_class = ConfidantConfirmationEmail
|
||||
|
||||
def has_permission(self, user: User, resource: User):
|
||||
return user == resource.confidant_unconfirmed
|
||||
|
8
userausfall/emails.py
Normal file
8
userausfall/emails.py
Normal file
|
@ -0,0 +1,8 @@
|
|||
from djeveric import Email
|
||||
|
||||
|
||||
class ConfidantConfirmationEmail(Email):
|
||||
subject = "TODO"
|
||||
|
||||
def get_message(self, context):
|
||||
return '"token": "{token}", "uid": "{uid}", "rtid": "{rtid}", "rid": "{rid}"'.format(**context)
|
|
@ -1,14 +1,13 @@
|
|||
from django.urls import path
|
||||
from rest_framework import routers
|
||||
|
||||
from djeveric.rest_api import ConfirmationView
|
||||
from userausfall.rest_api.views import UserViewSet
|
||||
from userausfall.rest_api.views import UserViewSet, ConfidantConfirmationView
|
||||
|
||||
router = routers.DefaultRouter(trailing_slash=True)
|
||||
router.register(r'users', UserViewSet, basename="user")
|
||||
|
||||
urlpatterns = [
|
||||
path("confirm", ConfirmationView.as_view())
|
||||
path("confirm/confidant/", ConfidantConfirmationView.as_view())
|
||||
]
|
||||
|
||||
urlpatterns += router.urls
|
||||
|
|
|
@ -2,10 +2,16 @@ from rest_framework import viewsets, status
|
|||
from rest_framework.decorators import action
|
||||
from rest_framework.response import Response
|
||||
|
||||
from djeveric import ConfirmationView
|
||||
from userausfall.models import User, MissingUserAttribute, PasswordMismatch
|
||||
from userausfall.confirmations import ConfidantConfirmation
|
||||
from userausfall.rest_api.serializers import UserSerializer, UserActivationSerializer
|
||||
|
||||
|
||||
class ConfidantConfirmationView(ConfirmationView):
|
||||
confirmation_class = ConfidantConfirmation
|
||||
|
||||
|
||||
class UserViewSet(viewsets.ModelViewSet):
|
||||
queryset = User.objects.all()
|
||||
serializer_class = UserSerializer
|
||||
|
|
|
@ -2,7 +2,7 @@ from django.db.models.signals import post_save
|
|||
from django.dispatch import receiver
|
||||
|
||||
from userausfall.models import User
|
||||
from userausfall.rest_api.confirmations import ConfidantConfirmation
|
||||
from userausfall.confirmations import ConfidantConfirmation
|
||||
|
||||
|
||||
@receiver(post_save, sender=User)
|
||||
|
|
Reference in a new issue