This repository has been archived on 2022-05-05. You can view files and clone it, but cannot push or open issues or pull requests.
userausfall/djeveric/__init__.py

154 lines
5 KiB
Python

from django.contrib.auth import get_user_model
from django.contrib.auth.tokens import default_token_generator
from django.contrib.contenttypes.models import ContentType
from django.utils.encoding import force_bytes
from django.utils.http import urlsafe_base64_encode, urlsafe_base64_decode
from rest_framework import serializers, status
from rest_framework.exceptions import ValidationError
from rest_framework.response import Response
from rest_framework.settings import api_settings
from rest_framework.views import APIView
def encode_pk(resource):
"""Encode the primary key of a resource with Base64 for usage in URLs."""
return urlsafe_base64_encode(force_bytes(resource.pk))
class ConfirmationError(Exception):
"""An error occurring while checking a confirmation."""
def __init__(self, message=None, errors=None):
if errors is None:
self.errors = {}
else:
self.errors = errors
if message is not None:
non_field_errors = self.errors.get(api_settings.NON_FIELD_ERRORS_KEY, [])
non_field_errors.append(message)
self.errors[api_settings.NON_FIELD_ERRORS_KEY] = non_field_errors
class Email:
"""
Base class for an email message.
"""
subject = ""
def __init__(self, user):
self.user = user
def get_subject(self):
return self.subject
def get_message(self, context):
raise NotImplementedError()
def send(self, context):
self.user.email_user(self.get_subject(), self.get_message(context))
class Confirmation:
"""
Base class for handling a confirmation process.
"""
email_class = Email
def __init__(self, user, resource):
self.user = user
self.resource = resource
def check(self):
if not self.has_permission(self.user, self.resource):
raise ConfirmationError("Permission denied")
if self.is_confirmed(self.resource):
raise ConfirmationError("Already confirmed")
self.confirm(self.resource)
def confirm(self, resource):
"""Overwrite this method to supply operations to confirm the resource."""
pass
def get_email(self):
return self.email_class(self.user)
def has_permission(self, user, resource) -> bool:
"""Overwrite this method returning if a user may confirm a resource."""
return False
def is_confirmed(self, resource) -> bool:
"""Overwrite this method to tell if a resource is confirmed."""
return False
def send_request(self):
if self.is_confirmed(self.resource):
return
self.get_email().send({
"token": default_token_generator.make_token(self.user),
"uid": encode_pk(self.user),
"rtid": encode_pk(ContentType.objects.get_for_model(self.resource)),
"rid": encode_pk(self.resource),
})
class ConfirmationSerializer(serializers.Serializer):
"""
Serializer class for confirmation requests.
"""
token = serializers.CharField()
uid = serializers.CharField()
rtid = serializers.CharField()
rid = serializers.CharField()
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.user = None
self.resource_type = None
self.resource = None
def validate_uid(self, value):
self.user = self.get_model_object(get_user_model(), value)
return value
def validate_rtid(self, value):
self.resource_type = self.get_model_object(ContentType, value)
return value
def validate(self, data):
# we need to be sure that the rtid was already decoded
self.resource = self.get_model_object(self.resource_type.model_class(), data["rid"])
return data
def get_model_object(self, model, oid):
try:
# urlsafe_base64_decode() decodes to bytestring
oid = urlsafe_base64_decode(oid).decode()
return model._default_manager.get(id=oid)
except (TypeError, ValueError, OverflowError, model.DoesNotExist):
raise ValidationError("Error while decoding object id")
class ConfirmationView(APIView):
"""
View for creating a confirmation API endpoint.
"""
confirmation_class = Confirmation
serializer_class = ConfirmationSerializer
def post(self, request, format=None):
try:
self.check_confirmation(request.data)
return Response({}, status=status.HTTP_204_NO_CONTENT)
except ConfirmationError as e:
return Response(e.errors, status=status.HTTP_400_BAD_REQUEST)
def check_confirmation(self, data):
serializer = self.serializer_class(data=data)
if not serializer.is_valid():
raise ConfirmationError(errors=serializer.errors)
token = serializer.validated_data["token"]
if default_token_generator.check_token(serializer.user, token):
# confirm resource or raise an exception otherwise
self.confirmation_class(serializer.user, serializer.resource).check()
else:
raise ConfirmationError("Invalid token")