Compare commits

..

No commits in common. "8ca0f667775b4537e5f53962b2f5c0cffb87a0d3" and "330e3c89ea8cff7eeb8225acf4222b564f7a0f9e" have entirely different histories.

11 changed files with 80 additions and 249 deletions

View file

@ -1,62 +1,44 @@
from django.conf import settings from django.conf import settings
from ldap3 import Connection, MOCK_SYNC, SAFE_SYNC, Server from ldap3 import Connection, Server, SYNC
_test_connection = None
class LDAPManager: def create_account(username, raw_password):
def __init__(self): connection = _get_connection()
if not getattr(settings, "USERAUSFALL_LDAP_IS_TEST", False): is_success = connection.add(
self.connection = self._get_connection() f"cn={username},dc=local",
else: ["simpleSecurityObject", "organizationalRole"],
self.connection = self._get_test_connection() {"userPassword": raw_password},
)
return is_success
def create_account(self, username, raw_password):
is_success = self.connection.add(
f"cn={username},dc=local",
["simpleSecurityObject", "organizationalRole"],
{"userPassword": raw_password},
)
return is_success
def has_account(self, username): def account_exists(username):
exists = self.connection.search(f"cn={username},dc=local", "(objectclass=simpleSecurityObject)") connection = _get_connection()
return exists exists = connection.search(f"cn={username},dc=local", "(objectclass=simpleSecurityObject)")
return exists
def is_valid_account_data(self, username, raw_password):
is_valid = self.connection.search(
f"cn={username},dc=local",
"(objectclass=simpleSecurityObject)",
attributes=["userPassword"],
)
if is_valid:
is_valid = self.connection.entries[0]["userPassword"].value == raw_password
return is_valid
def drop_test_connection(self): def is_valid_account_data(username, raw_password):
global _test_connection connection = _get_connection()
self.connection.unbind() is_valid = connection.search(
self.connection = None f"cn={username},dc=local",
_test_connection = None "(objectclass=simpleSecurityObject)",
attributes=["userPassword"],
)
if is_valid:
is_valid = connection.entries[0]["userPassword"].value == raw_password
return is_valid
def _get_connection(self):
server = Server("localhost")
connection = Connection(
server,
settings.USERAUSFALL_LDAP["ADMIN_USER_DN"],
settings.USERAUSFALL_LDAP["ADMIN_USER_PASSWORD"],
client_strategy=SAFE_SYNC,
auto_bind=True,
)
return connection
def _get_test_connection(self): def _get_connection():
global _test_connection server = Server("localhost")
if _test_connection is None: # The SAFE_SYNC client strategy doesn't seem to be present in Buster version of ldap3. We might want to use it as
server = Server("testserver") # soon as it is available (multithreading).
_test_connection = Connection( connection = Connection(
server, user="cn=admin,dc=local", password="admin_secret", client_strategy=MOCK_SYNC server,
) settings.USERAUSFALL_LDAP["ADMIN_USER_DN"],
_test_connection.strategy.add_entry("cn=admin,dc=local", {"userPassword": "admin_secret"}) settings.USERAUSFALL_LDAP["ADMIN_USER_PASSWORD"],
_test_connection.bind() client_strategy=SYNC,
return _test_connection auto_bind=True,
)
return connection

View file

@ -5,13 +5,18 @@ from django.contrib.auth.validators import UnicodeUsernameValidator
from django.core.mail import send_mail from django.core.mail import send_mail
from django.db import models from django.db import models
from django.utils import timezone from django.utils import timezone
from django.utils.functional import cached_property
from django.utils.translation import gettext_lazy as _ from django.utils.translation import gettext_lazy as _
from djeveric.fields import ConfirmationField from djeveric.fields import ConfirmationField
from djeveric.models import ConfirmableModelMixin from djeveric.models import ConfirmableModelMixin
from userausfall import ldap
from userausfall.emails import TrustBridgeConfirmationEmail from userausfall.emails import TrustBridgeConfirmationEmail
from userausfall.ldap import LDAPManager
class MissingUserAttribute(Exception):
"""The user object is missing a required attribute."""
pass
class PasswordMismatch(Exception): class PasswordMismatch(Exception):
@ -84,28 +89,22 @@ class User(PermissionsMixin, AbstractBaseUser):
super().clean() super().clean()
self.email = self.__class__.objects.normalize_email(self.email) self.email = self.__class__.objects.normalize_email(self.email)
def create_ldap_account(self, raw_password):
"""Create the LDAP account which corresponds to this user."""
if not self.check_password(raw_password):
raise PasswordMismatch("The given password does not match the user's password.")
return self._ldap.create_account(self.username, raw_password)
def email_user(self, subject, message, from_email=None, **kwargs): def email_user(self, subject, message, from_email=None, **kwargs):
"""Send an email to this user.""" """Send an email to this user."""
send_mail(subject, message, from_email, [self.email], **kwargs) send_mail(subject, message, from_email, [self.email], **kwargs)
def create_ldap_account(self, raw_password):
"""Create the LDAP account which corresponds to this user."""
if not self.username:
raise MissingUserAttribute("User is missing a username.")
if not self.check_password(raw_password):
raise PasswordMismatch("The given password does not match the user's password.")
return ldap.create_account(self.username, raw_password)
def get_primary_email(self): def get_primary_email(self):
"""Returns the primary email address for this user.""" """Returns the primary email address for this user."""
return f"{self.username}@{settings.USERAUSFALL['PRIMARY_EMAIL_DOMAIN']}" return f"{self.username}@{settings.USERAUSFALL['PRIMARY_EMAIL_DOMAIN']}"
def has_ldap_account(self):
"""Returns True if an ldap account exists for the user's username."""
return self._ldap.has_account(self.username)
@cached_property
def _ldap(self):
return LDAPManager()
class TrustBridge(ConfirmableModelMixin, models.Model): class TrustBridge(ConfirmableModelMixin, models.Model):
is_trusted = ConfirmationField(email_class=TrustBridgeConfirmationEmail) is_trusted = ConfirmationField(email_class=TrustBridgeConfirmationEmail)

View file

@ -7,39 +7,17 @@ from userausfall.views import get_authenticated_user
class UserSerializer(serializers.HyperlinkedModelSerializer): class UserSerializer(serializers.HyperlinkedModelSerializer):
class Meta: class Meta:
model = User model = User
fields = ["id", "trust_bridge", "url"] fields = ["username"]
class ActivateUserSerializer(serializers.HyperlinkedModelSerializer):
class Meta:
model = User
fields = ["id", "password", "url"]
def validate(self, data):
if not hasattr(self.instance, "trust_bridge") or not self.instance.trust_bridge.is_trusted:
raise serializers.ValidationError("User has no trusted trust bridge")
return data
def validate_password(self, value):
if not self.instance.check_password(value):
raise serializers.ValidationError("Password does not match the user's password")
return value
class TrustBridgeUserSerializer(serializers.HyperlinkedModelSerializer):
class Meta:
model = User
fields = ["id", "url", "username"]
# the UniqueValidator for username prevents successful validation for existing users # the UniqueValidator for username prevents successful validation for existing users
extra_kwargs = {"username": {"validators": []}} extra_kwargs = {"username": {"validators": []}}
class TrustBridgeSerializer(serializers.HyperlinkedModelSerializer): class TrustBridgeSerializer(serializers.HyperlinkedModelSerializer):
trust_giver = TrustBridgeUserSerializer() trust_giver = UserSerializer()
class Meta: class Meta:
model = TrustBridge model = TrustBridge
fields = ["id", "is_trusted", "trust_giver", "url"] fields = ["is_trusted", "trust_giver"]
read_only_fields = ["is_trusted"] read_only_fields = ["is_trusted"]
def create(self, validated_data): def create(self, validated_data):

View file

@ -1,3 +1,2 @@
from .auth import AuthenticationTestCase # noqa: F401, F403 from .auth import * # noqa: F401, F403
from .trust_bridges import TrustBridgeTestCase # noqa: F401, F403 from .trust_bridges import * # noqa: F401, F403
from .users import UserTestCase # noqa: F401, F403

View file

@ -27,7 +27,7 @@ class UserMixin:
self.client.force_login(user=self.user) self.client.force_login(user=self.user)
class AuthenticationTestCase(UserMixin, UserausfallAPITestCase): class UserTestCase(UserMixin, UserausfallAPITestCase):
base_url = "/api/auth" base_url = "/api/auth"
def test_signup(self): def test_signup(self):

View file

@ -1,25 +1,17 @@
from django.core import mail from django.core import mail
from rest_framework import status from rest_framework import status
from userausfall.models import TrustBridge, User from userausfall.models import TrustBridge
from userausfall.rest_api.tests.auth import UserMixin from userausfall.rest_api.tests import UserausfallAPITestCase, UserMixin
from userausfall.rest_api.tests.userausfall import get_url, UserausfallAPITestCase
class TrustBridgeMixin(UserMixin): class TrustBridgeTestCase(UserMixin, UserausfallAPITestCase):
trust_bridge: TrustBridge def create_trust_bridge(self):
trust_giver: User
def create_trust_bridge(self, is_trusted=False):
self.trust_giver = self.create_user() self.trust_giver = self.create_user()
self.create_user() self.create_user()
self.trust_bridge = TrustBridge.objects.create( self.trust_bridge = TrustBridge.objects.create(trust_taker=self.user, trust_giver=self.trust_giver)
trust_taker=self.user, trust_giver=self.trust_giver, is_trusted=is_trusted
)
return self.trust_bridge return self.trust_bridge
class TrustBridgeTestCase(TrustBridgeMixin, UserausfallAPITestCase):
def test_create_trust_bridge(self): def test_create_trust_bridge(self):
"""Create a trust bridge for the current user.""" """Create a trust bridge for the current user."""
url = "/trust-bridges/" url = "/trust-bridges/"
@ -45,17 +37,13 @@ class TrustBridgeTestCase(TrustBridgeMixin, UserausfallAPITestCase):
self.authenticate_user() self.authenticate_user()
response = self.client.get(self.get_api_url(url, pk=self.trust_bridge.pk)) response = self.client.get(self.get_api_url(url, pk=self.trust_bridge.pk))
self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertDictEqual( self.assertEqual(
response.data, response.data,
{ {
"id": self.trust_bridge.id,
"is_trusted": False, "is_trusted": False,
"trust_giver": { "trust_giver": {
"id": self.trust_giver.id,
"username": self.trust_giver.username, "username": self.trust_giver.username,
"url": get_url(response, "user", self.trust_giver),
}, },
"url": get_url(response, "trustbridge", self.trust_bridge),
}, },
) )

View file

@ -1,11 +1,6 @@
from rest_framework.reverse import reverse
from rest_framework.test import APITestCase from rest_framework.test import APITestCase
def get_url(response, basename, instance):
return reverse(f"{basename}-detail", [instance.pk], request=response.wsgi_request)
class UserausfallAPITestCase(APITestCase): class UserausfallAPITestCase(APITestCase):
base_url = "/api" base_url = "/api"

View file

@ -1,72 +0,0 @@
from django.test import override_settings
from rest_framework import status
from rest_framework.exceptions import ErrorDetail
from userausfall.ldap import LDAPManager
from userausfall.rest_api.tests.trust_bridges import TrustBridgeMixin
from userausfall.rest_api.tests.userausfall import get_url, UserausfallAPITestCase
@override_settings(USERAUSFALL_LDAP_IS_TEST=True)
class UserTestCase(TrustBridgeMixin, UserausfallAPITestCase):
def setUp(self) -> None:
self.ldap = LDAPManager()
def tearDown(self) -> None:
self.ldap.drop_test_connection()
def test_retrieve_user(self):
"""Retrieve the details of the current user."""
url = "/users/{pk}/"
self.authenticate_user()
response = self.client.get(self.get_api_url(url, pk=self.user.pk))
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertDictEqual(
response.data,
{
"id": self.user.id,
"trust_bridge": None,
"url": get_url(response, "user", self.user),
},
)
def test_activate_user(self):
"""Create the ldap account for the current user."""
url = "/users/{pk}/activate/"
self.create_trust_bridge(is_trusted=True)
self.authenticate_user()
response = self.client.post(self.get_api_url(url, pk=self.user.pk), {"password": self.password})
self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
self.assertTrue(self.user.has_ldap_account())
def test_activate_user_with_invalid_password(self):
"""Create the ldap account for the current user with an invalid password."""
url = "/users/{pk}/activate/"
self.create_trust_bridge(is_trusted=True)
self.authenticate_user()
response = self.client.post(self.get_api_url(url, pk=self.user.pk), {"password": "invalid"})
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertDictEqual(
response.data, {"password": [ErrorDetail("Password does not match the user's password", code="invalid")]}
)
def test_activate_user_without_trust_bridge(self):
"""Create the ldap account for the current user without a trust bridge."""
url = "/users/{pk}/activate/"
self.authenticate_user()
response = self.client.post(self.get_api_url(url, pk=self.user.pk), {"password": self.password})
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertDictEqual(
response.data, {"non_field_errors": [ErrorDetail("User has no trusted trust bridge", code="invalid")]}
)
def test_activate_user_with_untrusted_trust_bridge(self):
"""Create the ldap account for the current user with an untrusted trust bridge."""
url = "/users/{pk}/activate/"
self.create_trust_bridge(is_trusted=False)
self.authenticate_user()
response = self.client.post(self.get_api_url(url, pk=self.user.pk), {"password": self.password})
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertDictEqual(
response.data, {"non_field_errors": [ErrorDetail("User has no trusted trust bridge", code="invalid")]}
)

View file

@ -2,11 +2,10 @@ from django.urls import include, path
from drf_spectacular.views import SpectacularAPIView, SpectacularRedocView, SpectacularSwaggerView from drf_spectacular.views import SpectacularAPIView, SpectacularRedocView, SpectacularSwaggerView
from rest_framework import routers from rest_framework import routers
from userausfall.rest_api.views import TrustBridgeViewSet, UserViewSet from userausfall.rest_api.views import TrustBridgeViewSet
router = routers.SimpleRouter() router = routers.SimpleRouter()
router.register(r"trust-bridges", TrustBridgeViewSet, "trustbridge") router.register(r"trust-bridges", TrustBridgeViewSet, "trust-bridge")
router.register(r"users", UserViewSet, "user")
urlpatterns = [ urlpatterns = [
path("", include(router.urls)), path("", include(router.urls)),

View file

@ -3,8 +3,8 @@ from rest_framework import mixins, status, viewsets
from rest_framework.decorators import action from rest_framework.decorators import action
from rest_framework.response import Response from rest_framework.response import Response
from userausfall.models import TrustBridge, User from userausfall.models import MissingUserAttribute, PasswordMismatch, TrustBridge, User
from userausfall.rest_api.serializers import ActivateUserSerializer, TrustBridgeSerializer, UserSerializer from userausfall.rest_api.serializers import TrustBridgeSerializer
from userausfall.views import get_authenticated_user from userausfall.views import get_authenticated_user
@ -18,28 +18,21 @@ class TrustBridgeViewSet(
return self.queryset.filter(trust_taker=get_authenticated_user(self.request)) return self.queryset.filter(trust_taker=get_authenticated_user(self.request))
class ActivateUserMixin: class UserViewSet(viewsets.GenericViewSet):
@action(detail=True, methods=["post"]) @action(detail=True, methods=["post"])
def activate(self, request, pk=None): def activate(self, request, pk=None):
"""Create the corresponding LDAP account.""" """Create the corresponding LDAP account."""
instance = self.get_object() user: User = self.get_object()
serializer = self.get_serializer(instance, data=request.data) serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True) if serializer.is_valid():
self.perform_activate(instance, serializer) try:
return Response(status=status.HTTP_204_NO_CONTENT) # We prevent untrusted user accounts from being activated via API.
# They might be activated via Admin or programmatically.
def perform_activate(self, instance: User, serializer): if not user.trust_bridge.is_trusted:
instance.create_ldap_account(serializer.validated_data["password"]) raise MissingUserAttribute("User has no trusted trust bridge.")
user.create_ldap_account(serializer.validated_data["password"])
except (MissingUserAttribute, PasswordMismatch) as e:
class UserViewSet(ActivateUserMixin, mixins.RetrieveModelMixin, viewsets.GenericViewSet): return Response({"message": str(e)}, status=status.HTTP_400_BAD_REQUEST)
serializer_class = UserSerializer return Response(status=status.HTTP_204_NO_CONTENT)
def get_queryset(self):
return User.objects.filter(pk=get_authenticated_user(self.request).pk)
def get_serializer_class(self):
if self.action == "activate":
return ActivateUserSerializer
else: else:
return super().get_serializer_class() return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

View file

@ -1,30 +0,0 @@
from django.test import override_settings, TestCase
from userausfall.ldap import LDAPManager
@override_settings(USERAUSFALL_LDAP_IS_TEST=True)
class LDAPTestCase(TestCase):
def setUp(self) -> None:
self.username = "test"
self.password = "test12345"
self.ldap = LDAPManager()
def tearDown(self) -> None:
self.ldap.drop_test_connection()
def test_create_has_account(self):
exists = self.ldap.has_account(self.username)
self.assertFalse(exists)
is_created = self.ldap.create_account(self.username, self.password)
self.assertTrue(is_created)
exists = self.ldap.has_account(self.username)
self.assertTrue(exists)
def test_create_account_data(self):
is_valid = self.ldap.is_valid_account_data(self.username, self.password)
self.assertFalse(is_valid)
is_created = self.ldap.create_account(self.username, self.password)
self.assertTrue(is_created)
is_valid = self.ldap.is_valid_account_data(self.username, self.password)
self.assertTrue(is_valid)