Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class WorkflowScheduler(
// CostBasedRegionPlanGenerator considers costs to try to find an optimal plan.
new CostBasedScheduleGenerator(
workflowContext,
physicalPlan,
physicalPlan.copy(executionMode = workflowContext.workflowSettings.executionMode),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this setting should be part of Physical plan. The information is already in workflowContext, why duplicate it again in physical plan?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, please do not add such configuration to physical plan.

actorId
).generate()
this.schedule = generatedSchedule
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,11 @@ class WorkflowCompiler(
val (physicalPlan, outputPortsNeedingStorage) =
expandLogicalPlan(logicalPlan, logicalPlanPojo.opsToViewResult, None)

context.workflowSettings =
WorkflowSettings(context.workflowSettings.dataTransferBatchSize, outputPortsNeedingStorage)
context.workflowSettings = WorkflowSettings(
context.workflowSettings.dataTransferBatchSize,
outputPortsNeedingStorage,
context.workflowSettings.executionMode
)

Workflow(context, logicalPlan, physicalPlan)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,12 @@ import org.apache.pekko.actor.{ActorSystem, Props}
import org.apache.pekko.testkit.{ImplicitSender, TestKit}
import org.apache.pekko.util.Timeout
import org.apache.texera.amber.clustering.SingleNodeListener
import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext, WorkflowSettings}
import org.apache.texera.amber.core.workflow.{
ExecutionMode,
PortIdentity,
WorkflowContext,
WorkflowSettings
}
import org.apache.texera.amber.engine.architecture.controller._
import org.apache.texera.amber.engine.architecture.sendsemantics.partitionings._
import org.apache.texera.amber.engine.common.virtualidentity.util.CONTROLLER
Expand Down Expand Up @@ -119,7 +124,10 @@ class BatchSizePropagationSpec
"Engine" should "propagate the correct batch size for headerlessCsv workflow" in {
val expectedBatchSize = 1

val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize)
val customWorkflowSettings = WorkflowSettings(
dataTransferBatchSize = expectedBatchSize,
executionMode = ExecutionMode.STREAMING
)

val context =
new WorkflowContext(workflowSettings = customWorkflowSettings)
Expand All @@ -141,7 +149,10 @@ class BatchSizePropagationSpec
"Engine" should "propagate the correct batch size for headerlessCsv->keyword workflow" in {
val expectedBatchSize = 500

val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize)
val customWorkflowSettings = WorkflowSettings(
dataTransferBatchSize = expectedBatchSize,
executionMode = ExecutionMode.STREAMING
)

val context =
new WorkflowContext(workflowSettings = customWorkflowSettings)
Expand Down Expand Up @@ -171,7 +182,10 @@ class BatchSizePropagationSpec
"Engine" should "propagate the correct batch size for csv->keyword->count workflow" in {
val expectedBatchSize = 100

val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize)
val customWorkflowSettings = WorkflowSettings(
dataTransferBatchSize = expectedBatchSize,
executionMode = ExecutionMode.STREAMING
)

val context =
new WorkflowContext(workflowSettings = customWorkflowSettings)
Expand Down Expand Up @@ -209,7 +223,10 @@ class BatchSizePropagationSpec
"Engine" should "propagate the correct batch size for csv->keyword->averageAndGroupBy workflow" in {
val expectedBatchSize = 300

val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize)
val customWorkflowSettings = WorkflowSettings(
dataTransferBatchSize = expectedBatchSize,
executionMode = ExecutionMode.STREAMING
)

val context =
new WorkflowContext(workflowSettings = customWorkflowSettings)
Expand Down Expand Up @@ -250,7 +267,10 @@ class BatchSizePropagationSpec
"Engine" should "propagate the correct batch size for csv->(csv->)->join workflow" in {
val expectedBatchSize = 1

val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize)
val customWorkflowSettings = WorkflowSettings(
dataTransferBatchSize = expectedBatchSize,
executionMode = ExecutionMode.STREAMING
)

val context =
new WorkflowContext(workflowSettings = customWorkflowSettings)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.texera.amber.core.workflow;

public enum ExecutionMode {
STREAMING,
BATCH
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ import scala.jdk.CollectionConverters.{CollectionHasAsScala, IteratorHasAsScala}

case class PhysicalPlan(
operators: Set[PhysicalOp],
links: Set[PhysicalLink]
links: Set[PhysicalLink],
executionMode: ExecutionMode = ExecutionMode.STREAMING
) extends LazyLogging {

@transient private lazy val operatorMap: Map[PhysicalOpIdentity, PhysicalOp] =
Expand Down Expand Up @@ -245,6 +246,7 @@ case class PhysicalPlan(
getOperator(physicalOp.id).isInputLinkDependee(
link
) || getOperator(upstreamPhysicalOpId).isOutputLinkBlocking(link)
|| executionMode == ExecutionMode.BATCH
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ object WorkflowContext {
val DEFAULT_EXECUTION_ID: ExecutionIdentity = ExecutionIdentity(1L)
val DEFAULT_WORKFLOW_ID: WorkflowIdentity = WorkflowIdentity(1L)
val DEFAULT_WORKFLOW_SETTINGS: WorkflowSettings = WorkflowSettings(
dataTransferBatchSize = 400 // TODO: make this configurable
dataTransferBatchSize = 400,
executionMode = ExecutionMode.STREAMING
)
}
class WorkflowContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@ package org.apache.texera.amber.core.workflow

case class WorkflowSettings(
dataTransferBatchSize: Int,
outputPortsNeedingStorage: Set[GlobalPortIdentity] = Set.empty
outputPortsNeedingStorage: Set[GlobalPortIdentity] = Set.empty,
executionMode: ExecutionMode
)
2 changes: 2 additions & 0 deletions frontend/src/app/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ import { AdminSettingsComponent } from "./dashboard/component/admin/settings/adm
import { FormlyRepeatDndComponent } from "./common/formly/repeat-dnd/repeat-dnd.component";
import { NzInputNumberModule } from "ng-zorro-antd/input-number";
import { NzCheckboxModule } from "ng-zorro-antd/checkbox";
import { NzRadioModule } from "ng-zorro-antd/radio";

registerLocaleData(en);

Expand Down Expand Up @@ -344,6 +345,7 @@ registerLocaleData(en);
NzProgressModule,
NzInputNumberModule,
NzCheckboxModule,
NzRadioModule,
],
providers: [
provideNzI18n(en_US),
Expand Down
2 changes: 2 additions & 0 deletions frontend/src/app/common/type/gui-config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
import {ExecutionMode} from "./workflow";

// Please refer to core/config/src/main/resources/gui.conf for the definition of each config item
export interface GuiConfig {
Expand All @@ -33,6 +34,7 @@ export interface GuiConfig {
productionSharedEditingServer: boolean;
pythonLanguageServerPort: string;
defaultDataTransferBatchSize: number;
defaultExecutionMode: ExecutionMode;
workflowEmailNotificationEnabled: boolean;
sharingComputingUnitEnabled: boolean;
operatorConsoleMessageBufferSize: number;
Expand Down
6 changes: 6 additions & 0 deletions frontend/src/app/common/type/workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,14 @@
import { WorkflowMetadata } from "../../dashboard/type/workflow-metadata.interface";
import { CommentBox, OperatorLink, OperatorPredicate, Point } from "../../workspace/types/workflow-common.interface";

export enum ExecutionMode {
STREAMING,
BATCH,
}

export interface WorkflowSettings {
dataTransferBatchSize: number;
executionMode: ExecutionMode;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

//All times in test Workflows are in PST because our local machine's timezone is PST

import { Workflow, WorkflowContent } from "../../common/type/workflow";
import { ExecutionMode, Workflow, WorkflowContent } from "../../common/type/workflow";
import { DashboardEntry } from "../type/dashboard-entry";
import { DashboardProject } from "../type/dashboard-project.interface";

Expand All @@ -39,7 +39,7 @@ export const testWorkflowContent = (operatorTypes: string[]): WorkflowContent =>
commentBoxes: [],
links: [],
operatorPositions: {},
settings: { dataTransferBatchSize: 400 },
settings: { dataTransferBatchSize: 400, executionMode: ExecutionMode.STREAMING },
});

export const testWorkflow1: Workflow = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import { DashboardEntry, UserInfo } from "../../../type/dashboard-entry";
import { UserService } from "../../../../common/service/user/user.service";
import { UntilDestroy, untilDestroyed } from "@ngneat/until-destroy";
import { NotificationService } from "../../../../common/service/notification/notification.service";
import { WorkflowContent } from "../../../../common/type/workflow";
import { ExecutionMode, WorkflowContent } from "../../../../common/type/workflow";
import { NzUploadFile } from "ng-zorro-antd/upload";
import * as JSZip from "jszip";
import { FiltersComponent } from "../filters/filters.component";
Expand Down Expand Up @@ -230,7 +230,10 @@ export class UserWorkflowComponent implements AfterViewInit {
commentBoxes: [],
links: [],
operatorPositions: {},
settings: { dataTransferBatchSize: this.config.env.defaultDataTransferBatchSize },
settings: {
dataTransferBatchSize: this.config.env.defaultDataTransferBatchSize,
executionMode: this.config.env.defaultExecutionMode,
},
};
let localPid = this.pid;
this.workflowPersistService
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,25 @@
specific language governing permissions and limitations
under the License.
-->

<div class="settings-container">
<h4>Workflow Settings</h4>
<form
[formGroup]="settingsForm"
class="form-inline">
<b>Execution Mode:</b>
<nz-radio-group formControlName="executionMode">
<label
nz-radio
[nzValue]="ExecutionMode.BATCH"
>Batch</label
>
<br />
<label
nz-radio
[nzValue]="ExecutionMode.STREAMING"
>Streaming</label
>
</nz-radio-group>
<br />
<div class="form-group">
<label for="dataTransferBatchSize">Data Transfer Batch Size:</label>
<input
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@
padding: 10px;
}

h4 {
margin-bottom: 15px;
}

.form-inline {
display: flex;
flex-direction: column;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import { WorkflowActionService } from "../../../service/workflow-graph/model/wor
import { WorkflowPersistService } from "src/app/common/service/workflow-persist/workflow-persist.service";
import { UserService } from "../../../../common/service/user/user.service";
import { NotificationService } from "src/app/common/service/notification/notification.service";
import { GuiConfigService } from "../../../../common/service/gui-config.service";
import { ExecutionMode } from "../../../../common/type/workflow";

@UntilDestroy()
@Component({
Expand All @@ -33,43 +33,50 @@ import { GuiConfigService } from "../../../../common/service/gui-config.service"
styleUrls: ["./settings.component.scss"],
})
export class SettingsComponent implements OnInit {
settingsForm!: FormGroup;
currentDataTransferBatchSize!: number;
isSaving: boolean = false;
settingsForm: FormGroup;

constructor(
private fb: FormBuilder,
private workflowActionService: WorkflowActionService,
private workflowPersistService: WorkflowPersistService,
private userService: UserService,
private notificationService: NotificationService,
private config: GuiConfigService
) {}

ngOnInit(): void {
this.currentDataTransferBatchSize =
this.workflowActionService.getWorkflowContent().settings.dataTransferBatchSize ||
this.config.env.defaultDataTransferBatchSize;

private notificationService: NotificationService
) {
this.settingsForm = this.fb.group({
dataTransferBatchSize: [this.currentDataTransferBatchSize, [Validators.required, Validators.min(1)]],
dataTransferBatchSize: [
this.workflowActionService.getWorkflowContent().settings.dataTransferBatchSize,
[Validators.required, Validators.min(1)],
],
executionMode: [this.workflowActionService.getWorkflowContent().settings.executionMode],
});
}

this.settingsForm.valueChanges.pipe(untilDestroyed(this)).subscribe(value => {
if (this.settingsForm.valid) {
this.confirmUpdateDataTransferBatchSize(value.dataTransferBatchSize);
}
});
ngOnInit(): void {
this.settingsForm
.get("dataTransferBatchSize")!
.valueChanges.pipe(untilDestroyed(this))
.subscribe((batchSize: number) => {
if (this.settingsForm.get("dataTransferBatchSize")!.valid) {
this.confirmUpdateDataTransferBatchSize(batchSize);
}
});

this.settingsForm
.get("executionMode")!
.valueChanges.pipe(untilDestroyed(this))
.subscribe((mode: ExecutionMode) => {
this.updateExecutionMode(mode);
});

this.workflowActionService
.workflowChanged()
.pipe(untilDestroyed(this))
.subscribe(() => {
this.currentDataTransferBatchSize =
this.workflowActionService.getWorkflowContent().settings.dataTransferBatchSize ||
this.config.env.defaultDataTransferBatchSize;
this.settingsForm.patchValue(
{ dataTransferBatchSize: this.currentDataTransferBatchSize },
{
dataTransferBatchSize: this.workflowActionService.getWorkflowContent().settings.dataTransferBatchSize,
executionMode: this.workflowActionService.getWorkflowContent().settings.executionMode,
},
{ emitEvent: false }
);
});
Expand All @@ -85,13 +92,18 @@ export class SettingsComponent implements OnInit {
}

public persistWorkflow(): void {
this.isSaving = true;
this.workflowPersistService
.persistWorkflow(this.workflowActionService.getWorkflow())
.pipe(untilDestroyed(this))
.subscribe({
error: (e: unknown) => this.notificationService.error((e as Error).message),
})
.add(() => (this.isSaving = false));
});
}

public updateExecutionMode(mode: ExecutionMode) {
this.workflowActionService.updateExecutionMode(mode);
this.persistWorkflow();
}

protected readonly ExecutionMode = ExecutionMode;
}
Loading
Loading