Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions windows_auth/apps.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import atexit

from django.conf import settings
from django.db import DatabaseError
from ldap3.core.exceptions import LDAPException
from django.apps import AppConfig
Expand All @@ -13,7 +14,7 @@ class WindowsAuthConfig(AppConfig):
default_auto_field = 'django.db.models.AutoField'

def ready(self):
from windows_auth.conf import WAUTH_IGNORE_SETTING_WARNINGS, WAUTH_PRELOAD_DOMAINS, WAUTH_DOMAINS
from windows_auth.conf import wauth_settings
from windows_auth.settings import DEFAULT_DOMAIN_SETTING
from windows_auth.ldap import get_ldap_manager, close_connections

Expand All @@ -23,11 +24,17 @@ def ready(self):
# You can avoid this behavior by using "runserver --noreload" parameter,
# or modifying the WAUTH_PRELOAD_DOMAINS setting to False.

if (
not hasattr(settings, "WAUTH_DOMAINS")
and not getattr(settings, "WAUTH_IGNORE_SETTING_WARNINGS", False)
):
logger.warn("The required setting WAUTH_DOMAINS is missing.")

# check about users with domain missing from settings
if not WAUTH_IGNORE_SETTING_WARNINGS and DEFAULT_DOMAIN_SETTING not in WAUTH_DOMAINS:
if not wauth_settings.WAUTH_IGNORE_SETTING_WARNINGS and DEFAULT_DOMAIN_SETTING not in wauth_settings.WAUTH_DOMAINS:
try:
from windows_auth.models import LDAPUser
missing_domains = LDAPUser.objects.exclude(domain__in=WAUTH_DOMAINS.keys())
missing_domains = LDAPUser.objects.exclude(domain__in=wauth_settings.WAUTH_DOMAINS.keys())
if missing_domains.exists():
for result in missing_domains.values("domain").annotate(count=Count("pk")):
logger.warning(f"Settings for domain \"{result.get('domain')}\" are missing from WAUTH_DOMAINS "
Expand All @@ -37,9 +44,9 @@ def ready(self):
logger.warn(e)

# configure default preload domains
preload_domains = WAUTH_PRELOAD_DOMAINS
preload_domains = wauth_settings.WAUTH_PRELOAD_DOMAINS
if preload_domains in (None, True):
preload_domains = list(WAUTH_DOMAINS.keys())
preload_domains = list(wauth_settings.WAUTH_DOMAINS.keys())
if DEFAULT_DOMAIN_SETTING in preload_domains:
preload_domains.remove(DEFAULT_DOMAIN_SETTING)

Expand Down
6 changes: 3 additions & 3 deletions windows_auth/backends.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from django.contrib.auth.backends import RemoteUserBackend

from windows_auth.conf import WAUTH_USE_SPN, WAUTH_LOWERCASE_USERNAME
from windows_auth.conf import wauth_settings
from windows_auth.models import LDAPUser


Expand All @@ -19,12 +19,12 @@ def clean_username(self, username: str):
:param username: raw REMOTE_USER header value
:return: cleaned sAMAccountName value from the
"""
if WAUTH_USE_SPN:
if wauth_settings.WAUTH_USE_SPN:
sam_account_name, self.domain = username.rsplit("@", 2)
else:
self.domain, sam_account_name = username.split("\\", 2)

if WAUTH_LOWERCASE_USERNAME:
if wauth_settings.WAUTH_LOWERCASE_USERNAME:
return str(sam_account_name).lower()
else:
return sam_account_name
Expand Down
81 changes: 53 additions & 28 deletions windows_auth/conf.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,59 @@
from typing import Callable, Union, Optional, Iterable
from dataclasses import dataclass, field, fields, Field
from typing import Callable, Union, Optional, Iterable, Dict, TYPE_CHECKING, Set

from django.conf import settings
from django.core.signals import setting_changed
from django.dispatch import receiver
from django.http import HttpResponse

from django.utils import timezone

from windows_auth import logger

if not hasattr(settings, "WAUTH_DOMAINS"):
logger.warn("The required setting WAUTH_DOMAINS is missing.")

# Settings for each domain
WAUTH_DOMAINS: dict = getattr(settings, "WAUTH_DOMAINS", {})

# Expect REMOTE_USER value to be in SPN scheme
WAUTH_USE_SPN: bool = getattr(settings, "WAUTH_USE_SPN", False)
# Minimum time until automatic re-sync
WAUTH_RESYNC_DELTA: Optional[Union[str, int, timezone.timedelta]] = getattr(settings, "WAUTH_RESYNC_DELTA",
timezone.timedelta(days=1))
# Use cache instead of model for determining re-sync
WAUTH_USE_CACHE: bool = getattr(settings, "WAUTH_USE_CACHE", False)
# Raise exception and return Error 500 when user failed to synced to domain
WAUTH_REQUIRE_RESYNC: bool = getattr(settings, "WAUTH_REQUIRE_RESYNC", settings.DEBUG or False)
# Choose custom HTTP Response to send when LDAP Sync fails
WAUTH_ERROR_RESPONSE: Optional[Union[int, HttpResponse, Callable]] = getattr(settings, "WAUTH_ERROR_RESPONSE", None)
# Lowercase the username from the REMOTE_USER. Used for correct non-case sensitive LDAP backends.
WAUTH_LOWERCASE_USERNAME: bool = getattr(settings, "WAUTH_LOWERCASE_USERNAME", True)
# Skip verification of domain settings on server startup
WAUTH_IGNORE_SETTING_WARNINGS: bool = getattr(settings, "WAUTH_IGNORE_SETTING_WARNINGS", False)
# List of domains to preload and connect during process startup
WAUTH_PRELOAD_DOMAINS: Optional[Iterable[str]] = getattr(settings, "WAUTH_PRELOAD_DOMAINS", None)
# User to impersonate when using SimulateWindowsAuthMiddleware
WAUTH_SIMULATE_USER: str = getattr(settings, "WAUTH_SIMULATE_USER", "")
if TYPE_CHECKING:
from windows_auth.settings import LDAPSettings


@dataclass()
class WAUTHSettings:
WAUTH_DOMAINS: Dict[str, Union["LDAPSettings"]] = field(default_factory=lambda: {})
WAUTH_USE_SPN: bool = False
WAUTH_RESYNC_DELTA: Optional[Union[str, int, timezone.timedelta]] = timezone.timedelta(days=1)
WAUTH_USE_CACHE: bool = False
# Raise exception and return Error 500 when user failed to synced to domain
WAUTH_REQUIRE_RESYNC: bool = field(default_factory=lambda: settings.DEBUG)
WAUTH_ERROR_RESPONSE: Optional[Union[int, HttpResponse, Callable]] = None
# Lowercase the username from the REMOTE_USER. Used for connecting case-insensitive LDAP backends.
WAUTH_LOWERCASE_USERNAME: bool = True
# Skip verification of domain settings on server startup
WAUTH_IGNORE_SETTING_WARNINGS: bool = False
# List of domains to preload and connect during process startup
WAUTH_PRELOAD_DOMAINS: Optional[Iterable[str]] = None
# User to impersonate when using SimulateWindowsAuthMiddleware
WAUTH_SIMULATE_USER: str = ""

@classmethod
def build_settings(cls):
return cls(**{
f.name: getattr(settings, f.name)
for f in fields(cls)
if hasattr(settings, f.name)
})

def update_setting(self, name: str, value) -> None:
self.__setattr__(name, value)


if settings.configured:
wauth_settings = WAUTHSettings.build_settings()


@receiver(setting_changed)
def _update_setting(sender=None, setting=None, value=None, enter=None, **kwargs):
if not wauth_settings:
return

if setting in map(lambda f: f.name, fields(WAUTHSettings)):
wauth_settings.update_setting(setting, value)

# update bound settings
if setting == "DEBUG" and not hasattr(settings, "WAUTH_REQUIRE_RESYNC"):
wauth_settings.update_setting("WAUTH_REQUIRE_RESYNC", value)
4 changes: 2 additions & 2 deletions windows_auth/decorators.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from django.contrib.auth.decorators import user_passes_test
from django.utils import timezone

from windows_auth.conf import WAUTH_USE_CACHE
from windows_auth.conf import wauth_settings
from windows_auth.models import LDAPUser


Expand Down Expand Up @@ -50,7 +50,7 @@ def ldap_sync_required(function=None, timedelta=None, login_url=None, allow_non_
def check_sync(user):
if user.is_authenticated and LDAPUser.objects.filter(user=user).exists():
try:
if WAUTH_USE_CACHE:
if wauth_settings.WAUTH_USE_CACHE:
user.ldap.sync()
else:
# check via database query
Expand Down
25 changes: 12 additions & 13 deletions windows_auth/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
from django.utils import timezone

from windows_auth import logger
from windows_auth.conf import WAUTH_RESYNC_DELTA, WAUTH_USE_CACHE, WAUTH_REQUIRE_RESYNC, WAUTH_ERROR_RESPONSE, \
WAUTH_SIMULATE_USER
from windows_auth.conf import wauth_settings
from windows_auth.models import LDAPUser


Expand All @@ -22,17 +21,17 @@ def __call__(self, request):
"""

if (request.user and request.user.is_authenticated
and LDAPUser.objects.filter(user=request.user).exists() and WAUTH_RESYNC_DELTA not in (None, False)):
and LDAPUser.objects.filter(user=request.user).exists() and wauth_settings.WAUTH_RESYNC_DELTA not in (None, False)):
try:
# convert timeout to seconds
if isinstance(WAUTH_RESYNC_DELTA, timezone.timedelta):
timeout = WAUTH_RESYNC_DELTA.total_seconds()
if isinstance(wauth_settings.WAUTH_RESYNC_DELTA, timezone.timedelta):
timeout = wauth_settings.WAUTH_RESYNC_DELTA.total_seconds()
else:
timeout = int(WAUTH_RESYNC_DELTA)
timeout = int(wauth_settings.WAUTH_RESYNC_DELTA)

ldap_user = LDAPUser.objects.get(user=request.user)

if WAUTH_USE_CACHE:
if wauth_settings.WAUTH_USE_CACHE:
# if cache does not exist
cache_key = f"wauth_resync_user_{ldap_user.user.id}"
if not cache.get(cache_key):
Expand All @@ -50,11 +49,11 @@ def __call__(self, request):
except Exception as e:
logger.exception(f"Failed to synchronize user {request.user} against LDAP")
# return error response
if WAUTH_REQUIRE_RESYNC:
if isinstance(WAUTH_ERROR_RESPONSE, int):
return HttpResponse(f"Authorization Failed.", status=WAUTH_ERROR_RESPONSE)
elif callable(WAUTH_ERROR_RESPONSE):
return WAUTH_ERROR_RESPONSE(request, e)
if wauth_settings.WAUTH_REQUIRE_RESYNC:
if isinstance(wauth_settings.WAUTH_ERROR_RESPONSE, int):
return HttpResponse(f"Authorization Failed.", status=wauth_settings.WAUTH_ERROR_RESPONSE)
elif callable(wauth_settings.WAUTH_ERROR_RESPONSE):
return wauth_settings.WAUTH_ERROR_RESPONSE(request, e)
else:
raise e
response = self.get_response(request)
Expand All @@ -69,6 +68,6 @@ def __init__(self, get_response):
def __call__(self, request: HttpRequest):
if settings.DEBUG and not request.META.get("REMOTE_USER"):
# Set remote user
request.META['REMOTE_USER'] = WAUTH_SIMULATE_USER
request.META['REMOTE_USER'] = wauth_settings.WAUTH_SIMULATE_USER
return self.get_response(request)

10 changes: 5 additions & 5 deletions windows_auth/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from ldap3 import Reader, Entry, Attribute

from windows_auth import logger
from windows_auth.conf import WAUTH_USE_CACHE, WAUTH_USE_SPN, WAUTH_LOWERCASE_USERNAME
from windows_auth.conf import wauth_settings
from windows_auth.ldap import LDAPManager, get_ldap_manager
from windows_auth.signals import ldap_user_sync
from windows_auth.utils import LogExecutionTime
Expand Down Expand Up @@ -40,7 +40,7 @@ def create_user(self, username: str) -> User:
:param username: Logon username (DOMAIN\username or [email protected])
:return: User object (not LDAPUser)
"""
if WAUTH_USE_SPN:
if wauth_settings.WAUTH_USE_SPN:
if "@" not in username:
raise ValueError("Username must be in [email protected] format.")

Expand All @@ -51,7 +51,7 @@ def create_user(self, username: str) -> User:

domain, sam_account_name = username.split("\\", 2)

if WAUTH_LOWERCASE_USERNAME:
if wauth_settings.WAUTH_LOWERCASE_USERNAME:
sam_account_name = sam_account_name.lower()

user = get_user_model().objects.create_user(username=sam_account_name)
Expand Down Expand Up @@ -210,13 +210,13 @@ def sync(self) -> None:
ldap_user_sync.send(self, ldap_user=ldap_user, group_reader=group_reader)

# update sync time
if not WAUTH_USE_CACHE:
if not wauth_settings.WAUTH_USE_CACHE:
with LogExecutionTime(f"Save LDAP User {self}"):
self.last_sync = timezone.now()
self.save()

def __str__(self):
if WAUTH_USE_SPN:
if wauth_settings.WAUTH_USE_SPN:
return f"{self.user.username}@{self.domain}"
else:
return f"{self.domain}\\{self.user.username}"
Expand Down
8 changes: 4 additions & 4 deletions windows_auth/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,17 @@ class LDAPSettings:

@classmethod
def for_domain(cls, domain: str):
from windows_auth.conf import WAUTH_DOMAINS
from windows_auth.conf import wauth_settings

if domain not in WAUTH_DOMAINS and DEFAULT_DOMAIN_SETTING not in WAUTH_DOMAINS:
if domain not in wauth_settings.WAUTH_DOMAINS and DEFAULT_DOMAIN_SETTING not in wauth_settings.WAUTH_DOMAINS:
raise ImproperlyConfigured(f"Domain {domain} settings could not be found in WAUTH_DOMAINS setting.")

domain_settings = WAUTH_DOMAINS.get(domain, {})
domain_settings = wauth_settings.WAUTH_DOMAINS.get(domain, {})
# when setting is an LDAPSetting object
if isinstance(domain_settings, LDAPSettings):
return domain_settings

default_settings = WAUTH_DOMAINS.get(DEFAULT_DOMAIN_SETTING, {})
default_settings = wauth_settings.WAUTH_DOMAINS.get(DEFAULT_DOMAIN_SETTING, {})
# when domain setting
if isinstance(default_settings, LDAPSettings):
default_settings = asdict(default_settings)
Expand Down