Skip to content

Commit

Permalink
feat: deploy functions in parallel over api (#3154)
Browse files Browse the repository at this point in the history
  • Loading branch information
sweatybridge authored Feb 17, 2025
1 parent 40bfa9f commit ef456e3
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 46 deletions.
6 changes: 5 additions & 1 deletion cmd/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cmd
import (
"fmt"

"github.com/go-errors/errors"
"github.com/spf13/afero"
"github.com/spf13/cobra"
"github.com/supabase/cli/internal/functions/delete"
Expand Down Expand Up @@ -69,8 +70,10 @@ var (
}
if useApi {
useDocker = false
} else if maxJobs > 1 {
return errors.New("--jobs must be used together with --use-api")
}
return deploy.Run(cmd.Context(), args, useDocker, noVerifyJWT, importMapPath, afero.NewOsFs())
return deploy.Run(cmd.Context(), args, useDocker, noVerifyJWT, importMapPath, maxJobs, afero.NewOsFs())
},
}

Expand Down Expand Up @@ -134,6 +137,7 @@ func init() {
deployFlags.BoolVar(&useLegacyBundle, "legacy-bundle", false, "Use legacy bundling mechanism.")
functionsDeployCmd.MarkFlagsMutuallyExclusive("use-api", "use-docker", "legacy-bundle")
cobra.CheckErr(deployFlags.MarkHidden("legacy-bundle"))
deployFlags.UintVarP(&maxJobs, "jobs", "j", 1, "Maximum number of parallel jobs.")
deployFlags.BoolVar(noVerifyJWT, "no-verify-jwt", false, "Disable JWT verification for the Function.")
deployFlags.StringVar(&flags.ProjectRef, "project-ref", "", "Project ref of the Supabase project.")
deployFlags.StringVar(&importMapPath, "import-map", "", "Path to import map file.")
Expand Down
2 changes: 1 addition & 1 deletion cmd/gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ var (
Short: "Generate types from Postgres schema",
PreRunE: func(cmd *cobra.Command, args []string) error {
if postgrestV9Compat && !cmd.Flags().Changed("db-url") {
return errors.New("--postgrest-v9-compat can only be used together with --db-url")
return errors.New("--postgrest-v9-compat must used together with --db-url")
}
// Legacy commands specify language using arg, eg. gen types typescript
if len(args) > 0 && args[0] != types.LangTypescript && !cmd.Flags().Changed("lang") {
Expand Down
7 changes: 5 additions & 2 deletions internal/functions/deploy/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/supabase/cli/pkg/function"
)

func Run(ctx context.Context, slugs []string, useDocker bool, noVerifyJWT *bool, importMapPath string, fsys afero.Fs) error {
func Run(ctx context.Context, slugs []string, useDocker bool, noVerifyJWT *bool, importMapPath string, maxJobs uint, fsys afero.Fs) error {
// Load function config and project id
if err := flags.LoadConfig(fsys); err != nil {
return err
Expand All @@ -41,7 +41,10 @@ func Run(ctx context.Context, slugs []string, useDocker bool, noVerifyJWT *bool,
if err := api.UpsertFunctions(ctx, functionConfig); err != nil {
return err
}
} else if err := deploy(ctx, functionConfig, fsys); err != nil {
} else if err := deploy(ctx, functionConfig, maxJobs, fsys); errors.Is(err, errNoDeploy) {
fmt.Fprintln(os.Stderr, err)
return nil
} else if err != nil {
return err
}
fmt.Printf("Deployed Functions on project %s: %s\n", utils.Aqua(flags.ProjectRef), strings.Join(slugs, ", "))
Expand Down
14 changes: 7 additions & 7 deletions internal/functions/deploy/deploy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestDeployCommand(t *testing.T) {
}
// Run test
noVerifyJWT := true
err = Run(context.Background(), functions, true, &noVerifyJWT, "", fsys)
err = Run(context.Background(), functions, true, &noVerifyJWT, "", 1, fsys)
// Check error
assert.NoError(t, err)
assert.Empty(t, apitest.ListUnmatchedRequests())
Expand Down Expand Up @@ -114,7 +114,7 @@ import_map = "./import_map.json"
outputDir := filepath.Join(utils.TempDir, fmt.Sprintf(".output_%s", slug))
require.NoError(t, afero.WriteFile(fsys, filepath.Join(outputDir, "output.eszip"), []byte(""), 0644))
// Run test
err = Run(context.Background(), nil, true, nil, "", fsys)
err = Run(context.Background(), nil, true, nil, "", 1, fsys)
// Check error
assert.NoError(t, err)
assert.Empty(t, apitest.ListUnmatchedRequests())
Expand Down Expand Up @@ -164,7 +164,7 @@ import_map = "./import_map.json"
outputDir := filepath.Join(utils.TempDir, ".output_enabled-func")
require.NoError(t, afero.WriteFile(fsys, filepath.Join(outputDir, "output.eszip"), []byte(""), 0644))
// Run test
err = Run(context.Background(), nil, true, nil, "", fsys)
err = Run(context.Background(), nil, true, nil, "", 1, fsys)
// Check error
assert.NoError(t, err)
assert.Empty(t, apitest.ListUnmatchedRequests())
Expand All @@ -175,7 +175,7 @@ import_map = "./import_map.json"
fsys := afero.NewMemMapFs()
require.NoError(t, utils.WriteConfig(fsys, false))
// Run test
err := Run(context.Background(), []string{"_invalid"}, true, nil, "", fsys)
err := Run(context.Background(), []string{"_invalid"}, true, nil, "", 1, fsys)
// Check error
assert.ErrorContains(t, err, "Invalid Function name.")
})
Expand All @@ -185,7 +185,7 @@ import_map = "./import_map.json"
fsys := afero.NewMemMapFs()
require.NoError(t, utils.WriteConfig(fsys, false))
// Run test
err := Run(context.Background(), nil, true, nil, "", fsys)
err := Run(context.Background(), nil, true, nil, "", 1, fsys)
// Check error
assert.ErrorContains(t, err, "No Functions specified or found in supabase/functions")
})
Expand Down Expand Up @@ -228,7 +228,7 @@ verify_jwt = false
outputDir := filepath.Join(utils.TempDir, fmt.Sprintf(".output_%s", slug))
require.NoError(t, afero.WriteFile(fsys, filepath.Join(outputDir, "output.eszip"), []byte(""), 0644))
// Run test
assert.NoError(t, Run(context.Background(), []string{slug}, true, nil, "", fsys))
assert.NoError(t, Run(context.Background(), []string{slug}, true, nil, "", 1, fsys))
// Validate api
assert.Empty(t, apitest.ListUnmatchedRequests())
})
Expand Down Expand Up @@ -272,7 +272,7 @@ verify_jwt = false
require.NoError(t, afero.WriteFile(fsys, filepath.Join(outputDir, "output.eszip"), []byte(""), 0644))
// Run test
noVerifyJwt := false
assert.NoError(t, Run(context.Background(), []string{slug}, true, &noVerifyJwt, "", fsys))
assert.NoError(t, Run(context.Background(), []string{slug}, true, &noVerifyJwt, "", 1, fsys))
// Validate api
assert.Empty(t, apitest.ListUnmatchedRequests())
})
Expand Down
83 changes: 53 additions & 30 deletions internal/functions/deploy/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,18 @@ import (
"github.com/supabase/cli/pkg/api"
"github.com/supabase/cli/pkg/cast"
"github.com/supabase/cli/pkg/config"
"github.com/supabase/cli/pkg/queue"
)

func deploy(ctx context.Context, functionConfig config.FunctionConfig, fsys afero.Fs) error {
bundleOnly := len(functionConfig) > 1
var toUpdate []api.BulkUpdateFunctionBody
var errNoDeploy = errors.New("All Functions are up to date.")

func deploy(ctx context.Context, functionConfig config.FunctionConfig, maxJobs uint, fsys afero.Fs) error {
var toDeploy []api.FunctionDeployMetadata
for slug, fc := range functionConfig {
if !fc.Enabled {
fmt.Fprintln(os.Stderr, "Skipped deploying Function:", slug)
continue
}
fmt.Fprintln(os.Stderr, "Deploying Function:", slug)
param := api.V1DeployAFunctionParams{Slug: &slug}
if bundleOnly {
param.BundleOnly = &bundleOnly
}
meta := api.FunctionDeployMetadata{
Name: &slug,
EntrypointPath: filepath.ToSlash(fc.Entrypoint),
Expand All @@ -45,29 +42,55 @@ func deploy(ctx context.Context, functionConfig config.FunctionConfig, fsys afer
for i, sf := range fc.StaticFiles {
files[i] = filepath.ToSlash(sf)
}
resp, err := upload(ctx, param, meta, fsys)
if err != nil {
toDeploy = append(toDeploy, meta)
}
if len(toDeploy) == 0 {
return errors.New(errNoDeploy)
} else if len(toDeploy) == 1 {
param := api.V1DeployAFunctionParams{Slug: toDeploy[0].Name}
_, err := upload(ctx, param, toDeploy[0], fsys)
return err
}
return bulkUpload(ctx, toDeploy, maxJobs, fsys)
}

func bulkUpload(ctx context.Context, toDeploy []api.FunctionDeployMetadata, maxJobs uint, fsys afero.Fs) error {
jq := queue.NewJobQueue(maxJobs)
toUpdate := make([]api.BulkUpdateFunctionBody, len(toDeploy))
for i, meta := range toDeploy {
fmt.Fprintln(os.Stderr, "Deploying Function:", *meta.Name)
param := api.V1DeployAFunctionParams{
Slug: meta.Name,
BundleOnly: cast.Ptr(true),
}
bundle := func() error {
resp, err := upload(ctx, param, meta, fsys)
if err != nil {
return err
}
toUpdate[i].Id = resp.Id
toUpdate[i].Name = resp.Name
toUpdate[i].Slug = resp.Slug
toUpdate[i].Version = resp.Version
toUpdate[i].EntrypointPath = resp.EntrypointPath
toUpdate[i].ImportMap = resp.ImportMap
toUpdate[i].ImportMapPath = resp.ImportMapPath
toUpdate[i].VerifyJwt = resp.VerifyJwt
toUpdate[i].Status = api.BulkUpdateFunctionBodyStatus(resp.Status)
toUpdate[i].CreatedAt = resp.CreatedAt
return nil
}
if err := jq.Put(bundle); err != nil {
return err
}
toUpdate = append(toUpdate, api.BulkUpdateFunctionBody{
CreatedAt: resp.CreatedAt,
EntrypointPath: resp.EntrypointPath,
Id: resp.Id,
ImportMap: resp.ImportMap,
ImportMapPath: resp.ImportMapPath,
Name: resp.Name,
Slug: resp.Slug,
Status: api.BulkUpdateFunctionBodyStatus(resp.Status),
VerifyJwt: resp.VerifyJwt,
Version: resp.Version,
})
}
if len(toUpdate) > 1 {
if resp, err := utils.GetSupabase().V1BulkUpdateFunctionsWithResponse(ctx, flags.ProjectRef, toUpdate); err != nil {
return errors.Errorf("failed to bulk update: %w", err)
} else if resp.JSON200 == nil {
return errors.Errorf("unexpected bulk update status %d: %s", resp.StatusCode(), string(resp.Body))
}
if err := jq.Collect(); err != nil {
return err
}
if resp, err := utils.GetSupabase().V1BulkUpdateFunctionsWithResponse(ctx, flags.ProjectRef, toUpdate); err != nil {
return errors.Errorf("failed to bulk update: %w", err)
} else if resp.JSON200 == nil {
return errors.Errorf("unexpected bulk update status %d: %s", resp.StatusCode(), string(resp.Body))
}
return nil
}
Expand Down Expand Up @@ -116,7 +139,7 @@ func writeForm(form *multipart.Writer, meta api.FunctionDeployMetadata, fsys afe
} else if fi.IsDir() {
return errors.New("file path is a directory: " + srcPath)
}
fmt.Fprintln(os.Stderr, "Uploading asset:", srcPath)
fmt.Fprintf(os.Stderr, "Uploading asset (%s): %s\n", *meta.Name, srcPath)
r := io.TeeReader(f, w)
dst, err := form.CreateFormFile("file", srcPath)
if err != nil {
Expand All @@ -138,7 +161,7 @@ func writeForm(form *multipart.Writer, meta api.FunctionDeployMetadata, fsys afe
return err
}
// TODO: replace with addFile once edge runtime supports jsonc
fmt.Fprintln(os.Stderr, "Uploading asset:", imPath)
fmt.Fprintf(os.Stderr, "Uploading asset (%s): %s\n", *meta.Name, imPath)
f, err := form.CreateFormFile("file", imPath)
if err != nil {
return errors.Errorf("failed to create import map: %w", err)
Expand Down
17 changes: 12 additions & 5 deletions internal/functions/deploy/upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ func TestDeployAll(t *testing.T) {

t.Run("deploys single slug", func(t *testing.T) {
c := config.FunctionConfig{"demo": {
Enabled: true,
Entrypoint: "testdata/shared/whatever.ts",
}}
// Setup in-memory fs
Expand All @@ -157,16 +158,22 @@ func TestDeployAll(t *testing.T) {
Reply(http.StatusCreated).
JSON(api.DeployFunctionResponse{})
// Run test
err := deploy(context.Background(), c, fsys)
err := deploy(context.Background(), c, 1, fsys)
// Check error
assert.NoError(t, err)
assert.Empty(t, apitest.ListUnmatchedRequests())
})

t.Run("deploys multiple slugs", func(t *testing.T) {
c := config.FunctionConfig{
"test-ts": {Entrypoint: "testdata/shared/whatever.ts"},
"test-js": {Entrypoint: "testdata/geometries/Geometries.js"},
"test-ts": {
Enabled: true,
Entrypoint: "testdata/shared/whatever.ts",
},
"test-js": {
Enabled: true,
Entrypoint: "testdata/geometries/Geometries.js",
},
}
// Setup in-memory fs
fsys := afero.FromIOFS{FS: testImports}
Expand All @@ -187,7 +194,7 @@ func TestDeployAll(t *testing.T) {
Reply(http.StatusOK).
JSON(api.BulkUpdateFunctionResponse{})
// Run test
err := deploy(context.Background(), c, fsys)
err := deploy(context.Background(), c, 1, fsys)
// Check error
assert.NoError(t, err)
assert.Empty(t, apitest.ListUnmatchedRequests())
Expand All @@ -205,7 +212,7 @@ func TestDeployAll(t *testing.T) {
MatchParam("slug", "demo").
ReplyError(errNetwork)
// Run test
err := deploy(context.Background(), c, fsys)
err := deploy(context.Background(), c, 1, fsys)
// Check error
assert.ErrorIs(t, err, errNetwork)
assert.Empty(t, apitest.ListUnmatchedRequests())
Expand Down

0 comments on commit ef456e3

Please sign in to comment.