Compare commits

...

3 commits

Author SHA1 Message Date
aldrin 8ca0f66777 feat: Add endpoint to activate users 2021-10-26 12:36:14 +02:00
aldrin bba1d7c8aa test: Add ldap tests 2021-10-26 11:11:24 +02:00
aldrin 9449f7e665 feat: Add user view set 2021-10-22 12:38:25 +02:00
11 changed files with 249 additions and 80 deletions

View file

@ -1,44 +1,62 @@
from django.conf import settings from django.conf import settings
from ldap3 import Connection, Server, SYNC from ldap3 import Connection, MOCK_SYNC, SAFE_SYNC, Server
_test_connection = None
def create_account(username, raw_password): class LDAPManager:
connection = _get_connection() def __init__(self):
is_success = connection.add( if not getattr(settings, "USERAUSFALL_LDAP_IS_TEST", False):
f"cn={username},dc=local", self.connection = self._get_connection()
["simpleSecurityObject", "organizationalRole"], else:
{"userPassword": raw_password}, self.connection = self._get_test_connection()
)
return is_success
def create_account(self, username, raw_password):
is_success = self.connection.add(
f"cn={username},dc=local",
["simpleSecurityObject", "organizationalRole"],
{"userPassword": raw_password},
)
return is_success
def account_exists(username): def has_account(self, username):
connection = _get_connection() exists = self.connection.search(f"cn={username},dc=local", "(objectclass=simpleSecurityObject)")
exists = connection.search(f"cn={username},dc=local", "(objectclass=simpleSecurityObject)") return exists
return exists
def is_valid_account_data(self, username, raw_password):
is_valid = self.connection.search(
f"cn={username},dc=local",
"(objectclass=simpleSecurityObject)",
attributes=["userPassword"],
)
if is_valid:
is_valid = self.connection.entries[0]["userPassword"].value == raw_password
return is_valid
def is_valid_account_data(username, raw_password): def drop_test_connection(self):
connection = _get_connection() global _test_connection
is_valid = connection.search( self.connection.unbind()
f"cn={username},dc=local", self.connection = None
"(objectclass=simpleSecurityObject)", _test_connection = None
attributes=["userPassword"],
)
if is_valid:
is_valid = connection.entries[0]["userPassword"].value == raw_password
return is_valid
def _get_connection(self):
server = Server("localhost")
connection = Connection(
server,
settings.USERAUSFALL_LDAP["ADMIN_USER_DN"],
settings.USERAUSFALL_LDAP["ADMIN_USER_PASSWORD"],
client_strategy=SAFE_SYNC,
auto_bind=True,
)
return connection
def _get_connection(): def _get_test_connection(self):
server = Server("localhost") global _test_connection
# The SAFE_SYNC client strategy doesn't seem to be present in Buster version of ldap3. We might want to use it as if _test_connection is None:
# soon as it is available (multithreading). server = Server("testserver")
connection = Connection( _test_connection = Connection(
server, server, user="cn=admin,dc=local", password="admin_secret", client_strategy=MOCK_SYNC
settings.USERAUSFALL_LDAP["ADMIN_USER_DN"], )
settings.USERAUSFALL_LDAP["ADMIN_USER_PASSWORD"], _test_connection.strategy.add_entry("cn=admin,dc=local", {"userPassword": "admin_secret"})
client_strategy=SYNC, _test_connection.bind()
auto_bind=True, return _test_connection
)
return connection

View file

@ -5,18 +5,13 @@ from django.contrib.auth.validators import UnicodeUsernameValidator
from django.core.mail import send_mail from django.core.mail import send_mail
from django.db import models from django.db import models
from django.utils import timezone from django.utils import timezone
from django.utils.functional import cached_property
from django.utils.translation import gettext_lazy as _ from django.utils.translation import gettext_lazy as _
from djeveric.fields import ConfirmationField from djeveric.fields import ConfirmationField
from djeveric.models import ConfirmableModelMixin from djeveric.models import ConfirmableModelMixin
from userausfall import ldap
from userausfall.emails import TrustBridgeConfirmationEmail from userausfall.emails import TrustBridgeConfirmationEmail
from userausfall.ldap import LDAPManager
class MissingUserAttribute(Exception):
"""The user object is missing a required attribute."""
pass
class PasswordMismatch(Exception): class PasswordMismatch(Exception):
@ -89,22 +84,28 @@ class User(PermissionsMixin, AbstractBaseUser):
super().clean() super().clean()
self.email = self.__class__.objects.normalize_email(self.email) self.email = self.__class__.objects.normalize_email(self.email)
def create_ldap_account(self, raw_password):
"""Create the LDAP account which corresponds to this user."""
if not self.check_password(raw_password):
raise PasswordMismatch("The given password does not match the user's password.")
return self._ldap.create_account(self.username, raw_password)
def email_user(self, subject, message, from_email=None, **kwargs): def email_user(self, subject, message, from_email=None, **kwargs):
"""Send an email to this user.""" """Send an email to this user."""
send_mail(subject, message, from_email, [self.email], **kwargs) send_mail(subject, message, from_email, [self.email], **kwargs)
def create_ldap_account(self, raw_password):
"""Create the LDAP account which corresponds to this user."""
if not self.username:
raise MissingUserAttribute("User is missing a username.")
if not self.check_password(raw_password):
raise PasswordMismatch("The given password does not match the user's password.")
return ldap.create_account(self.username, raw_password)
def get_primary_email(self): def get_primary_email(self):
"""Returns the primary email address for this user.""" """Returns the primary email address for this user."""
return f"{self.username}@{settings.USERAUSFALL['PRIMARY_EMAIL_DOMAIN']}" return f"{self.username}@{settings.USERAUSFALL['PRIMARY_EMAIL_DOMAIN']}"
def has_ldap_account(self):
"""Returns True if an ldap account exists for the user's username."""
return self._ldap.has_account(self.username)
@cached_property
def _ldap(self):
return LDAPManager()
class TrustBridge(ConfirmableModelMixin, models.Model): class TrustBridge(ConfirmableModelMixin, models.Model):
is_trusted = ConfirmationField(email_class=TrustBridgeConfirmationEmail) is_trusted = ConfirmationField(email_class=TrustBridgeConfirmationEmail)

View file

@ -7,17 +7,39 @@ from userausfall.views import get_authenticated_user
class UserSerializer(serializers.HyperlinkedModelSerializer): class UserSerializer(serializers.HyperlinkedModelSerializer):
class Meta: class Meta:
model = User model = User
fields = ["username"] fields = ["id", "trust_bridge", "url"]
class ActivateUserSerializer(serializers.HyperlinkedModelSerializer):
class Meta:
model = User
fields = ["id", "password", "url"]
def validate(self, data):
if not hasattr(self.instance, "trust_bridge") or not self.instance.trust_bridge.is_trusted:
raise serializers.ValidationError("User has no trusted trust bridge")
return data
def validate_password(self, value):
if not self.instance.check_password(value):
raise serializers.ValidationError("Password does not match the user's password")
return value
class TrustBridgeUserSerializer(serializers.HyperlinkedModelSerializer):
class Meta:
model = User
fields = ["id", "url", "username"]
# the UniqueValidator for username prevents successful validation for existing users # the UniqueValidator for username prevents successful validation for existing users
extra_kwargs = {"username": {"validators": []}} extra_kwargs = {"username": {"validators": []}}
class TrustBridgeSerializer(serializers.HyperlinkedModelSerializer): class TrustBridgeSerializer(serializers.HyperlinkedModelSerializer):
trust_giver = UserSerializer() trust_giver = TrustBridgeUserSerializer()
class Meta: class Meta:
model = TrustBridge model = TrustBridge
fields = ["is_trusted", "trust_giver"] fields = ["id", "is_trusted", "trust_giver", "url"]
read_only_fields = ["is_trusted"] read_only_fields = ["is_trusted"]
def create(self, validated_data): def create(self, validated_data):

View file

@ -1,2 +1,3 @@
from .auth import * # noqa: F401, F403 from .auth import AuthenticationTestCase # noqa: F401, F403
from .trust_bridges import * # noqa: F401, F403 from .trust_bridges import TrustBridgeTestCase # noqa: F401, F403
from .users import UserTestCase # noqa: F401, F403

View file

@ -27,7 +27,7 @@ class UserMixin:
self.client.force_login(user=self.user) self.client.force_login(user=self.user)
class UserTestCase(UserMixin, UserausfallAPITestCase): class AuthenticationTestCase(UserMixin, UserausfallAPITestCase):
base_url = "/api/auth" base_url = "/api/auth"
def test_signup(self): def test_signup(self):

View file

@ -1,17 +1,25 @@
from django.core import mail from django.core import mail
from rest_framework import status from rest_framework import status
from userausfall.models import TrustBridge from userausfall.models import TrustBridge, User
from userausfall.rest_api.tests import UserausfallAPITestCase, UserMixin from userausfall.rest_api.tests.auth import UserMixin
from userausfall.rest_api.tests.userausfall import get_url, UserausfallAPITestCase
class TrustBridgeTestCase(UserMixin, UserausfallAPITestCase): class TrustBridgeMixin(UserMixin):
def create_trust_bridge(self): trust_bridge: TrustBridge
trust_giver: User
def create_trust_bridge(self, is_trusted=False):
self.trust_giver = self.create_user() self.trust_giver = self.create_user()
self.create_user() self.create_user()
self.trust_bridge = TrustBridge.objects.create(trust_taker=self.user, trust_giver=self.trust_giver) self.trust_bridge = TrustBridge.objects.create(
trust_taker=self.user, trust_giver=self.trust_giver, is_trusted=is_trusted
)
return self.trust_bridge return self.trust_bridge
class TrustBridgeTestCase(TrustBridgeMixin, UserausfallAPITestCase):
def test_create_trust_bridge(self): def test_create_trust_bridge(self):
"""Create a trust bridge for the current user.""" """Create a trust bridge for the current user."""
url = "/trust-bridges/" url = "/trust-bridges/"
@ -37,13 +45,17 @@ class TrustBridgeTestCase(UserMixin, UserausfallAPITestCase):
self.authenticate_user() self.authenticate_user()
response = self.client.get(self.get_api_url(url, pk=self.trust_bridge.pk)) response = self.client.get(self.get_api_url(url, pk=self.trust_bridge.pk))
self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual( self.assertDictEqual(
response.data, response.data,
{ {
"id": self.trust_bridge.id,
"is_trusted": False, "is_trusted": False,
"trust_giver": { "trust_giver": {
"id": self.trust_giver.id,
"username": self.trust_giver.username, "username": self.trust_giver.username,
"url": get_url(response, "user", self.trust_giver),
}, },
"url": get_url(response, "trustbridge", self.trust_bridge),
}, },
) )

View file

@ -1,6 +1,11 @@
from rest_framework.reverse import reverse
from rest_framework.test import APITestCase from rest_framework.test import APITestCase
def get_url(response, basename, instance):
return reverse(f"{basename}-detail", [instance.pk], request=response.wsgi_request)
class UserausfallAPITestCase(APITestCase): class UserausfallAPITestCase(APITestCase):
base_url = "/api" base_url = "/api"

View file

@ -0,0 +1,72 @@
from django.test import override_settings
from rest_framework import status
from rest_framework.exceptions import ErrorDetail
from userausfall.ldap import LDAPManager
from userausfall.rest_api.tests.trust_bridges import TrustBridgeMixin
from userausfall.rest_api.tests.userausfall import get_url, UserausfallAPITestCase
@override_settings(USERAUSFALL_LDAP_IS_TEST=True)
class UserTestCase(TrustBridgeMixin, UserausfallAPITestCase):
def setUp(self) -> None:
self.ldap = LDAPManager()
def tearDown(self) -> None:
self.ldap.drop_test_connection()
def test_retrieve_user(self):
"""Retrieve the details of the current user."""
url = "/users/{pk}/"
self.authenticate_user()
response = self.client.get(self.get_api_url(url, pk=self.user.pk))
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertDictEqual(
response.data,
{
"id": self.user.id,
"trust_bridge": None,
"url": get_url(response, "user", self.user),
},
)
def test_activate_user(self):
"""Create the ldap account for the current user."""
url = "/users/{pk}/activate/"
self.create_trust_bridge(is_trusted=True)
self.authenticate_user()
response = self.client.post(self.get_api_url(url, pk=self.user.pk), {"password": self.password})
self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
self.assertTrue(self.user.has_ldap_account())
def test_activate_user_with_invalid_password(self):
"""Create the ldap account for the current user with an invalid password."""
url = "/users/{pk}/activate/"
self.create_trust_bridge(is_trusted=True)
self.authenticate_user()
response = self.client.post(self.get_api_url(url, pk=self.user.pk), {"password": "invalid"})
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertDictEqual(
response.data, {"password": [ErrorDetail("Password does not match the user's password", code="invalid")]}
)
def test_activate_user_without_trust_bridge(self):
"""Create the ldap account for the current user without a trust bridge."""
url = "/users/{pk}/activate/"
self.authenticate_user()
response = self.client.post(self.get_api_url(url, pk=self.user.pk), {"password": self.password})
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertDictEqual(
response.data, {"non_field_errors": [ErrorDetail("User has no trusted trust bridge", code="invalid")]}
)
def test_activate_user_with_untrusted_trust_bridge(self):
"""Create the ldap account for the current user with an untrusted trust bridge."""
url = "/users/{pk}/activate/"
self.create_trust_bridge(is_trusted=False)
self.authenticate_user()
response = self.client.post(self.get_api_url(url, pk=self.user.pk), {"password": self.password})
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertDictEqual(
response.data, {"non_field_errors": [ErrorDetail("User has no trusted trust bridge", code="invalid")]}
)

View file

@ -2,10 +2,11 @@ from django.urls import include, path
from drf_spectacular.views import SpectacularAPIView, SpectacularRedocView, SpectacularSwaggerView from drf_spectacular.views import SpectacularAPIView, SpectacularRedocView, SpectacularSwaggerView
from rest_framework import routers from rest_framework import routers
from userausfall.rest_api.views import TrustBridgeViewSet from userausfall.rest_api.views import TrustBridgeViewSet, UserViewSet
router = routers.SimpleRouter() router = routers.SimpleRouter()
router.register(r"trust-bridges", TrustBridgeViewSet, "trust-bridge") router.register(r"trust-bridges", TrustBridgeViewSet, "trustbridge")
router.register(r"users", UserViewSet, "user")
urlpatterns = [ urlpatterns = [
path("", include(router.urls)), path("", include(router.urls)),

View file

@ -3,8 +3,8 @@ from rest_framework import mixins, status, viewsets
from rest_framework.decorators import action from rest_framework.decorators import action
from rest_framework.response import Response from rest_framework.response import Response
from userausfall.models import MissingUserAttribute, PasswordMismatch, TrustBridge, User from userausfall.models import TrustBridge, User
from userausfall.rest_api.serializers import TrustBridgeSerializer from userausfall.rest_api.serializers import ActivateUserSerializer, TrustBridgeSerializer, UserSerializer
from userausfall.views import get_authenticated_user from userausfall.views import get_authenticated_user
@ -18,21 +18,28 @@ class TrustBridgeViewSet(
return self.queryset.filter(trust_taker=get_authenticated_user(self.request)) return self.queryset.filter(trust_taker=get_authenticated_user(self.request))
class UserViewSet(viewsets.GenericViewSet): class ActivateUserMixin:
@action(detail=True, methods=["post"]) @action(detail=True, methods=["post"])
def activate(self, request, pk=None): def activate(self, request, pk=None):
"""Create the corresponding LDAP account.""" """Create the corresponding LDAP account."""
user: User = self.get_object() instance = self.get_object()
serializer = self.get_serializer(data=request.data) serializer = self.get_serializer(instance, data=request.data)
if serializer.is_valid(): serializer.is_valid(raise_exception=True)
try: self.perform_activate(instance, serializer)
# We prevent untrusted user accounts from being activated via API. return Response(status=status.HTTP_204_NO_CONTENT)
# They might be activated via Admin or programmatically.
if not user.trust_bridge.is_trusted: def perform_activate(self, instance: User, serializer):
raise MissingUserAttribute("User has no trusted trust bridge.") instance.create_ldap_account(serializer.validated_data["password"])
user.create_ldap_account(serializer.validated_data["password"])
except (MissingUserAttribute, PasswordMismatch) as e:
return Response({"message": str(e)}, status=status.HTTP_400_BAD_REQUEST) class UserViewSet(ActivateUserMixin, mixins.RetrieveModelMixin, viewsets.GenericViewSet):
return Response(status=status.HTTP_204_NO_CONTENT) serializer_class = UserSerializer
def get_queryset(self):
return User.objects.filter(pk=get_authenticated_user(self.request).pk)
def get_serializer_class(self):
if self.action == "activate":
return ActivateUserSerializer
else: else:
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) return super().get_serializer_class()

30
userausfall/tests.py Normal file
View file

@ -0,0 +1,30 @@
from django.test import override_settings, TestCase
from userausfall.ldap import LDAPManager
@override_settings(USERAUSFALL_LDAP_IS_TEST=True)
class LDAPTestCase(TestCase):
def setUp(self) -> None:
self.username = "test"
self.password = "test12345"
self.ldap = LDAPManager()
def tearDown(self) -> None:
self.ldap.drop_test_connection()
def test_create_has_account(self):
exists = self.ldap.has_account(self.username)
self.assertFalse(exists)
is_created = self.ldap.create_account(self.username, self.password)
self.assertTrue(is_created)
exists = self.ldap.has_account(self.username)
self.assertTrue(exists)
def test_create_account_data(self):
is_valid = self.ldap.is_valid_account_data(self.username, self.password)
self.assertFalse(is_valid)
is_created = self.ldap.create_account(self.username, self.password)
self.assertTrue(is_created)
is_valid = self.ldap.is_valid_account_data(self.username, self.password)
self.assertTrue(is_valid)