Skip to content

Commit

Permalink
skip adding results with invalid remotes to cache chains
Browse files Browse the repository at this point in the history
Signed-off-by: Erik Sipsma <[email protected]>
  • Loading branch information
sipsma committed Dec 6, 2024
1 parent 867e8fd commit 07cf45e
Show file tree
Hide file tree
Showing 14 changed files with 48 additions and 41 deletions.
2 changes: 1 addition & 1 deletion cache/remotecache/azblob/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (ci *importer) loadManifest(ctx context.Context, name string) (*v1.CacheCha
progress.OneOff(ctx, fmt.Sprintf("found %d layers in cache", len(allLayers)))(nil)

cc := v1.NewCacheChains()
if err := v1.ParseConfig(config, allLayers, cc); err != nil {
if err := v1.ParseConfig(ctx, config, allLayers, cc); err != nil {
return nil, err
}

Expand Down
2 changes: 1 addition & 1 deletion cache/remotecache/gha/gha.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ func (ci *importer) loadScope(ctx context.Context, scope string) (*v1.CacheChain
}

cc := v1.NewCacheChains()
if err := v1.ParseConfig(config, allLayers, cc); err != nil {
if err := v1.ParseConfig(ctx, config, allLayers, cc); err != nil {
return nil, err
}
return cc, nil
Expand Down
4 changes: 2 additions & 2 deletions cache/remotecache/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (ci *contentCacheImporter) Resolve(ctx context.Context, desc ocispecs.Descr
}

cc := v1.NewCacheChains()
if err := v1.Parse(dt, allLayers, cc); err != nil {
if err := v1.Parse(ctx, dt, allLayers, cc); err != nil {
return nil, err
}

Expand Down Expand Up @@ -238,7 +238,7 @@ func (ci *contentCacheImporter) importInlineCache(ctx context.Context, dt []byte
return errors.WithStack(err)
}
cc := v1.NewCacheChains()
if err := v1.ParseConfig(config, layers, cc); err != nil {
if err := v1.ParseConfig(ctx, config, layers, cc); err != nil {
return err
}
mu.Lock()
Expand Down
2 changes: 1 addition & 1 deletion cache/remotecache/inline/inline.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (ce *exporter) ExportForLayers(ctx context.Context, layers []digest.Digest)
}

cc := v1.NewCacheChains()
if err := v1.ParseConfig(*config, descs2, cc); err != nil {
if err := v1.ParseConfig(ctx, *config, descs2, cc); err != nil {
return nil, err
}

Expand Down
2 changes: 1 addition & 1 deletion cache/remotecache/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ func (i *importer) load(ctx context.Context) (*v1.CacheChains, error) {
}

cc := v1.NewCacheChains()
if err := v1.ParseConfig(config, allLayers, cc); err != nil {
if err := v1.ParseConfig(ctx, config, allLayers, cc); err != nil {
return nil, err
}
return cc, nil
Expand Down
25 changes: 23 additions & 2 deletions cache/remotecache/v1/chains.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import (
"time"

"github.com/containerd/containerd/content"
cerrdefs "github.com/containerd/errdefs"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/util/bklog"
digest "github.com/opencontainers/go-digest"
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
Expand Down Expand Up @@ -215,7 +217,26 @@ func (c *item) removeLink(src *item) bool {
return found
}

func (c *item) AddResult(_ digest.Digest, _ int, createdAt time.Time, result *solver.Remote) {
func (c *item) AddResult(ctx context.Context, _ digest.Digest, _ int, createdAt time.Time, result *solver.Remote) {
if result == nil {
return
}

if len(result.Descriptors) == 0 {
bklog.G(ctx).Warnf("no descriptors for item %s result, skipping", c.dgst)
return
}

if result.Provider != nil {
for _, desc := range result.Descriptors {
_, err := result.Provider.Info(ctx, desc.Digest)
if err != nil && !cerrdefs.IsNotFound(err) {
bklog.G(ctx).Warnf("failed to get info for item %s descriptor %s, skipping item result: %v", c.dgst, desc.Digest, err)
return
}
}
}

c.resultTime = createdAt
c.result = result
}
Expand Down Expand Up @@ -305,7 +326,7 @@ func (c *item) walkAllResults(fn func(i *item) error, visited map[*item]struct{}
type nopRecord struct {
}

func (c *nopRecord) AddResult(_ digest.Digest, _ int, createdAt time.Time, result *solver.Remote) {
func (c *nopRecord) AddResult(_ context.Context, _ digest.Digest, _ int, createdAt time.Time, result *solver.Remote) {
}

func (c *nopRecord) LinkFrom(rec solver.CacheExporterRecord, index int, selector string) {
Expand Down
4 changes: 2 additions & 2 deletions cache/remotecache/v1/chains_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestSimpleMarshal(t *testing.T) {
Digest: dgst("d1"),
}},
}
baz.AddResult("", 0, time.Now(), r0)
baz.AddResult(context.TODO(), "", 0, time.Now(), r0)
}

addRecords()
Expand Down Expand Up @@ -76,7 +76,7 @@ func TestSimpleMarshal(t *testing.T) {
require.NoError(t, err)

newChains := NewCacheChains()
err = Parse(dt, descPairs, newChains)
err = Parse(context.TODO(), dt, descPairs, newChains)
require.NoError(t, err)

cfg3, _, err := cc.Marshal(context.TODO())
Expand Down
17 changes: 9 additions & 8 deletions cache/remotecache/v1/parse.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cacheimport

import (
"context"
"encoding/json"

"github.com/moby/buildkit/solver"
Expand All @@ -9,27 +10,27 @@ import (
"github.com/pkg/errors"
)

func Parse(configJSON []byte, provider DescriptorProvider, t solver.CacheExporterTarget) error {
func Parse(ctx context.Context, configJSON []byte, provider DescriptorProvider, t solver.CacheExporterTarget) error {
var config CacheConfig
if err := json.Unmarshal(configJSON, &config); err != nil {
return errors.WithStack(err)
}

return ParseConfig(config, provider, t)
return ParseConfig(ctx, config, provider, t)
}

func ParseConfig(config CacheConfig, provider DescriptorProvider, t solver.CacheExporterTarget) error {
func ParseConfig(ctx context.Context, config CacheConfig, provider DescriptorProvider, t solver.CacheExporterTarget) error {
cache := map[int]solver.CacheExporterRecord{}

for i := range config.Records {
if _, err := parseRecord(config, i, provider, t, cache); err != nil {
if _, err := parseRecord(ctx, config, i, provider, t, cache); err != nil {
return err
}
}
return nil
}

func parseRecord(cc CacheConfig, idx int, provider DescriptorProvider, t solver.CacheExporterTarget, cache map[int]solver.CacheExporterRecord) (solver.CacheExporterRecord, error) {
func parseRecord(ctx context.Context, cc CacheConfig, idx int, provider DescriptorProvider, t solver.CacheExporterTarget, cache map[int]solver.CacheExporterRecord) (solver.CacheExporterRecord, error) {
if r, ok := cache[idx]; ok {
if r == nil {
return nil, errors.Errorf("invalid looping record")
Expand All @@ -46,7 +47,7 @@ func parseRecord(cc CacheConfig, idx int, provider DescriptorProvider, t solver.
cache[idx] = nil
for i, inputs := range rec.Inputs {
for _, inp := range inputs {
src, err := parseRecord(cc, inp.LinkIndex, provider, t, cache)
src, err := parseRecord(ctx, cc, inp.LinkIndex, provider, t, cache)
if err != nil {
return nil, err
}
Expand All @@ -61,7 +62,7 @@ func parseRecord(cc CacheConfig, idx int, provider DescriptorProvider, t solver.
return nil, err
}
if remote != nil {
r.AddResult("", 0, res.CreatedAt, remote)
r.AddResult(ctx, "", 0, res.CreatedAt, remote)
}
}

Expand All @@ -86,7 +87,7 @@ func parseRecord(cc CacheConfig, idx int, provider DescriptorProvider, t solver.
}
if remote != nil {
remote.Provider = mp
r.AddResult("", 0, res.CreatedAt, remote)
r.AddResult(ctx, "", 0, res.CreatedAt, remote)
}
}

Expand Down
15 changes: 0 additions & 15 deletions cache/remotecache/v1/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"sort"

cerrdefs "github.com/containerd/errdefs"
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/util/bklog"
digest "github.com/opencontainers/go-digest"
Expand Down Expand Up @@ -275,20 +274,6 @@ type marshalState struct {
}

func marshalRemote(ctx context.Context, r *solver.Remote, state *marshalState) string {
if len(r.Descriptors) == 0 {
return ""
}

if r.Provider != nil {
for _, d := range r.Descriptors {
if _, err := r.Provider.Info(ctx, d.Digest); err != nil {
if !cerrdefs.IsNotImplemented(err) {
return ""
}
}
}
}

var parentID string
if len(r.Descriptors) > 1 {
r2 := &solver.Remote{
Expand Down
4 changes: 2 additions & 2 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10974,7 +10974,7 @@ func testSameChainIDWithLazyBlobsCacheExport(t *testing.T, sb integration.Sandbo
// Put a random file in the rootfs, run a cache invalidation step and then copy
// the random file to a r/w mnt.
def, err = llb.Image(busyboxGzipRef).
Run(llb.Shlex(`sh -c "cat /dev/urandom | head -c 100 > /rand"`)).Root().
Run(llb.Shlex(`sh -c "cat /dev/urandom | head -c 100 | sha256sum > /rand"`)).Root().
Run(llb.Shlex(`echo `+identity.NewID())).Root().
Run(llb.Shlex(`cp /rand /mnt/rand`)).AddMount("/mnt", llb.Scratch()).
Marshal(sb.Context())
Expand Down Expand Up @@ -11008,7 +11008,7 @@ func testSameChainIDWithLazyBlobsCacheExport(t *testing.T, sb integration.Sandbo
// Run the same steps as before but with a different cache invalidation step in the middle
// The random file should still be cached from earlier and thus the output should be the same
def, err = llb.Image(busyboxGzipRef).
Run(llb.Shlex(`sh -c "cat /dev/urandom | head -c 100 > /rand"`)).Root().
Run(llb.Shlex(`sh -c "cat /dev/urandom | head -c 100 | sha256sum > /rand"`)).Root().
Run(llb.Shlex(`echo `+identity.NewID())).Root().
Run(llb.Shlex(`cp /rand /mnt/rand`)).AddMount("/mnt", llb.Scratch()).
Marshal(sb.Context())
Expand Down
6 changes: 3 additions & 3 deletions solver/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (e *exporter) ExportTo(ctx context.Context, t CacheExporterTarget, opt Cach
if opt.CompressionOpt != nil {
for _, r := range remotes { // record all remaining remotes as well
rec := t.Add(recKey)
rec.AddResult(k.vtx, int(k.output), v.CreatedAt, r)
rec.AddResult(ctx, k.vtx, int(k.output), v.CreatedAt, r)
variants = append(variants, rec)
}
}
Expand All @@ -154,15 +154,15 @@ func (e *exporter) ExportTo(ctx context.Context, t CacheExporterTarget, opt Cach
if opt.CompressionOpt != nil {
for _, r := range remotes { // record all remaining remotes as well
rec := t.Add(recKey)
rec.AddResult(k.vtx, int(k.output), v.CreatedAt, r)
rec.AddResult(ctx, k.vtx, int(k.output), v.CreatedAt, r)
variants = append(variants, rec)
}
}
}

if remote != nil {
for _, rec := range allRec {
rec.AddResult(k.vtx, int(k.output), v.CreatedAt, remote)
rec.AddResult(ctx, k.vtx, int(k.output), v.CreatedAt, remote)
}
}
allRec = append(allRec, variants...)
Expand Down
2 changes: 1 addition & 1 deletion solver/llbsolver/provenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ type cacheRecord struct {
ce *cacheExporter
}

func (c *cacheRecord) AddResult(dgst digest.Digest, idx int, createdAt time.Time, result *solver.Remote) {
func (c *cacheRecord) AddResult(_ context.Context, dgst digest.Digest, idx int, createdAt time.Time, result *solver.Remote) {
if result == nil || dgst == "" {
return
}
Expand Down
2 changes: 1 addition & 1 deletion solver/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4078,7 +4078,7 @@ type testExporterRecord struct {
linkMap map[digest.Digest]struct{}
}

func (r *testExporterRecord) AddResult(_ digest.Digest, _ int, createdAt time.Time, result *Remote) {
func (r *testExporterRecord) AddResult(_ context.Context, _ digest.Digest, _ int, createdAt time.Time, result *Remote) {
r.results++
}

Expand Down
2 changes: 1 addition & 1 deletion solver/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ type CacheExporterTarget interface {

// CacheExporterRecord is a single object being exported
type CacheExporterRecord interface {
AddResult(vtx digest.Digest, index int, createdAt time.Time, result *Remote)
AddResult(ctx context.Context, vtx digest.Digest, index int, createdAt time.Time, result *Remote)
LinkFrom(src CacheExporterRecord, index int, selector string)
}

Expand Down

0 comments on commit 07cf45e

Please sign in to comment.