This repository has been archived on 2022-05-05. You can view files and clone it, but cannot push or open issues or pull requests.
userausfall/userausfall/ldap.py

63 lines
2.1 KiB
Python

from django.conf import settings
from ldap3 import Connection, MOCK_SYNC, SAFE_SYNC, Server
_test_connection = None
class LDAPManager:
def __init__(self):
if not getattr(settings, "USERAUSFALL_LDAP_IS_TEST", False):
self.connection = self._get_connection()
else:
self.connection = self._get_test_connection()
def create_account(self, username, raw_password):
is_success = self.connection.add(
f"cn={username},dc=local",
["simpleSecurityObject", "organizationalRole"],
{"userPassword": raw_password},
)
return is_success
def has_account(self, username):
exists = self.connection.search(f"cn={username},dc=local", "(objectclass=simpleSecurityObject)")
return exists
def is_valid_account_data(self, username, raw_password):
is_valid = self.connection.search(
f"cn={username},dc=local",
"(objectclass=simpleSecurityObject)",
attributes=["userPassword"],
)
if is_valid:
is_valid = self.connection.entries[0]["userPassword"].value == raw_password
return is_valid
def drop_test_connection(self):
global _test_connection
self.connection.unbind()
self.connection = None
_test_connection = None
def _get_connection(self):
server = Server("localhost")
connection = Connection(
server,
settings.USERAUSFALL_LDAP["ADMIN_USER_DN"],
settings.USERAUSFALL_LDAP["ADMIN_USER_PASSWORD"],
client_strategy=SAFE_SYNC,
auto_bind=True,
)
return connection
def _get_test_connection(self):
global _test_connection
if _test_connection is None:
server = Server("testserver")
_test_connection = Connection(
server, user="cn=admin,dc=local", password="admin_secret", client_strategy=MOCK_SYNC
)
_test_connection.strategy.add_entry("cn=admin,dc=local", {"userPassword": "admin_secret"})
_test_connection.bind()
return _test_connection