diff --git a/cmd/api/api/registry_test.go b/cmd/api/api/registry_test.go new file mode 100644 index 00000000..df602bc6 --- /dev/null +++ b/cmd/api/api/registry_test.go @@ -0,0 +1,603 @@ +package api + +import ( + "context" + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "os" + "strings" + "testing" + "time" + + "github.com/go-chi/chi/v5" + "github.com/google/go-containerregistry/pkg/name" + v1 "github.com/google/go-containerregistry/pkg/v1" + "github.com/google/go-containerregistry/pkg/v1/remote" + "github.com/google/go-containerregistry/pkg/v1/types" + "github.com/onkernel/hypeman/lib/oapi" + "github.com/onkernel/hypeman/lib/paths" + "github.com/onkernel/hypeman/lib/registry" + "github.com/onkernel/hypeman/lib/system" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// setupRegistryTest creates a test service with a mounted OCI registry server. +// Returns the service (for API calls) and the server host (for building push URLs). +func setupRegistryTest(t *testing.T) (*ApiService, string) { + t.Helper() + + svc := newTestService(t) + p := paths.New(svc.Config.DataDir) + + reg, err := registry.New(p, svc.ImageManager) + require.NoError(t, err) + + r := chi.NewRouter() + r.Mount("/v2", reg.Handler()) + + ts := httptest.NewServer(r) + t.Cleanup(ts.Close) + + serverHost := strings.TrimPrefix(ts.URL, "http://") + return svc, serverHost +} + +func TestRegistryPushAndConvert(t *testing.T) { + svc, serverHost := setupRegistryTest(t) + + // Pull a small image from Docker Hub to push to our registry + t.Log("Pulling alpine:latest from Docker Hub...") + srcRef, err := name.ParseReference("docker.io/library/alpine:latest") + require.NoError(t, err) + + img, err := remote.Image(srcRef) + require.NoError(t, err) + + digest, err := img.Digest() + require.NoError(t, err) + t.Logf("Source image digest: %s", digest.String()) + + // Push to our test registry using digest reference + targetRef := serverHost + "/test/alpine@" + digest.String() + t.Logf("Pushing to %s...", targetRef) + + dstRef, err := name.ParseReference(targetRef, name.Insecure) + require.NoError(t, err) + + err = remote.Write(dstRef, img) + require.NoError(t, err) + t.Log("Push successful!") + + // Wait for image to be converted + imageName := "test/alpine@" + digest.String() + imgResp := waitForImageReady(t, svc, imageName, 60*time.Second) + assert.NotNil(t, imgResp.SizeBytes, "ready image should have size") +} + +func TestRegistryVersionCheck(t *testing.T) { + _, serverHost := setupRegistryTest(t) + + // Test /v2/ endpoint (version check) + resp, err := http.Get("http://" + serverHost + "/v2/") + require.NoError(t, err) + defer resp.Body.Close() + + // OCI Distribution Spec requires 200 OK for version check + assert.Equal(t, http.StatusOK, resp.StatusCode) +} + +func TestRegistryPushAndCreateInstance(t *testing.T) { + // This is a full e2e test that requires KVM access + if _, err := os.Stat("/dev/kvm"); os.IsNotExist(err) { + t.Skip("/dev/kvm not available - skipping VM creation test") + } + + svc, serverHost := setupRegistryTest(t) + + // Ensure system files for VM creation + p := paths.New(svc.Config.DataDir) + systemMgr := system.NewManager(p) + err := systemMgr.EnsureSystemFiles(context.Background()) + require.NoError(t, err) + + // Pull and push alpine + t.Log("Pulling alpine:latest...") + srcRef, err := name.ParseReference("docker.io/library/alpine:latest") + require.NoError(t, err) + + img, err := remote.Image(srcRef) + require.NoError(t, err) + + digest, err := img.Digest() + require.NoError(t, err) + + targetRef := serverHost + "/test/alpine@" + digest.String() + dstRef, err := name.ParseReference(targetRef, name.Insecure) + require.NoError(t, err) + + t.Log("Pushing to test registry...") + err = remote.Write(dstRef, img) + require.NoError(t, err) + + // Wait for image to be ready + imageName := "test/alpine@" + digest.String() + waitForImageReady(t, svc, imageName, 60*time.Second) + + // Create instance with pushed image + t.Log("Creating instance with pushed image...") + networkEnabled := false + resp, err := svc.CreateInstance(ctx(), oapi.CreateInstanceRequestObject{ + Body: &oapi.CreateInstanceRequest{ + Name: "test-pushed-image", + Image: imageName, + Network: &struct { + Enabled *bool `json:"enabled,omitempty"` + }{ + Enabled: &networkEnabled, + }, + }, + }) + require.NoError(t, err) + + created, ok := resp.(oapi.CreateInstance201JSONResponse) + require.True(t, ok, "expected 201 response, got %T", resp) + + instance := oapi.Instance(created) + assert.Equal(t, "test-pushed-image", instance.Name) + t.Logf("Instance created: %s (state: %s)", instance.Id, instance.State) + + // Verify instance reaches Running state + deadline := time.Now().Add(30 * time.Second) + for time.Now().Before(deadline) { + resp, _ := svc.GetInstance(ctx(), oapi.GetInstanceRequestObject{Id: instance.Id}) + if inst, ok := resp.(oapi.GetInstance200JSONResponse); ok { + if inst.State == "Running" { + t.Log("Instance is running!") + return // Success! + } + t.Logf("Instance state: %s", inst.State) + } + time.Sleep(1 * time.Second) + } + + t.Fatal("Timeout waiting for instance to reach Running state") +} + +// TestRegistryLayerCaching verifies that pushing the same image twice +// reuses cached layers and doesn't re-upload them. +func TestRegistryLayerCaching(t *testing.T) { + _, serverHost := setupRegistryTest(t) + + // Pull alpine image from Docker Hub + t.Log("Pulling alpine:latest from Docker Hub...") + srcRef, err := name.ParseReference("docker.io/library/alpine:latest") + require.NoError(t, err) + + img, err := remote.Image(srcRef) + require.NoError(t, err) + + digest, err := img.Digest() + require.NoError(t, err) + + // First push - should upload all blobs + t.Log("First push - uploading all layers...") + targetRef := serverHost + "/cache-test/alpine@" + digest.String() + dstRef, err := name.ParseReference(targetRef, name.Insecure) + require.NoError(t, err) + + // Track requests during first push + var firstPushRequests []string + transport := &loggingTransport{ + transport: http.DefaultTransport, + log: func(method, path string) { + firstPushRequests = append(firstPushRequests, method+" "+path) + }, + } + + err = remote.Write(dstRef, img, remote.WithTransport(transport)) + require.NoError(t, err) + + // Count blob uploads in first push + firstPushUploads := 0 + for _, req := range firstPushRequests { + if strings.HasPrefix(req, "PUT ") && strings.Contains(req, "/blobs/uploads/") { + firstPushUploads++ + } + } + t.Logf("First push: %d blob uploads", firstPushUploads) + assert.Greater(t, firstPushUploads, 0, "First push should upload blobs") + + // Second push - should reuse cached blobs + t.Log("Second push - should reuse cached layers...") + var secondPushRequests []string + transport2 := &loggingTransport{ + transport: http.DefaultTransport, + log: func(method, path string) { + secondPushRequests = append(secondPushRequests, method+" "+path) + }, + } + + err = remote.Write(dstRef, img, remote.WithTransport(transport2)) + require.NoError(t, err) + + // Count operations in second push + secondPushUploads := 0 + secondPushManifestHead := 0 + for _, req := range secondPushRequests { + if strings.HasPrefix(req, "PUT ") && strings.Contains(req, "/blobs/uploads/") { + secondPushUploads++ + } + if strings.HasPrefix(req, "HEAD ") && strings.Contains(req, "/manifests/") { + secondPushManifestHead++ + } + } + t.Logf("Second push: %d total requests, %d blob uploads", len(secondPushRequests), secondPushUploads) + + // Second push should: + // 1. Check if manifest exists (HEAD) - if yes, skip everything + // 2. NOT upload any blobs (all cached or manifest already exists) + assert.Greater(t, secondPushManifestHead, 0, "Second push should check if manifest exists") + assert.Equal(t, 0, secondPushUploads, "Second push should NOT upload any blobs (all cached)") + assert.Less(t, len(secondPushRequests), len(firstPushRequests), "Second push should make fewer requests than first") + + t.Logf("Layer caching verified: first push=%d requests, second push=%d requests", len(firstPushRequests), len(secondPushRequests)) + + // Wait for async conversion to complete to avoid cleanup issues + time.Sleep(2 * time.Second) +} + +// TestRegistrySharedLayerCaching verifies that pushing different images +// that share layers reuses the cached shared layers. +func TestRegistrySharedLayerCaching(t *testing.T) { + _, serverHost := setupRegistryTest(t) + + // Pull alpine image (this will be our base) + t.Log("Pulling alpine:latest...") + alpineRef, err := name.ParseReference("docker.io/library/alpine:latest") + require.NoError(t, err) + alpineImg, err := remote.Image(alpineRef) + require.NoError(t, err) + + // Get alpine layers for comparison + alpineLayers, err := alpineImg.Layers() + require.NoError(t, err) + t.Logf("Alpine has %d layers", len(alpineLayers)) + + // Push alpine first + t.Log("Pushing alpine...") + alpineDigest, _ := alpineImg.Digest() + dstRef, err := name.ParseReference(serverHost+"/shared/alpine@"+alpineDigest.String(), name.Insecure) + require.NoError(t, err) + + var firstPushBlobUploads int + transport1 := &loggingTransport{ + transport: http.DefaultTransport, + log: func(method, path string) { + if method == "PUT" && strings.Contains(path, "/blobs/uploads/") { + firstPushBlobUploads++ + } + }, + } + err = remote.Write(dstRef, alpineImg, remote.WithTransport(transport1)) + require.NoError(t, err) + t.Logf("First push (alpine): %d blob uploads", firstPushBlobUploads) + + // Now pull a different alpine-based image (e.g., alpine:3.18) + // which should share the base layer with alpine:latest + t.Log("Pulling alpine:3.18 (shares base layer)...") + alpine318Ref, err := name.ParseReference("docker.io/library/alpine:3.18") + require.NoError(t, err) + alpine318Img, err := remote.Image(alpine318Ref) + require.NoError(t, err) + + alpine318Digest, _ := alpine318Img.Digest() + dstRef2, err := name.ParseReference(serverHost+"/shared/alpine318@"+alpine318Digest.String(), name.Insecure) + require.NoError(t, err) + + var secondPushBlobUploads int + var secondPushBlobHeads int + transport2 := &loggingTransport{ + transport: http.DefaultTransport, + log: func(method, path string) { + if method == "PUT" && strings.Contains(path, "/blobs/uploads/") { + secondPushBlobUploads++ + } + if method == "HEAD" && strings.Contains(path, "/blobs/") { + secondPushBlobHeads++ + } + }, + } + + t.Log("Pushing alpine:3.18...") + err = remote.Write(dstRef2, alpine318Img, remote.WithTransport(transport2)) + require.NoError(t, err) + t.Logf("Second push (alpine:3.18): %d HEAD requests for blobs, %d blob uploads", secondPushBlobHeads, secondPushBlobUploads) + + // If layers are shared and caching works, the second push should upload + // fewer blobs than the total layers in the image (some are cached) + alpine318Layers, _ := alpine318Img.Layers() + t.Logf("Alpine 3.18 has %d layers, uploaded %d", len(alpine318Layers), secondPushBlobUploads) + + // The key assertion: second push should upload fewer blobs than first + // (or equal if they don't share layers, but usually alpine versions share the base) + assert.LessOrEqual(t, secondPushBlobUploads, firstPushBlobUploads, + "Second push should upload same or fewer blobs due to layer sharing") + + // Wait for async conversion + time.Sleep(2 * time.Second) +} + +// TestRegistryTagPush verifies that pushing with a tag reference (not digest) +// correctly triggers conversion. The server computes the digest from the manifest. +func TestRegistryTagPush(t *testing.T) { + svc, serverHost := setupRegistryTest(t) + + // Pull alpine image from Docker Hub + t.Log("Pulling alpine:latest from Docker Hub...") + srcRef, err := name.ParseReference("docker.io/library/alpine:latest") + require.NoError(t, err) + + img, err := remote.Image(srcRef) + require.NoError(t, err) + + digest, err := img.Digest() + require.NoError(t, err) + t.Logf("Source image digest: %s", digest.String()) + + // Push using TAG reference (not digest) - this is the key difference from other tests + targetRef := serverHost + "/tag-test/alpine:latest" + t.Logf("Pushing to %s (tag reference)...", targetRef) + + dstRef, err := name.ParseReference(targetRef, name.Insecure) + require.NoError(t, err) + + err = remote.Write(dstRef, img) + require.NoError(t, err) + t.Log("Push successful!") + + // The image should be registered with the computed digest, not the tag + imageName := "tag-test/alpine@" + digest.String() + waitForImageReady(t, svc, imageName, 60*time.Second) + + // Verify image appears in ListImages (GET /images) + listResp, err := svc.ListImages(ctx(), oapi.ListImagesRequestObject{}) + require.NoError(t, err) + images, ok := listResp.(oapi.ListImages200JSONResponse) + require.True(t, ok, "expected ListImages 200 response") + + var found bool + for _, img := range images { + if img.Digest == digest.String() { + found = true + assert.Equal(t, oapi.Ready, img.Status, "image in list should have Ready status") + assert.NotNil(t, img.SizeBytes, "ready image should have size") + t.Logf("Image found in ListImages: %s (status=%s, size=%d)", img.Name, img.Status, *img.SizeBytes) + break + } + } + assert.True(t, found, "pushed image should appear in ListImages response") +} + +// TestRegistryDockerV2ManifestConversion verifies that pushing an image with a +// Docker v2 manifest (as returned by local Docker daemon) is correctly converted +// to OCI format and the image conversion succeeds. +func TestRegistryDockerV2ManifestConversion(t *testing.T) { + svc, serverHost := setupRegistryTest(t) + + // Pull alpine image from Docker Hub (OCI format) + t.Log("Pulling alpine:latest from Docker Hub...") + srcRef, err := name.ParseReference("docker.io/library/alpine:latest") + require.NoError(t, err) + + img, err := remote.Image(srcRef) + require.NoError(t, err) + + // Wrap the image to simulate Docker v2 format (Docker daemon returns this format) + // This is what happens when using `daemon.Image()` in the CLI + dockerV2Img := &dockerV2ImageWrapper{img: img} + + // Push the Docker v2 formatted image + targetRef := serverHost + "/dockerv2-test/alpine:v1" + t.Logf("Pushing Docker v2 formatted image to %s...", targetRef) + + dstRef, err := name.ParseReference(targetRef, name.Insecure) + require.NoError(t, err) + + err = remote.Write(dstRef, dockerV2Img) + require.NoError(t, err) + t.Log("Push successful!") + + // Wait for image to be converted + // The server converts Docker v2 to OCI format internally, resulting in a different digest + imgResp := waitForImageReady(t, svc, "dockerv2-test/alpine:v1", 60*time.Second) + assert.NotNil(t, imgResp.SizeBytes, "ready image should have size") + assert.NotEmpty(t, imgResp.Digest, "image should have digest") +} + +// dockerV2ImageWrapper wraps an OCI image and returns Docker v2 media types +// to simulate what the Docker daemon returns via daemon.Image() +type dockerV2ImageWrapper struct { + img v1.Image +} + +func (w *dockerV2ImageWrapper) Layers() ([]v1.Layer, error) { + layers, err := w.img.Layers() + if err != nil { + return nil, err + } + // Wrap each layer to return Docker v2 media types + wrapped := make([]v1.Layer, len(layers)) + for i, l := range layers { + wrapped[i] = &dockerV2LayerWrapper{layer: l} + } + return wrapped, nil +} + +func (w *dockerV2ImageWrapper) MediaType() (types.MediaType, error) { + return types.DockerManifestSchema2, nil +} + +func (w *dockerV2ImageWrapper) Size() (int64, error) { + return w.img.Size() +} + +func (w *dockerV2ImageWrapper) ConfigName() (v1.Hash, error) { + return w.img.ConfigName() +} + +func (w *dockerV2ImageWrapper) ConfigFile() (*v1.ConfigFile, error) { + return w.img.ConfigFile() +} + +func (w *dockerV2ImageWrapper) RawConfigFile() ([]byte, error) { + return w.img.RawConfigFile() +} + +func (w *dockerV2ImageWrapper) Digest() (v1.Hash, error) { + // Compute digest of our Docker v2 manifest + rawManifest, err := w.RawManifest() + if err != nil { + return v1.Hash{}, err + } + h, _, err := v1.SHA256(strings.NewReader(string(rawManifest))) + return h, err +} + +func (w *dockerV2ImageWrapper) Manifest() (*v1.Manifest, error) { + origManifest, err := w.img.Manifest() + if err != nil { + return nil, err + } + + // Convert to Docker v2 media types + manifest := &v1.Manifest{ + SchemaVersion: origManifest.SchemaVersion, + MediaType: types.DockerManifestSchema2, + Config: v1.Descriptor{ + MediaType: types.DockerConfigJSON, + Size: origManifest.Config.Size, + Digest: origManifest.Config.Digest, + }, + } + + for _, layer := range origManifest.Layers { + manifest.Layers = append(manifest.Layers, v1.Descriptor{ + MediaType: types.DockerLayer, + Size: layer.Size, + Digest: layer.Digest, + }) + } + + return manifest, nil +} + +func (w *dockerV2ImageWrapper) RawManifest() ([]byte, error) { + manifest, err := w.Manifest() + if err != nil { + return nil, err + } + return json.Marshal(manifest) +} + +func (w *dockerV2ImageWrapper) LayerByDigest(hash v1.Hash) (v1.Layer, error) { + layer, err := w.img.LayerByDigest(hash) + if err != nil { + return nil, err + } + return &dockerV2LayerWrapper{layer: layer}, nil +} + +func (w *dockerV2ImageWrapper) LayerByDiffID(hash v1.Hash) (v1.Layer, error) { + layer, err := w.img.LayerByDiffID(hash) + if err != nil { + return nil, err + } + return &dockerV2LayerWrapper{layer: layer}, nil +} + +// dockerV2LayerWrapper wraps a layer to return Docker v2 media types +type dockerV2LayerWrapper struct { + layer v1.Layer +} + +func (w *dockerV2LayerWrapper) Digest() (v1.Hash, error) { + return w.layer.Digest() +} + +func (w *dockerV2LayerWrapper) DiffID() (v1.Hash, error) { + return w.layer.DiffID() +} + +func (w *dockerV2LayerWrapper) Compressed() (io.ReadCloser, error) { + return w.layer.Compressed() +} + +func (w *dockerV2LayerWrapper) Uncompressed() (io.ReadCloser, error) { + return w.layer.Uncompressed() +} + +func (w *dockerV2LayerWrapper) Size() (int64, error) { + return w.layer.Size() +} + +func (w *dockerV2LayerWrapper) MediaType() (types.MediaType, error) { + return types.DockerLayer, nil +} + +// loggingTransport wraps an http.RoundTripper and logs requests +type loggingTransport struct { + transport http.RoundTripper + log func(method, path string) +} + +func (t *loggingTransport) RoundTrip(req *http.Request) (*http.Response, error) { + t.log(req.Method, req.URL.Path) + return t.transport.RoundTrip(req) +} + +// waitForImageReady polls GetImage until the image reaches Ready status. +// Returns the image response on success, fails the test on error or timeout. +func waitForImageReady(t *testing.T, svc *ApiService, imageName string, timeout time.Duration) oapi.GetImage200JSONResponse { + t.Helper() + t.Logf("Waiting for image %s to be ready...", imageName) + + deadline := time.Now().Add(timeout) + var lastStatus oapi.ImageStatus + var lastError string + + for time.Now().Before(deadline) { + resp, err := svc.GetImage(ctx(), oapi.GetImageRequestObject{Name: imageName}) + if err != nil { + time.Sleep(1 * time.Second) + continue + } + + imgResp, ok := resp.(oapi.GetImage200JSONResponse) + if !ok { + time.Sleep(1 * time.Second) + continue + } + + lastStatus = imgResp.Status + if imgResp.Error != nil { + lastError = *imgResp.Error + } + + switch imgResp.Status { + case oapi.Ready: + t.Logf("Image ready: %s (digest=%s)", imgResp.Name, imgResp.Digest) + return imgResp + case oapi.Failed: + t.Fatalf("Image conversion failed: %s", lastError) + default: + t.Logf("Image status: %s", imgResp.Status) + } + time.Sleep(2 * time.Second) + } + + t.Fatalf("Timeout waiting for image %s. Last status: %s, error: %s", imageName, lastStatus, lastError) + return oapi.GetImage200JSONResponse{} +} diff --git a/cmd/api/api/volumes_test.go b/cmd/api/api/volumes_test.go index 59121ea6..5b26d1d8 100644 --- a/cmd/api/api/volumes_test.go +++ b/cmd/api/api/volumes_test.go @@ -37,7 +37,7 @@ func TestGetVolume_ByName(t *testing.T) { // Create a volume createResp, err := svc.CreateVolume(ctx(), oapi.CreateVolumeRequestObject{ - Body: &oapi.CreateVolumeRequest{ + JSONBody: &oapi.CreateVolumeRequest{ Name: "my-data", SizeGb: 1, }, @@ -62,7 +62,7 @@ func TestDeleteVolume_ByName(t *testing.T) { // Create a volume _, err := svc.CreateVolume(ctx(), oapi.CreateVolumeRequestObject{ - Body: &oapi.CreateVolumeRequest{ + JSONBody: &oapi.CreateVolumeRequest{ Name: "to-delete", SizeGb: 1, }, diff --git a/cmd/api/main.go b/cmd/api/main.go index 3951d7b7..005a031a 100644 --- a/cmd/api/main.go +++ b/cmd/api/main.go @@ -108,6 +108,12 @@ func run() error { logger.Warn("JWT_SECRET not configured - API authentication will fail") } + // Verify KVM access (required for VM creation) + if err := checkKVMAccess(); err != nil { + return fmt.Errorf("KVM access check failed: %w\n\nEnsure:\n 1. KVM is enabled (check /dev/kvm exists)\n 2. User is in 'kvm' group: sudo usermod -aG kvm $USER\n 3. Log out and back in, or use: newgrp kvm", err) + } + logger.Info("KVM access verified") + // Validate log rotation config var logMaxSize datasize.ByteSize if err := logMaxSize.UnmarshalText([]byte(app.Config.LogMaxSize)); err != nil { @@ -178,6 +184,16 @@ func run() error { mw.JwtAuth(app.Config.JwtSecret), ).Get("/instances/{id}/exec", app.ApiService.ExecHandler) + // OCI Distribution registry endpoints for image push (outside OpenAPI spec) + r.Route("/v2", func(r chi.Router) { + r.Use(middleware.RequestID) + r.Use(middleware.RealIP) + r.Use(middleware.Logger) + r.Use(middleware.Recoverer) + r.Use(mw.JwtAuth(app.Config.JwtSecret)) + r.Mount("/", app.Registry.Handler()) + }) + // Authenticated API endpoints r.Group(func(r chi.Router) { // Common middleware @@ -323,3 +339,19 @@ func getRunningInstanceIDs(app *application) []string { } return running } + +// checkKVMAccess verifies KVM is available and the user has permission to use it +func checkKVMAccess() error { + f, err := os.OpenFile("/dev/kvm", os.O_RDWR, 0) + if err != nil { + if os.IsNotExist(err) { + return fmt.Errorf("/dev/kvm not found - KVM not enabled or not supported") + } + if os.IsPermission(err) { + return fmt.Errorf("permission denied accessing /dev/kvm - user not in 'kvm' group") + } + return fmt.Errorf("cannot access /dev/kvm: %w", err) + } + f.Close() + return nil +} diff --git a/cmd/api/wire.go b/cmd/api/wire.go index ed881ee9..60b447b5 100644 --- a/cmd/api/wire.go +++ b/cmd/api/wire.go @@ -1,4 +1,4 @@ -// +build wireinject +//go:build wireinject package main @@ -13,6 +13,7 @@ import ( "github.com/onkernel/hypeman/lib/instances" "github.com/onkernel/hypeman/lib/network" "github.com/onkernel/hypeman/lib/providers" + "github.com/onkernel/hypeman/lib/registry" "github.com/onkernel/hypeman/lib/system" "github.com/onkernel/hypeman/lib/volumes" ) @@ -27,6 +28,7 @@ type application struct { NetworkManager network.Manager InstanceManager instances.Manager VolumeManager volumes.Manager + Registry *registry.Registry ApiService *api.ApiService } @@ -42,8 +44,8 @@ func initializeApp() (*application, func(), error) { providers.ProvideNetworkManager, providers.ProvideInstanceManager, providers.ProvideVolumeManager, + providers.ProvideRegistry, api.New, wire.Struct(new(application), "*"), )) } - diff --git a/cmd/api/wire_gen.go b/cmd/api/wire_gen.go index 617a0cd0..d36a94ae 100644 --- a/cmd/api/wire_gen.go +++ b/cmd/api/wire_gen.go @@ -14,6 +14,7 @@ import ( "github.com/onkernel/hypeman/lib/instances" "github.com/onkernel/hypeman/lib/network" "github.com/onkernel/hypeman/lib/providers" + "github.com/onkernel/hypeman/lib/registry" "github.com/onkernel/hypeman/lib/system" "github.com/onkernel/hypeman/lib/volumes" "log/slog" @@ -45,6 +46,10 @@ func initializeApp() (*application, func(), error) { if err != nil { return nil, nil, err } + registry, err := providers.ProvideRegistry(paths, manager) + if err != nil { + return nil, nil, err + } apiService := api.New(config, manager, instancesManager, volumesManager, networkManager) mainApplication := &application{ Ctx: context, @@ -55,6 +60,7 @@ func initializeApp() (*application, func(), error) { NetworkManager: networkManager, InstanceManager: instancesManager, VolumeManager: volumesManager, + Registry: registry, ApiService: apiService, } return mainApplication, func() { @@ -73,5 +79,6 @@ type application struct { NetworkManager network.Manager InstanceManager instances.Manager VolumeManager volumes.Manager + Registry *registry.Registry ApiService *api.ApiService } diff --git a/go.sum b/go.sum index e1226b84..466b211f 100644 --- a/go.sum +++ b/go.sum @@ -71,6 +71,8 @@ github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/go-containerregistry v0.20.6 h1:cvWX87UxxLgaH76b4hIvya6Dzz9qHB31qAwjAohdSTU= github.com/google/go-containerregistry v0.20.6/go.mod h1:T0x8MuoAoKX/873bkeSfLD2FAkwCDf9/HZgsFJ02E2Y= +github.com/google/subcommands v1.2.0 h1:vWQspBTo2nEqTUFita5/KeEWlUL8kQObDFbub/EN9oE= +github.com/google/subcommands v1.2.0/go.mod h1:ZjhPrFU+Olkh9WazFPsl27BQ4UPiG37m3yTrtFlrHVk= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -232,6 +234,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20190426145343-a29dc8fdc734/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.43.0 h1:dduJYIi3A3KOfdGOHX8AVZ/jGiyPa3IbBozJ5kNuE04= golang.org/x/crypto v0.43.0/go.mod h1:BFbav4mRNlXJL4wNeejLpWxB7wMbc79PdRGhWKncxR0= +golang.org/x/mod v0.28.0 h1:gQBtGhjxykdjY9YhZpSlZIsbnaE2+PgjfLWUQTnoZ1U= +golang.org/x/mod v0.28.0/go.mod h1:yfB/L0NOf/kmEbXjzCPOx1iK1fRutOydrCMsqRhEBxI= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -256,6 +260,8 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.30.0 h1:yznKA/E9zq54KzlzBEAWn1NXSQ8DIp/NYMy88xJjl4k= golang.org/x/text v0.30.0/go.mod h1:yDdHFIX9t+tORqspjENWgzaCVXgk0yYnYuSZ8UzzBVM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.37.0 h1:DVSRzp7FwePZW356yEAChSdNcQo6Nsp+fex1SUW09lE= +golang.org/x/tools v0.37.0/go.mod h1:MBN5QPQtLMHVdvsbtarmTNukZDdgwdwlO5qGacAzF0w= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= diff --git a/lib/images/manager.go b/lib/images/manager.go index b637bce5..639de5e8 100644 --- a/lib/images/manager.go +++ b/lib/images/manager.go @@ -6,6 +6,7 @@ import ( "os" "path/filepath" "sort" + "strings" "sync" "time" @@ -24,6 +25,9 @@ const ( type Manager interface { ListImages(ctx context.Context) ([]Image, error) CreateImage(ctx context.Context, req CreateImageRequest) (*Image, error) + // ImportLocalImage imports an image that was pushed to the local OCI cache. + // Unlike CreateImage, it does not resolve from a remote registry. + ImportLocalImage(ctx context.Context, repo, reference, digest string) (*Image, error) GetImage(ctx context.Context, name string) (*Image, error) DeleteImage(ctx context.Context, name string) error RecoverInterruptedBuilds() @@ -120,6 +124,46 @@ func (m *manager) CreateImage(ctx context.Context, req CreateImageRequest) (*Ima return m.createAndQueueImage(ref) } +// ImportLocalImage imports an image from the local OCI cache without resolving from a remote registry. +// This is used for images that were pushed directly to the hypeman registry. +func (m *manager) ImportLocalImage(ctx context.Context, repo, reference, digest string) (*Image, error) { + // Build the image reference string + var imageRef string + if strings.HasPrefix(reference, "sha256:") { + imageRef = repo + "@" + reference + } else { + imageRef = repo + ":" + reference + } + + // Parse and normalize + normalized, err := ParseNormalizedRef(imageRef) + if err != nil { + return nil, fmt.Errorf("%w: %s", ErrInvalidName, err.Error()) + } + + // Create a ResolvedRef directly with the provided digest + ref := NewResolvedRef(normalized, digest) + + m.createMu.Lock() + defer m.createMu.Unlock() + + // Check if we already have this digest (deduplication) + if meta, err := readMetadata(m.paths, ref.Repository(), ref.DigestHex()); err == nil { + // We have this digest already + if meta.Status == StatusReady && ref.Tag() != "" { + createTagSymlink(m.paths, ref.Repository(), ref.Tag(), ref.DigestHex()) + } + img := meta.toImage() + if meta.Status == StatusPending { + img.QueuePosition = m.queue.GetPosition(meta.Digest) + } + return img, nil + } + + // Don't have this digest yet, queue the build + return m.createAndQueueImage(ref) +} + func (m *manager) createAndQueueImage(ref *ResolvedRef) (*Image, error) { meta := &imageMetadata{ Name: ref.String(), diff --git a/lib/paths/paths.go b/lib/paths/paths.go index 05a566d8..adaf9905 100644 --- a/lib/paths/paths.go +++ b/lib/paths/paths.go @@ -9,6 +9,9 @@ // initrd/{arch}/latest -> {timestamp} // binaries/{version}/{arch}/cloud-hypervisor // oci-cache/ +// oci-layout +// index.json +// blobs/sha256/{digestHex} // builds/{ref}/ // images/ // {repository}/{digest}/ @@ -78,6 +81,26 @@ func (p *Paths) SystemOCICache() string { return filepath.Join(p.dataDir, "system", "oci-cache") } +// OCICacheBlobDir returns the path to the OCI cache blobs directory. +func (p *Paths) OCICacheBlobDir() string { + return filepath.Join(p.SystemOCICache(), "blobs", "sha256") +} + +// OCICacheBlob returns the path to a specific blob in the OCI cache. +func (p *Paths) OCICacheBlob(digestHex string) string { + return filepath.Join(p.OCICacheBlobDir(), digestHex) +} + +// OCICacheIndex returns the path to the OCI cache index.json. +func (p *Paths) OCICacheIndex() string { + return filepath.Join(p.SystemOCICache(), "index.json") +} + +// OCICacheLayout returns the path to the OCI cache oci-layout file. +func (p *Paths) OCICacheLayout() string { + return filepath.Join(p.SystemOCICache(), "oci-layout") +} + // SystemBuild returns the path to a system build directory. func (p *Paths) SystemBuild(ref string) string { return filepath.Join(p.dataDir, "system", "builds", ref) diff --git a/lib/providers/providers.go b/lib/providers/providers.go index 010ee3e6..158603b1 100644 --- a/lib/providers/providers.go +++ b/lib/providers/providers.go @@ -13,6 +13,7 @@ import ( "github.com/onkernel/hypeman/lib/network" hypemanotel "github.com/onkernel/hypeman/lib/otel" "github.com/onkernel/hypeman/lib/paths" + "github.com/onkernel/hypeman/lib/registry" "github.com/onkernel/hypeman/lib/system" "github.com/onkernel/hypeman/lib/volumes" "go.opentelemetry.io/otel" @@ -113,3 +114,8 @@ func ProvideVolumeManager(p *paths.Paths, cfg *config.Config) (volumes.Manager, meter := otel.GetMeterProvider().Meter("hypeman") return volumes.NewManager(p, maxTotalVolumeStorage, meter), nil } + +// ProvideRegistry provides the OCI registry for image push +func ProvideRegistry(p *paths.Paths, imageManager images.Manager) (*registry.Registry, error) { + return registry.New(p, imageManager) +} diff --git a/lib/registry/README.md b/lib/registry/README.md new file mode 100644 index 00000000..168412e3 --- /dev/null +++ b/lib/registry/README.md @@ -0,0 +1,187 @@ +# OCI Distribution Registry + +Implements an OCI Distribution Spec compliant registry that accepts pushed images and triggers conversion to hypeman's disk format. + +## Architecture + +```mermaid +sequenceDiagram + participant Client as Docker Client + participant Registry as Hypeman Registry + participant BlobStore as Blob Store + participant ImageMgr as Image Manager + + Client->>Registry: PUT /v2/.../blobs/{digest} + Registry->>BlobStore: Store blob + BlobStore-->>Registry: OK + Registry-->>Client: 201 Created + + Client->>Registry: PUT /v2/.../manifests/{ref} + Registry->>BlobStore: Store manifest blob + Registry-->>Client: 201 Created + + Registry->>Registry: Convert Docker v2 → OCI (if needed) + Registry->>Registry: Append to OCI layout + Registry--)ImageMgr: ImportLocalImage(ociDigest) (async) + ImageMgr->>ImageMgr: Queue conversion + ImageMgr->>ImageMgr: Unpack layers (umoci) + ImageMgr->>ImageMgr: Create ext4 disk image +``` + +## How It Works + +### Push Flow + +1. **Version Check**: Client hits `GET /v2/` to verify registry compatibility +2. **Blob Check**: Client does `HEAD /v2/{name}/blobs/{digest}` to check if layers exist +3. **Blob Upload**: Missing blobs uploaded via `POST/PATCH/PUT` sequence +4. **Manifest Upload**: Final `PUT /v2/{name}/manifests/{reference}` triggers conversion + +### Layer Caching + +Blobs are stored content-addressably in `system/oci-cache/blobs/sha256/`: + +```go +// BlobStore.Stat() - Returns size if exists, ErrNotFound otherwise +func (s *BlobStore) Stat(ctx context.Context, repo string, h v1.Hash) (int64, error) { + path := s.blobPath(h.String()) + info, err := os.Stat(path) + if os.IsNotExist(err) { + return 0, ErrNotFound // Client will upload + } + return info.Size(), nil // Client skips upload +} +``` + +When a client pushes: +- First push: HEAD returns 404 → uploads all blobs +- Second push: HEAD returns 200 with size → skips upload entirely + +### Manifest Handling + +go-containerregistry stores manifests in-memory, but we need them on disk for conversion. The registry intercepts manifest PUTs: + +```go +// Read manifest body and compute digest +body, _ := io.ReadAll(req.Body) +digest := computeDigest(body) + +// Store in blob store by digest +r.storeManifestBlob(digest, body) + +// Reconstruct body for underlying handler +req.Body = io.NopCloser(bytes.NewReader(body)) +r.handler.ServeHTTP(wrapper, req) + +// Trigger async conversion with computed digest +if wrapper.statusCode == http.StatusCreated { + go r.triggerConversion(repo, reference, digest) +} +``` + +### Conversion Trigger + +After a successful manifest push: + +1. Creates a `blobStoreImage` wrapper that reads from the blob store +2. If manifest is Docker v2 format, converts it to OCI format (different digest) +3. Appends to OCI layout via `layout.AppendImage()` which updates `index.json` +4. Calls `ImageManager.ImportLocalImage()` with the OCI digest to queue conversion + +### Docker v2 to OCI Conversion + +Images from the local Docker daemon use Docker v2 manifest format, but umoci (used for unpacking layers) only accepts OCI format. The registry handles this transparently: + +```go +// blobStoreImage detects Docker v2 and converts media types +func (img *blobStoreImage) MediaType() (types.MediaType, error) { + if isOCIMediaType(manifest.MediaType) { + return types.MediaType(manifest.MediaType), nil + } + return types.OCIManifestSchema1, nil // Convert Docker v2 → OCI +} + +// Digest returns OCI digest (differs from Docker v2 input digest) +func (img *blobStoreImage) Digest() (v1.Hash, error) { + if isOCIMediaType(manifest.MediaType) { + return v1.NewHash(img.digest) // Preserve original + } + // Compute digest of converted OCI manifest + rawManifest, _ := img.RawManifest() + return sha256Hash(rawManifest) +} +``` + +Media type conversions: +- `vnd.docker.distribution.manifest.v2+json` → `vnd.oci.image.manifest.v1+json` +- `vnd.docker.container.image.v1+json` → `vnd.oci.image.config.v1+json` +- `vnd.docker.image.rootfs.diff.tar.gzip` → `vnd.oci.image.layer.v1.tar+gzip` + +## Files + +- **`blob_store.go`** - Filesystem-backed blob storage implementing `registry.BlobHandler` +- **`registry.go`** - Registry handler wrapping go-containerregistry with manifest interception and Docker v2 → OCI conversion (`blobStoreImage`, `blobStoreLayer`) + +## Storage Layout + +``` +/var/lib/hypeman/system/oci-cache/ + oci-layout # {"imageLayoutVersion": "1.0.0"} + index.json # Manifest index with annotations + blobs/sha256/ + 2d35eb... # Layer blob (shared across all images) + 706db5... # Config blob + 85f2b7... # Manifest blob +``` + +## CLI Usage + +```bash +# Push from local Docker daemon +hypeman push myimage:latest + +# Push with custom target name +hypeman push myimage:latest my-custom-name +``` + +## Authentication + +The registry endpoints use JWT bearer token authentication. The hypeman CLI reads `HYPEMAN_API_KEY` or `HYPEMAN_BEARER_TOKEN` and passes it directly as a registry token using go-containerregistry's `RegistryToken` auth. + +**Note:** `docker push` will not work with this registry. Docker CLI expects the v2 registry token auth flow (WWW-Authenticate challenge → token endpoint → retry with JWT), which we don't implement. Use the hypeman CLI for pushing images. + +## Limitations + +- **No docker push support**: Docker CLI requires the v2 registry token auth flow. Use `hypeman push` instead. + +## Design Decisions + +### Why wrap go-containerregistry/pkg/registry? + +**What:** Use the existing registry implementation from go-containerregistry with custom blob storage. + +**Why:** +- Battle-tested OCI Distribution Spec compliance +- Handles chunked uploads, content negotiation, error responses +- We only need to customize storage, not protocol handling + +### Why store manifests separately? + +**What:** Intercept manifest PUT and store in blob store. + +**Why:** +- go-containerregistry stores manifests in-memory by default +- Our image manager needs to read manifests from disk +- Enables content-addressable manifest storage consistent with layers + +### Why convert Docker v2 manifests to OCI? + +**What:** Detect Docker v2 manifests and convert to OCI format before passing to umoci. + +**Why:** +- `daemon.Image()` (local Docker) returns Docker v2 manifests +- umoci only accepts OCI format (`v1.Manifest`) - Docker v2 causes "manifest data is not v1.Manifest" errors +- go-containerregistry does NOT automatically convert formats +- The converted OCI manifest has a different digest than the input Docker v2 manifest + +**Implementation:** The `blobStoreImage` wrapper transparently converts Docker v2 to OCI when the manifest is read, and computes the correct OCI digest for registration. diff --git a/lib/registry/blob_store.go b/lib/registry/blob_store.go new file mode 100644 index 00000000..d218a604 --- /dev/null +++ b/lib/registry/blob_store.go @@ -0,0 +1,114 @@ +package registry + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "fmt" + "io" + "os" + "strings" + + v1 "github.com/google/go-containerregistry/pkg/v1" + "github.com/onkernel/hypeman/lib/paths" +) + +// notFoundError is a custom error that matches go-containerregistry's errNotFound sentinel. +// go-containerregistry uses errors.Is(err, errNotFound) where errNotFound = errors.New("not found"). +// By implementing Is(), we ensure our error matches their sentinel via errors.Is(). +type notFoundError struct{} + +func (e notFoundError) Error() string { return "not found" } + +func (e notFoundError) Is(target error) bool { + return target.Error() == "not found" +} + +// ErrNotFound is returned when a blob is not found. +var ErrNotFound = notFoundError{} + +// BlobStore implements blob storage on the filesystem. +type BlobStore struct { + paths *paths.Paths +} + +// NewBlobStore creates a new filesystem-backed blob store. +func NewBlobStore(p *paths.Paths) (*BlobStore, error) { + blobDir := p.OCICacheBlobDir() + if err := os.MkdirAll(blobDir, 0755); err != nil { + return nil, fmt.Errorf("create blob directory: %w", err) + } + return &BlobStore{paths: p}, nil +} + +func (s *BlobStore) blobPath(digest string) string { + digestHex := strings.TrimPrefix(digest, "sha256:") + return s.paths.OCICacheBlob(digestHex) +} + +func (s *BlobStore) Stat(_ context.Context, repo string, h v1.Hash) (int64, error) { + path := s.blobPath(h.String()) + info, err := os.Stat(path) + if err != nil { + if os.IsNotExist(err) { + return 0, ErrNotFound + } + return 0, err + } + return info.Size(), nil +} + +func (s *BlobStore) Get(_ context.Context, repo string, h v1.Hash) (io.ReadCloser, error) { + path := s.blobPath(h.String()) + f, err := os.Open(path) + if err != nil { + if os.IsNotExist(err) { + return nil, ErrNotFound + } + return nil, err + } + return f, nil +} + +func (s *BlobStore) Put(_ context.Context, repo string, h v1.Hash, r io.ReadCloser) error { + defer r.Close() + path := s.blobPath(h.String()) + if _, err := os.Stat(path); err == nil { + io.Copy(io.Discard, r) + return nil + } + tempPath := path + ".tmp" + f, err := os.Create(tempPath) + if err != nil { + return fmt.Errorf("create temp blob file: %w", err) + } + defer func() { + f.Close() + os.Remove(tempPath) + }() + hasher := sha256.New() + tee := io.TeeReader(r, hasher) + if _, err := io.Copy(f, tee); err != nil { + return fmt.Errorf("write blob: %w", err) + } + if err := f.Close(); err != nil { + return fmt.Errorf("close blob file: %w", err) + } + actualDigest := "sha256:" + hex.EncodeToString(hasher.Sum(nil)) + if actualDigest != h.String() { + return fmt.Errorf("digest mismatch: expected %s, got %s", h.String(), actualDigest) + } + if err := os.Rename(tempPath, path); err != nil { + return fmt.Errorf("rename blob: %w", err) + } + return nil +} + +func (s *BlobStore) Delete(_ context.Context, repo string, h v1.Hash) error { + path := s.blobPath(h.String()) + err := os.Remove(path) + if err != nil && !os.IsNotExist(err) { + return err + } + return nil +} diff --git a/lib/registry/registry.go b/lib/registry/registry.go new file mode 100644 index 00000000..9f43309b --- /dev/null +++ b/lib/registry/registry.go @@ -0,0 +1,497 @@ +// Package registry implements an OCI Distribution Spec registry that accepts pushed images +// and triggers conversion to hypeman's disk format. +package registry + +import ( + "bytes" + "context" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "net/http" + "os" + "regexp" + "strings" + + "github.com/google/go-containerregistry/pkg/registry" + v1 "github.com/google/go-containerregistry/pkg/v1" + "github.com/google/go-containerregistry/pkg/v1/empty" + "github.com/google/go-containerregistry/pkg/v1/layout" + "github.com/google/go-containerregistry/pkg/v1/types" + "github.com/onkernel/hypeman/lib/images" + "github.com/onkernel/hypeman/lib/paths" +) + +// Registry provides an OCI Distribution Spec compliant registry that stores pushed images +// in hypeman's OCI cache and triggers conversion to ext4 disk format. +type Registry struct { + paths *paths.Paths + imageManager images.Manager + blobStore *BlobStore + handler http.Handler +} + +// manifestPutPattern matches PUT requests to /v2/{name}/manifests/{reference} +var manifestPutPattern = regexp.MustCompile(`^/v2/(.+)/manifests/(.+)$`) + +// New creates a new Registry that stores blobs in the OCI cache directory +// and triggers image conversion when manifests are pushed. +func New(p *paths.Paths, imgManager images.Manager) (*Registry, error) { + blobStore, err := NewBlobStore(p) + if err != nil { + return nil, err + } + + // Create registry with custom blob handler + regHandler := registry.New( + registry.WithBlobHandler(blobStore), + ) + + r := &Registry{ + paths: p, + imageManager: imgManager, + blobStore: blobStore, + handler: regHandler, + } + + return r, nil +} + +// Handler returns the http.Handler for the registry endpoints. +// This wraps the underlying registry to intercept manifest PUTs and trigger conversion. +func (r *Registry) Handler() http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + // Intercept manifest PUT requests to store in blob store and trigger conversion + if req.Method == http.MethodPut { + matches := manifestPutPattern.FindStringSubmatch(req.URL.Path) + if matches != nil { + repo := matches[1] + reference := matches[2] + + body, err := io.ReadAll(req.Body) + req.Body.Close() + if err != nil { + http.Error(w, "failed to read body", http.StatusInternalServerError) + return + } + + digest := computeDigest(body) + + // Verify digest if reference is a digest + if strings.HasPrefix(reference, "sha256:") && reference != digest { + http.Error(w, fmt.Sprintf("digest mismatch: expected %s, got %s", reference, digest), http.StatusBadRequest) + return + } + + if err := r.storeManifestBlob(digest, body); err != nil { + fmt.Fprintf(os.Stderr, "Warning: failed to store manifest blob: %v\n", err) + } + + req.Body = io.NopCloser(bytes.NewReader(body)) + wrapper := &responseWrapper{ResponseWriter: w} + r.handler.ServeHTTP(wrapper, req) + + if wrapper.statusCode == http.StatusCreated { + go r.triggerConversion(repo, reference, digest) + } + return + } + } + + r.handler.ServeHTTP(w, req) + }) +} + +// storeManifestBlob stores a manifest in the blob store by its digest. +func (r *Registry) storeManifestBlob(digest string, data []byte) error { + digestHex := strings.TrimPrefix(digest, "sha256:") + blobPath := r.paths.OCICacheBlob(digestHex) + + // Verify digest matches + actualDigest := computeDigest(data) + if actualDigest != digest { + return fmt.Errorf("digest mismatch: expected %s, got %s", digest, actualDigest) + } + + return os.WriteFile(blobPath, data, 0644) +} + +// responseWrapper captures the status code from the response +type responseWrapper struct { + http.ResponseWriter + statusCode int +} + +func (w *responseWrapper) WriteHeader(code int) { + w.statusCode = code + w.ResponseWriter.WriteHeader(code) +} + +// triggerConversion queues the image for conversion to ext4 disk format. +func (r *Registry) triggerConversion(repo, reference, dockerDigest string) { + imageRef := repo + ":" + reference + if strings.HasPrefix(reference, "sha256:") { + imageRef = repo + "@" + reference + } + + ociDigest, err := r.addToOCILayout(dockerDigest) + if err != nil { + fmt.Fprintf(os.Stderr, "Warning: failed to add image to OCI layout for %s: %v\n", imageRef, err) + return + } + + _, err = r.imageManager.ImportLocalImage(context.Background(), repo, reference, ociDigest) + if err != nil { + fmt.Fprintf(os.Stderr, "Warning: failed to queue image conversion for %s: %v\n", imageRef, err) + } +} + +// addToOCILayout adds the image to the OCI layout, converting Docker v2 to OCI if needed. +func (r *Registry) addToOCILayout(inputDigest string) (string, error) { + cacheDir := r.paths.SystemOCICache() + path, err := layout.FromPath(cacheDir) + if err != nil { + path, err = layout.Write(cacheDir, empty.Index) + if err != nil { + return "", fmt.Errorf("create oci layout: %w", err) + } + } + + img, err := r.imageFromBlobStore(inputDigest) + if err != nil { + return "", fmt.Errorf("create image from blob store: %w", err) + } + + digestHash, err := img.Digest() + if err != nil { + return "", fmt.Errorf("compute digest: %w", err) + } + digest := digestHash.String() + digestHex := digestHash.Hex + + err = path.AppendImage(img, layout.WithAnnotations(map[string]string{ + "org.opencontainers.image.ref.name": digestHex, + })) + if err != nil { + return "", fmt.Errorf("append image to layout: %w", err) + } + + return digest, nil +} + +// imageFromBlobStore creates a v1.Image that reads from our blob store. +func (r *Registry) imageFromBlobStore(digest string) (v1.Image, error) { + digestHex := strings.TrimPrefix(digest, "sha256:") + manifestPath := r.paths.OCICacheBlob(digestHex) + + manifestData, err := os.ReadFile(manifestPath) + if err != nil { + return nil, fmt.Errorf("read manifest: %w", err) + } + + return &blobStoreImage{ + paths: r.paths, + manifestData: manifestData, + digest: digest, + }, nil +} + +// blobStoreImage implements v1.Image by reading from the blob store. +// It transparently converts Docker v2 manifests to OCI format. +type blobStoreImage struct { + paths *paths.Paths + manifestData []byte + digest string +} + +// Layers returns wrapped blobStoreLayer instances for each layer in the manifest. +func (img *blobStoreImage) Layers() ([]v1.Layer, error) { + manifest, err := img.parseManifest() + if err != nil { + return nil, err + } + + var layers []v1.Layer + for _, layerDesc := range manifest.Layers { + layer := &blobStoreLayer{ + paths: img.paths, + digest: layerDesc.Digest, + size: layerDesc.Size, + mediaType: layerDesc.MediaType, + } + layers = append(layers, layer) + } + return layers, nil +} + +// MediaType returns OCI manifest type, converting from Docker v2 if needed. +func (img *blobStoreImage) MediaType() (types.MediaType, error) { + manifest, err := img.parseManifest() + if err != nil { + return "", err + } + if isOCIMediaType(manifest.MediaType) { + return types.MediaType(manifest.MediaType), nil + } + return types.OCIManifestSchema1, nil +} + +// isOCIMediaType returns true if the media type is an OCI manifest type +func isOCIMediaType(mediaType string) bool { + return mediaType == string(types.OCIManifestSchema1) || + mediaType == "application/vnd.oci.image.manifest.v1+json" +} + +func (img *blobStoreImage) Size() (int64, error) { + manifest, err := img.parseManifest() + if err != nil { + return 0, err + } + if isOCIMediaType(manifest.MediaType) { + return int64(len(img.manifestData)), nil + } + rawManifest, err := img.RawManifest() + if err != nil { + return 0, err + } + return int64(len(rawManifest)), nil +} + +func (img *blobStoreImage) ConfigName() (v1.Hash, error) { + manifest, err := img.parseManifest() + if err != nil { + return v1.Hash{}, err + } + h, err := v1.NewHash(manifest.Config.Digest) + if err != nil { + return v1.Hash{}, err + } + return h, nil +} + +func (img *blobStoreImage) ConfigFile() (*v1.ConfigFile, error) { + manifest, err := img.parseManifest() + if err != nil { + return nil, err + } + + digestHex := strings.TrimPrefix(manifest.Config.Digest, "sha256:") + configPath := img.paths.OCICacheBlob(digestHex) + configData, err := os.ReadFile(configPath) + if err != nil { + return nil, fmt.Errorf("read config: %w", err) + } + + var config v1.ConfigFile + if err := json.Unmarshal(configData, &config); err != nil { + return nil, fmt.Errorf("parse config: %w", err) + } + return &config, nil +} + +func (img *blobStoreImage) RawConfigFile() ([]byte, error) { + manifest, err := img.parseManifest() + if err != nil { + return nil, err + } + + digestHex := strings.TrimPrefix(manifest.Config.Digest, "sha256:") + configPath := img.paths.OCICacheBlob(digestHex) + return os.ReadFile(configPath) +} + +// Digest returns the manifest digest. For Docker v2, returns the digest of the +// converted OCI manifest (which differs from the original Docker v2 digest). +func (img *blobStoreImage) Digest() (v1.Hash, error) { + manifest, err := img.parseManifest() + if err != nil { + return v1.Hash{}, err + } + if isOCIMediaType(manifest.MediaType) { + return v1.NewHash(img.digest) + } + rawManifest, err := img.RawManifest() + if err != nil { + return v1.Hash{}, err + } + sum := sha256.Sum256(rawManifest) + return v1.Hash{ + Algorithm: "sha256", + Hex: hex.EncodeToString(sum[:]), + }, nil +} + +// Manifest returns the parsed manifest with Docker v2 media types converted to OCI. +func (img *blobStoreImage) Manifest() (*v1.Manifest, error) { + manifest, err := img.parseManifest() + if err != nil { + return nil, err + } + + targetMediaType := types.OCIManifestSchema1 + if isOCIMediaType(manifest.MediaType) { + targetMediaType = types.MediaType(manifest.MediaType) + } + + v1Manifest := &v1.Manifest{ + SchemaVersion: int64(manifest.SchemaVersion), + MediaType: targetMediaType, + Config: v1.Descriptor{ + MediaType: convertToOCIMediaType(manifest.Config.MediaType), + Size: manifest.Config.Size, + }, + } + + configHash, err := v1.NewHash(manifest.Config.Digest) + if err != nil { + return nil, err + } + v1Manifest.Config.Digest = configHash + + for _, layer := range manifest.Layers { + layerHash, err := v1.NewHash(layer.Digest) + if err != nil { + return nil, err + } + v1Manifest.Layers = append(v1Manifest.Layers, v1.Descriptor{ + MediaType: convertToOCIMediaType(layer.MediaType), + Size: layer.Size, + Digest: layerHash, + }) + } + + return v1Manifest, nil +} + +// convertToOCIMediaType converts Docker v2 media types to OCI equivalents +func convertToOCIMediaType(mediaType string) types.MediaType { + switch mediaType { + case "application/vnd.docker.distribution.manifest.v2+json": + return types.OCIManifestSchema1 + case "application/vnd.docker.container.image.v1+json": + return types.OCIConfigJSON + case "application/vnd.docker.image.rootfs.diff.tar.gzip": + return types.OCILayer + case "application/vnd.docker.image.rootfs.diff.tar": + return types.OCIUncompressedLayer + default: + // If already OCI or unknown, return as-is + return types.MediaType(mediaType) + } +} + +// RawManifest returns the manifest JSON. For OCI, returns original bytes to preserve +// digest. For Docker v2, returns the converted OCI manifest JSON. +func (img *blobStoreImage) RawManifest() ([]byte, error) { + manifest, err := img.parseManifest() + if err != nil { + return nil, err + } + if isOCIMediaType(manifest.MediaType) { + return img.manifestData, nil + } + v1Manifest, err := img.Manifest() + if err != nil { + return nil, err + } + return json.Marshal(v1Manifest) +} + +func (img *blobStoreImage) LayerByDigest(hash v1.Hash) (v1.Layer, error) { + manifest, err := img.parseManifest() + if err != nil { + return nil, err + } + + for _, layer := range manifest.Layers { + if layer.Digest == hash.String() { + return &blobStoreLayer{ + paths: img.paths, + digest: layer.Digest, + size: layer.Size, + mediaType: layer.MediaType, + }, nil + } + } + return nil, fmt.Errorf("layer not found: %s", hash.String()) +} + +func (img *blobStoreImage) LayerByDiffID(hash v1.Hash) (v1.Layer, error) { + return nil, fmt.Errorf("LayerByDiffID not implemented") +} + +// Internal manifest structure for parsing +type internalManifest struct { + SchemaVersion int `json:"schemaVersion"` + MediaType string `json:"mediaType"` + Config struct { + MediaType string `json:"mediaType"` + Size int64 `json:"size"` + Digest string `json:"digest"` + } `json:"config"` + Layers []struct { + MediaType string `json:"mediaType"` + Size int64 `json:"size"` + Digest string `json:"digest"` + } `json:"layers"` +} + +func (img *blobStoreImage) parseManifest() (*internalManifest, error) { + var manifest internalManifest + if err := json.Unmarshal(img.manifestData, &manifest); err != nil { + return nil, fmt.Errorf("parse manifest: %w", err) + } + return &manifest, nil +} + +// blobStoreLayer implements v1.Layer by reading blobs from the filesystem. +// Layer content is served directly; media types are converted to OCI format. +type blobStoreLayer struct { + paths *paths.Paths + digest string + size int64 + mediaType string +} + +// Digest returns the layer's content hash. +func (l *blobStoreLayer) Digest() (v1.Hash, error) { + return v1.NewHash(l.digest) +} + +// DiffID returns an empty hash. Computing the actual DiffID requires decompressing +// the layer which is expensive; callers that need DiffID should compute it themselves. +func (l *blobStoreLayer) DiffID() (v1.Hash, error) { + return v1.Hash{}, nil +} + +// Compressed returns a reader for the compressed layer blob from disk. +func (l *blobStoreLayer) Compressed() (io.ReadCloser, error) { + digestHex := strings.TrimPrefix(l.digest, "sha256:") + blobPath := l.paths.OCICacheBlob(digestHex) + return os.Open(blobPath) +} + +// Uncompressed returns a reader for the layer content. Since layers are stored +// compressed, this returns the compressed stream and relies on the caller +// (go-containerregistry) to handle decompression based on MediaType. +func (l *blobStoreLayer) Uncompressed() (io.ReadCloser, error) { + return l.Compressed() +} + +// Size returns the compressed size of the layer in bytes. +func (l *blobStoreLayer) Size() (int64, error) { + return l.size, nil +} + +// MediaType returns the layer's media type, converting Docker v2 types to OCI. +func (l *blobStoreLayer) MediaType() (types.MediaType, error) { + return convertToOCIMediaType(l.mediaType), nil +} + +// computeDigest calculates SHA256 hash of data +func computeDigest(data []byte) string { + sum := sha256.Sum256(data) + return "sha256:" + hex.EncodeToString(sum[:]) +}