Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -466,5 +466,5 @@ function helpText() {
` lobster 'exec --json "echo [1,2,3]" | json'\n` +
` lobster run --mode tool 'exec --json "echo [1]" | approve --prompt "ok?"'\n\n` +
`Commands:\n` +
` exec, head, json, pick, table, where, approve, clawd.invoke, state.get, state.set, diff.last, commands.list, workflows.list, workflows.run\n`;
` each, exec, head, json, map, pick, table, template, where, approve, clawd.invoke, state.get, state.set, diff.last, commands.list, workflows.list, workflows.run\n`;
}
2 changes: 2 additions & 0 deletions src/commands/registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import { commandsListCommand } from "./commands_list.js";
import { gogGmailSearchCommand } from "./stdlib/gog_gmail_search.js";
import { gogGmailSendCommand } from "./stdlib/gog_gmail_send.js";
import { emailTriageCommand } from "./stdlib/email_triage.js";
import { eachCommand } from "./stdlib/each.js";

export function createDefaultRegistry() {
const commands = new Map();
Expand Down Expand Up @@ -48,6 +49,7 @@ export function createDefaultRegistry() {
gogGmailSearchCommand,
gogGmailSendCommand,
emailTriageCommand,
eachCommand,
]) {
commands.set(cmd.name, cmd);
}
Expand Down
85 changes: 85 additions & 0 deletions src/commands/stdlib/each.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import { runPipeline } from '../../runtime.js';
import { renderTemplate } from './template_utils.js';

function interpolateStages(stages: any[], item: any): any[] {
return stages.map((stage) => {
const args = interpolateArgs(stage.args, item);
return { ...stage, args };
});
}

function interpolateArgs(args: any, item: any): any {
const out: any = {};
for (const [key, value] of Object.entries(args)) {
if (key === '_body') {
out._body = interpolateStages(value as any[], item);
} else if (key === '_bodyRaw') {
out._bodyRaw = value; // raw text, not a template
} else if (key === '_') {
out._ = (value as any[]).map((v) =>
typeof v === 'string' ? renderTemplate(v, item) : v,
);
} else if (typeof value === 'string') {
out[key] = renderTemplate(value, item);
} else {
out[key] = value;
}
}
return out;
}

export const eachCommand = {
name: 'each',
meta: {
description: 'Run a sub-pipeline for each input item',
argsSchema: {
type: 'object',
properties: {
_body: { description: 'Parsed sub-pipeline stages (injected by parser)' },
},
required: [],
},
sideEffects: ['delegates_to_sub_pipeline'],
},
help() {
return (
`each — run a sub-pipeline for each input item\n\n` +
`Usage:\n` +
` ... | each { template --text "hello {{.name}}" }\n` +
` ... | each { map --unwrap url | exec curl "{{.}}" }\n\n` +
`Notes:\n` +
` - Each item is fed into the sub-pipeline as a single-element stream.\n` +
` - {{.field}} interpolation is applied to all string args per item.\n` +
` - Template patterns ({{...}}) in item field values will be interpolated.\n` +
` - Errors in any iteration propagate immediately (fail-fast).\n` +
` - Items are processed sequentially.\n`
);
},
async run({ input, args, ctx }: any) {
const bodyStages = args._body;
if (!Array.isArray(bodyStages) || bodyStages.length === 0) {
throw new Error('each requires a { sub-pipeline } body');
}

return {
output: (async function* () {
for await (const item of input) {
const interpolated = interpolateStages(bodyStages, item);
const result = await runPipeline({
pipeline: interpolated,
registry: ctx.registry,
stdin: ctx.stdin,
stdout: ctx.stdout,
stderr: ctx.stderr,
env: ctx.env,
mode: ctx.mode,
input: (async function* () { yield item; })(),
});
for (const out of result.items) {
yield out;
}
}
})(),
};
},
};
3 changes: 0 additions & 3 deletions src/commands/stdlib/llm_task_invoke.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,6 @@ type CacheEntry = {
storedAt: string;
};

type Transport = 'clawd';

export const llmTaskInvokeCommand = {
name: 'llm_task.invoke',
meta: {
Expand Down Expand Up @@ -198,7 +196,6 @@ export const llmTaskInvokeCommand = {
const env = ctx.env ?? process.env;

const clawdUrl = String(env.CLAWD_URL ?? '').trim();
const transport: Transport = 'clawd';
if (!clawdUrl) {
throw new Error('llm_task.invoke requires CLAWD_URL (run via Clawdbot gateway)');
}
Expand Down
21 changes: 1 addition & 20 deletions src/commands/stdlib/map.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,4 @@
function getByPath(obj: any, path: string): any {
if (path === '.' || path === 'this') return obj;
const parts = path.split('.').filter(Boolean);
let cur: any = obj;
for (const p of parts) {
if (cur == null) return undefined;
cur = cur[p];
}
return cur;
}

function renderTemplate(tpl: string, ctx: any): string {
return tpl.replace(/\{\{\s*([^}]+?)\s*\}\}/g, (_m, expr) => {
const key = String(expr ?? '').trim();
const val = getByPath(ctx, key);
if (val === undefined || val === null) return '';
if (typeof val === 'string') return val;
return JSON.stringify(val);
});
}
import { renderTemplate } from './template_utils.js';

function parseAssignments(tokens: any[]): Array<{ key: string; value: string }> {
const out: Array<{ key: string; value: string }> = [];
Expand Down
22 changes: 1 addition & 21 deletions src/commands/stdlib/template.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,5 @@
import fs from 'node:fs/promises';

function getByPath(obj: any, path: string): any {
if (path === '.' || path === 'this') return obj;
const parts = path.split('.').filter(Boolean);
let cur: any = obj;
for (const p of parts) {
if (cur == null) return undefined;
cur = cur[p];
}
return cur;
}

function renderTemplate(tpl: string, ctx: any): string {
return tpl.replace(/\{\{\s*([^}]+?)\s*\}\}/g, (_m, expr) => {
const key = String(expr ?? '').trim();
const val = getByPath(ctx, key);
if (val === undefined || val === null) return '';
if (typeof val === 'string') return val;
return JSON.stringify(val);
});
}
import { renderTemplate } from './template_utils.js';

export const templateCommand = {
name: 'template',
Expand Down
21 changes: 21 additions & 0 deletions src/commands/stdlib/template_utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
export function getByPath(obj: any, path: string): any {
if (path === '.' || path === 'this') return obj;
const parts = path.split('.').filter(Boolean);
let cur: any = obj;
for (const p of parts) {
if (cur == null || typeof cur !== 'object') return undefined;
if (!Object.hasOwn(cur, p)) return undefined;
cur = cur[p];
}
return cur;
}

export function renderTemplate(tpl: string, ctx: any): string {
return tpl.replace(/\{\{([^}]+)\}\}/g, (_m, expr) => {
const key = String(expr ?? '').trim();
const val = getByPath(ctx, key);
if (val === undefined || val === null) return '';
if (typeof val === 'string') return val;
return JSON.stringify(val);
});
}
114 changes: 107 additions & 7 deletions src/parser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ function splitPipes(input) {
const parts = [];
let current = '';
let quote = null;
let braceDepth = 0;

for (let i = 0; i < input.length; i++) {
const ch = input[i];
Expand All @@ -32,7 +33,24 @@ function splitPipes(input) {
continue;
}

if (ch === '|') {
if (ch === '{') {
braceDepth++;
current += ch;
continue;
}

if (ch === '}') {
if (braceDepth > 0) {
braceDepth--;
current += ch;
continue;
}
// braceDepth === 0: treat as literal character (backward compat)
current += ch;
continue;
}

if (ch === '|' && braceDepth === 0) {
parts.push(current.trim());
current = '';
continue;
Expand Down Expand Up @@ -95,7 +113,7 @@ function tokenizeCommand(input) {
}

function parseArgs(tokens) {
const args = { _: [] };
const args: Record<string, any> = { _: [] };

for (let i = 0; i < tokens.length; i++) {
const tok = tokens[i];
Expand Down Expand Up @@ -126,15 +144,97 @@ function parseArgs(tokens) {
return args;
}

export function parsePipeline(input) {
/** Find the first lone `{` that is not inside quotes or a `{{...}}` template. Returns index or -1. */
function findUnquotedBrace(text) {
let quote = null;
for (let i = 0; i < text.length; i++) {
const ch = text[i];
if (quote) {
if (ch === '\\' && text[i + 1]) { i++; continue; }
if (ch === quote) quote = null;
continue;
}
if (ch === '"' || ch === "'") { quote = ch; continue; }
if (ch === '{') {
if (text[i + 1] === '{') {
// Skip past the {{ ... }} template expression
const close = text.indexOf('}}', i + 2);
i = close !== -1 ? close + 1 : i + 1;
continue;
}
return i;
}
}
return -1;
}

/** Find the `}` matching the `{` at openPos, respecting quotes, nesting, and `{{...}}` templates. */
function findMatchingBrace(text, openPos) {
let depth = 1;
let quote = null;
for (let i = openPos + 1; i < text.length; i++) {
const ch = text[i];
if (quote) {
if (ch === '\\' && text[i + 1]) { i++; continue; }
if (ch === quote) quote = null;
continue;
}
if (ch === '"' || ch === "'") { quote = ch; continue; }
if (ch === '{') {
if (text[i + 1] === '{') {
const close = text.indexOf('}}', i + 2);
i = close !== -1 ? close + 1 : i + 1;
continue;
}
depth++;
continue;
}
if (ch === '}') { depth--; if (depth === 0) return i; }
}
return -1;
}

const MAX_PIPELINE_DEPTH = 50;

export function parsePipeline(input, _depth = 0) {
if (_depth > MAX_PIPELINE_DEPTH) {
throw new Error(`Pipeline nesting exceeds maximum depth of ${MAX_PIPELINE_DEPTH}`);
}
const stages = splitPipes(input);
if (stages.length === 0) throw new Error('Empty pipeline');

return stages.map((stage) => {
const tokens = tokenizeCommand(stage);
if (tokens.length === 0) throw new Error('Empty command stage');
const name = tokens[0];
const args = parseArgs(tokens.slice(1));
const braceStart = findUnquotedBrace(stage);
if (braceStart === -1) {
// No brace syntax -- normal parse
const tokens = tokenizeCommand(stage);
if (tokens.length === 0) throw new Error('Empty command stage');
const name = tokens[0];
const args = parseArgs(tokens.slice(1));
return { name, args, raw: stage };
}

// Brace syntax: extract prefix, body, suffix
const prefix = stage.slice(0, braceStart);
const braceEnd = findMatchingBrace(stage, braceStart);
if (braceEnd === -1) throw new Error('Unclosed brace');

const bodyRaw = stage.slice(braceStart + 1, braceEnd).trim();
if (!bodyRaw) throw new Error('Empty body in { } block');

const suffix = stage.slice(braceEnd + 1).trim();
const preview = suffix.length > 50 ? suffix.slice(0, 50) + '...' : suffix;
if (suffix) throw new Error(`Unexpected content after closing brace: ${preview}`);

const prefixTokens = tokenizeCommand(prefix);
if (prefixTokens.length === 0) throw new Error('Empty command before { }');
const name = prefixTokens[0];
const args = parseArgs(prefixTokens.slice(1));

// Recursively parse the sub-pipeline body
args._body = parsePipeline(bodyRaw, _depth + 1);
args._bodyRaw = bodyRaw;

return { name, args, raw: stage };
});
}
Loading