-
Notifications
You must be signed in to change notification settings - Fork 22
Expand file tree
/
Copy pathworker.mjs
More file actions
100 lines (81 loc) · 3.08 KB
/
worker.mjs
File metadata and controls
100 lines (81 loc) · 3.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import { createAzureManagedWorkerBuilder } from "@microsoft/durabletask-js-azuremanaged";
import { DefaultAzureCredential, ManagedIdentityCredential } from "@azure/identity";
const EMULATOR_ENDPOINT = "http://localhost:8080";
const endpoint = process.env.ENDPOINT ?? EMULATOR_ENDPOINT;
const taskHub = process.env.TASKHUB ?? "default";
const managedIdentityClientId = process.env.AZURE_MANAGED_IDENTITY_CLIENT_ID;
function getWorkerBuilder() {
if (endpoint === EMULATOR_ENDPOINT) {
const connectionString = `Endpoint=${endpoint};Authentication=None;TaskHub=${taskHub}`;
console.log("Using local emulator with no authentication");
return createAzureManagedWorkerBuilder(connectionString);
}
const credential = managedIdentityClientId
? new ManagedIdentityCredential({ clientId: managedIdentityClientId })
: new DefaultAzureCredential();
if (managedIdentityClientId) {
console.log(`Using managed identity with client ID: ${managedIdentityClientId}`);
} else {
console.log("Using DefaultAzureCredential authentication");
}
return createAzureManagedWorkerBuilder(endpoint, taskHub, credential);
}
const sayHello = async (_ctx, name) => {
const safeName = typeof name === "string" && name.length ? name : "User";
const message = `Hello ${safeName}!`;
console.log(`sayHello -> ${message}`);
return message;
};
const processGreeting = async (_ctx, greeting) => {
const value = typeof greeting === "string" ? greeting : "Hello User!";
const message = `${value} How are you today?`;
console.log(`processGreeting -> ${message}`);
return message;
};
const finalizeResponse = async (_ctx, response) => {
const value = typeof response === "string" ? response : "Hello User! How are you today?";
const message = `${value} I hope you're doing well!`;
console.log(`finalizeResponse -> ${message}`);
return message;
};
const functionChainingOrchestrator = async function* functionChainingOrchestrator(ctx, name) {
const greeting = yield ctx.callActivity(sayHello, name);
const processedGreeting = yield ctx.callActivity(processGreeting, greeting);
const finalResponse = yield ctx.callActivity(finalizeResponse, processedGreeting);
return finalResponse;
};
let worker;
async function stopWorker(exitCode = 0) {
if (worker) {
console.log("Stopping worker...");
await worker.stop();
}
process.exit(exitCode);
}
process.on("SIGINT", async () => {
await stopWorker(0);
});
process.on("SIGTERM", async () => {
await stopWorker(0);
});
(async () => {
console.log("Starting Function Chaining worker...");
console.log(`Endpoint: ${endpoint}`);
console.log(`Task hub: ${taskHub}`);
worker = getWorkerBuilder()
.addOrchestrator(functionChainingOrchestrator)
.addActivity(sayHello)
.addActivity(processGreeting)
.addActivity(finalizeResponse)
.build();
try {
await worker.start();
console.log("Worker started and waiting for orchestrations...");
setInterval(() => {
// Keep process running for worker mode
}, 60_000);
} catch (error) {
console.error("Worker failed to start", error);
await stopWorker(1);
}
})();