Compare commits
No commits in common. "330e3c89ea8cff7eeb8225acf4222b564f7a0f9e" and "2ebb0b7ee4f6e85a3511e961965e1fed150cd6b7" have entirely different histories.
330e3c89ea
...
2ebb0b7ee4
9 changed files with 63 additions and 124 deletions
|
@ -1,8 +1,8 @@
|
||||||
from djeveric.emails import ConfirmationEmail
|
from djeveric.emails import ConfirmationEmail
|
||||||
|
|
||||||
|
|
||||||
class TrustBridgeConfirmationEmail(ConfirmationEmail):
|
class ConfidantConfirmationEmail(ConfirmationEmail):
|
||||||
subject = "TODO"
|
subject = "TODO"
|
||||||
|
|
||||||
def get_body(self, context: dict[str]) -> str:
|
def get_message(self, context):
|
||||||
return "{token}".format(**context)
|
return '"token": "{token}", "uid": "{uid}", "rtid": "{rtid}", "rid": "{rid}"'.format(**context)
|
||||||
|
|
|
@ -1,4 +1,3 @@
|
||||||
from django.conf import settings
|
|
||||||
from django.contrib.auth.base_user import AbstractBaseUser, BaseUserManager
|
from django.contrib.auth.base_user import AbstractBaseUser, BaseUserManager
|
||||||
from django.contrib.auth.models import PermissionsMixin
|
from django.contrib.auth.models import PermissionsMixin
|
||||||
from django.contrib.auth.validators import UnicodeUsernameValidator
|
from django.contrib.auth.validators import UnicodeUsernameValidator
|
||||||
|
@ -6,11 +5,8 @@ from django.core.mail import send_mail
|
||||||
from django.db import models
|
from django.db import models
|
||||||
from django.utils import timezone
|
from django.utils import timezone
|
||||||
from django.utils.translation import gettext_lazy as _
|
from django.utils.translation import gettext_lazy as _
|
||||||
from djeveric.fields import ConfirmationField
|
|
||||||
from djeveric.models import ConfirmableModelMixin
|
|
||||||
|
|
||||||
from userausfall import ldap
|
from userausfall import ldap
|
||||||
from userausfall.emails import TrustBridgeConfirmationEmail
|
|
||||||
|
|
||||||
|
|
||||||
class MissingUserAttribute(Exception):
|
class MissingUserAttribute(Exception):
|
||||||
|
@ -101,18 +97,14 @@ class User(PermissionsMixin, AbstractBaseUser):
|
||||||
raise PasswordMismatch("The given password does not match the user's password.")
|
raise PasswordMismatch("The given password does not match the user's password.")
|
||||||
return ldap.create_account(self.username, raw_password)
|
return ldap.create_account(self.username, raw_password)
|
||||||
|
|
||||||
def get_primary_email(self):
|
def get_or_create_trust_bridge(self):
|
||||||
"""Returns the primary email address for this user."""
|
try:
|
||||||
return f"{self.username}@{settings.USERAUSFALL['PRIMARY_EMAIL_DOMAIN']}"
|
return self.trust_bridge
|
||||||
|
except TrustBridge.DoesNotExist:
|
||||||
|
return TrustBridge.objects.create(trust_taker=self)
|
||||||
|
|
||||||
|
|
||||||
class TrustBridge(ConfirmableModelMixin, models.Model):
|
class TrustBridge(models.Model):
|
||||||
is_trusted = ConfirmationField(email_class=TrustBridgeConfirmationEmail)
|
is_trusted = models.BooleanField(default=False)
|
||||||
trust_giver = models.ForeignKey("User", on_delete=models.SET_NULL, null=True)
|
trust_giver = models.ForeignKey("User", on_delete=models.SET_NULL, null=True)
|
||||||
trust_taker = models.OneToOneField("User", on_delete=models.CASCADE, related_name="trust_bridge")
|
trust_taker = models.OneToOneField("User", on_delete=models.CASCADE, related_name="trust_bridge")
|
||||||
|
|
||||||
def get_confirmation_email_recipient(self) -> str:
|
|
||||||
return self.trust_giver.get_primary_email()
|
|
||||||
|
|
||||||
def _has_confirmation_recipient(self):
|
|
||||||
return self.trust_giver is not None
|
|
||||||
|
|
|
@ -1,26 +1,10 @@
|
||||||
from rest_framework import serializers
|
from rest_framework import serializers
|
||||||
|
|
||||||
from userausfall.models import TrustBridge, User
|
from userausfall.models import TrustBridge
|
||||||
from userausfall.views import get_authenticated_user
|
|
||||||
|
|
||||||
|
|
||||||
class UserSerializer(serializers.HyperlinkedModelSerializer):
|
class TrustBridgeSerializer(serializers.ModelSerializer):
|
||||||
class Meta:
|
|
||||||
model = User
|
|
||||||
fields = ["username"]
|
|
||||||
# the UniqueValidator for username prevents successful validation for existing users
|
|
||||||
extra_kwargs = {"username": {"validators": []}}
|
|
||||||
|
|
||||||
|
|
||||||
class TrustBridgeSerializer(serializers.HyperlinkedModelSerializer):
|
|
||||||
trust_giver = UserSerializer()
|
|
||||||
|
|
||||||
class Meta:
|
class Meta:
|
||||||
model = TrustBridge
|
model = TrustBridge
|
||||||
fields = ["is_trusted", "trust_giver"]
|
fields = ["is_trusted", "trust_giver"]
|
||||||
read_only_fields = ["is_trusted"]
|
read_only_fields = ["is_trusted"]
|
||||||
|
|
||||||
def create(self, validated_data):
|
|
||||||
user = get_authenticated_user(self.context["request"])
|
|
||||||
trust_giver, _ = User.objects.get_or_create(username=validated_data["trust_giver"]["username"])
|
|
||||||
return TrustBridge.objects.create(trust_taker=user, trust_giver=trust_giver)
|
|
||||||
|
|
|
@ -2,29 +2,7 @@ from rest_framework import status
|
||||||
|
|
||||||
from userausfall.models import User
|
from userausfall.models import User
|
||||||
from userausfall.rest_api.tests.userausfall import UserausfallAPITestCase
|
from userausfall.rest_api.tests.userausfall import UserausfallAPITestCase
|
||||||
|
from userausfall.tests import UserMixin
|
||||||
|
|
||||||
class UserMixin:
|
|
||||||
user: User
|
|
||||||
password: str
|
|
||||||
username: str
|
|
||||||
|
|
||||||
def create_user(self):
|
|
||||||
self.username = f"test{User.objects.count()}"
|
|
||||||
self.password = "test12345"
|
|
||||||
self.user = User.objects.create_user(self.username, self.password)
|
|
||||||
return self.user
|
|
||||||
|
|
||||||
def ensure_user_exists(self):
|
|
||||||
if not hasattr(self, "user"):
|
|
||||||
self.create_user()
|
|
||||||
|
|
||||||
def authenticate_user(self):
|
|
||||||
self.ensure_user_exists()
|
|
||||||
if hasattr(self.client, "force_authentication"):
|
|
||||||
self.client.force_authenticate(user=self.user)
|
|
||||||
else:
|
|
||||||
self.client.force_login(user=self.user)
|
|
||||||
|
|
||||||
|
|
||||||
class UserTestCase(UserMixin, UserausfallAPITestCase):
|
class UserTestCase(UserMixin, UserausfallAPITestCase):
|
||||||
|
|
|
@ -1,65 +1,34 @@
|
||||||
from django.core import mail
|
|
||||||
from rest_framework import status
|
from rest_framework import status
|
||||||
|
|
||||||
from userausfall.models import TrustBridge
|
from userausfall.rest_api.tests import UserausfallAPITestCase
|
||||||
from userausfall.rest_api.tests import UserausfallAPITestCase, UserMixin
|
from userausfall.tests import UserMixin
|
||||||
|
|
||||||
|
|
||||||
class TrustBridgeTestCase(UserMixin, UserausfallAPITestCase):
|
class TrustBridgeTestCase(UserMixin, UserausfallAPITestCase):
|
||||||
def create_trust_bridge(self):
|
|
||||||
self.trust_giver = self.create_user()
|
|
||||||
self.create_user()
|
|
||||||
self.trust_bridge = TrustBridge.objects.create(trust_taker=self.user, trust_giver=self.trust_giver)
|
|
||||||
return self.trust_bridge
|
|
||||||
|
|
||||||
def test_create_trust_bridge(self):
|
|
||||||
"""Create a trust bridge for the current user."""
|
|
||||||
url = "/trust-bridges/"
|
|
||||||
trust_giver = self.create_user()
|
|
||||||
self.create_user()
|
|
||||||
self.authenticate_user()
|
|
||||||
response = self.client.post(
|
|
||||||
self.get_api_url(url),
|
|
||||||
{
|
|
||||||
"trust_giver": {
|
|
||||||
"username": trust_giver.username,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
)
|
|
||||||
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
|
|
||||||
self.assertEqual(TrustBridge.objects.count(), 1)
|
|
||||||
self.assertEqual(TrustBridge.objects.get().trust_giver, trust_giver)
|
|
||||||
|
|
||||||
def test_retrieve_trust_bridge(self):
|
def test_retrieve_trust_bridge(self):
|
||||||
"""Retrieve the trust bridge information of a user without an ldap account."""
|
"""
|
||||||
url = "/trust-bridges/{pk}/"
|
Retrieve the trust bridge information of a user without an ldap account.
|
||||||
self.create_trust_bridge()
|
"""
|
||||||
|
url = "/trust-bridge/"
|
||||||
self.authenticate_user()
|
self.authenticate_user()
|
||||||
response = self.client.get(self.get_api_url(url, pk=self.trust_bridge.pk))
|
response = self.client.get(self.get_api_url(url))
|
||||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
response.data,
|
response.data,
|
||||||
{
|
{
|
||||||
"is_trusted": False,
|
"is_trusted": False,
|
||||||
"trust_giver": {
|
"trust_giver": None,
|
||||||
"username": self.trust_giver.username,
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
def test_send_confirmation_email_on_creation(self):
|
def test_update_trust_bridge(self):
|
||||||
"""When setting a trust giver a confirmation email is sent."""
|
"""
|
||||||
self.create_trust_bridge()
|
Update the trust giver of the user's trust bridge.
|
||||||
self.assertEqual(len(mail.outbox), 1)
|
"""
|
||||||
self.assertIn(self.user.trust_bridge.get_confirmation_token(), mail.outbox[0].body)
|
url = "/trust-bridge/"
|
||||||
|
other_user = self.create_user()
|
||||||
def test_confirm_trust_bridge(self):
|
self.create_user()
|
||||||
"""The trust giver may confirm the trust bridge."""
|
self.authenticate_user()
|
||||||
url = "/trust-bridges/{pk}/confirm/"
|
response = self.client.put(self.get_api_url(url), {"trust_giver": other_user.pk})
|
||||||
self.create_trust_bridge()
|
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||||
response = self.client.post(
|
self.assertEqual(self.user.trust_bridge.trust_giver, other_user)
|
||||||
self.get_api_url(url, pk=self.trust_bridge.pk), {"token": self.trust_bridge.get_confirmation_token()}
|
|
||||||
)
|
|
||||||
self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
|
|
||||||
self.trust_bridge.refresh_from_db()
|
|
||||||
self.assertTrue(self.trust_bridge.is_trusted)
|
|
||||||
|
|
|
@ -2,10 +2,9 @@ from django.urls import include, path
|
||||||
from drf_spectacular.views import SpectacularAPIView, SpectacularRedocView, SpectacularSwaggerView
|
from drf_spectacular.views import SpectacularAPIView, SpectacularRedocView, SpectacularSwaggerView
|
||||||
from rest_framework import routers
|
from rest_framework import routers
|
||||||
|
|
||||||
from userausfall.rest_api.views import TrustBridgeViewSet
|
from userausfall.rest_api.views import TrustBridgeView
|
||||||
|
|
||||||
router = routers.SimpleRouter()
|
router = routers.SimpleRouter()
|
||||||
router.register(r"trust-bridges", TrustBridgeViewSet, "trust-bridge")
|
|
||||||
|
|
||||||
urlpatterns = [
|
urlpatterns = [
|
||||||
path("", include(router.urls)),
|
path("", include(router.urls)),
|
||||||
|
@ -13,4 +12,5 @@ urlpatterns = [
|
||||||
path("schema/", SpectacularAPIView.as_view(), name="schema"),
|
path("schema/", SpectacularAPIView.as_view(), name="schema"),
|
||||||
path("schema/swagger-ui/", SpectacularSwaggerView.as_view(url_name="schema"), name="swagger-ui"),
|
path("schema/swagger-ui/", SpectacularSwaggerView.as_view(url_name="schema"), name="swagger-ui"),
|
||||||
path("schema/redoc/", SpectacularRedocView.as_view(url_name="schema"), name="redoc"),
|
path("schema/redoc/", SpectacularRedocView.as_view(url_name="schema"), name="redoc"),
|
||||||
|
path("trust-bridge/", TrustBridgeView.as_view()),
|
||||||
]
|
]
|
||||||
|
|
|
@ -1,21 +1,17 @@
|
||||||
from djeveric.views import ConfirmModelMixin
|
from rest_framework import generics, status, viewsets
|
||||||
from rest_framework import mixins, status, viewsets
|
|
||||||
from rest_framework.decorators import action
|
from rest_framework.decorators import action
|
||||||
from rest_framework.response import Response
|
from rest_framework.response import Response
|
||||||
|
|
||||||
from userausfall.models import MissingUserAttribute, PasswordMismatch, TrustBridge, User
|
from userausfall.models import MissingUserAttribute, PasswordMismatch, User
|
||||||
from userausfall.rest_api.serializers import TrustBridgeSerializer
|
from userausfall.rest_api.serializers import TrustBridgeSerializer
|
||||||
from userausfall.views import get_authenticated_user
|
from userausfall.views import get_authenticated_user
|
||||||
|
|
||||||
|
|
||||||
class TrustBridgeViewSet(
|
class TrustBridgeView(generics.RetrieveUpdateAPIView):
|
||||||
ConfirmModelMixin, mixins.CreateModelMixin, mixins.RetrieveModelMixin, viewsets.GenericViewSet
|
|
||||||
):
|
|
||||||
queryset = TrustBridge.objects
|
|
||||||
serializer_class = TrustBridgeSerializer
|
serializer_class = TrustBridgeSerializer
|
||||||
|
|
||||||
def get_basic_queryset(self):
|
def get_object(self):
|
||||||
return self.queryset.filter(trust_taker=get_authenticated_user(self.request))
|
return get_authenticated_user(self.request).get_or_create_trust_bridge()
|
||||||
|
|
||||||
|
|
||||||
class UserViewSet(viewsets.GenericViewSet):
|
class UserViewSet(viewsets.GenericViewSet):
|
||||||
|
|
|
@ -200,7 +200,3 @@ USERAUSFALL_LDAP = {
|
||||||
"ADMIN_USER_DN": "cn=admin,dc=local",
|
"ADMIN_USER_DN": "cn=admin,dc=local",
|
||||||
"ADMIN_USER_PASSWORD": os.environ.get("USERAUSFALL_LDAP_PASSWORD"),
|
"ADMIN_USER_PASSWORD": os.environ.get("USERAUSFALL_LDAP_PASSWORD"),
|
||||||
}
|
}
|
||||||
|
|
||||||
USERAUSFALL = {
|
|
||||||
"PRIMARY_EMAIL_DOMAIN": "systemausfall.org",
|
|
||||||
}
|
|
||||||
|
|
24
userausfall/tests.py
Normal file
24
userausfall/tests.py
Normal file
|
@ -0,0 +1,24 @@
|
||||||
|
from userausfall.models import User
|
||||||
|
|
||||||
|
|
||||||
|
class UserMixin:
|
||||||
|
user: User
|
||||||
|
password: str
|
||||||
|
username: str
|
||||||
|
|
||||||
|
def create_user(self):
|
||||||
|
self.username = f"test{User.objects.count()}"
|
||||||
|
self.password = "test12345"
|
||||||
|
self.user = User.objects.create_user(self.username, self.password)
|
||||||
|
return self.user
|
||||||
|
|
||||||
|
def ensure_user_exists(self):
|
||||||
|
if not hasattr(self, "user"):
|
||||||
|
self.create_user()
|
||||||
|
|
||||||
|
def authenticate_user(self):
|
||||||
|
self.ensure_user_exists()
|
||||||
|
if hasattr(self.client, "force_authentication"):
|
||||||
|
self.client.force_authenticate(user=self.user)
|
||||||
|
else:
|
||||||
|
self.client.force_login(user=self.user)
|
Reference in a new issue