@@ -15,6 +15,7 @@ import (
1515 pb "github.com/buchgr/bazel-remote/v2/genproto/build/bazel/remote/execution/v2"
1616
1717 "github.com/buchgr/bazel-remote/v2/cache"
18+ "github.com/buchgr/bazel-remote/v2/utils/validate"
1819)
1920
2021var (
@@ -374,3 +375,153 @@ func (s *grpcServer) fillDirectories(ctx context.Context, resp *pb.GetTreeRespon
374375
375376 return nil
376377}
378+
379+ func (s * grpcServer ) SpliceBlob (ctx context.Context , req * pb.SpliceBlobRequest ) (* pb.SpliceBlobResponse , error ) {
380+
381+ if req == nil {
382+ return nil , grpc_status .Errorf (codes .InvalidArgument ,
383+ "SpliceBlob called with nil SpliceBlobRequest" )
384+ }
385+
386+ if req .BlobDigest == nil {
387+ return nil , grpc_status .Errorf (codes .InvalidArgument ,
388+ "SpliceBlob called with nil SpliceBlobRequest.BlobDigest" )
389+ }
390+
391+ if req .BlobDigest .SizeBytes == 0 || req .BlobDigest .Hash == emptySha256 {
392+ return nil , grpc_status .Errorf (codes .InvalidArgument ,
393+ "SpliceBlob called to create the empty blob?" )
394+ }
395+
396+ if req .BlobDigest .SizeBytes < 0 {
397+ return nil , grpc_status .Errorf (codes .InvalidArgument ,
398+ "SpliceBlob called with negative SpliceBlobRequest.BlobDigest.SizeBytes" )
399+ }
400+
401+ if s .maxCasBlobSizeBytes > 0 && req .BlobDigest .SizeBytes > s .maxCasBlobSizeBytes {
402+ return nil , grpc_status .Errorf (codes .InvalidArgument ,
403+ "SpliceBlob called to create blob with size %d, which is greater than the max configured blob size %d" ,
404+ req .BlobDigest .SizeBytes , s .maxCasBlobSizeBytes )
405+ }
406+
407+ if ! validate .HashKeyRegex .MatchString (req .BlobDigest .Hash ) {
408+ return nil , grpc_status .Errorf (codes .InvalidArgument ,
409+ "SpliceBlob called with invalid SpliceBlobRequest.BlobDigest.Hash: %s" ,
410+ req .BlobDigest .Hash )
411+ }
412+
413+ if len (req .ChunkDigests ) == 0 {
414+ return nil , grpc_status .Errorf (codes .InvalidArgument ,
415+ "SpliceBlob called with no SpliceBlobRequest.ChunkDigests" )
416+ }
417+
418+ chunkTotal := int64 (0 )
419+ for _ , chunkDigest := range req .ChunkDigests {
420+ if chunkDigest == nil {
421+ return nil , grpc_status .Errorf (codes .InvalidArgument ,
422+ "SpliceBlob called with a nil value in SpliceBlobRequest.ChunkDigests" )
423+ }
424+
425+ if chunkDigest .SizeBytes < 0 {
426+ return nil , grpc_status .Errorf (codes .InvalidArgument ,
427+ "SpliceBlob called with a negative Digest in SpliceBlobRequest.ChunkDigests" )
428+ }
429+
430+ if chunkDigest .SizeBytes == 0 || chunkDigest .Hash == emptySha256 {
431+ return nil , grpc_status .Errorf (codes .InvalidArgument ,
432+ "SpliceBlob called with an empty blob in SpliceBlobRequest.ChunkDigests" )
433+ }
434+
435+ // chunkDigest.SizeBytes must be positive if we reached this point.
436+ // Add it to chunkTotal (which then must be positive, unless there
437+ // was an overflow).
438+
439+ chunkTotal += chunkDigest .SizeBytes
440+
441+ if chunkTotal <= 0 {
442+ return nil , grpc_status .Errorf (codes .InvalidArgument ,
443+ "Overflow in SpliceBlobRequest.ChunkDigests, does not match SpliceBlobRequest.BlobDigest.SizeBytes" )
444+ }
445+ }
446+
447+ if chunkTotal != req .BlobDigest .SizeBytes {
448+ return nil , grpc_status .Errorf (codes .InvalidArgument ,
449+ "SpliceBlob called with SpliceBlobRequest.ChunkDigests sizes that don't match SpliceBlobRequest.BlobDigest.SizeBytes" )
450+ }
451+
452+ pr , pw := io .Pipe ()
453+ writerResultChan := make (chan error , 1 )
454+
455+ go func () {
456+ defer pw .Close ()
457+
458+ for _ , chunkDigest := range req .ChunkDigests {
459+ rc , _ , err := s .cache .Get (ctx , cache .CAS , chunkDigest .Hash , chunkDigest .SizeBytes , 0 )
460+ if err != nil {
461+ rc .Close ()
462+ writerResultChan <- grpc_status .Errorf (codes .Unknown ,
463+ "SpliceBlob failed to get chunk %s/%d: %s" ,
464+ chunkDigest .Hash , chunkDigest .SizeBytes , err )
465+ return
466+ }
467+
468+ if rc == nil {
469+ writerResultChan <- grpc_status .Errorf (codes .NotFound ,
470+ "SpliceBlob called with nonexistent blob: %s/%d" ,
471+ chunkDigest .Hash , chunkDigest .SizeBytes )
472+ return
473+ }
474+
475+ // We can assume that the size returned by s.cache.Get equals chunkDigest.SizeBytes,
476+ // because we checked that is was not -1 in the chunkTotal check performed earlier.
477+
478+ copiedBytes , err := io .Copy (pw , rc )
479+ if err != nil {
480+ rc .Close ()
481+ writerResultChan <- grpc_status .Errorf (codes .Unknown ,
482+ "SpliceBlob failed to copy chunk %s/%d: %s" ,
483+ chunkDigest .Hash , chunkDigest .SizeBytes , err )
484+ return
485+ }
486+
487+ if copiedBytes != chunkDigest .SizeBytes {
488+ rc .Close ()
489+ writerResultChan <- grpc_status .Errorf (codes .Unknown ,
490+ "SpliceBlob copied unpexpected number of bytes (%d) from chunk %s/%d" ,
491+ copiedBytes , chunkDigest .Hash , chunkDigest .SizeBytes )
492+ return
493+ }
494+
495+ rc .Close ()
496+ }
497+
498+ writerResultChan <- nil
499+ }()
500+
501+ err := s .cache .Put (ctx , cache .CAS , req .BlobDigest .Hash , req .BlobDigest .SizeBytes , pr )
502+ if err != nil {
503+
504+ select {
505+ case writerErr , ok := <- writerResultChan :
506+ if ok && writerErr != nil {
507+ // Return the more specific writerErr.
508+ return nil , writerErr
509+ }
510+ default :
511+ }
512+
513+ return nil , grpc_status .Errorf (codes .Unknown ,
514+ "Failed to splice blob %s/%d: %s" ,
515+ req .BlobDigest .Hash , req .BlobDigest .SizeBytes , err )
516+ }
517+
518+ resp := pb.SpliceBlobResponse {
519+ BlobDigest : req .BlobDigest ,
520+ }
521+
522+ return & resp , nil
523+ }
524+
525+ func (s * grpcServer ) SplitBlob (ctx context.Context , req * pb.SplitBlobRequest ) (* pb.SplitBlobResponse , error ) {
526+ return nil , grpc_status .Errorf (codes .Unimplemented , "method SplitBlob not implemented" )
527+ }
0 commit comments