|
| 1 | +#!/usr/bin/env python |
| 2 | +# This file is part of icspacket. |
| 3 | +# Copyright (C) 2025-present MatrixEditor @ github |
| 4 | +# |
| 5 | +# This program is free software: you can redistribute it and/or modify |
| 6 | +# it under the terms of the GNU General Public License as published by |
| 7 | +# the Free Software Foundation, either version 3 of the License, or |
| 8 | +# (at your option) any later version. |
| 9 | +# |
| 10 | +# This program is distributed in the hope that it will be useful, |
| 11 | +# but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 12 | +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 13 | +# GNU General Public License for more details. |
| 14 | +# |
| 15 | +# You should have received a copy of the GNU General Public License |
| 16 | +# along with this program. If not, see <https://www.gnu.org/licenses/>. |
| 17 | +# pyright: reportUnusedCallResult=false |
| 18 | +# |
| 19 | +# Simple Script to |
| 20 | +import argparse |
| 21 | +import dataclasses |
| 22 | +import datetime |
| 23 | +import logging |
| 24 | +import json |
| 25 | + |
| 26 | +from typing import Any |
| 27 | + |
| 28 | +from rich.console import Console |
| 29 | +from rich.highlighter import ReprHighlighter |
| 30 | +from rich.markup import escape |
| 31 | +from rich.text import Text |
| 32 | +from rich.tree import Tree |
| 33 | +from scapy.layers.l2 import Dot1Q, Ether |
| 34 | + |
| 35 | +from icspacket.core import logger |
| 36 | +from icspacket.proto.iec61850._iec61850 import ( |
| 37 | + Data, |
| 38 | + IEC61850_Specific_Protocol, |
| 39 | +) |
| 40 | +from icspacket.proto.iec61850.goose import PDU, GOOSE_Client |
| 41 | + |
| 42 | +from icspacket.examples.util import add_logging_options |
| 43 | +from icspacket.proto.iec61850.path import ObjectReference |
| 44 | +from icspacket.examples.mms_utility import data_to_str |
| 45 | +from icspacket.proto.mms.data import get_floating_point_value, Timestamp |
| 46 | + |
| 47 | + |
| 48 | +class PacketHandler: |
| 49 | + def __init__(self, args) -> None: |
| 50 | + self.console = Console() |
| 51 | + self.json_out = args.json |
| 52 | + self.json_minify = args.json_minify |
| 53 | + self.target_dataref = args.data |
| 54 | + self.target_dataset_ref = args.dataset |
| 55 | + self.target_goid = args.goid |
| 56 | + |
| 57 | + def include(self, pdu: IEC61850_Specific_Protocol) -> bool: |
| 58 | + if pdu.present == IEC61850_Specific_Protocol.PRESENT.PR_goosePdu: |
| 59 | + goose_pdu = pdu.goosePdu |
| 60 | + ref = ObjectReference.from_mmsref(goose_pdu.gocbRef) |
| 61 | + if self.target_dataref and ref != self.target_dataref: |
| 62 | + return False |
| 63 | + |
| 64 | + data_set = ObjectReference.from_mmsref(goose_pdu.datSet) |
| 65 | + if self.target_dataset_ref and data_set != self.target_dataset_ref: |
| 66 | + return False |
| 67 | + |
| 68 | + go_id = goose_pdu.goID |
| 69 | + if self.target_goid and go_id != self.target_goid: |
| 70 | + return False |
| 71 | + |
| 72 | + return True |
| 73 | + |
| 74 | + def on_message(self, pkt: Ether, pdu: PDU) -> None: |
| 75 | + vlan_pkt = pkt.getlayer(Dot1Q) |
| 76 | + dest: str = pkt.dst.lower() |
| 77 | + is_std = dest.startswith("01:0c:cd") |
| 78 | + |
| 79 | + address = dest[-5:] |
| 80 | + if vlan_pkt: |
| 81 | + logging.debug( |
| 82 | + "(G: %s) (VLAN %s) 802.1Q from %s", address, vlan_pkt.vlan, pkt.src |
| 83 | + ) |
| 84 | + try: |
| 85 | + apdu = pdu.apdu(IEC61850_Specific_Protocol) |
| 86 | + except ValueError: |
| 87 | + return logging.error("Failed to parse GOOSE APDU") |
| 88 | + |
| 89 | + # logging.debug("(App: [b]%s[/]): [i]%s[/]", pdu.app_id, |
| 90 | + # apdu.present.name[3:]) |
| 91 | + include = self.include(apdu) |
| 92 | + if not include: |
| 93 | + return |
| 94 | + |
| 95 | + logging.info( |
| 96 | + "(AppId: [b]%s[/]) %s > %s%s", |
| 97 | + pdu.app_id, |
| 98 | + pkt.src, |
| 99 | + pkt.dst, |
| 100 | + "" if is_std else " ([i]non-std dest[/])", |
| 101 | + ) |
| 102 | + if apdu.present == IEC61850_Specific_Protocol.PRESENT.PR_goosePdu: |
| 103 | + goose_pdu = apdu.goosePdu |
| 104 | + assert goose_pdu # this should never happen |
| 105 | + |
| 106 | + if self.json_out: |
| 107 | + return self._format_json(pkt, vlan_pkt, pdu, apdu) |
| 108 | + |
| 109 | + ref = ObjectReference.from_mmsref(goose_pdu.gocbRef) |
| 110 | + data_set = ObjectReference.from_mmsref(goose_pdu.datSet) |
| 111 | + go_id = goose_pdu.goID |
| 112 | + |
| 113 | + label = ( |
| 114 | + f"[bold light_green]{goose_pdu.stNum}:{goose_pdu.sqNum}[/] [b]DATA[/]:" |
| 115 | + f"{ref} of [b]DATA_SET[/]:{data_set}" |
| 116 | + ) |
| 117 | + if go_id: |
| 118 | + label = f"{label} ([b]{go_id}[/])" |
| 119 | + |
| 120 | + tree = Tree(label) |
| 121 | + tree.add(f"Simulation: {goose_pdu.simulation or False}") |
| 122 | + tree.add(f"ConfRev: {goose_pdu.confRev}") |
| 123 | + tree.add(f"NDSCom: {goose_pdu.ndsCom}") |
| 124 | + |
| 125 | + ts = Timestamp.from_utc_time(goose_pdu.time) |
| 126 | + dt = datetime.datetime.fromtimestamp(ts.seconds) |
| 127 | + tree.add(f"Time: {dt}") |
| 128 | + if goose_pdu.numDatSetEntries > 0: |
| 129 | + subtree = tree.add("Data Set Entries:") |
| 130 | + for i, value in enumerate(list(goose_pdu.allData)): |
| 131 | + value_text = self._format_data(value) |
| 132 | + text = Text(f"({i}) {value.present.name[3:]}: ") |
| 133 | + text.append(value_text) |
| 134 | + subtree.add(text) |
| 135 | + |
| 136 | + self.console.print(tree) |
| 137 | + |
| 138 | + def _format_data(self, data: Data) -> Any: |
| 139 | + highlighter = ReprHighlighter() |
| 140 | + text = Text() |
| 141 | + do_highlight = True |
| 142 | + match data.present: |
| 143 | + case Data.PRESENT.PR_floating_point: |
| 144 | + text.append(str(get_floating_point_value(data.floating_point))) |
| 145 | + case Data.PRESENT.PR_boolean: |
| 146 | + text.append(str(data.boolean)) |
| 147 | + case Data.PRESENT.PR_integer: |
| 148 | + text.append(str(data.integer)) |
| 149 | + case Data.PRESENT.PR_Unsigned: |
| 150 | + text.append(str(data.unsigned)) |
| 151 | + case Data.PRESENT.PR_visible_string: |
| 152 | + text.append(escape(repr(data.visible_string or "<EMPTY>"))) |
| 153 | + case Data.PRESENT.PR_objId: |
| 154 | + text.append(escape(repr(data.objId or "<EMPTY>"))) |
| 155 | + case Data.PRESENT.PR_structure: |
| 156 | + text.append("{ ") |
| 157 | + first = True |
| 158 | + for element in list(data.structure): |
| 159 | + element_text = self._format_data(element) |
| 160 | + if not first: |
| 161 | + text.append(", ") |
| 162 | + else: |
| 163 | + first = False |
| 164 | + text.append(element_text) |
| 165 | + text.append(" }") |
| 166 | + case Data.PRESENT.PR_array: |
| 167 | + text.append("[ ") |
| 168 | + first = True |
| 169 | + for element in list(data.array): |
| 170 | + element_text = self._format_data(element) |
| 171 | + if not first: |
| 172 | + text.append(", ") |
| 173 | + else: |
| 174 | + first = False |
| 175 | + text.append(element_text) |
| 176 | + text.append(" ]") |
| 177 | + case Data.PRESENT.PR_bit_string: |
| 178 | + text.append(f"'{data.bit_string.value.to01()}'B") |
| 179 | + do_highlight = False |
| 180 | + case Data.PRESENT.PR_octet_string: |
| 181 | + text.append(f"'{data.octet_string.hex()}'H") |
| 182 | + do_highlight = False |
| 183 | + case Data.PRESENT.PR_utc_time: |
| 184 | + ts = Timestamp.from_utc_time(data.utc_time) |
| 185 | + dt = datetime.datetime.fromtimestamp(ts.seconds) |
| 186 | + text.append(str(dt)) |
| 187 | + do_highlight = False |
| 188 | + case _: |
| 189 | + text.append(str(data_to_str(data))) |
| 190 | + do_highlight = False |
| 191 | + |
| 192 | + if do_highlight: |
| 193 | + highlighter.highlight(text) |
| 194 | + return text |
| 195 | + |
| 196 | + def _format_json( |
| 197 | + self, pkt: Ether, vlan_pkt: Dot1Q, pdu: PDU, apdu: IEC61850_Specific_Protocol |
| 198 | + ) -> None: |
| 199 | + apdu_json = json.loads(apdu.jer_encode()) |
| 200 | + log_entry = { |
| 201 | + "src": pkt.src, |
| 202 | + "dst": pkt.dst, |
| 203 | + "vlan": vlan_pkt.vlan if vlan_pkt else None, |
| 204 | + "pdu": dataclasses.asdict(pdu), |
| 205 | + } |
| 206 | + del log_entry["pdu"]["raw_apdu"] |
| 207 | + log_entry["pdu"]["apdu"] = apdu_json |
| 208 | + self.console.print( |
| 209 | + json.dumps(log_entry, indent=2 if not self.json_minify else None) |
| 210 | + ) |
| 211 | + |
| 212 | + |
| 213 | +def cli_main(): |
| 214 | + from icspacket import __version__ |
| 215 | + |
| 216 | + parser = argparse.ArgumentParser( |
| 217 | + description="GOOSE Observer (requires administrative privileges)", |
| 218 | + ) |
| 219 | + # fmt: off |
| 220 | + group = parser.add_argument_group("Output Options") |
| 221 | + group.add_argument("--json", action="store_true", help="Output as JSON", default=False) |
| 222 | + group.add_argument("--json-minify", action="store_true", help="Minify the JSON output", default=False) |
| 223 | + |
| 224 | + group = parser.add_argument_group("Filter Options") |
| 225 | + group.add_argument("--data", type=ObjectReference.from_mmsref, help="Filter on data set entry (DataName)", metavar="<LDName>/<LNName>.<FC>.<DataName>", default=None) |
| 226 | + group.add_argument("--dataset", type=ObjectReference.from_mmsref, help="Filter on data set name (DataName)", metavar="<LDName>/<LNName>.<DataName>", default=None) |
| 227 | + group.add_argument("--goid", type=str, help="Filter on GoID", metavar="GOID", default=None) |
| 228 | + |
| 229 | + group = parser.add_argument_group("Input Options") |
| 230 | + group.add_argument("-I", "--interface", action="append", help="Interface to listen on", dest="interfaces", default=None) |
| 231 | + group.add_argument("-P", "--pcap", action="append", help="Read packets from a PCAP file", dest="pcaps", default=None) |
| 232 | + # fmt: on |
| 233 | + add_logging_options(parser) |
| 234 | + |
| 235 | + args = parser.parse_args() |
| 236 | + if args.json: |
| 237 | + # disable all logging output except errors |
| 238 | + args.quiet = True |
| 239 | + |
| 240 | + logger.init_from_args(args.verbosity, args.quiet, args.ts) |
| 241 | + if args.verbosity > 0: |
| 242 | + print(f"icspacket v{__version__}\n") |
| 243 | + |
| 244 | + handler = PacketHandler(args) |
| 245 | + observer = GOOSE_Client( |
| 246 | + iface=args.interfaces, inputs=args.pcaps, callback=handler.on_message |
| 247 | + ) |
| 248 | + if not args.pcaps: |
| 249 | + logging.info("Hit Ctrl+C to stop...") |
| 250 | + observer.listen_forever() |
| 251 | + else: |
| 252 | + observer.start() |
| 253 | + observer.stop(join=True) |
| 254 | + |
| 255 | + |
| 256 | +if __name__ == "__main__": |
| 257 | + cli_main() |
0 commit comments