Skip to content

Commit d801b87

Browse files
authored
LastImportTime for resource. (grafana#112153)
* LastImportTime for resource. * Make StorageBackendImpl implement GetResourceLastImportTimes * More missing implementations of GetResourceLastImportTimes * Fix import. * Skip TestGetResourceLastImportTime in TestBadgerKVStorageBackend. * Implement GetResourceLastImportTimes by mockStorageBackend * Bump test tolerance. * Fix postgres query and timezone. * Fix postgres query and timezone. * Make linter happy.
1 parent 3cdae5d commit d801b87

24 files changed

+476
-8
lines changed

pkg/registry/apis/dashboard/legacy/storage.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,12 @@ func (a *dashboardSqlAccess) ListModifiedSince(ctx context.Context, key resource
242242
}
243243
}
244244

245+
func (a *dashboardSqlAccess) GetResourceLastImportTimes(ctx context.Context) iter.Seq2[resource.ResourceLastImportTime, error] {
246+
return func(yield func(resource.ResourceLastImportTime, error) bool) {
247+
yield(resource.ResourceLastImportTime{}, errors.New("not implemented"))
248+
}
249+
}
250+
245251
// List implements StorageBackend.
246252
func (a *dashboardSqlAccess) ListIterator(ctx context.Context, req *resourcepb.ListRequest, cb func(resource.ListIterator) error) (int64, error) {
247253
if req.ResourceVersion != 0 {

pkg/registry/apis/iam/noopstorage/storage_backend.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,3 +61,9 @@ func (c *StorageBackendImpl) WatchWriteEvents(ctx context.Context) (<-chan *reso
6161
func (c *StorageBackendImpl) WriteEvent(context.Context, resource.WriteEvent) (int64, error) {
6262
return 0, errNoopStorage
6363
}
64+
65+
func (c *StorageBackendImpl) GetResourceLastImportTimes(ctx context.Context) iter.Seq2[resource.ResourceLastImportTime, error] {
66+
return func(yield func(resource.ResourceLastImportTime, error) bool) {
67+
yield(resource.ResourceLastImportTime{}, errNoopStorage)
68+
}
69+
}

pkg/registry/apis/iam/resourcepermission/storage_backend.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"k8s.io/apiserver/pkg/endpoints/request"
1414

1515
"github.com/grafana/authlib/types"
16+
1617
"github.com/grafana/grafana/apps/iam/pkg/apis/iam/v0alpha1"
1718
"github.com/grafana/grafana/pkg/infra/log"
1819
"github.com/grafana/grafana/pkg/registry/apis/iam/common"
@@ -295,3 +296,9 @@ func (s *ResourcePermSqlBackend) WriteEvent(ctx context.Context, event resource.
295296

296297
return rv, err
297298
}
299+
300+
func (s *ResourcePermSqlBackend) GetResourceLastImportTimes(ctx context.Context) iter.Seq2[resource.ResourceLastImportTime, error] {
301+
return func(yield func(resource.ResourceLastImportTime, error) bool) {
302+
yield(resource.ResourceLastImportTime{}, errNotImplemented)
303+
}
304+
}

pkg/storage/unified/resource/bulk.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,13 @@ func grpcMetaValueIsTrue(vals []string) bool {
3333
}
3434

3535
type BulkRequestIterator interface {
36+
// Next advances the iterator to the next element if one exists.
3637
Next() bool
3738

38-
// The next event we should process
39+
// Request returns the current element. Only valid after Next() returns true.
3940
Request() *resourcepb.BulkRequest
4041

41-
// Rollback requested
42+
// RollbackRequested returns true if there was an error advancing the iterator. Checked after Next() returns true.
4243
RollbackRequested() bool
4344
}
4445

pkg/storage/unified/resource/cdk_backend.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,12 @@ type cdkBackend struct {
7575
stream chan<- *WrittenEvent
7676
}
7777

78+
func (s *cdkBackend) GetResourceLastImportTimes(ctx context.Context) iter.Seq2[ResourceLastImportTime, error] {
79+
return func(yield func(ResourceLastImportTime, error) bool) {
80+
yield(ResourceLastImportTime{}, errors.New("not implemented"))
81+
}
82+
}
83+
7884
func (s *cdkBackend) ListModifiedSince(ctx context.Context, key NamespacedResource, sinceRv int64) (int64, iter.Seq2[*ModifiedResource, error]) {
7985
return 0, func(yield func(*ModifiedResource, error) bool) {
8086
yield(nil, errors.New("not implemented"))

pkg/storage/unified/resource/search_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,12 @@ func (m *mockStorageBackend) ListModifiedSince(ctx context.Context, key Namespac
129129
}
130130
}
131131

132+
func (m *mockStorageBackend) GetResourceLastImportTimes(ctx context.Context) iter.Seq2[ResourceLastImportTime, error] {
133+
return func(yield func(ResourceLastImportTime, error) bool) {
134+
yield(ResourceLastImportTime{}, errors.New("not implemented"))
135+
}
136+
}
137+
132138
// mockSearchBackend implements SearchBackend for testing with tracking capabilities
133139
type mockSearchBackend struct {
134140
openIndexes []NamespacedResource

pkg/storage/unified/resource/server.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,11 @@ type BackendReadResponse struct {
8686
Error *resourcepb.ErrorResult
8787
}
8888

89+
type ResourceLastImportTime struct {
90+
NamespacedResource
91+
LastImportTime time.Time
92+
}
93+
8994
// The StorageBackend is an internal abstraction that supports interacting with
9095
// the underlying raw storage medium. This interface is never exposed directly,
9196
// it is provided by concrete instances that actually write values.
@@ -118,6 +123,9 @@ type StorageBackend interface {
118123

119124
// Get resource stats within the storage backend. When namespace is empty, it will apply to all
120125
GetResourceStats(ctx context.Context, namespace string, minCount int) ([]ResourceStats, error)
126+
127+
// GetResourceLastImportTimes returns import times for all namespaced resources in the backend.
128+
GetResourceLastImportTimes(ctx context.Context) iter.Seq2[ResourceLastImportTime, error]
121129
}
122130

123131
type ModifiedResource struct {

pkg/storage/unified/resource/storage_backend.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,13 @@ import (
1515

1616
"github.com/bwmarrin/snowflake"
1717
"github.com/grafana/grafana-app-sdk/logging"
18-
"github.com/grafana/grafana/pkg/apimachinery/utils"
19-
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
20-
"github.com/grafana/grafana/pkg/util/debouncer"
2118
"github.com/prometheus/client_golang/prometheus"
2219
"go.opentelemetry.io/otel/trace"
2320
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
21+
22+
"github.com/grafana/grafana/pkg/apimachinery/utils"
23+
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
24+
"github.com/grafana/grafana/pkg/util/debouncer"
2425
)
2526

2627
const (
@@ -1076,6 +1077,12 @@ func (k *kvStorageBackend) GetResourceStats(ctx context.Context, namespace strin
10761077
return k.dataStore.GetResourceStats(ctx, namespace, minCount)
10771078
}
10781079

1080+
func (k *kvStorageBackend) GetResourceLastImportTimes(ctx context.Context) iter.Seq2[ResourceLastImportTime, error] {
1081+
return func(yield func(ResourceLastImportTime, error) bool) {
1082+
yield(ResourceLastImportTime{}, fmt.Errorf("not implemented"))
1083+
}
1084+
}
1085+
10791086
// readAndClose reads all data from a ReadCloser and ensures it's closed,
10801087
// combining any errors from both operations.
10811088
func readAndClose(r io.ReadCloser) ([]byte, error) {

pkg/storage/unified/search/bleve_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1489,8 +1489,8 @@ func TestConcurrentIndexUpdateAndSearchWithIndexMinUpdateInterval(t *testing.T)
14891489
if rvDiff == 0 {
14901490
// OK
14911491
} else {
1492-
// Allow returned RV to be within 10% of minInterval.
1493-
require.InDelta(t, minInterval.Milliseconds(), rvDiff, float64(minInterval.Milliseconds())*0.10)
1492+
// Allow returned RV to be within 20% of minInterval (to account for slow CI machines).
1493+
require.InDelta(t, minInterval.Milliseconds(), rvDiff, float64(minInterval.Milliseconds())*0.20)
14941494
}
14951495
}
14961496

pkg/storage/unified/sql/backend.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -955,3 +955,51 @@ func (b *backend) fetchLatestHistoryRV(ctx context.Context, x db.ContextExecer,
955955
}
956956
return res.ResourceVersion, nil
957957
}
958+
959+
func (b *backend) GetResourceLastImportTimes(ctx context.Context) iter.Seq2[resource.ResourceLastImportTime, error] {
960+
ctx, span := b.tracer.Start(ctx, tracePrefix+"GetLastImportTimes")
961+
defer span.End()
962+
963+
rows, err := dbutil.QueryRows(ctx, b.db, sqlResourceLastImportTimeQuery, &sqlResourceLastImportTimeQueryRequest{SQLTemplate: sqltemplate.New(b.dialect)})
964+
if err != nil {
965+
return func(yield func(resource.ResourceLastImportTime, error) bool) {
966+
yield(resource.ResourceLastImportTime{}, err)
967+
}
968+
}
969+
970+
return func(yield func(resource.ResourceLastImportTime, error) bool) {
971+
closeOnDefer := true
972+
defer func() {
973+
if closeOnDefer {
974+
_ = rows.Close() // Close while ignoring errors.
975+
}
976+
}()
977+
978+
for rows.Next() {
979+
// If context has finished, return early.
980+
if ctx.Err() != nil {
981+
yield(resource.ResourceLastImportTime{}, ctx.Err())
982+
return
983+
}
984+
985+
row := resource.ResourceLastImportTime{}
986+
err = rows.Scan(&row.Namespace, &row.Group, &row.Resource, &row.LastImportTime)
987+
if err != nil {
988+
yield(resource.ResourceLastImportTime{}, err)
989+
return
990+
}
991+
992+
if !yield(row, nil) {
993+
return
994+
}
995+
}
996+
997+
closeOnDefer = false
998+
999+
// Close and report error, if any.
1000+
err := rows.Close()
1001+
if err != nil {
1002+
yield(resource.ResourceLastImportTime{}, err)
1003+
}
1004+
}
1005+
}

0 commit comments

Comments
 (0)