diff --git a/windows_auth/apps.py b/windows_auth/apps.py index b4b5359..5ec7ed0 100644 --- a/windows_auth/apps.py +++ b/windows_auth/apps.py @@ -1,5 +1,6 @@ import atexit +from django.conf import settings from django.db import DatabaseError from ldap3.core.exceptions import LDAPException from django.apps import AppConfig @@ -13,7 +14,7 @@ class WindowsAuthConfig(AppConfig): default_auto_field = 'django.db.models.AutoField' def ready(self): - from windows_auth.conf import WAUTH_IGNORE_SETTING_WARNINGS, WAUTH_PRELOAD_DOMAINS, WAUTH_DOMAINS + from windows_auth.conf import wauth_settings from windows_auth.settings import DEFAULT_DOMAIN_SETTING from windows_auth.ldap import get_ldap_manager, close_connections @@ -23,11 +24,17 @@ def ready(self): # You can avoid this behavior by using "runserver --noreload" parameter, # or modifying the WAUTH_PRELOAD_DOMAINS setting to False. + if ( + not hasattr(settings, "WAUTH_DOMAINS") + and not getattr(settings, "WAUTH_IGNORE_SETTING_WARNINGS", False) + ): + logger.warn("The required setting WAUTH_DOMAINS is missing.") + # check about users with domain missing from settings - if not WAUTH_IGNORE_SETTING_WARNINGS and DEFAULT_DOMAIN_SETTING not in WAUTH_DOMAINS: + if not wauth_settings.WAUTH_IGNORE_SETTING_WARNINGS and DEFAULT_DOMAIN_SETTING not in wauth_settings.WAUTH_DOMAINS: try: from windows_auth.models import LDAPUser - missing_domains = LDAPUser.objects.exclude(domain__in=WAUTH_DOMAINS.keys()) + missing_domains = LDAPUser.objects.exclude(domain__in=wauth_settings.WAUTH_DOMAINS.keys()) if missing_domains.exists(): for result in missing_domains.values("domain").annotate(count=Count("pk")): logger.warning(f"Settings for domain \"{result.get('domain')}\" are missing from WAUTH_DOMAINS " @@ -37,9 +44,9 @@ def ready(self): logger.warn(e) # configure default preload domains - preload_domains = WAUTH_PRELOAD_DOMAINS + preload_domains = wauth_settings.WAUTH_PRELOAD_DOMAINS if preload_domains in (None, True): - preload_domains = list(WAUTH_DOMAINS.keys()) + preload_domains = list(wauth_settings.WAUTH_DOMAINS.keys()) if DEFAULT_DOMAIN_SETTING in preload_domains: preload_domains.remove(DEFAULT_DOMAIN_SETTING) diff --git a/windows_auth/backends.py b/windows_auth/backends.py index b314680..f3124d5 100644 --- a/windows_auth/backends.py +++ b/windows_auth/backends.py @@ -1,6 +1,6 @@ from django.contrib.auth.backends import RemoteUserBackend -from windows_auth.conf import WAUTH_USE_SPN, WAUTH_LOWERCASE_USERNAME +from windows_auth.conf import wauth_settings from windows_auth.models import LDAPUser @@ -19,12 +19,12 @@ def clean_username(self, username: str): :param username: raw REMOTE_USER header value :return: cleaned sAMAccountName value from the """ - if WAUTH_USE_SPN: + if wauth_settings.WAUTH_USE_SPN: sam_account_name, self.domain = username.rsplit("@", 2) else: self.domain, sam_account_name = username.split("\\", 2) - if WAUTH_LOWERCASE_USERNAME: + if wauth_settings.WAUTH_LOWERCASE_USERNAME: return str(sam_account_name).lower() else: return sam_account_name diff --git a/windows_auth/conf.py b/windows_auth/conf.py index fae5cee..2b21e50 100644 --- a/windows_auth/conf.py +++ b/windows_auth/conf.py @@ -1,34 +1,59 @@ -from typing import Callable, Union, Optional, Iterable +from dataclasses import dataclass, field, fields, Field +from typing import Callable, Union, Optional, Iterable, Dict, TYPE_CHECKING, Set from django.conf import settings +from django.core.signals import setting_changed +from django.dispatch import receiver from django.http import HttpResponse from django.utils import timezone -from windows_auth import logger - -if not hasattr(settings, "WAUTH_DOMAINS"): - logger.warn("The required setting WAUTH_DOMAINS is missing.") - -# Settings for each domain -WAUTH_DOMAINS: dict = getattr(settings, "WAUTH_DOMAINS", {}) - -# Expect REMOTE_USER value to be in SPN scheme -WAUTH_USE_SPN: bool = getattr(settings, "WAUTH_USE_SPN", False) -# Minimum time until automatic re-sync -WAUTH_RESYNC_DELTA: Optional[Union[str, int, timezone.timedelta]] = getattr(settings, "WAUTH_RESYNC_DELTA", - timezone.timedelta(days=1)) -# Use cache instead of model for determining re-sync -WAUTH_USE_CACHE: bool = getattr(settings, "WAUTH_USE_CACHE", False) -# Raise exception and return Error 500 when user failed to synced to domain -WAUTH_REQUIRE_RESYNC: bool = getattr(settings, "WAUTH_REQUIRE_RESYNC", settings.DEBUG or False) -# Choose custom HTTP Response to send when LDAP Sync fails -WAUTH_ERROR_RESPONSE: Optional[Union[int, HttpResponse, Callable]] = getattr(settings, "WAUTH_ERROR_RESPONSE", None) -# Lowercase the username from the REMOTE_USER. Used for correct non-case sensitive LDAP backends. -WAUTH_LOWERCASE_USERNAME: bool = getattr(settings, "WAUTH_LOWERCASE_USERNAME", True) -# Skip verification of domain settings on server startup -WAUTH_IGNORE_SETTING_WARNINGS: bool = getattr(settings, "WAUTH_IGNORE_SETTING_WARNINGS", False) -# List of domains to preload and connect during process startup -WAUTH_PRELOAD_DOMAINS: Optional[Iterable[str]] = getattr(settings, "WAUTH_PRELOAD_DOMAINS", None) -# User to impersonate when using SimulateWindowsAuthMiddleware -WAUTH_SIMULATE_USER: str = getattr(settings, "WAUTH_SIMULATE_USER", "") +if TYPE_CHECKING: + from windows_auth.settings import LDAPSettings + + +@dataclass() +class WAUTHSettings: + WAUTH_DOMAINS: Dict[str, Union["LDAPSettings"]] = field(default_factory=lambda: {}) + WAUTH_USE_SPN: bool = False + WAUTH_RESYNC_DELTA: Optional[Union[str, int, timezone.timedelta]] = timezone.timedelta(days=1) + WAUTH_USE_CACHE: bool = False + # Raise exception and return Error 500 when user failed to synced to domain + WAUTH_REQUIRE_RESYNC: bool = field(default_factory=lambda: settings.DEBUG) + WAUTH_ERROR_RESPONSE: Optional[Union[int, HttpResponse, Callable]] = None + # Lowercase the username from the REMOTE_USER. Used for connecting case-insensitive LDAP backends. + WAUTH_LOWERCASE_USERNAME: bool = True + # Skip verification of domain settings on server startup + WAUTH_IGNORE_SETTING_WARNINGS: bool = False + # List of domains to preload and connect during process startup + WAUTH_PRELOAD_DOMAINS: Optional[Iterable[str]] = None + # User to impersonate when using SimulateWindowsAuthMiddleware + WAUTH_SIMULATE_USER: str = "" + + @classmethod + def build_settings(cls): + return cls(**{ + f.name: getattr(settings, f.name) + for f in fields(cls) + if hasattr(settings, f.name) + }) + + def update_setting(self, name: str, value) -> None: + self.__setattr__(name, value) + + +if settings.configured: + wauth_settings = WAUTHSettings.build_settings() + + +@receiver(setting_changed) +def _update_setting(sender=None, setting=None, value=None, enter=None, **kwargs): + if not wauth_settings: + return + + if setting in map(lambda f: f.name, fields(WAUTHSettings)): + wauth_settings.update_setting(setting, value) + + # update bound settings + if setting == "DEBUG" and not hasattr(settings, "WAUTH_REQUIRE_RESYNC"): + wauth_settings.update_setting("WAUTH_REQUIRE_RESYNC", value) diff --git a/windows_auth/decorators.py b/windows_auth/decorators.py index a78069d..502a40e 100644 --- a/windows_auth/decorators.py +++ b/windows_auth/decorators.py @@ -1,7 +1,7 @@ from django.contrib.auth.decorators import user_passes_test from django.utils import timezone -from windows_auth.conf import WAUTH_USE_CACHE +from windows_auth.conf import wauth_settings from windows_auth.models import LDAPUser @@ -50,7 +50,7 @@ def ldap_sync_required(function=None, timedelta=None, login_url=None, allow_non_ def check_sync(user): if user.is_authenticated and LDAPUser.objects.filter(user=user).exists(): try: - if WAUTH_USE_CACHE: + if wauth_settings.WAUTH_USE_CACHE: user.ldap.sync() else: # check via database query diff --git a/windows_auth/middleware.py b/windows_auth/middleware.py index 7373704..e2f393a 100644 --- a/windows_auth/middleware.py +++ b/windows_auth/middleware.py @@ -4,8 +4,7 @@ from django.utils import timezone from windows_auth import logger -from windows_auth.conf import WAUTH_RESYNC_DELTA, WAUTH_USE_CACHE, WAUTH_REQUIRE_RESYNC, WAUTH_ERROR_RESPONSE, \ - WAUTH_SIMULATE_USER +from windows_auth.conf import wauth_settings from windows_auth.models import LDAPUser @@ -22,17 +21,17 @@ def __call__(self, request): """ if (request.user and request.user.is_authenticated - and LDAPUser.objects.filter(user=request.user).exists() and WAUTH_RESYNC_DELTA not in (None, False)): + and LDAPUser.objects.filter(user=request.user).exists() and wauth_settings.WAUTH_RESYNC_DELTA not in (None, False)): try: # convert timeout to seconds - if isinstance(WAUTH_RESYNC_DELTA, timezone.timedelta): - timeout = WAUTH_RESYNC_DELTA.total_seconds() + if isinstance(wauth_settings.WAUTH_RESYNC_DELTA, timezone.timedelta): + timeout = wauth_settings.WAUTH_RESYNC_DELTA.total_seconds() else: - timeout = int(WAUTH_RESYNC_DELTA) + timeout = int(wauth_settings.WAUTH_RESYNC_DELTA) ldap_user = LDAPUser.objects.get(user=request.user) - if WAUTH_USE_CACHE: + if wauth_settings.WAUTH_USE_CACHE: # if cache does not exist cache_key = f"wauth_resync_user_{ldap_user.user.id}" if not cache.get(cache_key): @@ -50,11 +49,11 @@ def __call__(self, request): except Exception as e: logger.exception(f"Failed to synchronize user {request.user} against LDAP") # return error response - if WAUTH_REQUIRE_RESYNC: - if isinstance(WAUTH_ERROR_RESPONSE, int): - return HttpResponse(f"Authorization Failed.", status=WAUTH_ERROR_RESPONSE) - elif callable(WAUTH_ERROR_RESPONSE): - return WAUTH_ERROR_RESPONSE(request, e) + if wauth_settings.WAUTH_REQUIRE_RESYNC: + if isinstance(wauth_settings.WAUTH_ERROR_RESPONSE, int): + return HttpResponse(f"Authorization Failed.", status=wauth_settings.WAUTH_ERROR_RESPONSE) + elif callable(wauth_settings.WAUTH_ERROR_RESPONSE): + return wauth_settings.WAUTH_ERROR_RESPONSE(request, e) else: raise e response = self.get_response(request) @@ -69,6 +68,6 @@ def __init__(self, get_response): def __call__(self, request: HttpRequest): if settings.DEBUG and not request.META.get("REMOTE_USER"): # Set remote user - request.META['REMOTE_USER'] = WAUTH_SIMULATE_USER + request.META['REMOTE_USER'] = wauth_settings.WAUTH_SIMULATE_USER return self.get_response(request) diff --git a/windows_auth/models.py b/windows_auth/models.py index 40bf340..7e74f15 100644 --- a/windows_auth/models.py +++ b/windows_auth/models.py @@ -9,7 +9,7 @@ from ldap3 import Reader, Entry, Attribute from windows_auth import logger -from windows_auth.conf import WAUTH_USE_CACHE, WAUTH_USE_SPN, WAUTH_LOWERCASE_USERNAME +from windows_auth.conf import wauth_settings from windows_auth.ldap import LDAPManager, get_ldap_manager from windows_auth.signals import ldap_user_sync from windows_auth.utils import LogExecutionTime @@ -40,7 +40,7 @@ def create_user(self, username: str) -> User: :param username: Logon username (DOMAIN\username or username@domain.com) :return: User object (not LDAPUser) """ - if WAUTH_USE_SPN: + if wauth_settings.WAUTH_USE_SPN: if "@" not in username: raise ValueError("Username must be in username@domain.com format.") @@ -51,7 +51,7 @@ def create_user(self, username: str) -> User: domain, sam_account_name = username.split("\\", 2) - if WAUTH_LOWERCASE_USERNAME: + if wauth_settings.WAUTH_LOWERCASE_USERNAME: sam_account_name = sam_account_name.lower() user = get_user_model().objects.create_user(username=sam_account_name) @@ -210,13 +210,13 @@ def sync(self) -> None: ldap_user_sync.send(self, ldap_user=ldap_user, group_reader=group_reader) # update sync time - if not WAUTH_USE_CACHE: + if not wauth_settings.WAUTH_USE_CACHE: with LogExecutionTime(f"Save LDAP User {self}"): self.last_sync = timezone.now() self.save() def __str__(self): - if WAUTH_USE_SPN: + if wauth_settings.WAUTH_USE_SPN: return f"{self.user.username}@{self.domain}" else: return f"{self.domain}\\{self.user.username}" diff --git a/windows_auth/settings.py b/windows_auth/settings.py index 8f47c17..0149b87 100644 --- a/windows_auth/settings.py +++ b/windows_auth/settings.py @@ -57,17 +57,17 @@ class LDAPSettings: @classmethod def for_domain(cls, domain: str): - from windows_auth.conf import WAUTH_DOMAINS + from windows_auth.conf import wauth_settings - if domain not in WAUTH_DOMAINS and DEFAULT_DOMAIN_SETTING not in WAUTH_DOMAINS: + if domain not in wauth_settings.WAUTH_DOMAINS and DEFAULT_DOMAIN_SETTING not in wauth_settings.WAUTH_DOMAINS: raise ImproperlyConfigured(f"Domain {domain} settings could not be found in WAUTH_DOMAINS setting.") - domain_settings = WAUTH_DOMAINS.get(domain, {}) + domain_settings = wauth_settings.WAUTH_DOMAINS.get(domain, {}) # when setting is an LDAPSetting object if isinstance(domain_settings, LDAPSettings): return domain_settings - default_settings = WAUTH_DOMAINS.get(DEFAULT_DOMAIN_SETTING, {}) + default_settings = wauth_settings.WAUTH_DOMAINS.get(DEFAULT_DOMAIN_SETTING, {}) # when domain setting if isinstance(default_settings, LDAPSettings): default_settings = asdict(default_settings)