diff --git a/internal/internal_worker.go b/internal/internal_worker.go index 31b71b1cb..1fff472f3 100644 --- a/internal/internal_worker.go +++ b/internal/internal_worker.go @@ -448,8 +448,17 @@ func newSessionWorker(client *WorkflowClient, params workerExecutionParameters, creationTaskqueue := getCreationTaskqueue(params.TaskQueue) params.BackgroundContext = context.WithValue(params.BackgroundContext, sessionEnvironmentContextKey, sessionEnvironment) params.TaskQueue = sessionEnvironment.GetResourceSpecificTaskqueue() + // For the resource specific task queue, we don't need to include deployment options + // Save them to restore later + deployments := params.DeploymentOptions + useBuildIDForVersioning := params.UseBuildIDForVersioning + // Disable versioning for activity worker within session, but still send deployment name for debug purpose + params.DeploymentOptions.UseVersioning = false + params.UseBuildIDForVersioning = false activityWorker := newActivityWorker(client, params, - &workerOverrides{slotSupplier: params.Tuner.GetSessionActivitySlotSupplier()}, env, nil) + &workerOverrides{ + slotSupplier: params.Tuner.GetSessionActivitySlotSupplier(), + }, env, nil) params.ActivityTaskPollerBehavior = NewPollerBehaviorSimpleMaximum( PollerBehaviorSimpleMaximumOptions{ @@ -457,6 +466,8 @@ func newSessionWorker(client *WorkflowClient, params workerExecutionParameters, }, ) params.TaskQueue = creationTaskqueue + params.DeploymentOptions = deployments + params.UseBuildIDForVersioning = useBuildIDForVersioning // Although we have session token bucket to limit session size across creation // and recreation, we also limit it here for creation only overrides := &workerOverrides{} @@ -518,7 +529,6 @@ func newActivityWorker( } else { slotSupplier = params.Tuner.GetActivityTaskSlotSupplier() } - bwo := baseWorkerOptions{ pollerRate: defaultPollerRate, slotSupplier: slotSupplier, diff --git a/test/worker_deployment_test.go b/test/worker_deployment_test.go index 25a31880c..65536c5d3 100644 --- a/test/worker_deployment_test.go +++ b/test/worker_deployment_test.go @@ -16,6 +16,7 @@ import ( enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/workflowservice/v1" + "go.temporal.io/sdk/activity" "go.temporal.io/sdk/client" "go.temporal.io/sdk/worker" "go.temporal.io/sdk/workflow" @@ -236,6 +237,60 @@ func (ts *WorkerDeploymentTestSuite) TestBuildIDChangesOverWorkflowLifetime() { ts.Equal("2.0", lastBuildID) } +func (ts *WorkerDeploymentTestSuite) TestBuildIDWithSession() { + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + deploymentName := "deploy-test-" + uuid.NewString() + v1 := worker.WorkerDeploymentVersion{ + DeploymentName: deploymentName, + BuildID: "1.0", + } + + worker := worker.New(ts.client, ts.taskQueueName, worker.Options{ + EnableSessionWorker: true, + DeploymentOptions: worker.DeploymentOptions{ + UseVersioning: true, + Version: v1, + }, + }) + + worker.RegisterWorkflowWithOptions(ts.workflows.BasicSession, workflow.RegisterOptions{ + Name: "SessionBuildIDWorkflow", + VersioningBehavior: workflow.VersioningBehaviorAutoUpgrade, + }) + + activities2 := &Activities2{} + result := &Activities{activities2: activities2} + activities2.impl = result + worker.RegisterActivityWithOptions(activities2, activity.RegisterOptions{Name: "Prefix_", DisableAlreadyRegisteredCheck: true}) + + ts.NoError(worker.Start()) + defer worker.Stop() + + dHandle := ts.client.WorkerDeploymentClient().GetHandle(deploymentName) + + ts.waitForWorkerDeployment(ctx, dHandle) + + response1, err := dHandle.Describe(ctx, client.WorkerDeploymentDescribeOptions{}) + ts.NoError(err) + + ts.waitForWorkerDeploymentVersion(ctx, dHandle, v1) + + _, err = dHandle.SetCurrentVersion(ctx, client.WorkerDeploymentSetCurrentVersionOptions{ + BuildID: v1.BuildID, + ConflictToken: response1.ConflictToken, + }) + ts.NoError(err) + + // start workflow1 with 1.0, BasicSession, auto-upgrade + wfHandle, err := ts.client.ExecuteWorkflow(ctx, ts.startWorkflowOptions("evolving-wf-1"), "SessionBuildIDWorkflow") + ts.NoError(err) + + ts.NoError(wfHandle.Get(ctx, nil)) +} + func (ts *WorkerDeploymentTestSuite) TestPinnedBehaviorThreeWorkers() { if os.Getenv("DISABLE_SERVER_1_27_TESTS") != "" { ts.T().Skip("temporal server 1.27+ required")