Skip to content

Commit 2d395cb

Browse files
Add draft indexed mcap reader
Signed-off-by: Thomas Sedlmayer <[email protected]>
1 parent 71449a1 commit 2d395cb

File tree

1 file changed

+167
-0
lines changed

1 file changed

+167
-0
lines changed

osi3trace/indexed_mcap_reader.py

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
from typing import Optional, Iterable, Iterator, Tuple, List, Any
2+
import io
3+
4+
from mcap.reader import SeekingReader, NonSeekingReader, breakup_chunk, _chunks_matching_topics, DecodedMessageTuple
5+
6+
from mcap._message_queue import MessageQueue
7+
from mcap.data_stream import ReadDataStream
8+
from mcap.exceptions import DecoderNotFoundError
9+
from mcap.records import (
10+
Channel,
11+
Chunk,
12+
ChunkIndex,
13+
Message,
14+
Schema,
15+
)
16+
17+
18+
class IndexedSeekingReader(SeekingReader):
19+
"""
20+
Extends SeekingReader with global message indexing.
21+
22+
- Every message gets a global integer index (0..N-1).
23+
- Builds a table mapping global index -> (chunk_offset, index_in_chunk).
24+
- Provides .get(global_index) to retrieve a message by index.
25+
"""
26+
27+
def __init__(
28+
self,
29+
stream,
30+
validate_crcs: bool = False,
31+
decoder_factories=(),
32+
record_size_limit: Optional[int] = 4 * 2**30,
33+
):
34+
super().__init__(
35+
stream,
36+
validate_crcs=validate_crcs,
37+
decoder_factories=decoder_factories,
38+
record_size_limit=record_size_limit,
39+
)
40+
self._index_map: List[Tuple[Optional[int], int]] = [] # (chunk_offset, idx_in_chunk)
41+
42+
def iter_messages(
43+
self,
44+
topics: Optional[Iterable[str]] = None,
45+
start_time: Optional[int] = None,
46+
end_time: Optional[int] = None,
47+
log_time_order: bool = True,
48+
reverse: bool = False,
49+
) -> Iterator[Tuple[Optional[Schema], Channel, Message, int]]:
50+
"""
51+
Same as SeekingReader.iter_messages, but yields an extra global index:
52+
(schema, channel, message, global_index)
53+
"""
54+
self._index_map.clear()
55+
global_idx = 0
56+
57+
summary = self.get_summary()
58+
if summary is None or len(summary.chunk_indexes) == 0:
59+
# No chunk indices: fall back to NonSeekingReader
60+
self._stream.seek(0, io.SEEK_SET)
61+
for schema, channel, msg in NonSeekingReader(self._stream).iter_messages(
62+
topics, start_time, end_time, log_time_order
63+
):
64+
self._index_map.append((None, global_idx))
65+
yield schema, channel, msg, global_idx
66+
global_idx += 1
67+
return
68+
69+
message_queue = MessageQueue(log_time_order=log_time_order, reverse=reverse)
70+
for chunk_index in _chunks_matching_topics(
71+
summary, topics, start_time, end_time
72+
):
73+
message_queue.push(chunk_index)
74+
75+
while message_queue:
76+
next_item = message_queue.pop()
77+
if isinstance(next_item, ChunkIndex):
78+
self._stream.seek(next_item.chunk_start_offset + 1 + 8, io.SEEK_SET)
79+
chunk = Chunk.read(ReadDataStream(self._stream))
80+
for index_in_chunk, record in enumerate(
81+
breakup_chunk(chunk, validate_crc=self._validate_crcs)
82+
):
83+
if not isinstance(record, Message):
84+
continue
85+
channel = summary.channels[record.channel_id]
86+
if topics is not None and channel.topic not in topics:
87+
continue
88+
if start_time is not None and record.log_time < start_time:
89+
continue
90+
if end_time is not None and record.log_time >= end_time:
91+
continue
92+
schema = (
93+
None
94+
if channel.schema_id == 0
95+
else summary.schemas[channel.schema_id]
96+
)
97+
message_queue.push(
98+
(
99+
(schema, channel, record),
100+
next_item.chunk_start_offset,
101+
index_in_chunk,
102+
)
103+
)
104+
else:
105+
(schema, channel, record), chunk_offset, idx_in_chunk = next_item
106+
self._index_map.append((chunk_offset, idx_in_chunk))
107+
print(self._index_map)
108+
yield schema, channel, record, global_idx
109+
global_idx += 1
110+
111+
def get(self, global_index: int) -> Tuple[Optional[Schema], Channel, DecodedMessageTuple, Message]:
112+
"""
113+
Retrieve a message by its global index.
114+
Returns the same tuple shape as iter_decoded_messages().
115+
"""
116+
117+
if global_index >= len(self._index_map):
118+
for _, _, _, idx in self.iter_messages():
119+
if idx >= global_index:
120+
break
121+
122+
if global_index < 0 or global_index >= len(self._index_map):
123+
raise IndexError(global_index)
124+
125+
chunk_offset, idx_in_chunk = self._index_map[global_index]
126+
127+
if chunk_offset is None:
128+
# Non-seeking path: re-run a linear iteration
129+
for i, (schema, channel, decoded, raw) in enumerate(super().iter_decoded_messages()):
130+
if i == global_index:
131+
return schema, channel, decoded, raw
132+
raise IndexError(global_index)
133+
134+
# Seeking path
135+
self._stream.seek(chunk_offset + 1 + 8, io.SEEK_SET)
136+
chunk = Chunk.read(ReadDataStream(self._stream))
137+
for j, record in enumerate(
138+
breakup_chunk(chunk, validate_crc=self._validate_crcs)
139+
):
140+
if not isinstance(record, Message):
141+
continue
142+
if j == idx_in_chunk:
143+
summary = self.get_summary()
144+
channel = summary.channels[record.channel_id]
145+
schema = (
146+
None if channel.schema_id == 0 else summary.schemas[channel.schema_id]
147+
)
148+
def decoded_message(
149+
schema: Optional[Schema], channel: Channel, message: Message
150+
) -> Any:
151+
decoder = self._decoders.get(message.channel_id)
152+
if decoder is not None:
153+
return decoder(message.data)
154+
for factory in self._decoder_factories:
155+
decoder = factory.decoder_for(channel.message_encoding, schema)
156+
if decoder is not None:
157+
self._decoders[message.channel_id] = decoder
158+
return decoder(message.data)
159+
160+
raise DecoderNotFoundError(
161+
f"no decoder factory supplied for message encoding {channel.message_encoding}, "
162+
f"schema {schema}"
163+
)
164+
decoded = decoded_message(schema, channel, record)
165+
return DecodedMessageTuple(schema, channel, record, decoded)
166+
167+
raise IndexError(global_index)

0 commit comments

Comments
 (0)