Skip to content

Commit

Permalink
chore(dataobj): use dataset.Reader in dataobj.LogsReader and StreamsR…
Browse files Browse the repository at this point in the history
…eader
  • Loading branch information
rfratto committed Mar 11, 2025
1 parent 8b04f51 commit 46c3a3a
Show file tree
Hide file tree
Showing 9 changed files with 483 additions and 228 deletions.
2 changes: 2 additions & 0 deletions pkg/dataobj/internal/dataset/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ func (r *Reader) Read(ctx context.Context, s []Row) (n int, err error) {
count, err := r.inner.ReadColumns(ctx, r.dl.PrimaryColumns(), s[:readSize])
if err != nil && !errors.Is(err, io.EOF) {
return n, err
} else if count == 0 && errors.Is(err, io.EOF) {
return 0, io.EOF
}

var passCount int // passCount tracks how many rows pass the predicate.
Expand Down
2 changes: 1 addition & 1 deletion pkg/dataobj/internal/dataset/reader_basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (pr *basicReader) ReadColumns(ctx context.Context, columns []Column, s []Ro
// Fill does not advance the offset of the basicReader.
func (pr *basicReader) Fill(ctx context.Context, columns []Column, s []Row) (n int, err error) {
if len(columns) == 0 {
return 0, fmt.Errorf("no columns to read")
return 0, fmt.Errorf("no columns to fill")
}

for partition := range partitionRows(s) {
Expand Down
7 changes: 5 additions & 2 deletions pkg/dataobj/internal/sections/logs/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func IterSection(ctx context.Context, dec encoding.LogsDecoder, section *filemd.
}

for _, row := range rows[:n] {
record, err := decodeRecord(streamsColumns, row)
record, err := Decode(streamsColumns, row)
if err != nil || !yield(record) {
return err
}
Expand All @@ -89,7 +89,10 @@ func IterSection(ctx context.Context, dec encoding.LogsDecoder, section *filemd.
})
}

func decodeRecord(columns []*logsmd.ColumnDesc, row dataset.Row) (Record, error) {
// Decode decodes a record from a [dataset.Row], using the provided columns to
// determine the column type. The list of columns must match the columns used
// to create the row.
func Decode(columns []*logsmd.ColumnDesc, row dataset.Row) (Record, error) {
record := Record{
// Preallocate metadata to exact number of metadata columns to avoid
// oversizing.
Expand Down
7 changes: 5 additions & 2 deletions pkg/dataobj/internal/sections/streams/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func IterSection(ctx context.Context, dec encoding.StreamsDecoder, section *file
}

for _, row := range rows[:n] {
stream, err := decodeRow(streamsColumns, row)
stream, err := Decode(streamsColumns, row)
if err != nil || !yield(stream) {
return err
}
Expand All @@ -87,7 +87,10 @@ func IterSection(ctx context.Context, dec encoding.StreamsDecoder, section *file
})
}

func decodeRow(columns []*streamsmd.ColumnDesc, row dataset.Row) (Stream, error) {
// Decode decodes a stream from a [dataset.Row], using the provided columns to
// determine the column type. The list of columns must match the columns used
// to create the row.
func Decode(columns []*streamsmd.ColumnDesc, row dataset.Row) (Stream, error) {
var stream Stream

for columnIndex, columnValue := range row.Values {
Expand Down
Loading

0 comments on commit 46c3a3a

Please sign in to comment.