Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow a workflow to support multiple adaptor versions #610

Merged
merged 12 commits into from
Feb 23, 2024
38 changes: 37 additions & 1 deletion integration-tests/worker/test/runs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ const humanMb = (sizeInBytes: number) => Math.round(sizeInBytes / 1024 / 1024);
const run = async (t, attempt) => {
return new Promise<any>(async (done, reject) => {
lightning.on('step:complete', ({ payload }) => {
t.is(payload.reason, 'success');

// TODO friendlier job names for this would be nice (rather than run ids)
t.log(
`run ${payload.step_id} done in ${payload.duration / 1000}s [${humanMb(
Expand Down Expand Up @@ -192,7 +194,7 @@ test.serial('run parallel jobs', async (t) => {
// });
});

test('run a http adaptor job', async (t) => {
test.serial('run a http adaptor job', async (t) => {
const job = createJob({
adaptor: '@openfn/[email protected]',
body: `get("https://jsonplaceholder.typicode.com/todos/1");
Expand All @@ -212,3 +214,37 @@ test('run a http adaptor job', async (t) => {
completed: false,
});
});

test.serial.only('use different versions of the same adaptor', async (t) => {
// http@5 exported an axios global - so run this job and validate that the global is there
const job1 = createJob({
body: `import { axios } from "@openfn/language-http";
fn((s) => {
if (!axios) {
throw new Error('AXIOS NOT FOUND')
}
return s;
})`,
adaptor: '@openfn/[email protected]',
});

// http@6 no longer exports axios - so throw an error if we see it
const job2 = createJob({
body: `import { axios } from "@openfn/language-http";
fn((s) => {
if (axios) {
throw new Error('AXIOS FOUND')
}
return s;
})`,
adaptor: '@openfn/[email protected]',
});

// Just for fun, run each job a couple of times to make sure that there's no wierd caching or ordering anything
const steps = [job1, job2, job1, job2];
const attempt = createRun([], steps, []);

const result = await run(t, attempt);
t.log(result);
t.falsy(result.errors);
});
12 changes: 6 additions & 6 deletions packages/engine-multi/src/api/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@ import {
} from './lifecycle';
import preloadCredentials from './preload-credentials';
import { ExecutionError } from '../errors';
import type { RunOptions } from '../worker/thread/run';

const execute = async (context: ExecutionContext) => {
const { state, callWorker, logger, options } = context;
try {
// TODO catch and "throw" nice clean autoinstall errors
const adaptorPaths = await autoinstall(context);
await autoinstall(context);

// TODO catch and "throw" nice clean compile errors
try {
await compile(context);
} catch (e: any) {
Expand All @@ -49,10 +48,10 @@ const execute = async (context: ExecutionContext) => {
const whitelist = options.whitelist?.map((w) => w.toString());

const runOptions = {
adaptorPaths,
whitelist,
statePropsToRemove: options.statePropsToRemove,
};
whitelist,
repoDir: options.repoDir,
} as RunOptions;

const workerOptions = {
memoryLimitMb: options.memoryLimitMb,
Expand Down Expand Up @@ -116,6 +115,7 @@ const execute = async (context: ExecutionContext) => {
error(context, { workflowId: state.plan.id, error: evt.error });
},
};

return callWorker(
'run',
[state.plan, state.input || {}, runOptions || {}],
Expand Down
9 changes: 4 additions & 5 deletions packages/engine-multi/src/worker/thread/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import { execute, createLoggers } from './helpers';
import serializeError from '../../util/serialize-error';
import { JobErrorPayload } from '../../events';

type RunOptions = {
adaptorPaths: Record<string, { path: string }>;
export type RunOptions = {
repoDir: string;
whitelist?: RegExp[];
sanitize: SanitizePolicies;
statePropsToRemove?: string[];
Expand All @@ -26,8 +26,7 @@ const eventMap = {

register({
run: (plan: ExecutionPlan, input: State, runOptions: RunOptions) => {
const { adaptorPaths, whitelist, sanitize, statePropsToRemove } =
runOptions;
const { repoDir, whitelist, sanitize, statePropsToRemove } = runOptions;
const { logger, jobLogger, adaptorLogger } = createLoggers(
plan.id!,
sanitize,
Expand All @@ -52,7 +51,7 @@ register({
logger,
jobLogger,
linker: {
modules: adaptorPaths,
repo: repoDir,
whitelist,
cacheKey: plan.id,
},
Expand Down
7 changes: 7 additions & 0 deletions packages/runtime/src/execute/compile-plan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import type {
import compileFunction from '../modules/compile-function';
import { conditionContext, Context } from './context';
import { ExecutionPlan, Job, StepEdge, Workflow } from '@openfn/lexicon';
import { getNameAndVersion } from '../modules/repo';

const compileEdges = (
from: string,
Expand Down Expand Up @@ -127,6 +128,12 @@ export default (plan: ExecutionPlan) => {
'name',
]);

if ((step as Job).adaptor) {
const job = step as Job;
const { name, version } = getNameAndVersion(job.adaptor!);
newStep.linker = { [name]: { version: version! } };
}

if (step.next) {
trapErrors(() => {
newStep.next = compileEdges(stepId, step.next!, context);
Expand Down
15 changes: 12 additions & 3 deletions packages/runtime/src/execute/expression.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
assertSecurityKill,
} from '../errors';
import type { JobModule, ExecutionContext } from '../types';
import { ModuleInfoMap } from '../modules/linker';

export type ExecutionErrorWrapper = {
state: any;
Expand All @@ -28,7 +29,10 @@ export type ExecutionErrorWrapper = {
export default (
ctx: ExecutionContext,
expression: string | Operation[],
input: State
input: State,
// allow custom linker options to be passed for this step
// this lets us use multiple versions of the same adaptor in a workflow
moduleOverrides?: ModuleInfoMap
) =>
new Promise(async (resolve, reject) => {
let duration = Date.now();
Expand All @@ -42,7 +46,8 @@ export default (
const { operations, execute } = await prepareJob(
expression,
context,
opts
opts,
moduleOverrides
);
// Create the main reducer function
const reducer = (execute || defaultExecute)(
Expand Down Expand Up @@ -125,11 +130,15 @@ export const wrapOperation = (
const prepareJob = async (
expression: string | Operation[],
context: Context,
opts: Options = {}
opts: Options = {},
moduleOverrides: ModuleInfoMap = {}
): Promise<JobModule> => {
if (typeof expression === 'string') {
const exports = await loadModule(expression, {
...opts.linker,
// allow module paths and versions to be overriden from the defaults
// TODO I think this is too harsh and path information will be lost
modules: Object.assign({}, opts.linker?.modules, moduleOverrides),
context,
log: opts.logger,
});
Expand Down
6 changes: 5 additions & 1 deletion packages/runtime/src/execute/step.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,17 @@ const executeStep = async (

const timerId = `step-${jobId}`;
logger.timer(timerId);

// TODO can we include the adaptor version here?
// How would we get it?
logger.info(`Starting step ${jobName}`);

const startTime = Date.now();
try {
// TODO include the upstream job?
notify(NOTIFY_JOB_START, { jobId });
result = await executeExpression(ctx, job.expression, state);

result = await executeExpression(ctx, job.expression, state, step.linker);
} catch (e: any) {
didError = true;
if (e.hasOwnProperty('error') && e.hasOwnProperty('state')) {
Expand Down
3 changes: 2 additions & 1 deletion packages/runtime/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import run from './runtime';
import run, { Options } from './runtime';
export default run;
export type { Options };

import type { ModuleInfo, ModuleInfoMap } from './modules/linker';
export type { ModuleInfo, ModuleInfoMap };
Expand Down
1 change: 0 additions & 1 deletion packages/runtime/src/modules/module-loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ export default async (
opts: Options = {}
): Promise<ModuleExports> => {
validate(src);

const context = opts.context || vm.createContext();
const linker = opts.linker || mainLinker;

Expand Down
5 changes: 5 additions & 0 deletions packages/runtime/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
NOTIFY_INIT_START,
NOTIFY_STATE_LOAD,
} from './events';
import { ModuleInfoMap } from './modules/linker';

export type CompiledEdge =
| boolean
Expand All @@ -23,6 +24,10 @@ export type CompiledStep = Omit<Step, 'next'> & {
id: StepId;
next?: Record<StepId, CompiledEdge>;

// custom overrides for the linker
// This lets us set version or even path per job
linker?: ModuleInfoMap;

[other: string]: any;
};

Expand Down
25 changes: 25 additions & 0 deletions packages/runtime/test/execute/compile-plan.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,31 @@ test('should reset job ids for each call', (t) => {
t.is(second.workflow.steps['job-1'].expression, 'x');
});

test('should write adaptor versions', (t) => {
const plan = {
workflow: {
steps: [
{
id: 'x',
expression: '.',
adaptor: '[email protected]',
},
{
id: 'y',
expression: '.',
adaptor: '[email protected]',
},
],
},
options: {},
};

const { workflow } = compilePlan(plan);
const { x, y } = workflow.steps;
t.deepEqual(x.linker, { x: { version: '1.0' } });
t.deepEqual(y.linker, { y: { version: '1.0' } });
});

test('should set the start to steps[0]', (t) => {
const plan: ExecutionPlan = {
workflow: {
Expand Down
2 changes: 1 addition & 1 deletion packages/runtime/test/modules/module-loader.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ test('load a module with an import', async (t) => {
t.assert(m.default === 20);
});

test('load a module with aribtrary exports', async (t) => {
test('load a module with aribitrary exports', async (t) => {
const src = 'export const x = 10; export const y = 20;';

const m = await loadModule(src);
Expand Down
100 changes: 100 additions & 0 deletions packages/runtime/test/runtime.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,106 @@ test('run from an adaptor', async (t) => {
t.deepEqual(result, { data: 22 });
});

test('run a workflow using the repo and load the default version', async (t) => {
const expression = `
import result from 'ultimate-answer';
export default [() => result];
`;
const plan = {
workflow: {
steps: [
{
id: 'a',
expression,
},
],
},
};

const result: any = await run(
plan,
{},
{
linker: {
repo: path.resolve('test/__repo__'),
},
}
);

t.deepEqual(result, 43);
});

test('run a workflow using the repo using a specific version', async (t) => {
const expression = `
import result from 'ultimate-answer';
export default [() => result];
`;
const plan = {
workflow: {
steps: [
{
id: 'a',
expression,
},
],
},
};

const result: any = await run(
plan,
{},
{
linker: {
repo: path.resolve('test/__repo__'),
modules: {
'ultimate-answer': { version: '1.0.0' },
},
},
}
);

t.deepEqual(result, 42);
});

test('run a workflow using the repo with multiple versions of the same adaptor', async (t) => {
const plan = {
workflow: {
steps: [
{
id: 'a',
expression: `import result from 'ultimate-answer';
export default [(s) => { s.data.a = result; return s;}];`,
adaptor: '[email protected]',
next: { b: true },
},
{
id: 'b',
expression: `import result from 'ultimate-answer';
export default [(s) => { s.data.b = result; return s;}];`,
adaptor: '[email protected]',
},
],
},
};

const result: any = await run(
plan,
{},
{
linker: {
repo: path.resolve('test/__repo__'),
},
}
);

t.deepEqual(result, {
data: {
a: 42,
b: 43,
},
});
});

// https://github.com/OpenFn/kit/issues/520
test('run from an adaptor with error', async (t) => {
const expression = `
Expand Down