Skip to content

Commit bb12177

Browse files
committed
Update attribute parser and importer
1 parent 2cacefa commit bb12177

File tree

8 files changed

+272
-143
lines changed

8 files changed

+272
-143
lines changed

.bumpversion.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[bumpversion]
2-
current_version = 0.8.3
2+
current_version = 0.8.4
33
commit = True
44
tag = False
55

objutils/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
The first parameter is always the codec name.
1414
"""
1515

16-
__version__ = "0.8.3"
16+
__version__ = "0.8.4"
1717

1818
__all__ = [
1919
"Image",

objutils/dwarf/traverser.py

Lines changed: 110 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,40 @@ class AttributeParser:
132132
# Little = 0
133133
# Big = 1
134134

135-
def __init__(self, session):
136-
self.session = session
135+
def __init__(self, session_or_path, *, import_if_needed: bool = True, force_import: bool = False, quiet: bool = True):
136+
"""
137+
Create an AttributeParser.
138+
139+
Parameters
140+
----------
141+
session_or_path:
142+
Either an existing SQLAlchemy session (backward compatible) or a path to an ELF/.prgdb file.
143+
If a path is given, the corresponding program database will be opened (and imported if needed).
144+
import_if_needed: bool
145+
When a path to an ELF is provided, import DWARF into a sibling .prgdb if it doesn't exist yet.
146+
force_import: bool
147+
Force re-import when creating the database from an ELF path.
148+
quiet: bool
149+
Suppress non-error output during on-demand import when a path is provided.
150+
"""
151+
# Lazy import to avoid heavy module import/cycles at module import time.
152+
from objutils.elf import open_program_database # local import by design
153+
154+
# Determine whether we received a session or a filesystem path.
155+
if hasattr(session_or_path, "query"):
156+
# Assume it's a SQLAlchemy session (backward compatible path)
157+
self.session = session_or_path
158+
self._model = None
159+
else:
160+
# Treat as a path (str or Path-like)
161+
db_model = open_program_database(
162+
session_or_path,
163+
import_if_needed=import_if_needed,
164+
force_import=force_import,
165+
quiet=quiet,
166+
)
167+
self._model = db_model
168+
self.session = db_model.session
137169
self.type_stack: set[int] = set()
138170
self.parsed_types: dict = {}
139171
self.att_types: dict = defaultdict(set)
@@ -161,6 +193,82 @@ def __init__(self, session):
161193
self.stack_machine = factory.stack_machine
162194
self.dwarf_expression = factory.dwarf_expression
163195

196+
@lru_cache(maxsize=64 * 1024)
197+
def type_tree(self, obj: Union[int, model.DebugInformationEntry, DIEAttribute]) -> dict[str, Any] | CircularReference:
198+
"""Return a fully traversed type tree as a dict.
199+
200+
Accepts one of:
201+
- a DIE offset (absolute),
202+
- a DIE instance that has a DW_AT_type attribute,
203+
- a DIEAttribute instance (DW_AT_type) whose value references a type.
204+
205+
The returned dictionary contains:
206+
- tag: DWARF tag name for the type DIE
207+
- attrs: non-structural attributes with values; nested "type" attributes
208+
are resolved recursively into dicts
209+
- children: list of child DIE dicts (e.g., members, enumerators, subranges)
210+
211+
Circular references are represented by CircularReference(tag, name).
212+
"""
213+
# Case 1: already an absolute DIE offset
214+
if isinstance(obj, int):
215+
return self.parse_type(obj)
216+
217+
# Case 2: attribute object (expected to be DW_AT_type)
218+
if isinstance(obj, DIEAttribute):
219+
# Try to resolve relative ref forms to absolute offset using the parent DIE if available
220+
parent: Optional[model.DebugInformationEntry] = getattr(obj, "entry", None)
221+
off = self._resolve_type_offset(obj, parent)
222+
if off is None:
223+
return {"tag": "<invalid>", "attrs": {}}
224+
return self.parse_type(off)
225+
226+
# Case 3: a DIE that should have a DW_AT_type attribute
227+
if hasattr(obj, "attributes_map") or hasattr(obj, "attributes"):
228+
die = obj # type: ignore[assignment]
229+
type_attr = self._get_attr(die, "type")
230+
if type_attr is None:
231+
return {"tag": "<no-type>", "attrs": {}}
232+
off = self._resolve_type_offset(type_attr, die)
233+
if off is None:
234+
return {"tag": "<invalid>", "attrs": {}}
235+
return self.parse_type(off)
236+
237+
# Fallback
238+
return {"tag": "<unsupported>", "attrs": {}}
239+
240+
def _resolve_type_offset(
241+
self,
242+
type_attr: DIEAttribute,
243+
context_die: Optional[model.DebugInformationEntry],
244+
) -> Optional[int]:
245+
"""Resolve a DW_AT_type attribute's value to an absolute DIE offset.
246+
247+
Handles CU-relative reference forms by adding the DIE's cu_start.
248+
Returns None if the attribute cannot be interpreted as an integer offset.
249+
"""
250+
raw = getattr(type_attr, "raw_value", None)
251+
try:
252+
off = int(raw) if raw is not None else None
253+
except Exception:
254+
off = None
255+
if off is None:
256+
return None
257+
try:
258+
frm = getattr(type_attr, "form", None)
259+
if frm in (
260+
getattr(AttributeForm, "DW_FORM_ref1", None),
261+
getattr(AttributeForm, "DW_FORM_ref2", None),
262+
getattr(AttributeForm, "DW_FORM_ref4", None),
263+
getattr(AttributeForm, "DW_FORM_ref8", None),
264+
getattr(AttributeForm, "DW_FORM_ref_udata", None),
265+
):
266+
base = getattr(context_die, "cu_start", 0) if context_die is not None else 0
267+
off += int(base or 0)
268+
except Exception:
269+
pass
270+
return off
271+
164272
# The cache lives per-instance because "self" participates in the key.
165273
@lru_cache(maxsize=8192)
166274
def get_die(self, offset: int) -> model.DebugInformationEntry | None:

objutils/elf/__init__.py

Lines changed: 152 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ def fetch(
254254
self,
255255
sections: str = None,
256256
name_pattern: str = None,
257-
symbol_list: str = None, # comma-separated list of symbols
257+
symbol_list: typing.Optional[list[str]] = None, # comma-separated list of symbols
258258
bindings: str = None,
259259
access: str = None,
260260
types_str: str = None,
@@ -315,7 +315,7 @@ def fetch(
315315
flt.append(defs.SymbolType.STT_TLS)
316316
query = query.filter(model.Elf_Symbol.st_type.in_(flt))
317317
if symbol_list:
318-
name_flt = frozenset(symbol_list.split(","))
318+
name_flt = frozenset(symbol_list)
319319
query = query.filter(model.Elf_Symbol.symbol_name.in_(name_flt))
320320
query = query.order_by(model.Elf_Symbol.section_name)
321321
if order_by_value:
@@ -1061,3 +1061,153 @@ def create_image(
10611061
if callback:
10621062
callback("stop", None)
10631063
return img
1064+
1065+
1066+
def import_dwarf_to_db(
1067+
elf_path: str,
1068+
out_db: str | None,
1069+
*,
1070+
quiet: bool = False,
1071+
verbose: bool = False,
1072+
run_lines: bool = True,
1073+
run_pubnames: bool = True,
1074+
run_aranges: bool = True,
1075+
run_mac: bool = False,
1076+
force: bool = False,
1077+
) -> int:
1078+
"""Import DWARF data from ELF into a .prgdb database file.
1079+
1080+
Returns an exit code compatible with the CLI script.
1081+
"""
1082+
# Local import to avoid potential circular import at module import time.
1083+
from objutils.dwarf import DwarfProcessor
1084+
1085+
def _print(msg: str):
1086+
if not quiet:
1087+
print(msg)
1088+
1089+
elf_p = Path(elf_path)
1090+
if not elf_p.exists() or not elf_p.is_file():
1091+
_print(f"ELF file not found: {elf_path}")
1092+
return 2
1093+
1094+
default_db_path = elf_p.with_suffix(model.DB_EXTENSION)
1095+
try:
1096+
if force:
1097+
if default_db_path.exists():
1098+
try:
1099+
default_db_path.unlink()
1100+
except Exception:
1101+
pass
1102+
if out_db:
1103+
outp = Path(out_db)
1104+
if outp.exists():
1105+
try:
1106+
outp.unlink()
1107+
except Exception:
1108+
pass
1109+
except Exception:
1110+
pass
1111+
1112+
try:
1113+
ep = ElfParser(str(elf_p))
1114+
except Exception as e:
1115+
_print(f"Failed to open ELF file '{elf_path}': {e}")
1116+
return 2
1117+
1118+
if verbose:
1119+
_print(str(ep))
1120+
1121+
try:
1122+
dp = DwarfProcessor(ep)
1123+
except TypeError as te:
1124+
_print(f"No DWARF sections available in '{elf_path}': {te}")
1125+
return 1
1126+
1127+
if run_pubnames:
1128+
try:
1129+
dp.pubnames()
1130+
except Exception as e:
1131+
if verbose:
1132+
_print(f"Warning: pubnames failed: {e}")
1133+
if run_aranges:
1134+
try:
1135+
dp.aranges()
1136+
except Exception as e:
1137+
if verbose:
1138+
_print(f"Warning: aranges failed: {e}")
1139+
if run_lines:
1140+
try:
1141+
dp.do_lines()
1142+
except Exception as e:
1143+
if verbose:
1144+
_print(f"Warning: do_lines failed: {e}")
1145+
1146+
try:
1147+
dp.do_dbg_info()
1148+
except Exception as e:
1149+
_print(f"Error while parsing .debug_info: {e}")
1150+
return 3
1151+
1152+
if run_mac:
1153+
try:
1154+
dp.do_mac_info()
1155+
except Exception as e:
1156+
if verbose:
1157+
_print(f"Warning: do_mac_info failed: {e}")
1158+
1159+
try:
1160+
if out_db:
1161+
src_db = default_db_path
1162+
dst_db = Path(out_db)
1163+
if str(dst_db.resolve()) != str(src_db.resolve()):
1164+
if not src_db.exists():
1165+
with model.Model(str(dst_db)) as _mdb: # type: ignore[attr-defined]
1166+
pass
1167+
else:
1168+
try:
1169+
ep.db.close()
1170+
except Exception:
1171+
pass
1172+
import shutil as _shutil
1173+
1174+
_shutil.copyfile(str(src_db), str(dst_db))
1175+
_print(f"Wrote database: {dst_db}")
1176+
else:
1177+
_print(f"Database available at: {src_db}")
1178+
else:
1179+
_print(f"Database available at: {default_db_path}")
1180+
except Exception as e:
1181+
_print(f"Failed to write/copy database: {e}")
1182+
return 4
1183+
1184+
return 0
1185+
1186+
1187+
def open_program_database(
1188+
path: str | os.PathLike,
1189+
*,
1190+
import_if_needed: bool = True,
1191+
force_import: bool = False,
1192+
quiet: bool = True,
1193+
) -> model.Model:
1194+
"""Open a program database (.prgdb) or derive it from an ELF file.
1195+
1196+
If `path` points to a .prgdb file, open and return it. If it points to an ELF
1197+
file (or a .prgdb does not yet exist next to the ELF), optionally import
1198+
DWARF to create the database and return the opened model.
1199+
"""
1200+
p = Path(path)
1201+
if p.suffix.lower() == model.DB_EXTENSION:
1202+
return model.Model(str(p))
1203+
1204+
# Treat as ELF input; determine the sibling .prgdb
1205+
elf_path = p
1206+
db_path = elf_path.with_suffix(model.DB_EXTENSION)
1207+
if not db_path.exists():
1208+
if not import_if_needed:
1209+
raise FileNotFoundError(str(db_path))
1210+
rc = import_dwarf_to_db(str(elf_path), str(db_path), quiet=quiet, force=force_import)
1211+
if rc != 0:
1212+
raise RuntimeError(f"Failed to import DWARF from '{elf_path}' (rc={rc})")
1213+
return model.Model(str(db_path))

0 commit comments

Comments
 (0)