| 
5 | 5 | import shutil  | 
6 | 6 | from datetime import datetime  | 
7 | 7 | from tempfile import mkdtemp  | 
8 |  | -from typing import Dict, List, Optional  | 
 | 8 | +from typing import Dict, Iterable, List, Optional, Union  | 
9 | 9 | 
 
  | 
10 | 10 | from dateutil.parser import parse as parse_date  | 
11 | 11 | from distro import distro  | 
@@ -385,6 +385,57 @@ def process(self, output):  | 
385 | 385 |         return sysctls  | 
386 | 386 | 
 
  | 
387 | 387 | 
 
  | 
 | 388 | +class GroupInfo(TypedDict):  | 
 | 389 | +    name: str  | 
 | 390 | +    password: str  | 
 | 391 | +    gid: int  | 
 | 392 | +    user_list: list[str]  | 
 | 393 | + | 
 | 394 | + | 
 | 395 | +def _group_info_from_group_str(info: str) -> GroupInfo:  | 
 | 396 | +    """  | 
 | 397 | +    Parses an entry from /etc/group or a similar source, e.g.  | 
 | 398 | +    'plugdev:x:46:sysadmin,user2' into a GroupInfo dict object  | 
 | 399 | +    """  | 
 | 400 | + | 
 | 401 | +    fields = info.split(":")  | 
 | 402 | + | 
 | 403 | +    if len(fields) != 4:  | 
 | 404 | +        raise ValueError(f"Error parsing group '{info}', expected exactly 4 fields separated by :")  | 
 | 405 | + | 
 | 406 | +    return {  | 
 | 407 | +        "name": fields[0],  | 
 | 408 | +        "password": fields[1],  | 
 | 409 | +        "gid": int(fields[2]),  | 
 | 410 | +        "user_list": fields[3].split(","),  | 
 | 411 | +    }  | 
 | 412 | + | 
 | 413 | + | 
 | 414 | +class Group(FactBase[GroupInfo]):  | 
 | 415 | +    """  | 
 | 416 | +    Returns information on a specific group on the system.  | 
 | 417 | +    """  | 
 | 418 | + | 
 | 419 | +    def command(self, group):  | 
 | 420 | +        # FIXME: the '|| true' ensures 'process' is called, even if  | 
 | 421 | +        #        getent was unable to find information on the group  | 
 | 422 | +        #        There must be a better way to do this !  | 
 | 423 | +        #        e.g. allow facts 'process' method access to the process  | 
 | 424 | +        #        return code ?  | 
 | 425 | +        return f"getent group {group} || true"  | 
 | 426 | + | 
 | 427 | +    default = None  | 
 | 428 | + | 
 | 429 | +    def process(self, output: Iterable[str]) -> str:  | 
 | 430 | +        group_string = next(iter(output), None)  | 
 | 431 | + | 
 | 432 | +        if group_string is None:  | 
 | 433 | +            # This will happen if the group was simply not found  | 
 | 434 | +            return None  | 
 | 435 | + | 
 | 436 | +        return _group_info_from_group_str(group_string)  | 
 | 437 | + | 
 | 438 | + | 
388 | 439 | class Groups(FactBase[List[str]]):  | 
389 | 440 |     """  | 
390 | 441 |     Returns a list of groups on the system.  | 
@@ -417,7 +468,20 @@ def process(self, output) -> list[str]:  | 
417 | 468 | Crontab = crontab.Crontab  | 
418 | 469 | 
 
  | 
419 | 470 | 
 
  | 
420 |  | -class Users(FactBase):  | 
 | 471 | +class UserInfo(TypedDict):  | 
 | 472 | +    name: str  | 
 | 473 | +    comment: str  | 
 | 474 | +    home: str  | 
 | 475 | +    shell: str  | 
 | 476 | +    group: str  | 
 | 477 | +    groups: list[str]  | 
 | 478 | +    uid: int  | 
 | 479 | +    gid: int  | 
 | 480 | +    lastlog: str  | 
 | 481 | +    password: str  | 
 | 482 | + | 
 | 483 | + | 
 | 484 | +class Users(FactBase[dict[str, UserInfo]]):  | 
421 | 485 |     """  | 
422 | 486 |     Returns a dictionary of users -> details.  | 
423 | 487 | 
  | 
@@ -457,7 +521,7 @@ def command(self) -> str:  | 
457 | 521 | 
 
  | 
458 | 522 |     default = dict  | 
459 | 523 | 
 
  | 
460 |  | -    def process(self, output):  | 
 | 524 | +    def process(self, output: Iterable[str]) -> dict[str, UserInfo]:  | 
461 | 525 |         users = {}  | 
462 | 526 |         rex = r"[A-Z][a-z]{2} [A-Z][a-z]{2} {1,2}\d+ .+$"  | 
463 | 527 | 
 
  | 
 | 
0 commit comments