Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix various lazy blob errors involving dedupe by chainID #5560

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cache/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (cm *cacheManager) GetByBlob(ctx context.Context, desc ocispecs.Descriptor,
descHandlers := descHandlersOf(opts...)
if desc.Digest != "" && (descHandlers == nil || descHandlers[desc.Digest] == nil) {
if _, err := cm.ContentStore.Info(ctx, desc.Digest); errors.Is(err, cerrdefs.ErrNotFound) {
return nil, NeedsRemoteProviderError([]digest.Digest{desc.Digest})
return nil, NeedsRemoteProviderError([]DigestDescriptionPair{{Digest: desc.Digest}})
} else if err != nil {
return nil, err
}
Expand Down Expand Up @@ -396,7 +396,7 @@ func (cm *cacheManager) getRecord(ctx context.Context, id string, opts ...RefOpt
if isLazy, err := cr.isLazy(ctx); err != nil {
return err
} else if isLazy && dhs[blob] == nil {
missing = append(missing, blob)
missing = append(missing, DigestDescriptionPair{Digest: blob, Description: cr.GetDescription()})
}
return nil
}); err != nil {
Expand Down
13 changes: 11 additions & 2 deletions cache/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,19 @@ func descHandlersOf(opts ...RefOption) DescHandlers {

type DescHandlerKey digest.Digest

type NeedsRemoteProviderError []digest.Digest //nolint:errname
type NeedsRemoteProviderError []DigestDescriptionPair //nolint:errname

type DigestDescriptionPair struct {
Digest digest.Digest
Description string
}

func (d DigestDescriptionPair) String() string {
return fmt.Sprintf("%s: %s", d.Digest, d.Description)
}

func (m NeedsRemoteProviderError) Error() string {
return fmt.Sprintf("missing descriptor handlers for lazy blobs %+v", []digest.Digest(m))
return fmt.Sprintf("missing descriptor handlers for lazy blobs %+v", []DigestDescriptionPair(m))
}

type Unlazy session.Group
Expand Down
15 changes: 15 additions & 0 deletions cache/refs.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type Ref interface {
RefMetadata
Release(context.Context) error
IdentityMapping() *idtools.IdentityMapping
DescHandlers() DescHandlers
DescHandler(digest.Digest) *DescHandler
}

Expand Down Expand Up @@ -612,6 +613,13 @@ func (sr *immutableRef) LayerChain() RefList {
return l
}

func (sr *immutableRef) DescHandlers() DescHandlers {
// clone to prevent mutation of internal state
dhs := make(DescHandlers)
maps.Copy(dhs, sr.descHandlers)
return dhs
}

func (sr *immutableRef) DescHandler(dgst digest.Digest) *DescHandler {
return sr.descHandlers[dgst]
}
Expand Down Expand Up @@ -640,6 +648,13 @@ func (sr *mutableRef) traceLogFields() logrus.Fields {
return m
}

func (sr *mutableRef) DescHandlers() DescHandlers {
// clone to prevent mutation of internal state
dhs := make(DescHandlers)
maps.Copy(dhs, sr.descHandlers)
return dhs
}

func (sr *mutableRef) DescHandler(dgst digest.Digest) *DescHandler {
return sr.descHandlers[dgst]
}
Expand Down
4 changes: 3 additions & 1 deletion client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10899,6 +10899,7 @@ func testRunValidExitCodes(t *testing.T, sb integration.Sandbox) {
}

func testSameChainIDWithLazyBlobsCacheExport(t *testing.T, sb integration.Sandbox) {
requiresLinux(t)
workers.CheckFeatureCompat(t, sb,
workers.FeatureCacheExport,
workers.FeatureCacheImport,
Expand Down Expand Up @@ -11040,6 +11041,7 @@ func testSameChainIDWithLazyBlobsCacheExport(t *testing.T, sb integration.Sandbo
}

func testSameChainIDWithLazyBlobsCacheMountBase(t *testing.T, sb integration.Sandbox) {
requiresLinux(t)
workers.CheckFeatureCompat(t, sb,
workers.FeatureCacheExport,
workers.FeatureCacheImport,
Expand Down Expand Up @@ -11123,7 +11125,7 @@ func testSameChainIDWithLazyBlobsCacheMountBase(t *testing.T, sb integration.San
require.NoError(t, err)

// try to re-use the cache mount from before, ensure we successfully get
// the same one writen to in previous step
// the same one written to in previous step
def, err = llb.Image(busyboxZstdRef).Run(
llb.Shlex(`stat /mnt/bar`),
llb.AddMount("/mnt",
Expand Down
76 changes: 46 additions & 30 deletions solver/cacheopts.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,69 +28,85 @@ func WithCacheOptGetter(ctx context.Context, getter func(includeAncestors bool,
return context.WithValue(ctx, cacheOptGetterKey{}, getter)
}

func withAncestorCacheOpts(ctx context.Context, start *state) context.Context {
func withAncestorCacheOpts(ctx context.Context, start *sharedOp) context.Context {
return WithCacheOptGetter(ctx, func(includeAncestors bool, keys ...interface{}) map[interface{}]interface{} {
keySet := make(map[interface{}]struct{})
for _, k := range keys {
keySet[k] = struct{}{}
}
values := make(map[interface{}]interface{})
walkAncestors(ctx, start, func(st *state) bool {
if st.clientVertex.Error != "" {
walkAncestors(ctx, start, func(op *sharedOp) bool {
if op.st.clientVertex.Error != "" {
// don't use values from cancelled or otherwise error'd vertexes
return false
}
for _, res := range st.op.cacheRes {
if res.Opts == nil {
continue

for k := range keySet {
var v any
var ok bool

// check opts set from CacheMap operation
for _, res := range op.cacheRes {
if res.Opts == nil {
continue
}
v, ok = res.Opts[k]
if ok {
break
}
}
for k := range keySet {
if v, ok := res.Opts[k]; ok {
values[k] = v
delete(keySet, k)
if len(keySet) == 0 {
return true
}

// check opts set during cache load
if !ok && op.loadCacheOpts != nil {
v, ok = op.loadCacheOpts[k]
}

if ok {
values[k] = v
delete(keySet, k)
if len(keySet) == 0 {
return true
}
}
}
return !includeAncestors // stop after the first state unless includeAncestors is true

return !includeAncestors // stop after the first op unless includeAncestors is true
})
return values
})
}

func walkAncestors(ctx context.Context, start *state, f func(*state) bool) {
stack := [][]*state{{start}}
func walkAncestors(ctx context.Context, start *sharedOp, f func(*sharedOp) bool) {
stack := [][]*sharedOp{{start}}
cache := make(map[digest.Digest]struct{})
for len(stack) > 0 {
sts := stack[len(stack)-1]
if len(sts) == 0 {
ops := stack[len(stack)-1]
if len(ops) == 0 {
stack = stack[:len(stack)-1]
continue
}
st := sts[len(sts)-1]
stack[len(stack)-1] = sts[:len(sts)-1]
if st == nil {
op := ops[len(ops)-1]
stack[len(stack)-1] = ops[:len(ops)-1]
if op == nil {
continue
}
if _, ok := cache[st.origDigest]; ok {
if _, ok := cache[op.st.origDigest]; ok {
continue
}
cache[st.origDigest] = struct{}{}
if shouldStop := f(st); shouldStop {
cache[op.st.origDigest] = struct{}{}
if shouldStop := f(op); shouldStop {
return
}
stack = append(stack, []*state{})
for _, parentDgst := range st.clientVertex.Inputs {
st.solver.mu.RLock()
parent := st.solver.actives[parentDgst]
st.solver.mu.RUnlock()
stack = append(stack, []*sharedOp{})
for _, parentDgst := range op.st.clientVertex.Inputs {
op.st.solver.mu.RLock()
parent := op.st.solver.actives[parentDgst]
op.st.solver.mu.RUnlock()
if parent == nil {
bklog.G(ctx).Warnf("parent %q not found in active job list during cache opt search", parentDgst)
continue
}
stack[len(stack)-1] = append(stack[len(stack)-1], parent)
stack[len(stack)-1] = append(stack[len(stack)-1], parent.op)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion solver/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (e *exporter) ExportTo(ctx context.Context, t CacheExporterTarget, opt Cach
if e.edge != nil {
op, ok := e.edge.op.(*sharedOp)
sipsma marked this conversation as resolved.
Show resolved Hide resolved
if ok && op != nil && op.st != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

op.st != nil not required anymore (or wasn't ever)

ctx = withAncestorCacheOpts(ctx, op.st)
ctx = withAncestorCacheOpts(ctx, op)
}
}

Expand Down
19 changes: 12 additions & 7 deletions solver/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -869,6 +869,8 @@ type sharedOp struct {
cacheDone bool
cacheErr error

loadCacheOpts CacheOpts

slowMu sync.Mutex
slowCacheRes map[Index]digest.Digest
slowCacheErr map[Index]error
Expand All @@ -879,18 +881,18 @@ func (s *sharedOp) IgnoreCache() bool {
}

func (s *sharedOp) Cache() CacheManager {
return &cacheWithCacheOpts{s.st.combinedCacheManager(), s.st}
return &cacheWithCacheOpts{s.st.combinedCacheManager(), s}
}

type cacheWithCacheOpts struct {
CacheManager
st *state
op *sharedOp
}

func (c cacheWithCacheOpts) Records(ctx context.Context, ck *CacheKey) ([]*CacheRecord, error) {
// Allow Records accessing to cache opts through ctx. This enable to use remote provider
// during checking the cache existence.
return c.CacheManager.Records(withAncestorCacheOpts(ctx, c.st), ck)
return c.CacheManager.Records(withAncestorCacheOpts(ctx, c.op), ck)
}

func (s *sharedOp) LoadCache(ctx context.Context, rec *CacheRecord) (Result, error) {
Expand All @@ -901,9 +903,12 @@ func (s *sharedOp) LoadCache(ctx context.Context, rec *CacheRecord) (Result, err
// no cache hit. start evaluating the node
span, ctx := tracing.StartSpan(ctx, "load cache: "+s.st.vtx.Name(), trace.WithAttributes(attribute.String("vertex", s.st.vtx.Digest().String())))
notifyCompleted := notifyStarted(ctx, &s.st.clientVertex, true)
res, err := s.Cache().Load(withAncestorCacheOpts(ctx, s.st), rec)
res, err := s.Cache().Load(withAncestorCacheOpts(ctx, s), rec)
tracing.FinishWithError(span, err)
notifyCompleted(err, true)
if err == nil {
s.loadCacheOpts = res.CacheOpts()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we just use interface detection on res.Sys() in here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe Load() should return it separately? Something seems off about having CacheOpts() method in the Result interface.

}
return res, err
}

Expand Down Expand Up @@ -952,7 +957,7 @@ func (s *sharedOp) CalcSlowCache(ctx context.Context, index Index, p PreprocessF
if s.st.mspan.Span != nil {
ctx = trace.ContextWithSpan(ctx, s.st.mspan)
}
key, err = f(withAncestorCacheOpts(ctx, s.st), res, s.st)
key, err = f(withAncestorCacheOpts(ctx, s), res, s.st)
}
if err != nil {
select {
Expand Down Expand Up @@ -1008,7 +1013,7 @@ func (s *sharedOp) CacheMap(ctx context.Context, index int) (resp *cacheMapResp,
if s.st.mspan.Span != nil {
ctx = trace.ContextWithSpan(ctx, s.st.mspan)
}
ctx = withAncestorCacheOpts(ctx, s.st)
ctx = withAncestorCacheOpts(ctx, s)
if len(s.st.vtx.Inputs()) == 0 {
// no cache hit. start evaluating the node
span, ctx := tracing.StartSpan(ctx, "cache request: "+s.st.vtx.Name(), trace.WithAttributes(attribute.String("vertex", s.st.vtx.Digest().String())))
Expand Down Expand Up @@ -1087,7 +1092,7 @@ func (s *sharedOp) Exec(ctx context.Context, inputs []Result) (outputs []Result,
if s.st.mspan.Span != nil {
ctx = trace.ContextWithSpan(ctx, s.st.mspan)
}
ctx = withAncestorCacheOpts(ctx, s.st)
ctx = withAncestorCacheOpts(ctx, s)

// no cache hit. start evaluating the node
span, ctx := tracing.StartSpan(ctx, s.st.vtx.Name(), trace.WithAttributes(attribute.String("vertex", s.st.vtx.Digest().String())))
Expand Down
6 changes: 3 additions & 3 deletions solver/llbsolver/mounts/mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,9 @@ func (g *cacheRefGetter) getRefCacheDirNoCache(ctx context.Context, key string,
var needsRemoteProviders cache.NeedsRemoteProviderError
if errors.As(err, &needsRemoteProviders) && ref != nil {
sipsma marked this conversation as resolved.
Show resolved Hide resolved
descHandlers := cache.DescHandlers(make(map[digest.Digest]*cache.DescHandler))
for _, dgst := range needsRemoteProviders {
if handler := ref.DescHandler(dgst); handler != nil {
descHandlers[dgst] = handler
for _, dgstDescPair := range needsRemoteProviders {
if handler := ref.DescHandler(dgstDescPair.Digest); handler != nil {
descHandlers[dgstDescPair.Digest] = handler
}
}
mRef, err = g.cm.GetMutable(ctx, si.ID(), descHandlers)
Expand Down
1 change: 1 addition & 0 deletions solver/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3947,6 +3947,7 @@ func (r *dummyResult) ID() string { return r.id }
func (r *dummyResult) Release(context.Context) error { return nil }
func (r *dummyResult) Sys() interface{} { return r }
func (r *dummyResult) Clone() Result { return r }
func (r *dummyResult) CacheOpts() CacheOpts { return nil }

func testOpResolver(v Vertex, b Builder) (Op, error) {
if op, ok := v.Sys().(Op); ok {
Expand Down
1 change: 1 addition & 0 deletions solver/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type Result interface {
Release(context.Context) error
Sys() interface{}
Clone() Result
CacheOpts() CacheOpts
}

// CachedResult is a result connected with its cache key
Expand Down
4 changes: 2 additions & 2 deletions worker/base/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,8 @@ func (w *Worker) LoadRef(ctx context.Context, id string, hidden bool) (cache.Imm
if errors.As(err, &needsRemoteProviders) {
if optGetter := solver.CacheOptGetterOf(ctx); optGetter != nil {
var keys []interface{}
for _, dgst := range needsRemoteProviders {
keys = append(keys, cache.DescHandlerKey(dgst))
for _, dgstDescPair := range needsRemoteProviders {
keys = append(keys, cache.DescHandlerKey(dgstDescPair.Digest))
}
descHandlers := cache.DescHandlers(make(map[digest.Digest]*cache.DescHandler))
for k, v := range optGetter(true, keys...) {
Expand Down
10 changes: 10 additions & 0 deletions worker/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,16 @@ func (wr *WorkerRef) Release(ctx context.Context) error {
return wr.ImmutableRef.Release(ctx)
}

func (wr *WorkerRef) CacheOpts() solver.CacheOpts {
opts := solver.CacheOpts{}
if wr.ImmutableRef != nil {
for k, v := range wr.ImmutableRef.DescHandlers() {
opts[cache.DescHandlerKey(k)] = v
}
}
return opts
}

// GetRemotes method abstracts ImmutableRef's GetRemotes to allow a Worker to override.
// This is needed for moby integration.
// Use this method instead of calling ImmutableRef.GetRemotes() directly.
Expand Down