Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions app/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
CustomDomainSearchAdmin,
)
from app.admin.abuser_lookup import AbuserLookupResult, AbuserLookupAdmin
from app.admin.domain_check import DomainCheckAdmin

__all__ = [
# Initialization
Expand Down Expand Up @@ -62,4 +63,5 @@
"CustomDomainSearchAdmin",
"AbuserLookupResult",
"AbuserLookupAdmin",
"DomainCheckAdmin",
]
23 changes: 23 additions & 0 deletions app/admin/domain_check.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from __future__ import annotations

from flask import request
from flask_admin import expose

from app.admin.base import BaseAdminView
from app.email_utils import check_domain_for_mailbox, MailboxDomainCheckResult


class DomainCheckAdmin(BaseAdminView):
@expose("/", methods=["GET"])
def index(self):
domain = request.args.get("domain", "").strip()
result: MailboxDomainCheckResult | None = None

if domain:
result = check_domain_for_mailbox(domain)

return self.render(
"admin/domain_check.html",
domain=domain,
result=result,
)
2 changes: 2 additions & 0 deletions app/admin/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from app.admin.email_search import EmailSearchAdmin
from app.admin.custom_domain_search import CustomDomainSearchAdmin
from app.admin.abuser_lookup import AbuserLookupAdmin
from app.admin.domain_check import DomainCheckAdmin


def init_admin(app: Flask):
Expand All @@ -49,6 +50,7 @@ def init_admin(app: Flask):
admin.add_view(
AbuserLookupAdmin(name="Abuser Lookup", endpoint="admin.abuser_lookup")
)
admin.add_view(DomainCheckAdmin(name="Domain Check", endpoint="admin.domain_check"))
admin.add_view(UserAdmin(User, Session))
admin.add_view(AliasAdmin(Alias, Session))
admin.add_view(MailboxAdmin(Mailbox, Session))
Expand Down
128 changes: 93 additions & 35 deletions app/email_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -603,60 +603,118 @@ class EmailCannotBeUsedReason(enum.Enum):
MailboxOfDisabledUser = "This email address is not allowed"


def email_can_be_used_as_mailbox_with_reason(
email_address: str,
) -> Optional[EmailCannotBeUsedReason]:
try:
domain = validate_email(
email_address, check_deliverability=False, allow_smtputf8=False
).domain
except EmailNotValidError:
LOG.d("%s is invalid email address", email_address)
return EmailCannotBeUsedReason.InvalidEmailAddress
class MailboxDomainCheckResult:
"""Detailed result of a domain-level mailbox check, for admin use."""

def __init__(
self,
can_be_used: bool,
reason: Optional[EmailCannotBeUsedReason] = None,
detail: Optional[str] = None,
):
self.can_be_used = can_be_used
self.reason = reason
self.detail = detail

if not domain:
LOG.d("no valid domain associated to %s", email_address)
return EmailCannotBeUsedReason.InvalidEmailDomain

if SLDomain.get_by(domain=domain):
LOG.d("%s is a SL domain", email_address)
return EmailCannotBeUsedReason.IsSimpleLoginDomain
def check_domain_for_mailbox(domain: str) -> MailboxDomainCheckResult:
"""Check whether a domain can be used for mailboxes.

from app.models import CustomDomain
This is the single source of truth for domain-level mailbox checks.
Returns a MailboxDomainCheckResult with a human-readable detail string.
"""
if not domain or "." not in domain:
return MailboxDomainCheckResult(
can_be_used=False,
reason=EmailCannotBeUsedReason.InvalidEmailDomain,
detail=f"'{domain}' is not a valid domain name.",
)

if SLDomain.get_by(domain=domain):
LOG.d("domain %s is a SL domain", domain)
return MailboxDomainCheckResult(
can_be_used=False,
reason=EmailCannotBeUsedReason.IsSimpleLoginDomain,
detail=f"'{domain}' is a SimpleLogin alias domain and cannot be used for mailboxes.",
)

custom_domain = CustomDomain.get_by(domain=domain, verified=True)
if custom_domain is not None:
LOG.d("domain %s is custom domain %s", domain, custom_domain)
return EmailCannotBeUsedReason.IsCustomDomain
return MailboxDomainCheckResult(
can_be_used=False,
reason=EmailCannotBeUsedReason.IsCustomDomain,
detail=f"'{domain}' is a verified custom domain (ID: {custom_domain.id}, owned by user ID: {custom_domain.user_id}).",
)

if is_invalid_mailbox_domain(domain):
LOG.d("Domain %s is invalid mailbox domain", domain)
return EmailCannotBeUsedReason.InvalidMailboxDomain
LOG.d("domain %s is an invalid mailbox domain", domain)
return MailboxDomainCheckResult(
can_be_used=False,
reason=EmailCannotBeUsedReason.InvalidMailboxDomain,
detail=f"'{domain}' is listed as an invalid mailbox domain.",
)

# check if email MX domain is disposable
mx_domains = get_mx_domain_list(domain)

# if no MX record, email is not valid
if not config.SKIP_MX_LOOKUP_ON_CHECK and not mx_domains:
LOG.d("No MX record for domain %s", domain)
return EmailCannotBeUsedReason.NoMxRecordFound
LOG.d("no MX record for domain %s", domain)
return MailboxDomainCheckResult(
can_be_used=False,
reason=EmailCannotBeUsedReason.NoMxRecordFound,
detail=f"No MX records found for '{domain}'.",
)

mx_ips = set()
for mx_domain in mx_domains:
if is_invalid_mailbox_domain(mx_domain):
LOG.d("MX Domain %s %s is invalid mailbox domain", mx_domain, domain)
return EmailCannotBeUsedReason.InvalidMailboxDomain
LOG.d("MX domain %s for %s is an invalid mailbox domain", mx_domain, domain)
return MailboxDomainCheckResult(
can_be_used=False,
reason=EmailCannotBeUsedReason.InvalidMailboxDomain,
detail=f"MX domain '{mx_domain}' (used by '{domain}') is listed as an invalid mailbox domain.",
)
a_record = get_a_record(mx_domain)
LOG.i(
f"Found MX Domain {mx_domain} for mailbox {email_address} with a record {a_record}"
)
LOG.i("Found MX domain %s for %s with A record %s", mx_domain, domain, a_record)
if a_record is not None:
mx_ips.add(a_record)
if len(mx_ips) > 0:
forbidden_ip = ForbiddenMxIp.filter(ForbiddenMxIp.ip.in_(list(mx_ips))).all()
if forbidden_ip:
LOG.i("Found forbidden MX ip %s", forbidden_ip)
return EmailCannotBeUsedReason.ForbiddenMxRecordFound

if mx_ips:
forbidden = ForbiddenMxIp.filter(ForbiddenMxIp.ip.in_(list(mx_ips))).all()
if forbidden:
forbidden_str = ", ".join(fi.ip for fi in forbidden)
LOG.i("Found forbidden MX IPs for domain %s: %s", domain, forbidden_str)
return MailboxDomainCheckResult(
can_be_used=False,
reason=EmailCannotBeUsedReason.ForbiddenMxRecordFound,
detail=f"Forbidden MX IP(s) detected for '{domain}': {forbidden_str}.",
)

mx_list = ", ".join(mx_domains) if mx_domains else "none found (MX check skipped)"
return MailboxDomainCheckResult(
can_be_used=True,
detail=f"'{domain}' can be used as a mailbox domain. MX record(s): {mx_list}.",
)


def email_can_be_used_as_mailbox_with_reason(
email_address: str,
) -> Optional[EmailCannotBeUsedReason]:
try:
domain = validate_email(
email_address, check_deliverability=False, allow_smtputf8=False
).domain
except EmailNotValidError:
LOG.d("%s is invalid email address", email_address)
return EmailCannotBeUsedReason.InvalidEmailAddress

if not domain:
LOG.d("no valid domain associated to %s", email_address)
return EmailCannotBeUsedReason.InvalidEmailDomain

domain_result = check_domain_for_mailbox(domain)
if not domain_result.can_be_used:
return domain_result.reason

existing_user = User.get_by(email=email_address)
if existing_user and existing_user.disabled:
Expand Down Expand Up @@ -707,7 +765,7 @@ def is_invalid_mailbox_domain(domain):
return False


def get_mx_domain_list(domain) -> [str]:
def get_mx_domain_list(domain) -> List[str]:
"""return list of MX domains for a given email.
domain name ends *without* a dot (".") at the end.
"""
Expand Down
Loading
Loading