Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Next Release #613

Merged
merged 3 commits into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions integration-tests/worker/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
# @openfn/integration-tests-worker

## 1.0.36

### Patch Changes

- Updated dependencies [4f5f1dd]
- Updated dependencies [58e0d11]
- Updated dependencies [58e0d11]
- @openfn/[email protected]
- @openfn/[email protected]
- @openfn/[email protected]

## 1.0.35

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/worker/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@openfn/integration-tests-worker",
"private": true,
"version": "1.0.35",
"version": "1.0.36",
"description": "Lightning WOrker integration tests",
"author": "Open Function Group <[email protected]>",
"license": "ISC",
Expand Down
38 changes: 37 additions & 1 deletion integration-tests/worker/test/runs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ const humanMb = (sizeInBytes: number) => Math.round(sizeInBytes / 1024 / 1024);
const run = async (t, attempt) => {
return new Promise<any>(async (done, reject) => {
lightning.on('step:complete', ({ payload }) => {
t.is(payload.reason, 'success');

// TODO friendlier job names for this would be nice (rather than run ids)
t.log(
`run ${payload.step_id} done in ${payload.duration / 1000}s [${humanMb(
Expand Down Expand Up @@ -192,7 +194,7 @@ test.serial('run parallel jobs', async (t) => {
// });
});

test('run a http adaptor job', async (t) => {
test.serial('run a http adaptor job', async (t) => {
const job = createJob({
adaptor: '@openfn/[email protected]',
body: `get("https://jsonplaceholder.typicode.com/todos/1");
Expand All @@ -212,3 +214,37 @@ test('run a http adaptor job', async (t) => {
completed: false,
});
});

test.serial('use different versions of the same adaptor', async (t) => {
// http@5 exported an axios global - so run this job and validate that the global is there
const job1 = createJob({
body: `import { axios } from "@openfn/language-http";
fn((s) => {
if (!axios) {
throw new Error('AXIOS NOT FOUND')
}
return s;
})`,
adaptor: '@openfn/[email protected]',
});

// http@6 no longer exports axios - so throw an error if we see it
const job2 = createJob({
body: `import { axios } from "@openfn/language-http";
fn((s) => {
if (axios) {
throw new Error('AXIOS FOUND')
}
return s;
})`,
adaptor: '@openfn/[email protected]',
});

// Just for fun, run each job a couple of times to make sure that there's no wierd caching or ordering anything
const steps = [job1, job2, job1, job2];
const attempt = createRun([], steps, []);

const result = await run(t, attempt);
t.log(result);
t.falsy(result.errors);
});
9 changes: 9 additions & 0 deletions packages/cli/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# @openfn/cli

## 1.1.0

### Patch Changes

Allow multiple version of the same adaptor to run in the same workflow

- Updated dependencies [4f5f1dd]
- @openfn/[email protected]

## 1.0.0

### Major Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/cli",
"version": "1.0.0",
"version": "1.1.0",
"description": "CLI devtools for the openfn toolchain.",
"engines": {
"node": ">=18",
Expand Down
12 changes: 12 additions & 0 deletions packages/engine-multi/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
# engine-multi

## 1.1.0

### Minor Changes

- 4f5f1dd: Support workflows with different versions of the same adaptor

### Patch Changes

- 58e0d11: Record adaptor versions as an array
- Updated dependencies [4f5f1dd]
- @openfn/[email protected]

## 1.0.0

### Major Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/engine-multi/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/engine-multi",
"version": "1.0.0",
"version": "1.1.0",
"description": "Multi-process runtime engine",
"main": "dist/index.js",
"type": "module",
Expand Down
27 changes: 20 additions & 7 deletions packages/engine-multi/src/api/autoinstall.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,6 @@ const autoinstall = async (context: ExecutionContext): Promise<ModulePaths> => {
}

if (!skipRepoValidation && !didValidateRepo) {
// TODO what if this throws?
// Whole server probably needs to crash, so throwing is probably appropriate
// TODO do we need to do it on EVERY call? Can we not cache it?
await ensureRepo(repoDir, logger);
didValidateRepo = true;
Expand All @@ -137,12 +135,15 @@ const autoinstall = async (context: ExecutionContext): Promise<ModulePaths> => {

const v = version || 'unknown';

// Write the adaptor version to the context
// This is a reasonably accurate, but not totally bulletproof, report
// @ts-ignore
context.versions[name] = v;
// Write the adaptor version to the context for reporting later
if (!context.versions[name]) {
context.versions[name] = [];
}
if (!context.versions[name].includes(v)) {
(context.versions[name] as string[]).push(v);
}

paths[name] = {
paths[a] = {
path: `${repoDir}/node_modules/${alias}`,
version: v,
};
Expand All @@ -152,6 +153,18 @@ const autoinstall = async (context: ExecutionContext): Promise<ModulePaths> => {
}
}

// Write linker arguments back to the plan
for (const step of plan.workflow.steps) {
const job = step as unknown as Job;
if (paths[job.adaptor!]) {
const { name } = getNameAndVersion(job.adaptor!);
// @ts-ignore
job.linker = {
[name]: paths[job.adaptor!],
};
}
}

if (adaptorsToLoad.length) {
// Add this to the queue
const p = enqueue(adaptorsToLoad);
Expand Down
12 changes: 6 additions & 6 deletions packages/engine-multi/src/api/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@ import {
} from './lifecycle';
import preloadCredentials from './preload-credentials';
import { ExecutionError } from '../errors';
import type { RunOptions } from '../worker/thread/run';

const execute = async (context: ExecutionContext) => {
const { state, callWorker, logger, options } = context;
try {
// TODO catch and "throw" nice clean autoinstall errors
const adaptorPaths = await autoinstall(context);
await autoinstall(context);

// TODO catch and "throw" nice clean compile errors
try {
await compile(context);
} catch (e: any) {
Expand All @@ -49,10 +48,9 @@ const execute = async (context: ExecutionContext) => {
const whitelist = options.whitelist?.map((w) => w.toString());

const runOptions = {
adaptorPaths,
whitelist,
statePropsToRemove: options.statePropsToRemove,
};
whitelist,
} as RunOptions;

const workerOptions = {
memoryLimitMb: options.memoryLimitMb,
Expand Down Expand Up @@ -109,13 +107,15 @@ const execute = async (context: ExecutionContext) => {
jobError(context, evt);
},
[workerEvents.LOG]: (evt: workerEvents.LogEvent) => {
// console.log(evt.log.name, evt.log.message);
log(context, evt);
},
// TODO this is also untested
[workerEvents.ERROR]: (evt: workerEvents.ErrorEvent) => {
error(context, { workflowId: state.plan.id, error: evt.error });
},
};

return callWorker(
'run',
[state.plan, state.input || {}, runOptions || {}],
Expand Down
2 changes: 1 addition & 1 deletion packages/engine-multi/src/api/lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ export const workflowStart = (
// forward the event on to any external listeners
context.emit(externalEvents.WORKFLOW_START, {
threadId,
versions: context.versions,
});
};

Expand Down Expand Up @@ -81,7 +82,6 @@ export const jobStart = (
context.emit(externalEvents.JOB_START, {
jobId,
threadId,
versions: context.versions,
});
};

Expand Down
2 changes: 1 addition & 1 deletion packages/engine-multi/src/classes/ExecutionContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import type {
import type { ExternalEvents, EventMap } from '../events';

/**
* The ExeuctionContext class wraps an event emitter with some useful context
* The ExecutionContext class wraps an event emitter with some useful context
* and automatically appends the workflow id to each emitted events
*
* Each running workflow has its own context object
Expand Down
5 changes: 3 additions & 2 deletions packages/engine-multi/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ interface ExternalEvent {
workflowId: string;
}

export interface WorkflowStartPayload extends ExternalEvent {}
export interface WorkflowStartPayload extends ExternalEvent {
versions: Versions;
}

export interface WorkflowCompletePayload extends ExternalEvent {
state: any;
Expand All @@ -64,7 +66,6 @@ export interface WorkflowErrorPayload extends ExternalEvent {

export interface JobStartPayload extends ExternalEvent {
jobId: string;
versions: Versions;
}

export interface JobCompletePayload extends ExternalEvent {
Expand Down
3 changes: 2 additions & 1 deletion packages/engine-multi/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,5 +83,6 @@ export type Versions = {
node: string;
engine: string;
compiler: string;
[adaptor: string]: string;

[adaptor: string]: string | string[];
};
9 changes: 4 additions & 5 deletions packages/engine-multi/src/worker/thread/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import { execute, createLoggers } from './helpers';
import serializeError from '../../util/serialize-error';
import { JobErrorPayload } from '../../events';

type RunOptions = {
adaptorPaths: Record<string, { path: string }>;
export type RunOptions = {
repoDir: string;
whitelist?: RegExp[];
sanitize: SanitizePolicies;
statePropsToRemove?: string[];
Expand All @@ -26,8 +26,7 @@ const eventMap = {

register({
run: (plan: ExecutionPlan, input: State, runOptions: RunOptions) => {
const { adaptorPaths, whitelist, sanitize, statePropsToRemove } =
runOptions;
const { repoDir, whitelist, sanitize, statePropsToRemove } = runOptions;
const { logger, jobLogger, adaptorLogger } = createLoggers(
plan.id!,
sanitize,
Expand All @@ -52,7 +51,7 @@ register({
logger,
jobLogger,
linker: {
modules: adaptorPaths,
repo: repoDir,
whitelist,
cacheKey: plan.id,
},
Expand Down
Loading