Skip to content

Commit

Permalink
add block query context
Browse files Browse the repository at this point in the history
  • Loading branch information
kolesnikovae committed Feb 4, 2025
1 parent 45192ea commit 3ab3fb0
Show file tree
Hide file tree
Showing 17 changed files with 374 additions and 130 deletions.
88 changes: 49 additions & 39 deletions api/gen/proto/go/metastore/v1/types.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 31 additions & 0 deletions api/gen/proto/go/metastore/v1/types_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions api/metastore/v1/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ message BlockMeta {
int64 min_time = 6;
int64 max_time = 7;
int32 created_by = 8;
uint64 metadata_offset = 12;
uint64 size = 9;
repeated Dataset datasets = 10;

Expand Down
4 changes: 4 additions & 0 deletions api/openapiv2/gen/phlare.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,10 @@
"type": "integer",
"format": "int32"
},
"metadataOffset": {
"type": "string",
"format": "uint64"
},
"size": {
"type": "string",
"format": "uint64"
Expand Down
85 changes: 71 additions & 14 deletions pkg/experiment/block/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ type CompactionPlan struct {
datasets []*datasetCompaction
meta *metastorev1.BlockMeta
strings *metadata.StringTable
datasetIndex *datasetIndex
datasetIndex *datasetIndexWriter
}

func newBlockCompaction(
Expand All @@ -168,7 +168,7 @@ func newBlockCompaction(
tenant: tenant,
datasetMap: make(map[int32]*datasetCompaction),
strings: metadata.NewStringTable(),
datasetIndex: newDatasetIndex(),
datasetIndex: newDatasetIndexWriter(),
}
p.path = BuildObjectPath(tenant, shard, compactionLevel, id)
p.meta = &metastorev1.BlockMeta{
Expand All @@ -187,7 +187,8 @@ func (b *CompactionPlan) Compact(ctx context.Context, dst objstore.Bucket, tmpdi
err = multierror.New(err, w.Close()).Err()
}()
// Datasets are compacted in a strict order.
for _, s := range b.datasets {
for i, s := range b.datasets {
b.datasetIndex.resetDatasetIndex(uint32(i))
if err = s.compact(ctx, w); err != nil {
return nil, fmt.Errorf("compacting block: %w", err)
}
Expand All @@ -209,20 +210,21 @@ func (b *CompactionPlan) writeDatasetIndex(w *Writer) error {
return err
}
off := w.Offset()
n, err := w.ReadFrom(bytes.NewReader(b.datasetIndex.indexRewriter.buf))
n, err := w.ReadFrom(bytes.NewReader(b.datasetIndex.buf))
if err != nil {
return err
}
labels := metadata.NewLabelBuilder(b.strings).BuildPairs(
metadata.LabelNameTenantDataset,
metadata.LabelValueDatasetIndex,
metadata.LabelValueDatasetTSDBIndex,
)
b.meta.Datasets = append(b.meta.Datasets, &metastorev1.Dataset{
Tenant: b.meta.Tenant,
Name: 0, // Anonymous.
MinTime: b.meta.MinTime,
MaxTime: b.meta.MaxTime,
// FIXME: We mimic the default layout: empty profiles, index, and empty symbols.
// Instead, it should be handled at the query time: substitute the dataset layout.
TableOfContents: []uint64{off, off, w.Offset()},
Size: uint64(n),
Labels: labels,
Expand Down Expand Up @@ -571,20 +573,75 @@ func (s *symbolsRewriter) loadStacktraceIDs(values []parquet.Value) {

func (s *symbolsRewriter) Flush() error { return s.w.Flush() }

type datasetIndex struct {
indexRewriter *indexRewriter
// datasetIndexWriter is identical with indexRewriter,
// except it writes dataset ID instead of series ID.
type datasetIndexWriter struct {
series []seriesLabels
chunks []index.ChunkMeta
previous model.Fingerprint
symbols map[string]struct{}
idx uint32
buf []byte
}

func newDatasetIndex() *datasetIndex {
return &datasetIndex{
indexRewriter: newIndexRewriter(),
func newDatasetIndexWriter() *datasetIndexWriter {
return &datasetIndexWriter{
symbols: make(map[string]struct{}),
}
}

func (s *datasetIndex) writeRow(r ProfileEntry) error {
return s.indexRewriter.rewriteRow(r)
func (rw *datasetIndexWriter) resetDatasetIndex(i uint32) { rw.idx = i }

func (rw *datasetIndexWriter) writeRow(e ProfileEntry) error {
if rw.previous != e.Fingerprint || len(rw.series) == 0 {
series := e.Labels.Clone()
for _, l := range series {
rw.symbols[l.Name] = struct{}{}
rw.symbols[l.Value] = struct{}{}
}
rw.series = append(rw.series, seriesLabels{
labels: series,
fingerprint: e.Fingerprint,
})
rw.chunks = append(rw.chunks, index.ChunkMeta{
SeriesIndex: rw.idx,
})
rw.previous = e.Fingerprint
}
return nil
}

func (s *datasetIndex) Flush() error {
return s.indexRewriter.Flush()
func (rw *datasetIndexWriter) Flush() error {
// TODO(kolesnikovae):
// * Estimate size.
// * Use buffer pool.
w, err := memindex.NewWriter(context.Background(), 1<<20)
if err != nil {
return err
}

// Sort symbols
symbols := make([]string, 0, len(rw.symbols))
for s := range rw.symbols {
symbols = append(symbols, s)
}
sort.Strings(symbols)

// Add symbols
for _, symbol := range symbols {
if err = w.AddSymbol(symbol); err != nil {
return err
}
}

// Add Series
for i, series := range rw.series {
if err = w.AddSeries(storage.SeriesRef(i), series.labels, series.fingerprint, rw.chunks[i]); err != nil {
return err
}
}

err = w.Close()
rw.buf = w.ReleaseIndex()
return err
}
35 changes: 35 additions & 0 deletions pkg/experiment/block/dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,41 @@ import (
"github.com/grafana/pyroscope/pkg/util/refctr"
)

type Section uint32

const (
// Table of contents sections.
_ Section = iota
SectionProfiles
SectionTSDB
SectionSymbols
)

var allSections = []Section{
SectionProfiles,
SectionTSDB,
SectionSymbols,
}

var (
// Version-specific.
sectionNames = [...][]string{1: {"invalid", "profiles", "tsdb", "symbols"}}
sectionIndices = [...][]int{1: {-1, 0, 1, 2}}
)

func (sc Section) open(ctx context.Context, s *Dataset) (err error) {
switch sc {
case SectionTSDB:
return openTSDB(ctx, s)
case SectionSymbols:
return openSymbols(ctx, s)
case SectionProfiles:
return openProfileTable(ctx, s)
default:
panic(fmt.Sprintf("bug: unknown section: %d", sc))
}
}

type Dataset struct {
meta *metastorev1.Dataset
obj *Object
Expand Down
11 changes: 11 additions & 0 deletions pkg/experiment/block/metadata/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,14 @@ func (t *StringTable) Load(x iter.Iterator[string]) error {
}
return x.Err()
}

func OpenStringTable(src *metastorev1.BlockMeta) *StringTable {
t := &StringTable{
Dict: make(map[string]int32, len(src.StringTable)),
Strings: src.StringTable,
}
for i, s := range src.StringTable {
t.Dict[s] = int32(i)
}
return t
}
Loading

0 comments on commit 3ab3fb0

Please sign in to comment.