51 lines
1.8 KiB
Python
51 lines
1.8 KiB
Python
from django.conf import settings
|
|
from ldap3 import Connection, MOCK_SYNC, SAFE_SYNC, Server
|
|
|
|
|
|
class LDAPManager:
|
|
def __init__(self):
|
|
if not getattr(settings, "USERAUSFALL_LDAP_IS_TEST", False):
|
|
self.connection = self._get_connection()
|
|
else:
|
|
self.connection = self._get_test_connection()
|
|
|
|
def create_account(self, username, raw_password):
|
|
is_success = self.connection.add(
|
|
f"cn={username},dc=local",
|
|
["simpleSecurityObject", "organizationalRole"],
|
|
{"userPassword": raw_password},
|
|
)
|
|
return is_success
|
|
|
|
def has_account(self, username):
|
|
exists = self.connection.search(f"cn={username},dc=local", "(objectclass=simpleSecurityObject)")
|
|
return exists
|
|
|
|
def is_valid_account_data(self, username, raw_password):
|
|
is_valid = self.connection.search(
|
|
f"cn={username},dc=local",
|
|
"(objectclass=simpleSecurityObject)",
|
|
attributes=["userPassword"],
|
|
)
|
|
if is_valid:
|
|
is_valid = self.connection.entries[0]["userPassword"].value == raw_password
|
|
return is_valid
|
|
|
|
def _get_connection(self):
|
|
server = Server("localhost")
|
|
connection = Connection(
|
|
server,
|
|
settings.USERAUSFALL_LDAP["ADMIN_USER_DN"],
|
|
settings.USERAUSFALL_LDAP["ADMIN_USER_PASSWORD"],
|
|
client_strategy=SAFE_SYNC,
|
|
auto_bind=True,
|
|
)
|
|
return connection
|
|
|
|
def _get_test_connection(self):
|
|
server = Server("testserver")
|
|
connection = Connection(server, user="cn=admin,dc=local", password="admin_secret", client_strategy=MOCK_SYNC)
|
|
connection.strategy.add_entry("cn=admin,dc=local", {"userPassword": "admin_secret"})
|
|
connection.bind()
|
|
return connection
|