from django.contrib.auth import get_user_model from django.contrib.auth.tokens import default_token_generator from django.contrib.contenttypes.models import ContentType from django.utils.encoding import force_bytes from django.utils.http import urlsafe_base64_encode, urlsafe_base64_decode from rest_framework import serializers, status from rest_framework.exceptions import ValidationError from rest_framework.response import Response from rest_framework.settings import api_settings from rest_framework.views import APIView def encode_pk(resource): """Encode the primary key of a resource with Base64 for usage in URLs.""" return urlsafe_base64_encode(force_bytes(resource.pk)) class ConfirmationError(Exception): """An error occurring while checking a confirmation.""" def __init__(self, message=None, errors=None): if errors is None: self.errors = {} else: self.errors = errors if message is not None: non_field_errors = self.errors.get(api_settings.NON_FIELD_ERRORS_KEY, []) non_field_errors.append(message) self.errors[api_settings.NON_FIELD_ERRORS_KEY] = non_field_errors class Email: """ Base class for an email message. """ subject = "" def __init__(self, user): self.user = user def get_subject(self): return self.subject def get_message(self, context): raise NotImplementedError() def send(self, context): self.user.email_user(self.get_subject(), self.get_message(context)) class Confirmation: """ Base class for handling a confirmation process. """ email_class = Email def __init__(self, user, resource): self.user = user self.resource = resource def check(self): if not self.has_permission(self.user, self.resource): raise ConfirmationError("Permission denied") if self.is_confirmed(self.resource): raise ConfirmationError("Already confirmed") self.confirm(self.resource) def confirm(self, resource): """Overwrite this method to supply operations to confirm the resource.""" pass def get_email(self): return self.email_class(self.user) def has_permission(self, user, resource) -> bool: """Overwrite this method returning if a user may confirm a resource.""" return False def is_confirmed(self, resource) -> bool: """Overwrite this method to tell if a resource is confirmed.""" return False def send_request(self): if self.is_confirmed(self.resource): return self.get_email().send({ "token": default_token_generator.make_token(self.user), "uid": encode_pk(self.user), "rtid": encode_pk(ContentType.objects.get_for_model(self.resource)), "rid": encode_pk(self.resource), }) class ConfirmationSerializer(serializers.Serializer): """ Serializer class for confirmation requests. """ token = serializers.CharField() uid = serializers.CharField() rtid = serializers.CharField() rid = serializers.CharField() def __init__(self, **kwargs): super().__init__(**kwargs) self.user = None self.resource_type = None self.resource = None def validate_uid(self, value): self.user = self.get_model_object(get_user_model(), value) return value def validate_rtid(self, value): self.resource_type = self.get_model_object(ContentType, value) return value def validate(self, data): # we need to be sure that the rtid was already decoded self.resource = self.get_model_object(self.resource_type.model_class(), data["rid"]) return data def get_model_object(self, model, oid): try: # urlsafe_base64_decode() decodes to bytestring oid = urlsafe_base64_decode(oid).decode() return model._default_manager.get(id=oid) except (TypeError, ValueError, OverflowError, model.DoesNotExist): raise ValidationError("Error while decoding object id") class ConfirmationView(APIView): """ View for creating a confirmation API endpoint. """ confirmation_class = Confirmation serializer_class = ConfirmationSerializer def post(self, request, format=None): try: self.check_confirmation(request.data) return Response({}, status=status.HTTP_204_NO_CONTENT) except ConfirmationError as e: return Response(e.errors, status=status.HTTP_400_BAD_REQUEST) def check_confirmation(self, data): serializer = self.serializer_class(data=data) if not serializer.is_valid(): raise ConfirmationError(errors=serializer.errors) token = serializer.validated_data["token"] if default_token_generator.check_token(serializer.user, token): # confirm resource or raise an exception otherwise self.confirmation_class(serializer.user, serializer.resource).check() else: raise ConfirmationError("Invalid token")