diff --git a/sssd_test_framework/hosts/ad.py b/sssd_test_framework/hosts/ad.py index 82cf3093..d603795c 100644 --- a/sssd_test_framework/hosts/ad.py +++ b/sssd_test_framework/hosts/ad.py @@ -307,6 +307,12 @@ def restore(self, backup_data: Any | None) -> None: }} }} + # Clean up certificate directories + if (Test-Path "C:\pki") {{ + Write-Host "Cleaning up certificate directories in C:\pki" + Remove-Item "C:\pki" -Recurse -Force -ErrorAction SilentlyContinue + }} + # If we got here, make sure we exit with 0 Exit 0 """, diff --git a/sssd_test_framework/misc/__init__.py b/sssd_test_framework/misc/__init__.py index 43dda168..d5209c03 100644 --- a/sssd_test_framework/misc/__init__.py +++ b/sssd_test_framework/misc/__init__.py @@ -313,3 +313,29 @@ def get_attr(data: dict[str, Any], key: str, default: Any | None = None) -> Any return default return value[0] if len(value) == 1 else value return value + + +def parse_cert_info(output: str) -> dict[str, list[str]]: + """ + Parse certutil output into dictionary. + + :param output: certutil output. + :type output: str + :returns: Dictionary of certificate attributes. + :rtype: dict[str, list[str]] + """ + lines = [line.strip() for line in (output or "").splitlines() if line.strip()] + return attrs_parse(lines) + + +def parse_ad_object_info(output: str) -> dict[str, list[str]]: + """ + Parse AD object output into dictionary. + + :param output: PowerShell AD object output. + :type output: str + :returns: Dictionary of AD object attributes. + :rtype: dict[str, list[str]] + """ + lines = [line.strip() for line in (output or "").splitlines() if line.strip()] + return attrs_parse(lines) diff --git a/sssd_test_framework/roles/ad.py b/sssd_test_framework/roles/ad.py index 78c855e3..0149bdae 100644 --- a/sssd_test_framework/roles/ad.py +++ b/sssd_test_framework/roles/ad.py @@ -2,6 +2,9 @@ from __future__ import annotations +import os +import re +import uuid from abc import ABC from datetime import datetime from typing import Any, TypeAlias @@ -10,9 +13,17 @@ from pytest_mh.conn import ProcessResult from ..hosts.ad import ADHost -from ..misc import attrs_include_value, attrs_parse, attrs_to_hash, ip_version, seconds_to_timespan +from ..misc import ( + attrs_include_value, + attrs_parse, + attrs_to_hash, + ip_version, + parse_ad_object_info, + parse_cert_info, + seconds_to_timespan, +) from .base import BaseObject, BaseWindowsRole, DeleteAttribute -from .generic import GenericPasswordPolicy +from .generic import GenericCertificateAuthority, GenericPasswordPolicy from .ldap import LDAPNetgroupMember from .nfs import NFSExport @@ -30,6 +41,7 @@ "GPO", "ADDNSServer", "ADDNSZone", + "ADCertificateAuthority", ] @@ -141,6 +153,20 @@ def test_example_autofs(client: Client, ad: AD, nfs: NFS): } """ + self._ca = ADCertificateAuthority(self.host) + """ + AD Certificate Authority server management. + + Provides certificate operations: + + - Request certificates using templates + - Request smartcard certificates with Enrollment Agent + - Revoke certificates with configurable reasons + - Manage certificate holds + - Export certificates as PFX + - Retrieve certificate and template details + """ + @property def password_policy(self) -> ADPasswordPolicy: """ @@ -159,6 +185,44 @@ def test_example(client: Client, ad: AD): """ return self._password_policy + @property + def ca(self) -> ADCertificateAuthority: + """ + AD Certificate Authority management. + + Provides certificate operations: + - Request certificates using templates + - Request smartcard certificates with Enrollment Agent + - Revoke certificates with configurable reasons + - Manage certificate holds + - Export certificates as PFX + - Retrieve certificate and template details + + .. code-block:: python + :caption: Example usage + + # Basic certificate request + cert, key, csr = ad.ca.request( + template="User", + subject="CN=testuser" + ) + + # Smartcard certificate request + cert, key, csr = ad.ca.request_smartcard( + template="SmartcardLogon", + subject="CN=testuser", + enrollment_agent_cert_hash="abc123...", + requester_name="Administrator" + ) + + # Revoke certificate + ad.ca.revoke(cert, reason="key_compromise") + + # Export as PFX + ad.ca.export_pfx(cert, "C:\\temp\\cert.pfx") + """ + return self._ca + @property def naming_context(self) -> str: """ @@ -2331,3 +2395,387 @@ def print(self) -> str: ADNetgroupMember: TypeAlias = LDAPNetgroupMember[ADUser, ADNetgroup] + + +class ADCertificateAuthority(GenericCertificateAuthority): + """ + Certificate Authority server management. + + This class allows requesting, revoking, placing/removing certificate holds, + and retrieving certificate information via certreq and certutil commands. + """ + + def __init__(self, host: ADHost) -> None: + """ + Initialize the AD Certificate Authority helper. + + :param host: Remote AD host. + :type host: ADHost + """ + self.host = host + self.cli = host.cli + self.temp_dir = f"C:\\pki\\ad_test_certs_{os.getpid()}_{uuid.uuid4().hex}" + self._create_temp_dir() + + def _create_temp_dir(self) -> None: + """Create temporary directory for certificate files.""" + self.host.conn.run(f'New-Item -ItemType Directory -Path "{self.temp_dir}" -Force') + + def __request_enrollment( + self, + template: str, + subject: str, + key_size: int = 2048, + ) -> tuple[str, str, str]: + """ + Request a certificate enrollment (enrollment certificate). + + This method prepares an INF file and requests an enrollment certificate. + It is intended for internal use, e.g., by smartcard certificate requests. + + :param template: Certificate template name (e.g., "User"). + :type template: str + :param subject: Certificate subject (e.g., "CN=testuser"). + :type subject: str + :param key_size: RSA key size in bits. + :type key_size: int + :returns: A tuple of (certificate_path, key_path, csr_path). + :rtype: tuple[str, str, str] + :raises RuntimeError: If certificate request fails. + """ + base = re.sub(r"[^a-zA-Z0-9._-]", "_", subject) + inf_path = os.path.join(self.temp_dir, f"{base}.inf") + req_path = os.path.join(self.temp_dir, f"{base}.req") + cert_path = os.path.join(self.temp_dir, f"{base}.cer") + + # Create INF file using PSIni + self.host.conn.run( + f""" + $iniPath = "{inf_path}" + New-PsIniFile -Path $iniPath + Add-PsIniSection -Path $iniPath -Section "NewRequest" + Set-PsIniKey -Path $iniPath -Section "NewRequest" -Key "Subject" -Value '"{subject}"' + Set-PsIniKey -Path $iniPath -Section "NewRequest" -Key "KeyLength" -Value "{key_size}" + Set-PsIniKey -Path $iniPath -Section "NewRequest" -Key "MachineKeySet" -Value "no" + Set-PsIniKey -Path $iniPath -Section "NewRequest" -Key "Exportable" -Value "yes" + Set-PsIniKey -Path $iniPath -Section "NewRequest" -Key "RequestType" -Value "PKCS10" + Set-PsIniKey -Path $iniPath -Section "NewRequest" -Key "KeyUsage" -Value "0xA0" + Add-PsIniSection -Path $iniPath -Section "RequestAttributes" + Set-PsIniKey -Path $iniPath -Section "RequestAttributes" -Key "CertificateTemplate" -Value '"{template}"' + """ + ) + + result = self.host.conn.run(f'certreq -new "{inf_path}" "{req_path}"', raise_on_error=False) + if result.rc != 0: + raise RuntimeError(f"Certificate request generation failed: {result.stderr}") + + result = self.host.conn.run( + f'certreq -submit -config "{self._get_ca_config()}" "{req_path}" "{cert_path}"', raise_on_error=False + ) + if result.rc != 0: + raise RuntimeError(f"Certificate submission failed: {result.stderr}") + + key_ref = inf_path + + return cert_path, key_ref, req_path + + def request_basic( + self, + template: str, + subject: str, + key_size: int = 2048, + ) -> tuple[str, str, str]: + """ + Request a basic certificate from the AD CA without enrollment agent. + + This method can be used for simple certificate requests that don't + require enrollment agent certificates. + + :param template: Certificate template name (e.g., "User"). + :type template: str + :param subject: Certificate subject (e.g., "CN=testuser"). + :type subject: str + :param key_size: RSA key size in bits. + :type key_size: int + :returns: A tuple of (certificate_path, key_path, csr_path). + :rtype: tuple[str, str, str] + :raises RuntimeError: If certificate request fails. + """ + return self.__request_enrollment(template, subject, key_size) + + def request( + self, + template: str, + subject: str, + enrollment_agent_cert_hash: str, + requester_name: str, + ) -> tuple[str, str, str]: + """ + Request a smartcard certificate using Enrollment Agent. + + :param template: Smartcard certificate template name. + :type template: str + :param subject: Certificate subject. + :type subject: str + :param enrollment_agent_cert_hash: Hash of the Enrollment Agent certificate. + :type enrollment_agent_cert_hash: str + :param requester_name: Name of the enrollment agent requester. + :type requester_name: str + :returns: A tuple of (certificate_path, key_path, csr_path). + :rtype: tuple[str, str, str] + :raises RuntimeError: If smartcard certificate request fails. + """ + base = re.sub(r"[^a-zA-Z0-9._-]", "_", subject) + inf_path = os.path.join(self.temp_dir, f"{base}.inf") + req_path = os.path.join(self.temp_dir, f"{base}.req") + signed_req_path = os.path.join(self.temp_dir, f"{base}_signed.req") + cert_path = os.path.join(self.temp_dir, f"{base}.cer") + + # Create INF file using PSIni + self.host.conn.run( + f""" + $iniPath = "{inf_path}" + New-PsIniFile -Path $iniPath + Add-PsIniSection -Path $iniPath -Section "NewRequest" + Set-PsIniKey -Path $iniPath -Section "NewRequest" -Key "Subject" -Value '"{subject}"' + Set-PsIniKey -Path $iniPath -Section "NewRequest" -Key "KeySpec" -Value "1" + Set-PsIniKey -Path $iniPath -Section "NewRequest" -Key "KeyUsage" -Value "0xA0" + Set-PsIniKey -Path $iniPath -Section "NewRequest" -Key "Exportable" -Value "TRUE" + Set-PsIniKey -Path $iniPath -Section "NewRequest" -Key "RequestType" -Value "CMC" + Set-PsIniKey -Path $iniPath -Section "NewRequest" -Key "RequesterName" -Value '"{requester_name}"' + Set-PsIniKey -Path $iniPath -Section "NewRequest" -Key "ProviderName" ` + -Value '"Microsoft Software Key Storage Provider"' + Add-PsIniSection -Path $iniPath -Section "RequestAttributes" + Set-PsIniKey -Path $iniPath -Section "RequestAttributes" -Key "CertificateTemplate" -Value '"{template}"' + """ + ) + + result = self.host.conn.run(f'certreq -new "{inf_path}" "{req_path}"', raise_on_error=False) + if result.rc != 0: + raise RuntimeError(f"Smartcard certificate request generation failed: {result.stderr}") + + result = self.host.conn.run( + f'certreq -q -sign -cert "{enrollment_agent_cert_hash}" "{req_path}" "{signed_req_path}"', + raise_on_error=False, + ) + if result.rc != 0: + raise RuntimeError(f"Request signing failed: {result.stderr}") + + result = self.host.conn.run( + f'certreq -submit -config "{self._get_ca_config()}" "{signed_req_path}" "{cert_path}"', + raise_on_error=False, + ) + if result.rc != 0: + raise RuntimeError(f"Smartcard certificate submission failed: {result.stderr}") + + return cert_path, inf_path, req_path + + def revoke(self, cert_path: str, reason: str = "unspecified") -> None: + """ + Revoke a certificate in AD CA. + + :param cert_path: Path to the certificate file. + :type cert_path: str + :param reason: Reason for revocation. + :type reason: str + :raises RuntimeError: If revocation fails. + """ + serial = self._get_cert_serial(cert_path) + reason_code = self._revocation_reason_to_code(reason) + + result = self.host.conn.run( + f'certutil -config "{self._get_ca_config()}" -revoke {serial} {reason_code}', raise_on_error=False + ) + if result.rc != 0: + raise RuntimeError(f"Certificate revocation failed: {result.stderr}") + + def revoke_hold(self, cert_path: str) -> None: + """ + Place a certificate on hold. + + :param cert_path: Path to the certificate file. + :type cert_path: str + """ + self.revoke(cert_path, reason="certificate_hold") + + def revoke_hold_remove(self, cert_path: str) -> None: + """ + Remove hold from a certificate. + + :param cert_path: Path to the certificate file. + :type cert_path: str + :raises RuntimeError: If hold removal fails. + """ + serial = self._get_cert_serial(cert_path) + + result = self.host.conn.run( + f'certutil -config "{self._get_ca_config()}" -revoke {serial} 8', raise_on_error=False # 8 = removeFromCRL + ) + if result.rc != 0: + raise RuntimeError(f"Certificate hold removal failed: {result.stderr}") + + def get(self, cert_path: str) -> dict[str, list[str]]: + """ + Retrieve certificate details from AD CA. + + :param cert_path: Path to the certificate file. + :type cert_path: str + :returns: A dictionary of certificate attributes. + :rtype: dict[str, list[str]] + :raises ValueError: If the certificate is not found. + """ + result = self.host.conn.run(f'certutil "{cert_path}"', raise_on_error=False) + if result.rc != 0: + raise ValueError(f"Certificate not found or invalid: {result.stderr}") + + return parse_cert_info(result.stdout) + + def export_pfx( + self, cert_path: str, pfx_path: str, password: str = "Secret123", include_chain: bool = False + ) -> None: + """ + Export certificate as PFX file. + + :param cert_path: Path to the certificate file. + :type cert_path: str + :param pfx_path: Path where to save the PFX file. + :type pfx_path: str + :param password: Password for the PFX file. + :type password: str + :param include_chain: Whether to include certificate chain. + :type include_chain: bool + :raises RuntimeError: If export fails. + """ + result = self.host.conn.run(f'certreq -accept "{cert_path}"', raise_on_error=False) + if result.rc != 0: + raise RuntimeError(f"Certificate acceptance failed: {result.stderr}") + + thumbprint = self._get_cert_thumbprint(cert_path) + + export_cmd = f""" + $cert = Get-ChildItem -Path "Cert:\\\\LocalMachine\\\\My" + | Where-Object {{ $_.Thumbprint -eq "{thumbprint}" }} + if ($cert) {{ + $pfxPassword = ConvertTo-SecureString -String "{password}" -AsPlainText -Force + Export-PfxCertificate -Cert $cert -FilePath "{pfx_path}" -Password $pfxPassword + }} else {{ + Write-Error "Certificate with thumbprint {thumbprint} not found in store" + exit 1 + }} + """ + + result = self.host.conn.run(export_cmd, raise_on_error=False) + if result.rc != 0: + raise RuntimeError(f"PFX export failed: {result.stderr}") + + def get_certificate_template(self, template_name: str) -> dict[str, list[str]]: + """ + Get certificate template information. + + :param template_name: Name of the certificate template. + :type template_name: str + :returns: Dictionary of template attributes. + :rtype: dict[str, list[str]] + """ + result = self.host.conn.run( + f""" + $configNC = (Get-ADRootDSE).configurationNamingContext + $templateDN = "CN={template_name},CN=Certificate Templates,CN=Public Key Services,CN=Services," + $configNC + Get-ADObject -Identity $templateDN -Properties * + """, + raise_on_error=False, + ) + + if result.rc != 0: + raise ValueError(f"Certificate template '{template_name}' not found: {result.stderr}") + + return parse_ad_object_info(result.stdout) + + def _get_ca_config(self) -> str: + """ + Get CA configuration string. + + :returns: CA configuration string. + :rtype: str + """ + result = self.host.conn.run("certutil -dump", raise_on_error=False) + if result.rc == 0: + for line in result.stdout_lines: + if "Config:" in line: + return line.split(":", 1)[1].strip() + + return f"{self.host.hostname}\\{self.host.domain}-CA" + + def _get_cert_serial(self, cert_path: str) -> str: + """ + Extract certificate serial number. + + :param cert_path: Path to certificate file. + :type cert_path: str + :returns: Certificate serial number. + :rtype: str + :raises RuntimeError: If serial extraction fails. + """ + result = self.host.conn.run(f'certutil "{cert_path}"', raise_on_error=False) + if result.rc != 0: + raise RuntimeError(f"Failed to get certificate serial: {result.stderr}") + + for line in result.stdout_lines: + if "Serial Number:" in line: + return line.split(":", 1)[1].strip() + + raise RuntimeError("Serial number not found in certificate output") + + def _get_cert_thumbprint(self, cert_path: str) -> str: + """ + Extract certificate thumbprint. + + :param cert_path: Path to certificate file. + :type cert_path: str + :returns: Certificate thumbprint. + :rtype: str + :raises RuntimeError: If thumbprint extraction fails. + """ + result = self.host.conn.run(f'certutil "{cert_path}"', raise_on_error=False) + if result.rc != 0: + raise RuntimeError(f"Failed to get certificate thumbprint: {result.stderr}") + + for line in result.stdout_lines: + if "Thumbprint:" in line: + return line.split(":", 1)[1].strip() + + raise RuntimeError("Thumbprint not found in certificate output") + + def _revocation_reason_to_code(self, reason: str) -> int: + """ + Map revocation reason to numeric code. + + :param reason: Revocation reason. + :type reason: str + :returns: Numeric reason code. + :rtype: int + """ + reason_map = { + "unspecified": 0, + "key_compromise": 1, + "ca_compromise": 2, + "affiliation_changed": 3, + "superseded": 4, + "cessation_of_operation": 5, + "certificate_hold": 6, + "remove_from_crl": 8, + "privilege_withdrawn": 9, + "aa_compromise": 10, + } + return reason_map[reason] + + def cleanup(self) -> None: + """ + Clean up temporary certificate directory. + + This method removes the temporary directory created for certificate operations. + """ + self.host.conn.run( + f'if (Test-Path "{self.temp_dir}") {{ Remove-Item "{self.temp_dir}" -Recurse -Force }}', + raise_on_error=False, + ) diff --git a/sssd_test_framework/roles/generic.py b/sssd_test_framework/roles/generic.py index 51e3ac7d..d5de80c5 100644 --- a/sssd_test_framework/roles/generic.py +++ b/sssd_test_framework/roles/generic.py @@ -31,6 +31,7 @@ "GenericGPO", "GenericDNSServer", "GenericDNSZone", + "GenericCertificateAuthority", ] @@ -345,6 +346,36 @@ def test_example(client: Client, provider: GenericProvider, nfs: NFS): """ pass + @property + @abstractmethod + def ca(self) -> GenericCertificateAuthority: + """ + Certificate Authority management. + + Provides certificate operations across different providers. + + .. code-block:: python + :caption: Example usage + + @pytest.mark.topology(KnownTopologyGroup.AnyProvider) + def test_certificate_operations(client: Client, provider: GenericProvider): + # Request certificate (parameters vary by provider) + cert, key, csr = provider.ca.request(...) + + # Revoke certificate + provider.ca.revoke(cert, reason="key_compromise") + + # Place certificate on hold + provider.ca.revoke_hold(cert) + + # Remove hold + provider.ca.revoke_hold_remove(cert) + + # Get certificate details + cert_details = provider.ca.get(cert) + """ + pass + class GenericADProvider(GenericProvider): """ @@ -1537,3 +1568,69 @@ def print(self) -> str: :rtype: str """ pass + + +class GenericCertificateAuthority(ABC): + """ + Generic Certificate Authority management. + + Provides a unified interface for certificate operations across different providers + (IPA, AD, etc.). + """ + + @abstractmethod + def request(self, *args, **kwargs) -> tuple[str, str, str]: + """ + Request a certificate from the CA. + + Returns a tuple of (certificate_path, key_path, csr_path). + The specific parameters depend on the provider implementation. + + :returns: A tuple of (certificate_path, key_path, csr_path). + :rtype: tuple[str, str, str] + """ + pass + + @abstractmethod + def revoke(self, cert_path: str, reason: str = "unspecified") -> None: + """ + Revoke a certificate. + + :param cert_path: Path to the certificate file. + :type cert_path: str + :param reason: Reason for revocation. + :type reason: str + """ + pass + + @abstractmethod + def revoke_hold(self, cert_path: str) -> None: + """ + Place a certificate on hold. + + :param cert_path: Path to the certificate file. + :type cert_path: str + """ + pass + + @abstractmethod + def revoke_hold_remove(self, cert_path: str) -> None: + """ + Remove hold from a certificate. + + :param cert_path: Path to the certificate file. + :type cert_path: str + """ + pass + + @abstractmethod + def get(self, cert_path: str) -> dict[str, list[str]]: + """ + Retrieve certificate details. + + :param cert_path: Path to the certificate file. + :type cert_path: str + :returns: A dictionary of certificate attributes. + :rtype: dict[str, list[str]] + """ + pass diff --git a/sssd_test_framework/roles/ipa.py b/sssd_test_framework/roles/ipa.py index 8215e3fa..e7181ea2 100644 --- a/sssd_test_framework/roles/ipa.py +++ b/sssd_test_framework/roles/ipa.py @@ -8,7 +8,7 @@ import uuid from itertools import groupby from textwrap import dedent -from typing import Any, Literal, Optional +from typing import Any, Optional from pytest_mh import MultihostHost from pytest_mh.cli import CLIBuilder, CLIBuilderArgs @@ -28,7 +28,7 @@ from ..utils.sssctl import SSSCTLUtils from ..utils.sssd import SSSDUtils from .base import BaseLinuxRole, BaseObject -from .generic import GenericNetgroupMember, GenericPasswordPolicy +from .generic import GenericCertificateAuthority, GenericNetgroupMember, GenericPasswordPolicy from .nfs import NFSExport __all__ = [ @@ -51,19 +51,6 @@ "IPAHBAC", ] -RevocationReason = Literal[ - "unspecified", - "key_compromise", - "ca_compromise", - "affiliation_changed", - "superseded", - "cessation_of_operation", - "certificate_hold", - "remove_from_crl", - "privilege_withdrawn", - "aa_compromise", -] - class IPA(BaseLinuxRole[IPAHost]): """ @@ -181,7 +168,7 @@ def test_example(client: Client, ipa: IPA, nfs: NFS): } """ - self.ca = IPACertificateAuthority(self.host, self.fs) + self._ca = IPACertificateAuthority(self.host, self.fs) """ IPA Certificate Authority management. @@ -215,6 +202,36 @@ def test_example(client: Client, ipa: IPA): """ return self._password_policy + @property + def ca(self) -> IPACertificateAuthority: + """ + IPA Certificate Authority management. + + Provides certificate operations: + - Request certificates for services/users + - Revoke certificates with configurable reasons + - Manage certificate holds + - Retrieve certificate details + + .. code-block:: python + :caption: Example usage + + @pytest.mark.topology(KnownTopology.IPA) + def test_example(client: Client, ipa: IPA): + # Request certificate + cert, key, csr = ipa.ca.request(principal="HTTP/client.ipa.test") + + # Revoke certificate + ipa.ca.revoke(cert, reason="key_compromise") + + # Place on hold + ipa.ca.revoke_hold(cert) + + # Remove hold + ipa.ca.revoke_hold_remove(cert) + """ + return self._ca + @property def naming_context(self) -> str: """ @@ -2770,7 +2787,7 @@ def print(self) -> str: return result -class IPACertificateAuthority: +class IPACertificateAuthority(GenericCertificateAuthority): """ Provides helper methods for FreeIPA Certificate Authority operations. @@ -2885,14 +2902,14 @@ def request( return cert_path, key_path, csr_path - def revoke(self, cert_path: str, reason: RevocationReason = "unspecified") -> None: + def revoke(self, cert_path: str, reason: str = "unspecified") -> None: """ Revoke a certificate in IPA. :param cert_path: Path to the certificate file. :type cert_path: str :param reason: Reason for revocation. - :type reason: RevocationReason + :type reason: str :raises RuntimeError: If revocation fails. """ serial = self._get_cert_serial(cert_path) @@ -2992,12 +3009,12 @@ def _get_cert_serial(self, cert_path: str) -> str: return out.split("=", 1)[1].lower() return out.lower() - def _revocation_reason_to_code(self, reason: RevocationReason) -> int: + def _revocation_reason_to_code(self, reason: str) -> int: """ Map a revocation reason string to its corresponding numeric code. :param reason: Revocation reason string. - :type reason: RevocationReason + :type reason: str :returns: Numeric reason code. :rtype: int """