Skip to content

feat: map-reduce analysis across sharded workspaces#95

Open
jamubc wants to merge 1 commit into
mainfrom
feat/map-reduce-analysis
Open

feat: map-reduce analysis across sharded workspaces#95
jamubc wants to merge 1 commit into
mainfrom
feat/map-reduce-analysis

Conversation

@jamubc

@jamubc jamubc commented Jun 5, 2026

Copy link
Copy Markdown
Owner

Summary

Introduces map-reduce-analyze, a tool that enables whole-repository analysis for codebases that exceed a single Gemini context. The workspace is greedy bin-packed into byte-bounded shards; each shard is analyzed in parallel through a bounded concurrency pool; a final reduce call synthesizes the results into one answer.

Key design points

  • sharding.ts and mapReduce.ts are pure utilities with no I/O or Gemini coupling — fully unit-testable with fake async functions (the tests assert the concurrency cap is never exceeded, order is preserved, and a failing shard is captured without aborting the run).
  • All file paths are confined to process.cwd() (the same guard pattern used elsewhere after CVE-2026-0755); the file walk skips symlinks and a default ignore set (node_modules, .git, dist, .gemini-mcp).
  • A hard cap of 50 shards prevents runaway API cost; the error message guides users to raise shardBytes when triggered.
  • Default concurrency (4) is read from GEMINI_MCP_MAP_CONCURRENCY and can be overridden per call (clamped to 1–10).

Verification

  • npx tsc --noEmit — clean
  • node scripts/run-tests.mjs unit integration — 85 pass / 0 fail

Notes

  • Opt-in: a new tool; no existing behavior changes.
  • Per-shard @file references also pass through the existing assertSafeFileReferences guard (defense in depth).

Adds the map-reduce-analyze tool that enables analysis of codebases too
large to fit a single Gemini context window. Files under the target root
are greedy bin-packed into byte-bounded shards; each shard is analysed in
parallel (bounded by a concurrency cap); the per-shard answers are then
synthesised into one coherent response by a final Gemini reduce call.

- src/utils/sharding.ts   — pure planShards() + collectFiles() (path-confined)
- src/utils/mapReduce.ts  — generic bounded pool; preserves order; captures shard errors
- src/tools/map-reduce-analyze.tool.ts — UnifiedTool wiring the above to Gemini
- src/constants.ts        — ENV.GEMINI_MCP_MAP_CONCURRENCY + new messages
- src/tools/index.ts      — registers mapReduceAnalyzeTool
- tests (unit + integration) covering sharding, pool, collectFiles, registry
- docs/concepts/map-reduce.md

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces the map-reduce-analyze tool, which allows analyzing large codebases by sharding the workspace, running Gemini on each shard in parallel, and synthesizing the results. Key feedback includes a critical path traversal vulnerability in collectFiles due to unresolved symlinks, a potential process crash in mapReduce from synchronous errors, an infinite hang risk if concurrency is less than 1, and a design bypass where the tool mutates an external array instead of properly utilizing the reduceFn callback.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment thread src/utils/sharding.ts
Comment on lines +82 to +94
// Confine to cwd.
const normalizedCwd = path.resolve(cwd);
const escapesRoot =
resolvedRoot !== normalizedCwd &&
!resolvedRoot.startsWith(normalizedCwd + path.sep);
if (escapesRoot) {
throw new Error(
`collectFiles: root "${root}" resolves to "${resolvedRoot}" which is outside the working directory "${normalizedCwd}".`,
);
}

const ignoreDirs = opts?.ignoreDirs ?? DEFAULT_IGNORE_DIRS;
const results: Array<{ path: string; bytes: number }> = [];

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-critical critical

The path confinement check currently uses path.resolve(root) to verify if the target directory is within the working directory. However, path.resolve does not resolve symbolic links. If root is a symlink pointing to a directory outside the workspace (e.g., /etc), path.resolve will return a path within the workspace, bypassing the confinement check. When fs.readdirSync is subsequently called, it will follow the symlink and read files from the external directory, leading to an arbitrary file read / path traversal vulnerability.

To prevent this, resolve the real path of both the target root and the working directory using fs.realpathSync before performing the confinement check.

  const cwd = process.cwd();
  let resolvedRoot: string;
  try {
    resolvedRoot = fs.realpathSync(path.resolve(root));
  } catch {
    throw new Error(`collectFiles: root "${root}" does not exist or is inaccessible.`);
  }

  // Confine to cwd.
  const normalizedCwd = fs.realpathSync(cwd);
  const escapesRoot =
    resolvedRoot !== normalizedCwd &&
    !resolvedRoot.startsWith(normalizedCwd + path.sep);
  if (escapesRoot) {
    throw new Error(
      `collectFiles: root "${root}" resolves to "${resolvedRoot}" which is outside the working directory "${normalizedCwd}".`
    );
  }

Comment thread src/utils/mapReduce.ts
Comment on lines +43 to +57
mapFn(shards[i], i)
.then((result) => {
results[i] = result;
})
.catch((err: unknown) => {
errors.push({ index: i, error: err });
})
.finally(() => {
inFlight--;
if (nextIndex < shards.length || inFlight > 0) {
trySchedule();
} else {
resolve();
}
});

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

If mapFn throws an error synchronously (which can happen if it is a standard function returning a promise, or if a synchronous error is thrown before returning the promise), the error will escape the promise chain. If this happens during a subsequent schedule inside the .finally() callback, the synchronous error will be thrown inside an asynchronous microtask, resulting in an unhandled promise rejection or an uncaught exception that can crash the Node.js process.

To make the execution robust against synchronous errors, wrap the call to mapFn in a Promise.resolve().then(...) chain so that any synchronous error is safely converted into a promise rejection and caught by the .catch() block.

        Promise.resolve()
          .then(() => mapFn(shards[i], i))
          .then((result) => {
            results[i] = result;
          })
          .catch((err: unknown) => {
            errors.push({ index: i, error: err });
          })
          .finally(() => {
            inFlight--;
            if (nextIndex < shards.length || inFlight > 0) {
              trySchedule();
            } else {
              resolve();
            }
          });

Comment thread src/utils/mapReduce.ts
export async function mapReduce<T, R, O>(
options: MapReduceOptions<T, R, O>,
): Promise<MapReduceResult<O>> {
const { shards, mapFn, reduceFn, concurrency } = options;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

If concurrency is passed as 0 or a negative number, the scheduling loop condition inFlight < concurrency will immediately evaluate to false, preventing any shards from being scheduled. Furthermore, the promise will hang indefinitely because resolve() is never called.

Add a guard at the beginning of mapReduce to ensure concurrency is at least 1.

Suggested change
const { shards, mapFn, reduceFn, concurrency } = options;
const { shards, mapFn, reduceFn, concurrency } = options;
if (concurrency < 1) {
throw new Error("concurrency must be at least 1");
}

Comment on lines +181 to +218
// ---- Map phase ----
const shardAnswers: string[] = new Array(shards.length).fill('');

const { errors } = await mapReduce<Shard, string, void>({
shards,
concurrency,
mapFn: async (shard, i) => {
onProgress?.(`${STATUS_MESSAGES.MAP_REDUCE_MAPPING} ${i + 1}/${shards.length}`);
const mapPrompt = buildMapPrompt(prompt as string, shard, i, shards.length);
const answer = await executeGeminiCLI(mapPrompt, undefined, false, false, onProgress);
shardAnswers[i] = answer;
return answer;
},
reduceFn: () => {
/* reduce is handled below after collecting shardAnswers */
},
});

// Report per-shard errors without failing the whole run.
if (errors.length > 0) {
for (const e of errors) {
const msg = e.error instanceof Error ? e.error.message : String(e.error);
Logger.warn(`map-reduce-analyze: shard ${e.index + 1} failed: ${msg}`);
shardAnswers[e.index] = `[Shard ${e.index + 1} analysis failed: ${msg}]`;
}
}

// ---- Reduce phase ----
onProgress?.(STATUS_MESSAGES.MAP_REDUCE_REDUCING);
const reducePrompt = buildReducePrompt(prompt as string, shardAnswers);
const synthesis = await executeGeminiCLI(reducePrompt, undefined, false, false, onProgress);

const errorNote =
errors.length > 0
? `\n\n> **Note:** ${errors.length} shard(s) encountered errors during analysis (shards: ${errors.map((e) => e.index + 1).join(', ')}). Results may be incomplete.`
: '';

return `${synthesis}${errorNote}`;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current implementation bypasses the functional design of the mapReduce utility by mutating an external shardAnswers array inside the concurrent mapFn and passing a no-op reduceFn.

Refactoring this to perform the synthesis and error-handling inside reduceFn makes the code cleaner, eliminates side-effects from the concurrent map phase, and properly leverages the map-reduce abstraction.

    // ---- Map & Reduce phases ----
    const { output: synthesis, errors } = await mapReduce<Shard, string, string>({
      shards,
      concurrency,
      mapFn: async (shard, i) => {
        onProgress?.(`${STATUS_MESSAGES.MAP_REDUCE_MAPPING} ${i + 1}/${shards.length}`);
        const mapPrompt = buildMapPrompt(prompt as string, shard, i, shards.length);
        return await executeGeminiCLI(mapPrompt, undefined, false, false, onProgress);
      },
      reduceFn: async (results, errors) => {
        const shardAnswers = results.map((ans, i) => {
          if (ans !== undefined) return ans;
          const err = errors.find((e) => e.index === i);
          const msg = err?.error instanceof Error ? err.error.message : String(err?.error);
          Logger.warn(`map-reduce-analyze: shard ${i + 1} failed: ${msg}`);
          return `[Shard ${i + 1} analysis failed: ${msg}]`;
        });

        onProgress?.(STATUS_MESSAGES.MAP_REDUCE_REDUCING);
        const reducePrompt = buildReducePrompt(prompt as string, shardAnswers);
        return await executeGeminiCLI(reducePrompt, undefined, false, false, onProgress);
      },
    });

    const errorNote =
      errors.length > 0
        ? `\\n\\n> **Note:** ${errors.length} shard(s) encountered errors during analysis (shards: ${errors.map((e) => e.index + 1).join(', ')}). Results may be incomplete.`
        : '';

    return `${synthesis}${errorNote}`;

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants