11from __future__ import annotations
22
33import re
4+ from dataclasses import dataclass
5+ from typing import Union
46
57from typing_extensions import TypedDict , override
68
79from pyinfra .api import FactBase
810
911from .gpg import GpgFactBase
10- from .util import make_cat_files_command
12+
13+
14+ @dataclass
15+ class AptRepo :
16+ """Represents an APT repository configuration.
17+
18+ This dataclass provides type safety for APT repository definitions,
19+ supporting both legacy .list style and modern deb822 .sources formats.
20+
21+ Provides dict-like access for backward compatibility while offering
22+ full type safety for modern code.
23+ """
24+
25+ type : str # "deb" or "deb-src"
26+ url : str # Repository URL
27+ distribution : str # Suite/distribution name
28+ components : list [str ] # List of components (e.g., ["main", "contrib"])
29+ options : dict [str , Union [str , list [str ]]] # Repository options
30+
31+ # Dict-like interface for backward compatibility
32+ def __getitem__ (self , key : str ):
33+ """Dict-like access: repo['type'] works like repo.type"""
34+ return getattr (self , key )
35+
36+ def __setitem__ (self , key : str , value ):
37+ """Dict-like assignment: repo['type'] = 'deb' works like repo.type = 'deb'"""
38+ setattr (self , key , value )
39+
40+ def __contains__ (self , key : str ) -> bool :
41+ """Support 'key' in repo syntax"""
42+ return hasattr (self , key )
43+
44+ def get (self , key : str , default = None ):
45+ """Dict-like get: repo.get('type', 'deb')"""
46+ return getattr (self , key , default )
47+
48+ def keys (self ):
49+ """Return dict-like keys"""
50+ return ["type" , "url" , "distribution" , "components" , "options" ]
51+
52+ def values (self ):
53+ """Return dict-like values"""
54+ return [self .type , self .url , self .distribution , self .components , self .options ]
55+
56+ def items (self ):
57+ """Return dict-like items"""
58+ return [(k , getattr (self , k )) for k in self .keys ()]
59+
60+ @override
61+ def __eq__ (self , other ) -> bool :
62+ """Enhanced equality that works with dicts and AptRepo instances"""
63+ if isinstance (other , dict ):
64+ return (
65+ self .type == other .get ("type" )
66+ and self .url == other .get ("url" )
67+ and self .distribution == other .get ("distribution" )
68+ and self .components == other .get ("components" )
69+ and self .options == other .get ("options" )
70+ )
71+ elif isinstance (other , AptRepo ):
72+ return (
73+ self .type == other .type
74+ and self .url == other .url
75+ and self .distribution == other .distribution
76+ and self .components == other .components
77+ and self .options == other .options
78+ )
79+ return False
80+
81+ def to_json (self ):
82+ """Convert to dict for JSON serialization"""
83+ return {
84+ "type" : self .type ,
85+ "url" : self .url ,
86+ "distribution" : self .distribution ,
87+ "components" : self .components ,
88+ "options" : self .options ,
89+ }
90+
91+
92+ @dataclass
93+ class AptSourcesFile :
94+ """Represents a deb822 sources file entry before expansion into individual repositories.
95+
96+ This preserves the original multi-value fields from deb822 format,
97+ while AptRepo represents individual expanded repositories.
98+ """
99+
100+ types : list [str ] # ["deb", "deb-src"]
101+ uris : list [str ] # ["http://deb.debian.org", "https://mirror.example.com"]
102+ suites : list [str ] # ["bookworm", "bullseye"]
103+ components : list [str ] # ["main", "contrib", "non-free"]
104+ architectures : list [str ] | None = None # ["amd64", "i386"]
105+ signed_by : list [str ] | None = None # ["/path/to/key1.gpg", "/path/to/key2.gpg"]
106+ trusted : str | None = None # "yes"/"no"
107+
108+ @classmethod
109+ def from_deb822_lines (cls , lines : list [str ]) -> "AptSourcesFile | None" :
110+ """Parse deb822 stanza lines into AptSourcesFile.
111+
112+ Returns None if parsing failed or repository is disabled.
113+ """
114+ if not lines :
115+ return None
116+
117+ data : dict [str , str ] = {}
118+ for line in lines :
119+ if not line or line .startswith ("#" ):
120+ continue
121+ # Field-Name: value
122+ try :
123+ key , value = line .split (":" , 1 )
124+ except ValueError : # malformed line
125+ continue
126+ data [key .strip ()] = value .strip ()
127+
128+ # Validate required fields
129+ required = ("Types" , "URIs" , "Suites" )
130+ if not all (field in data for field in required ):
131+ return None
132+
133+ # Filter out disabled repositories
134+ enabled_str = data .get ("Enabled" , "yes" ).lower ()
135+ if enabled_str != "yes" :
136+ return None
137+
138+ # Parse fields into appropriate types
139+ return cls (
140+ types = data .get ("Types" , "" ).split (),
141+ uris = data .get ("URIs" , "" ).split (),
142+ suites = data .get ("Suites" , "" ).split (),
143+ components = data .get ("Components" , "" ).split (),
144+ architectures = (
145+ data .get ("Architectures" , "" ).split () if data .get ("Architectures" ) else None
146+ ),
147+ signed_by = data .get ("Signed-By" , "" ).split () if data .get ("Signed-By" ) else None ,
148+ trusted = data .get ("Trusted" , "" ).lower () if data .get ("Trusted" ) else None ,
149+ )
150+
151+ @classmethod
152+ def parse_sources_file (cls , lines : list [str ]) -> list [AptRepo ]:
153+ """Parse a full deb822 .sources file into AptRepo instances.
154+
155+ Splits on blank lines into stanzas and parses each one.
156+ Returns a combined list of AptRepo instances for all stanzas.
157+
158+ Args:
159+ lines: Lines from a .sources file
160+ """
161+ repos = []
162+ stanza : list [str ] = []
163+ for raw in lines + ["" ]: # sentinel blank line to flush last stanza
164+ line = raw .rstrip ("\n " )
165+ if line .strip () == "" :
166+ if stanza :
167+ sources_file = cls .from_deb822_lines (stanza )
168+ if sources_file :
169+ repos .extend (sources_file .expand_to_repos ())
170+ stanza = []
171+ continue
172+ stanza .append (line )
173+ return repos
174+
175+ def expand_to_repos (self ) -> list [AptRepo ]:
176+ """Expand this sources file entry into individual AptRepo instances."""
177+ # Build options dict in the same format as legacy parsing
178+ options : dict [str , Union [str , list [str ]]] = {}
179+
180+ if self .architectures :
181+ options ["arch" ] = (
182+ self .architectures if len (self .architectures ) > 1 else self .architectures [0 ]
183+ )
184+ if self .signed_by :
185+ options ["signed-by" ] = self .signed_by if len (self .signed_by ) > 1 else self .signed_by [0 ]
186+ if self .trusted :
187+ options ["trusted" ] = self .trusted
188+
189+ repos = []
190+ # Produce combinations – in most real-world cases these will each be one.
191+ for repo_type in self .types :
192+ for uri in self .uris :
193+ for suite in self .suites :
194+ repos .append (
195+ AptRepo (
196+ type = repo_type ,
197+ url = uri ,
198+ distribution = suite ,
199+ components = self .components .copy (), # copy to avoid shared reference
200+ options = dict (options ), # copy per entry
201+ )
202+ )
203+ return repos
11204
12205
13206def noninteractive_apt (command : str , force = False ):
@@ -32,13 +225,21 @@ def noninteractive_apt(command: str, force=False):
32225)
33226
34227
35- def parse_apt_repo (name ):
228+ def parse_apt_repo (name : str ) -> AptRepo | None :
229+ """Parse a traditional apt source line into an AptRepo.
230+
231+ Args:
232+ name: Apt source line (e.g., "deb [arch=amd64] http://example.com focal main")
233+
234+ Returns:
235+ AptRepo instance or None if parsing failed
236+ """
36237 regex = r"^(deb(?:-src)?)(?:\s+\[([^\]]+)\])?\s+([^\s]+)\s+([^\s]+)\s+([a-z-\s\d]*)$"
37238
38239 matches = re .match (regex , name )
39240
40241 if not matches :
41- return
242+ return None
42243
43244 # Parse any options
44245 options = {}
@@ -51,53 +252,97 @@ def parse_apt_repo(name):
51252
52253 options [key ] = value
53254
54- return {
55- "options" : options ,
56- "type" : matches .group (1 ),
57- "url" : matches .group (3 ),
58- "distribution" : matches .group (4 ),
59- "components" : list ( matches . group ( 5 ). split ()) ,
60- }
255+ return AptRepo (
256+ type = matches . group ( 1 ) ,
257+ url = matches .group (3 ),
258+ distribution = matches .group (4 ),
259+ components = list ( matches .group (5 ). split () ),
260+ options = options ,
261+ )
61262
62263
63- class AptSources (FactBase ):
264+ def parse_apt_list_file (lines : list [str ]) -> list [AptRepo ]:
265+ """Parse legacy .list style apt source file.
266+
267+ Each non-comment, non-empty line is a discrete repository definition in the
268+ traditional ``deb http://... suite components`` syntax.
269+ Returns a list of AptRepo instances.
270+
271+ Args:
272+ lines: Lines from a .list file
64273 """
65- Returns a list of installed apt sources:
274+ repos = []
275+ for raw in lines :
276+ line = raw .strip ()
277+ if not line or line .startswith ("#" ):
278+ continue
279+ repo = parse_apt_repo (line )
280+ if repo :
281+ repos .append (repo )
282+ return repos
66283
67- .. code:: python
68284
69- [
70- {
71- "type": "deb",
72- "url": "http://archive.ubuntu.org",
73- "distribution": "trusty",
74- "components", ["main", "multiverse"],
75- },
76- ]
285+ class AptSources (FactBase ):
286+ """Returns a list of installed apt sources (legacy .list + deb822 .sources).
287+
288+ Returns a list of AptRepo instances that behave like dicts for backward compatibility:
289+
290+ [AptRepo(type="deb", url="http://archive.ubuntu.org", ...)]
291+
292+ Each AptRepo can be accessed like a dict:
293+ repo['type'] # works like repo.type
294+ repo.get('url') # works like getattr(repo, 'url')
77295 """
78296
79297 @override
80298 def command (self ) -> str :
81- return make_cat_files_command (
82- "/etc/apt/sources.list" ,
83- "/etc/apt/sources.list.d/*.list" ,
299+ # We emit file boundary markers so the parser can select the correct
300+ # parsing function based on filename extension.
301+ return (
302+ "sh -c '"
303+ "for f in "
304+ "/etc/apt/sources.list "
305+ "/etc/apt/sources.list.d/*.list "
306+ "/etc/apt/sources.list.d/*.sources; do "
307+ '[ -e "$f" ] || continue; '
308+ 'echo "##FILE $f"; '
309+ 'cat "$f"; '
310+ "echo; "
311+ "done'"
84312 )
85313
86314 @override
87315 def requires_command (self ) -> str :
88- return "apt" # if apt installed, above should exist
316+ return "apt"
89317
90318 default = list
91319
92320 @override
93- def process (self , output ):
94- repos = []
321+ def process (self , output ): # type: ignore[override]
322+ repos : list [AptRepo ] = []
323+ current_file : str | None = None
324+ buffer : list [str ] = []
325+
326+ def flush ():
327+ nonlocal buffer , current_file , repos
328+ if current_file is None or not buffer :
329+ buffer = []
330+ return
331+
332+ if current_file .endswith (".sources" ):
333+ repos .extend (AptSourcesFile .parse_sources_file (buffer ))
334+ else : # .list files or /etc/apt/sources.list
335+ repos .extend (parse_apt_list_file (buffer ))
336+ buffer = []
95337
96338 for line in output :
97- repo = parse_apt_repo (line )
98- if repo :
99- repos .append (repo )
339+ if line .startswith ("##FILE " ):
340+ flush () # flush previous file buffer
341+ current_file = line [7 :].strip () # remove "##FILE " prefix
342+ continue
343+ buffer .append (line )
100344
345+ flush () # flush the final buffer
101346 return repos
102347
103348
0 commit comments