from django.conf import settings from ldap3 import Connection, MOCK_SYNC, SAFE_SYNC, Server _test_connection = None class LDAPManager: def __init__(self): if not getattr(settings, "USERAUSFALL_LDAP_IS_TEST", False): self.connection = self._get_connection() else: self.connection = self._get_test_connection() def create_account(self, username, raw_password): is_success = self.connection.add( f"cn={username},dc=local", ["simpleSecurityObject", "organizationalRole"], {"userPassword": raw_password}, ) return is_success def has_account(self, username): exists = self.connection.search(f"cn={username},dc=local", "(objectclass=simpleSecurityObject)") return exists def is_valid_account_data(self, username, raw_password): is_valid = self.connection.search( f"cn={username},dc=local", "(objectclass=simpleSecurityObject)", attributes=["userPassword"], ) if is_valid: is_valid = self.connection.entries[0]["userPassword"].value == raw_password return is_valid def drop_test_connection(self): global _test_connection self.connection.unbind() self.connection = None _test_connection = None def _get_connection(self): server = Server("localhost") connection = Connection( server, settings.USERAUSFALL_LDAP["ADMIN_USER_DN"], settings.USERAUSFALL_LDAP["ADMIN_USER_PASSWORD"], client_strategy=SAFE_SYNC, auto_bind=True, ) return connection def _get_test_connection(self): global _test_connection if _test_connection is None: server = Server("testserver") _test_connection = Connection( server, user="cn=admin,dc=local", password="admin_secret", client_strategy=MOCK_SYNC ) _test_connection.strategy.add_entry("cn=admin,dc=local", {"userPassword": "admin_secret"}) _test_connection.bind() return _test_connection