Skip to content

Commit 4deff20

Browse files
committed
Add SpliceBlob support
This new rpc allows clients to upload a large blob in chunks (potentially in parallel), and then ask the server to join those chunks into a new large blob. The SplitBlob rpc is not yet supported, I am waiting for details of a common chunking algorithm to be decided. bazelbuild/remote-apis#282
1 parent f29a05c commit 4deff20

File tree

3 files changed

+559
-1
lines changed

3 files changed

+559
-1
lines changed

server/grpc.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ const (
3737
const grpcHealthServiceName = "/grpc.health.v1.Health/Check"
3838

3939
type grpcServer struct {
40-
pb.UnimplementedContentAddressableStorageServer
4140
cache disk.Cache
4241
accessLogger cache.Logger
4342
errorLogger cache.Logger
@@ -131,6 +130,8 @@ func (s *grpcServer) GetCapabilities(ctx context.Context,
131130
SupportedCompressors: []pb.Compressor_Value{pb.Compressor_ZSTD},
132131
SupportedBatchUpdateCompressors: []pb.Compressor_Value{pb.Compressor_ZSTD},
133132
MaxCasBlobSizeBytes: s.maxCasBlobSizeBytes,
133+
BlobSpliceSupport: true,
134+
BlobSplitSupport: false,
134135
},
135136
LowApiVersion: &semver.SemVer{Major: int32(2)},
136137
HighApiVersion: &semver.SemVer{Major: int32(2), Minor: int32(3)},

server/grpc_cas.go

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
pb "github.com/buchgr/bazel-remote/v2/genproto/build/bazel/remote/execution/v2"
1616

1717
"github.com/buchgr/bazel-remote/v2/cache"
18+
"github.com/buchgr/bazel-remote/v2/utils/validate"
1819
)
1920

2021
var (
@@ -374,3 +375,153 @@ func (s *grpcServer) fillDirectories(ctx context.Context, resp *pb.GetTreeRespon
374375

375376
return nil
376377
}
378+
379+
func (s *grpcServer) SpliceBlob(ctx context.Context, req *pb.SpliceBlobRequest) (*pb.SpliceBlobResponse, error) {
380+
381+
if req == nil {
382+
return nil, grpc_status.Errorf(codes.InvalidArgument,
383+
"SpliceBlob called with nil SpliceBlobRequest")
384+
}
385+
386+
if req.BlobDigest == nil {
387+
return nil, grpc_status.Errorf(codes.InvalidArgument,
388+
"SpliceBlob called with nil SpliceBlobRequest.BlobDigest")
389+
}
390+
391+
if req.BlobDigest.SizeBytes == 0 || req.BlobDigest.Hash == emptySha256 {
392+
return nil, grpc_status.Errorf(codes.InvalidArgument,
393+
"SpliceBlob called to create the empty blob?")
394+
}
395+
396+
if req.BlobDigest.SizeBytes < 0 {
397+
return nil, grpc_status.Errorf(codes.InvalidArgument,
398+
"SpliceBlob called with negative SpliceBlobRequest.BlobDigest.SizeBytes")
399+
}
400+
401+
if s.maxCasBlobSizeBytes > 0 && req.BlobDigest.SizeBytes > s.maxCasBlobSizeBytes {
402+
return nil, grpc_status.Errorf(codes.InvalidArgument,
403+
"SpliceBlob called to create blob with size %d, which is greater than the max configured blob size %d",
404+
req.BlobDigest.SizeBytes, s.maxCasBlobSizeBytes)
405+
}
406+
407+
if !validate.HashKeyRegex.MatchString(req.BlobDigest.Hash) {
408+
return nil, grpc_status.Errorf(codes.InvalidArgument,
409+
"SpliceBlob called with invalid SpliceBlobRequest.BlobDigest.Hash: %s",
410+
req.BlobDigest.Hash)
411+
}
412+
413+
if len(req.ChunkDigests) == 0 {
414+
return nil, grpc_status.Errorf(codes.InvalidArgument,
415+
"SpliceBlob called with no SpliceBlobRequest.ChunkDigests")
416+
}
417+
418+
chunkTotal := int64(0)
419+
for _, chunkDigest := range req.ChunkDigests {
420+
if chunkDigest == nil {
421+
return nil, grpc_status.Errorf(codes.InvalidArgument,
422+
"SpliceBlob called with a nil value in SpliceBlobRequest.ChunkDigests")
423+
}
424+
425+
if chunkDigest.SizeBytes < 0 {
426+
return nil, grpc_status.Errorf(codes.InvalidArgument,
427+
"SpliceBlob called with a negative Digest in SpliceBlobRequest.ChunkDigests")
428+
}
429+
430+
if chunkDigest.SizeBytes == 0 || chunkDigest.Hash == emptySha256 {
431+
return nil, grpc_status.Errorf(codes.InvalidArgument,
432+
"SpliceBlob called with an empty blob in SpliceBlobRequest.ChunkDigests")
433+
}
434+
435+
// chunkDigest.SizeBytes must be positive if we reached this point.
436+
// Add it to chunkTotal (which then must be positive, unless there
437+
// was an overflow).
438+
439+
chunkTotal += chunkDigest.SizeBytes
440+
441+
if chunkTotal <= 0 {
442+
return nil, grpc_status.Errorf(codes.InvalidArgument,
443+
"Overflow in SpliceBlobRequest.ChunkDigests, does not match SpliceBlobRequest.BlobDigest.SizeBytes")
444+
}
445+
}
446+
447+
if chunkTotal != req.BlobDigest.SizeBytes {
448+
return nil, grpc_status.Errorf(codes.InvalidArgument,
449+
"SpliceBlob called with SpliceBlobRequest.ChunkDigests sizes that don't match SpliceBlobRequest.BlobDigest.SizeBytes")
450+
}
451+
452+
pr, pw := io.Pipe()
453+
writerResultChan := make(chan error, 1)
454+
455+
go func() {
456+
defer pw.Close()
457+
458+
for _, chunkDigest := range req.ChunkDigests {
459+
rc, _, err := s.cache.Get(ctx, cache.CAS, chunkDigest.Hash, chunkDigest.SizeBytes, 0)
460+
if err != nil {
461+
rc.Close()
462+
writerResultChan <- grpc_status.Errorf(codes.Unknown,
463+
"SpliceBlob failed to get chunk %s/%d: %s",
464+
chunkDigest.Hash, chunkDigest.SizeBytes, err)
465+
return
466+
}
467+
468+
if rc == nil {
469+
writerResultChan <- grpc_status.Errorf(codes.NotFound,
470+
"SpliceBlob called with nonexistent blob: %s/%d",
471+
chunkDigest.Hash, chunkDigest.SizeBytes)
472+
return
473+
}
474+
475+
// We can assume that the size returned by s.cache.Get equals chunkDigest.SizeBytes,
476+
// because we checked that is was not -1 in the chunkTotal check performed earlier.
477+
478+
copiedBytes, err := io.Copy(pw, rc)
479+
if err != nil {
480+
rc.Close()
481+
writerResultChan <- grpc_status.Errorf(codes.Unknown,
482+
"SpliceBlob failed to copy chunk %s/%d: %s",
483+
chunkDigest.Hash, chunkDigest.SizeBytes, err)
484+
return
485+
}
486+
487+
if copiedBytes != chunkDigest.SizeBytes {
488+
rc.Close()
489+
writerResultChan <- grpc_status.Errorf(codes.Unknown,
490+
"SpliceBlob copied unpexpected number of bytes (%d) from chunk %s/%d",
491+
copiedBytes, chunkDigest.Hash, chunkDigest.SizeBytes)
492+
return
493+
}
494+
495+
rc.Close()
496+
}
497+
498+
writerResultChan <- nil
499+
}()
500+
501+
err := s.cache.Put(ctx, cache.CAS, req.BlobDigest.Hash, req.BlobDigest.SizeBytes, pr)
502+
if err != nil {
503+
504+
select {
505+
case writerErr, ok := <-writerResultChan:
506+
if ok && writerErr != nil {
507+
// Return the more specific writerErr.
508+
return nil, writerErr
509+
}
510+
default:
511+
}
512+
513+
return nil, grpc_status.Errorf(codes.Unknown,
514+
"Failed to splice blob %s/%d: %s",
515+
req.BlobDigest.Hash, req.BlobDigest.SizeBytes, err)
516+
}
517+
518+
resp := pb.SpliceBlobResponse{
519+
BlobDigest: req.BlobDigest,
520+
}
521+
522+
return &resp, nil
523+
}
524+
525+
func (s *grpcServer) SplitBlob(ctx context.Context, req *pb.SplitBlobRequest) (*pb.SplitBlobResponse, error) {
526+
return nil, grpc_status.Errorf(codes.Unimplemented, "method SplitBlob not implemented")
527+
}

0 commit comments

Comments
 (0)