Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 57 additions & 1 deletion broker-core/agent-messaging.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ export interface DirectAgentDispatchInput {
target: string;
body: string;
metadata?: Record<string, unknown>;
trustedBrokerAgentId?: string;
}

export interface BroadcastAgentDispatchInput {
Expand Down Expand Up @@ -245,13 +246,57 @@ export function resolveDirectAgentTarget(agents: AgentInfo[], target: string): A
);
}

function isDescendantOf(agents: AgentInfo[], descendantId: string, ancestorId: string): boolean {
let current = agents.find((agent) => agent.id === descendantId) ?? null;
const seen = new Set<string>();
while (current?.parentAgentId) {
if (current.parentAgentId === ancestorId) return true;
if (seen.has(current.parentAgentId)) return false;
seen.add(current.parentAgentId);
current = agents.find((agent) => agent.id === current?.parentAgentId) ?? null;
}
return false;
}

function canDispatchDirectAgentMessage(
agents: AgentInfo[],
sender: AgentInfo | null,
target: AgentInfo,
metadata?: Record<string, unknown>,
): boolean {
if (!sender) return false;
if (
!target.parentAgentId &&
target.supervisionState !== "supervised" &&
target.supervisionState !== "orphaned" &&
target.supervisionState !== "stopping"
) {
return true;
}
if (sender.id === metadata?.trustedBrokerAgentId) {
return metadata.emergency === true || metadata.targetScope === "subtree";
}
if (target.parentAgentId === sender.id) return true;
if (sender.parentAgentId === target.id) return true;
if (isDescendantOf(agents, target.id, sender.id)) return true;
if (isDescendantOf(agents, sender.id, target.id)) return true;
return false;
}

export function resolveBroadcastTargets(
agents: AgentInfo[],
senderAgentId: string,
channel: string,
): AgentInfo[] {
return agents
.filter((agent) => agent.id !== senderAgentId)
.filter(
(agent) =>
!agent.parentAgentId &&
agent.supervisionState !== "supervised" &&
agent.supervisionState !== "orphaned" &&
agent.supervisionState !== "stopping",
)
.filter((agent) => agentSubscribesToBroadcastChannel(agent, channel))
.sort((left, right) => left.name.localeCompare(right.name));
}
Expand All @@ -261,10 +306,21 @@ export function dispatchDirectAgentMessage(
input: DirectAgentDispatchInput,
onDispatch?: AgentDispatchCallback,
): DirectAgentDispatchResult {
const target = resolveDirectAgentTarget(storage.getAgents(), input.target);
const agents = storage.getAgents();
const target = resolveDirectAgentTarget(agents, input.target);
if (!target) {
throw new Error(`Agent not found: ${input.target}`);
}
const sender = agents.find((agent) => agent.id === input.senderAgentId) ?? null;
const policyMetadata = {
...(input.metadata ?? {}),
...(input.trustedBrokerAgentId ? { trustedBrokerAgentId: input.trustedBrokerAgentId } : {}),
};
if (!canDispatchDirectAgentMessage(agents, sender, target, policyMetadata)) {
throw new Error(
`Agent ${input.senderAgentId} cannot message supervised agent ${target.id} without parent/subtree visibility or an explicit broker emergency override.`,
);
}

const resolvedTarget: AgentDispatchTarget = { id: target.id, name: target.name };
const metadata = buildAgentMessageMetadata(input.senderAgentName, input.body, input.metadata);
Expand Down
11 changes: 9 additions & 2 deletions broker-core/maintenance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,14 @@ export function runBrokerMaintenancePass(
const agents = db
.getAgents()
.filter((agent) => agent.id !== brokerAgentId)
.filter((agent) => agent.metadata?.role !== "broker");
.filter((agent) => agent.metadata?.role !== "broker")
.filter(
(agent) =>
!agent.parentAgentId &&
agent.supervisionState !== "supervised" &&
agent.supervisionState !== "orphaned" &&
agent.supervisionState !== "stopping",
);

const agentLoads = agents.map((agent) => ({
agent,
Expand All @@ -123,7 +130,7 @@ export function runBrokerMaintenancePass(
}

const preferredAgent = backlog.preferredAgentId
? (agents.find((agent) => agent.id === backlog.preferredAgentId) ?? null)
? (db.getAgents().find((agent) => agent.id === backlog.preferredAgentId) ?? null)
: null;
if (backlog.preferredAgentId && !preferredAgent) {
const knownPreferredAgent = db.getAgentById(backlog.preferredAgentId);
Expand Down
Loading
Loading