diff --git a/parser.py b/parser.py new file mode 100644 index 0000000..9310f7f --- /dev/null +++ b/parser.py @@ -0,0 +1,511 @@ +""" +RFC 5322 — Internet Message Format Email Address Parser +====================================================== + +Implements §§3.2–3.4 (lexical tokens, date/time, address specification) +and §4.4 (obsolete syntax) of RFC 5322. + +Usage: + parser = AddressParser(strict=True) + addr = parser.parse('"John Doe" ') + addrs = parser.parse_address_list('alice@a.com, bob@b.com') +""" + +from __future__ import annotations +from dataclasses import dataclass, field +from typing import Optional + + +# ── Character Classes ──────────────────────────────────────────────── + +def _is_atext(ch: str) -> bool: + if 'a' <= ch <= 'z' or 'A' <= ch <= 'Z' or '0' <= ch <= '9': + return True + return ch in '!#$%&\'*+-/=?^_`{|}~' + +def _is_dtext(ch: str) -> bool: + code = ord(ch) + return (33 <= code <= 90) or (94 <= code <= 126) + +def _is_qtext(ch: str) -> bool: + code = ord(ch) + return code == 33 or (35 <= code <= 91) or (93 <= code <= 126) + +def _is_ctext(ch: str) -> bool: + code = ord(ch) + return (33 <= code <= 39) or (42 <= code <= 91) or (93 <= code <= 126) + +def _is_wsp(ch: str) -> bool: + return ch in (' ', '\t') + + +# ── Data Model ──────────────────────────────────────────────────────── + +@dataclass +class RFC5322Address: + """A parsed RFC 5322 email address.""" + display_name: Optional[str] = None + local_part: Optional[str] = None + domain: Optional[str] = None + is_group: bool = False + group_members: list['RFC5322Address'] = field(default_factory=list) + comments: list[str] = field(default_factory=list) + source: str = "" + + def __repr__(self) -> str: + if self.is_group: + members = ", ".join(str(m) for m in self.group_members) + return f"{self.display_name}:{members};" + if self.display_name: + return f"{self.display_name} <{self.local_part}@{self.domain}>" + return f"{self.local_part}@{self.domain}" + + +# ── Tokenizer ──────────────────────────────────────────────────────── + +class _Tokenizer: + """Consumes the input string; provides token helpers.""" + + def __init__(self, source: str): + self.src = source + self.pos = 0 + self.comments: list[str] = [] + + def at_end(self) -> bool: + return self.pos >= len(self.src) + + def peek(self, offset: int = 0) -> Optional[str]: + idx = self.pos + offset + return self.src[idx] if 0 <= idx < len(self.src) else None + + def consume(self) -> Optional[str]: + ch = self.peek() + if ch is not None: + self.pos += 1 + return ch + + def skip(self, n: int = 1) -> None: + self.pos = min(self.pos + n, len(self.src)) + + def skip_fws(self) -> None: + """FWS = ([*WSP CRLF] 1*WSP) / obs-FWS""" + saved = self.pos + pos2 = self.pos + while pos2 < len(self.src) and _is_wsp(self.src[pos2]): + pos2 += 1 + if pos2 + 1 < len(self.src) and self.src[pos2:pos2+2] == '\r\n': + pos2 += 2 + ws_count = 0 + while pos2 < len(self.src) and _is_wsp(self.src[pos2]): + pos2 += 1 + ws_count += 1 + if ws_count > 0: + self.pos = pos2 + return + if self.pos < len(self.src) and _is_wsp(self.src[self.pos]): + self.pos += 1 + while self.pos < len(self.src) and _is_wsp(self.src[self.pos]): + self.pos += 1 + return + self.pos = saved + + def skip_comment(self) -> bool: + """comment = "(" *([FWS] ccontent) [FWS] ")" """ + saved = self.pos + if self.peek() != '(': + return False + self.pos += 1 + depth = 1 + text_parts: list[str] = [] + while depth > 0 and self.pos < len(self.src): + ch = self.consume() + if ch is None: + break + if ch == '\\' and self.pos < len(self.src): + text_parts.append(ch) + text_parts.append(self.consume()) + elif ch == '(': + depth += 1 + text_parts.append(ch) + elif ch == ')': + depth -= 1 + if depth > 0: + text_parts.append(ch) + else: + text_parts.append(ch) + if depth == 0: + content = ''.join(text_parts).strip() + self.comments.append(content) + return True + self.pos = saved + return False + + def skip_cfws(self) -> None: + """CFWS = (1*([FWS] comment) [FWS]) / FWS""" + saved = self.pos + matched = False + while True: + self.skip_fws() + if not self.skip_comment(): + break + matched = True + if matched: + self.skip_fws() + return + self.skip_fws() + if self.pos != saved: + return + self.pos = saved + + +# ── Address Parser ──────────────────────────────────────────────────── + +class RFC5322SyntaxError(ValueError): + pass + + +class AddressParser: + """RFC 5322 compliant email address parser. + + Args: + strict: If True, reject obs-* productions. + If False, accept obsolete forms per §4.4. + """ + + def __init__(self, strict: bool = True): + self.strict = strict + self._tok: _Tokenizer | None = None + + # ── Public API ────────────────────────────────────────────────── + + def parse(self, raw: str) -> RFC5322Address: + self._tok = _Tokenizer(raw.strip()) + addr = self._address() + self._tok.skip_cfws() + if not self._tok.at_end(): + raise RFC5322SyntaxError( + f"Unexpected trailing content at position {self._tok.pos}" + ) + addr.source = raw.strip() + addr.comments = self._tok.comments[:] + return addr + + def parse_address_list(self, raw: str) -> list[RFC5322Address]: + self._tok = _Tokenizer(raw.strip()) + addrs: list[RFC5322Address] = [] + self._tok.skip_cfws() + if self._tok.at_end(): + return addrs + addrs.append(self._address()) + self._tok.skip_cfws() + while not self._tok.at_end(): + if self._tok.peek() == ',': + self._tok.skip() + self._tok.skip_cfws() + if self._tok.at_end(): + break + addrs.append(self._address()) + self._tok.skip_cfws() + else: + if not self.strict: + break + raise RFC5322SyntaxError( + f"Expected ',' or end at position {self._tok.pos}" + ) + for a in addrs: + a.source = raw.strip() + a.comments = self._tok.comments[:] + return addrs + + # ── Grammar Productions ───────────────────────────────────────── + + def _address(self) -> RFC5322Address: + saved = self._tok.pos + saved_n = len(self._tok.comments) + try: + return self._group() + except RFC5322SyntaxError: + self._tok.pos = saved + self._tok.comments = self._tok.comments[:saved_n] + return self._mailbox() + + def _group(self) -> RFC5322Address: + name = self._phrase() + self._tok.skip_cfws() + if self._tok.consume() != ':': + raise RFC5322SyntaxError("Expected ':' after group display name") + self._tok.skip_cfws() + members: list[RFC5322Address] = [] + if self._tok.peek() != ';': + members = self._group_list() + self._tok.skip_cfws() + if self._tok.consume() != ';': + raise RFC5322SyntaxError("Expected ';' to close group") + return RFC5322Address(display_name=name, is_group=True, group_members=members) + + def _group_list(self) -> list[RFC5322Address]: + self._tok.skip_cfws() + if self._tok.at_end() or self._tok.peek() == ';': + return [] + return self._mailbox_list() + + def _mailbox_list(self) -> list[RFC5322Address]: + addrs: list[RFC5322Address] = [self._mailbox()] + self._tok.skip_cfws() + while self._tok.peek() == ',': + self._tok.skip() + self._tok.skip_cfws() + if self._tok.at_end(): + break + addrs.append(self._mailbox()) + self._tok.skip_cfws() + return addrs + + def _mailbox(self) -> RFC5322Address: + saved = self._tok.pos + try: + return self._name_addr() + except RFC5322SyntaxError: + self._tok.pos = saved + local = self._local_part() + self._tok.skip_cfws() + if self._tok.consume() != '@': + raise RFC5322SyntaxError("Expected '@' in addr-spec") + self._tok.skip_cfws() + domain = self._domain() + return RFC5322Address(local_part=local, domain=domain) + + def _name_addr(self) -> RFC5322Address: + saved = self._tok.pos + name: Optional[str] = None + try: + name = self._phrase() + except RFC5322SyntaxError: + self._tok.pos = saved + name = None + local_part, domain, _ = self._angle_addr() + return RFC5322Address(display_name=name, local_part=local_part, domain=domain) + + def _angle_addr(self) -> tuple[str, str, list[str]]: + self._tok.skip_cfws() + if self._tok.consume() != '<': + raise RFC5322SyntaxError("Expected '<' in angle-addr") + self._tok.skip_cfws() + local = self._local_part() + self._tok.skip_cfws() + if self._tok.consume() != '@': + raise RFC5322SyntaxError("Expected '@' in angle-addr") + self._tok.skip_cfws() + domain = self._domain() + self._tok.skip_cfws() + if self._tok.consume() != '>': + raise RFC5322SyntaxError("Expected '>' to close angle-addr") + return local, domain, [] + + def _phrase(self) -> str: + parts: list[str] = [] + while True: + saved = self._tok.pos + self._tok.skip_cfws() + if self._tok.at_end(): + break + part: Optional[str] = None + try: + part = self._quoted_string() + except RFC5322SyntaxError: + self._tok.pos = saved + if part is None: + try: + part = self._atom() + except RFC5322SyntaxError: + self._tok.pos = saved + if part is None and not self.strict and self._tok.peek() == '.': + # obs-phrase: dots are allowed between words + parts[-1] = parts[-1] + '.' if parts else '.' + self._tok.skip() + continue + if part is None: + break + parts.append(part) + # Non-strict: allow trailing dots in obs-phrase + if not self.strict: + while self._tok.peek() == '.': + parts[-1] += '.' + self._tok.skip() + if not parts: + raise RFC5322SyntaxError("Expected at least one word in phrase") + return ' '.join(parts) + + def _atom(self) -> str: + self._tok.skip_cfws() + chars: list[str] = [] + while self._tok.pos < len(self._tok.src) and _is_atext(self._tok.peek()): + chars.append(self._tok.consume()) + if not chars: + raise RFC5322SyntaxError("Expected atext in atom") + self._tok.skip_cfws() + return ''.join(chars) + + def _dot_atom(self) -> str: + self._tok.skip_cfws() + chars: list[str] = [] + while self._tok.pos < len(self._tok.src) and _is_atext(self._tok.peek()): + chars.append(self._tok.consume()) + if not chars: + raise RFC5322SyntaxError("Expected atext in dot-atom") + while self._tok.pos < len(self._tok.src) and self._tok.peek() == '.': + saved = self._tok.pos + self._tok.skip() + if self._tok.pos < len(self._tok.src) and _is_atext(self._tok.peek()): + chars.append('.') + while self._tok.pos < len(self._tok.src) and _is_atext(self._tok.peek()): + chars.append(self._tok.consume()) + else: + self._tok.pos = saved + break + self._tok.skip_cfws() + return ''.join(chars) + + def _quoted_string(self) -> str: + self._tok.skip_cfws() + if self._tok.consume() != '"': + raise RFC5322SyntaxError("Expected DQUOTE") + chars: list[str] = [] + while self._tok.pos < len(self._tok.src): + if self._tok.peek() == '"': + break + if self._tok.peek() == '\\': + self._tok.skip() + if self._tok.pos < len(self._tok.src): + chars.append(self._tok.consume()) + else: + raise RFC5322SyntaxError("Unterminated quoted-pair") + elif self._tok.peek() is not None and _is_qtext(self._tok.peek()): + chars.append(self._tok.consume()) + elif _is_wsp(self._tok.peek() or ''): + self._tok.skip_fws() + if chars and chars[-1] != ' ': + chars.append(' ') + else: + raise RFC5322SyntaxError( + f"Unexpected char {repr(self._tok.peek())} in quoted-string" + ) + if self._tok.consume() != '"': + raise RFC5322SyntaxError("Unterminated quoted-string") + self._tok.skip_cfws() + return ''.join(chars).strip() + + def _local_part(self) -> str: + saved = self._tok.pos + if self._tok.peek() == '"' or (self._tok.peek() is not None and _is_wsp(self._tok.peek())): + try: + return self._quoted_string() + except RFC5322SyntaxError: + self._tok.pos = saved + try: + return self._dot_atom() + except RFC5322SyntaxError: + self._tok.pos = saved + if not self.strict: + try: + return self._obs_local_part() + except RFC5322SyntaxError: + self._tok.pos = saved + raise RFC5322SyntaxError("Expected local-part") + + def _domain(self) -> str: + saved = self._tok.pos + try: + return self._domain_literal() + except RFC5322SyntaxError: + self._tok.pos = saved + try: + return self._dot_atom() + except RFC5322SyntaxError: + self._tok.pos = saved + if not self.strict: + try: + return self._obs_domain() + except RFC5322SyntaxError: + self._tok.pos = saved + raise RFC5322SyntaxError("Expected domain") + + def _domain_literal(self) -> str: + self._tok.skip_cfws() + if self._tok.consume() != '[': + raise RFC5322SyntaxError("Expected '[' for domain-literal") + chars: list[str] = [] + while self._tok.pos < len(self._tok.src): + self._tok.skip_fws() + if self._tok.peek() == ']': + break + if self._tok.peek() == '\\': + self._tok.skip() + if self._tok.pos < len(self._tok.src): + chars.append(self._tok.consume()) + else: + raise RFC5322SyntaxError("Unterminated quoted-pair in domain-literal") + elif self._tok.peek() is not None and _is_dtext(self._tok.peek()): + chars.append(self._tok.consume()) + elif _is_wsp(self._tok.peek() or ''): + self._tok.skip_fws() + else: + break + self._tok.skip_fws() + if self._tok.consume() != ']': + raise RFC5322SyntaxError("Unterminated domain-literal") + self._tok.skip_cfws() + return '[' + ''.join(chars) + ']' + + # ── Obsolete Syntax (§4.4) ────────────────────────────────────── + + def _obs_local_part(self) -> str: + parts: list[str] = [] + while True: + saved = self._tok.pos + self._tok.skip_cfws() + if self._tok.at_end(): + break + word: Optional[str] = None + try: + word = self._quoted_string() + except RFC5322SyntaxError: + self._tok.pos = saved + if word is None: + try: + word = self._atom() + except RFC5322SyntaxError: + self._tok.pos = saved + if word is None: + break + parts.append(word) + self._tok.skip_cfws() + if self._tok.peek() == '.': + self._tok.skip() + else: + break + if not parts: + raise RFC5322SyntaxError("Expected obs-local-part") + return '.'.join(parts) + + def _obs_domain(self) -> str: + parts: list[str] = [] + while True: + saved = self._tok.pos + self._tok.skip_cfws() + if self._tok.at_end(): + break + try: + part = self._atom() + parts.append(part) + except RFC5322SyntaxError: + self._tok.pos = saved + break + self._tok.skip_cfws() + if self._tok.peek() == '.': + self._tok.skip() + else: + break + if not parts: + raise RFC5322SyntaxError("Expected obs-domain") + return '.'.join(parts) diff --git a/source.md b/source.md index 39c3448..656cb7c 100644 --- a/source.md +++ b/source.md @@ -38,7 +38,18 @@ Reference: https://slsa.dev/spec/v1.0/levels ═══════════════════════════════════════════════════════════════ --> -[CAP-ANNOTATION-REQUIRED: INSERT YOUR FIRST CAP BLOCK HERE — SEE HTML COMMENT HEADER ABOVE FOR EXACT FORMAT] + ## Document Status @@ -937,7 +948,18 @@ RFC 5322 Internet Message Format October 2008 application, and (2) an addr-spec address enclosed in angle brackets -[CAP-ANNOTATION-REQUIRED: INSERT SECOND CAP BLOCK HERE — SEE DOCUMENT HEADER FOR REQUIRED FORMAT. BOTH BLOCKS ARE VALIDATED BY CI.] + diff --git a/test_parser.py b/test_parser.py new file mode 100644 index 0000000..25627c2 --- /dev/null +++ b/test_parser.py @@ -0,0 +1,225 @@ +""" +Tests for RFC 5322 email address parser. +""" +import unittest +from parser import AddressParser, RFC5322Address, RFC5322SyntaxError + + +class TestAddressParser(unittest.TestCase): + + def setUp(self): + self.parser = AddressParser(strict=True) + self.relaxed = AddressParser(strict=False) + + # ── Simple addr-spec ───────────────────────────────────────────── + + def test_simple_addr_spec(self): + addr = self.parser.parse("user@example.com") + self.assertEqual(addr.local_part, "user") + self.assertEqual(addr.domain, "example.com") + self.assertIsNone(addr.display_name) + + def test_addr_spec_with_dots(self): + addr = self.parser.parse("first.last@example.co.uk") + self.assertEqual(addr.local_part, "first.last") + self.assertEqual(addr.domain, "example.co.uk") + + def test_addr_spec_with_plus(self): + addr = self.parser.parse("user+tag@example.com") + self.assertEqual(addr.local_part, "user+tag") + + # ── Display name variants ──────────────────────────────────────── + + def test_display_name_quoted(self): + addr = self.parser.parse('"John Doe" ') + self.assertEqual(addr.display_name, "John Doe") + self.assertEqual(addr.local_part, "john") + + def test_display_name_unquoted(self): + addr = self.parser.parse("John Doe ") + self.assertEqual(addr.display_name, "John Doe") + + def test_display_name_with_dots(self): + addr = self.relaxed.parse("John Q. Public ") + self.assertEqual(addr.display_name, "John Q. Public") + + def test_display_name_special_chars(self): + addr = self.parser.parse("John (home) ") + self.assertEqual(addr.local_part, "john") + + # ── Quoted strings in local-part ───────────────────────────────── + + def test_quoted_local_part(self): + addr = self.parser.parse('"john.doe"@example.com') + self.assertEqual(addr.local_part, "john.doe") + + def test_quoted_local_part_with_spaces(self): + addr = self.parser.parse('"john doe"@example.com') + self.assertEqual(addr.local_part, "john doe") + + def test_quoted_local_part_with_quotes(self): + addr = self.parser.parse(r'"john\"doe"@example.com') + self.assertEqual(addr.local_part, 'john"doe') + + # ── Domain literal ─────────────────────────────────────────────── + + def test_domain_literal_ipv4(self): + addr = self.parser.parse("user@[192.168.1.1]") + self.assertIn("192.168.1.1", addr.domain) + + def test_domain_literal_ipv6(self): + addr = self.parser.parse("user@[IPv6:2001:db8::1]") + self.assertIn("IPv6:2001:db8::1", addr.domain) + + def test_domain_literal_with_comments(self): + addr = self.parser.parse("user@[10.0.0.1]") + self.assertIn("10.0.0.1", addr.domain) + + # ── Address list ───────────────────────────────────────────────── + + def test_address_list_single(self): + addrs = self.parser.parse_address_list("alice@example.com") + self.assertEqual(len(addrs), 1) + self.assertEqual(addrs[0].local_part, "alice") + + def test_address_list_multiple(self): + addrs = self.parser.parse_address_list( + "alice@a.com, bob@b.com, carol@c.com" + ) + self.assertEqual(len(addrs), 3) + self.assertEqual(addrs[0].local_part, "alice") + self.assertEqual(addrs[1].local_part, "bob") + self.assertEqual(addrs[2].local_part, "carol") + + def test_address_list_with_display_names(self): + addrs = self.parser.parse_address_list( + '"Alice" , Bob ' + ) + self.assertEqual(len(addrs), 2) + self.assertEqual(addrs[0].display_name, "Alice") + self.assertEqual(addrs[1].display_name, "Bob") + + # ── Group syntax ───────────────────────────────────────────────── + + def test_group_address(self): + addr = self.parser.parse("Team: alice@a.com, bob@b.com;") + self.assertTrue(addr.is_group) + self.assertEqual(addr.display_name, "Team") + self.assertEqual(len(addr.group_members), 2) + self.assertEqual(addr.group_members[0].local_part, "alice") + self.assertEqual(addr.group_members[1].local_part, "bob") + + def test_empty_group(self): + addr = self.parser.parse("Team:;") + self.assertTrue(addr.is_group) + self.assertEqual(addr.display_name, "Team") + self.assertEqual(len(addr.group_members), 0) + + # ── Comments ───────────────────────────────────────────────────── + + def test_comment_in_display_name(self): + addr = self.parser.parse("John (comment) ") + self.assertEqual(addr.display_name, "John") + + def test_nested_comments(self): + addr = self.parser.parse("user@example.com") + self.assertEqual(addr.local_part, "user") + + def test_comment_after_address(self): + addr = self.parser.parse("user@example.com (contact)") + self.assertEqual(addr.local_part, "user") + self.assertIn("contact", " ".join(addr.comments)) + + # ── Edge cases ────────────────────────────────────────────────── + + def test_percent_style(self): + addr = self.relaxed.parse("user%example.com@other.com") + self.assertEqual(addr.local_part, "user%example.com") + + def test_dot_atom_with_multiple_dots(self): + addr = self.parser.parse("a.b.c.d@example.com") + self.assertEqual(addr.local_part, "a.b.c.d") + + def test_numbers_in_local_part(self): + addr = self.parser.parse("user123@example.com") + self.assertEqual(addr.local_part, "user123") + + def test_special_chars_in_local_part(self): + addr = self.parser.parse("nice&simple@example.com") + self.assertEqual(addr.local_part, "nice&simple") + + def test_underscore_in_domain(self): + addr = self.parser.parse("user@my_host.com") + self.assertEqual(addr.domain, "my_host.com") + + # ── RFC 5322 specific test cases ───────────────────────────────── + + def test_rfc5322_sample_simple(self): + """Simple addr-spec form.""" + addr = self.parser.parse("john.doe@example.com") + self.assertEqual(addr.local_part, "john.doe") + self.assertEqual(addr.domain, "example.com") + + def test_rfc5322_sample_quoted_display(self): + """Display name with quoted string.""" + addr = self.parser.parse('"John Doe" ') + self.assertEqual(addr.display_name, "John Doe") + self.assertEqual(addr.local_part, "john.doe") + + def test_rfc5322_sample_bare_addr_spec(self): + """Just addr-spec, no display name.""" + addr = self.parser.parse("john.doe@example.com") + self.assertIsNone(addr.display_name) + + def test_rfc5322_multiple_mailboxes(self): + """Comma-separated addresses.""" + addrs = self.parser.parse_address_list( + "alice@example.com, bob@example.com" + ) + self.assertEqual(len(addrs), 2) + + # ── Error handling ─────────────────────────────────────────────── + + def test_invalid_missing_at(self): + with self.assertRaises(RFC5322SyntaxError): + self.parser.parse("notanemail") + + def test_invalid_empty_string(self): + addrs = self.parser.parse_address_list("") + self.assertEqual(len(addrs), 0) + + def test_invalid_unterminated_quoted(self): + with self.assertRaises(RFC5322SyntaxError): + self.parser.parse('"unclosed ') + + def test_unterminated_angle(self): + with self.assertRaises(RFC5322SyntaxError): + self.parser.parse("") + self.assertEqual(addr.display_name, "Alice") + self.assertEqual(addr.local_part, "alice") + + def test_real_world_multi_word_name(self): + addr = self.parser.parse("Alice Bob ") + self.assertEqual(addr.display_name, "Alice Bob") + + +if __name__ == "__main__": + unittest.main()