-
Notifications
You must be signed in to change notification settings - Fork 2
API Reference
Mahesh VAIKRI edited this page Mar 2, 2026
·
3 revisions
Creator: Mahesh Vaijainthymala Krishnamoorthy (Mahesh Vaikri)
Version: 1.1.0 | Tests: 818 Passed | Coverage: 80%
# Core framework
from maple import Agent, Message, Config, Result
# Autonomous AI
from maple import AutonomousAgent, AutonomousConfig, LLMConfig
from maple import Tool, ToolRegistry, MemoryManager
from maple import AgentOrchestrator
# LLM providers
from maple.llm import LLMProviderRegistry, LLMProvider
from maple.llm.types import ChatMessage, ChatRole, ToolDefinition, LLMResponse
# Infrastructure
from maple.error import CircuitBreaker
from maple.resources import ResourceManager
from maple.security import AuthenticationManager, AuthorizationManager
from maple.state import StateStore
from maple.discovery import AgentRegistry, HealthMonitor, FailureDetectorThe core agent class for multi-agent communication.
class Agent:
def __init__(self, config: Config, broker=None) -> None
def start(self) -> None
def stop(self) -> None
def send(self, message: Message) -> Result[str, Dict]
def request(self, message: Message, timeout: str = "30s") -> Result[Message, Dict]
def broadcast(self, recipients: List[str], message: Message) -> Dict[str, Result]
def publish(self, topic: str, message: Message) -> Result[str, Dict]
def subscribe(self, topic: str) -> Result[None, Dict]
def establish_link(self, target_agent: str) -> Result[str, Dict]@dataclass
class Config:
agent_id: str
broker_url: str = "localhost:8080"
capabilities: List[str] = field(default_factory=list)
security: Optional[SecurityConfig] = Noneclass Message:
def __init__(self, message_type: str, receiver=None, payload=None, metadata=None)
def to_dict(self) -> Dict[str, Any]
def to_json(self) -> str
@classmethod
def from_dict(cls, data: Dict) -> 'Message'Type-safe error handling inspired by Rust.
class Result[T, E]:
@classmethod
def ok(cls, value: T) -> 'Result[T, E]'
@classmethod
def err(cls, error: E) -> 'Result[T, E]'
def is_ok(self) -> bool
def is_err(self) -> bool
def unwrap(self) -> T # Raises if Err
def unwrap_or(self, default: T) -> T
def unwrap_err(self) -> E
def map(self, f: Callable[[T], U]) -> 'Result[U, E]'
def map_err(self, f: Callable[[E], F]) -> 'Result[T, F]'
def and_then(self, f: Callable[[T], 'Result[U, E]']) -> 'Result[U, E]'result = Result.ok(42)
assert result.is_ok()
assert result.unwrap() == 42
error = Result.err({"message": "not found"})
assert error.is_err()
value = error.unwrap_or(0) # Returns 0
# Chaining
final = parse_input(data) \
.and_then(validate) \
.and_then(process) \
.map_err(lambda e: f"Pipeline failed: {e}")LLM-powered agent with ReAct reasoning loop.
class AutonomousAgent(Agent):
def __init__(self, agent_id: str, autonomy_config: AutonomousConfig, broker=None)
def register_tool(self, tool: Tool) -> None
def set_approval_callback(self, callback: Callable) -> None
def pursue_goal(self, description: str) -> Result[Goal, Error]
def decompose_goal(self, goal: Goal) -> Result[List[Goal], Error]@dataclass
class AutonomousConfig:
llm: LLMConfig
max_reasoning_steps: int = 20
max_tool_calls_per_step: int = 5
working_memory_tokens: int = 8000
require_approval_for: List[str] = field(default_factory=list)
reflection_frequency: int = 5
system_prompt: Optional[str] = None@dataclass
class LLMConfig:
provider: str # "openai" or "anthropic"
model: str # e.g. "gpt-4", "claude-sonnet-4-20250514"
api_key: Optional[str] = None
api_base: Optional[str] = None
temperature: float = 0.7
max_tokens: int = 4096
system_prompt: Optional[str] = None
timeout: float = 30.0@dataclass
class Goal:
goal_id: str
description: str
status: str # pending, in_progress, completed, failed
sub_goals: List['Goal']
result: Optional[Any]
reasoning_trace: List[ReasoningStep]from maple import AutonomousAgent, AutonomousConfig, LLMConfig, Tool, Result
config = AutonomousConfig(
llm=LLMConfig(provider="openai", model="gpt-4"),
max_reasoning_steps=15
)
agent = AutonomousAgent("researcher", autonomy_config=config)
# Register custom tools
agent.register_tool(Tool(
name="search",
description="Search for information",
parameters={"query": {"type": "string"}},
handler=lambda query: Result.ok(f"Results for: {query}")
))
# Agent autonomously reasons, uses tools, and reflects
result = agent.pursue_goal("Summarize recent advances in quantum computing")
if result.is_ok():
goal = result.unwrap()
print(goal.result)@dataclass
class Tool:
name: str
description: str
parameters: Dict = field(default_factory=dict) # JSON Schema
handler: Callable[..., Result] = None
requires_approval: bool = False
tags: List[str] = field(default_factory=list)
def to_llm_definition(self) -> ToolDefinition
def execute(self, **kwargs) -> Resultclass ToolRegistry:
def register(self, tool: Tool) -> Result
def get(self, name: str) -> Result[Tool, Error]
def list_tools(self, tags=None) -> List[Tool]
def get_llm_definitions(self, tags=None) -> List[ToolDefinition]
def execute(self, name: str, arguments: Dict) -> ResultWhen creating an AutonomousAgent, these tools are automatically available:
-
send_message— Send messages to other agents -
query_agents— Find agents by capability -
read_state/write_state— Read/write distributed state -
check_resources— Query available resources -
establish_link— Create secure communication links
class MemoryManager:
working: WorkingMemory # Current context window
episodic: EpisodicMemory # Past task interactions
semantic: SemanticMemory # Learned facts and knowledge
def summarize_and_archive(self, llm_provider: LLMProvider) -> Noneclass WorkingMemory:
def add(self, key: str, content: str, relevance: float = 1.0) -> None
def get_context(self) -> List[Dict]
def clear(self) -> Noneclass EpisodicMemory:
def record(self, task_id: str, event: Dict) -> None
def recall(self, task_id: str) -> List[Dict]
def search(self, query: str, limit: int = 10) -> List[Dict]class SemanticMemory:
def store_fact(self, key: str, fact: Any, metadata: Dict = None) -> None
def recall_fact(self, key: str) -> Result[Any, Error]
def list_facts(self, prefix: str = "") -> List[str]class LLMProvider(ABC):
def complete(self, messages, tools=None, **kwargs) -> Result[LLMResponse, Error]
async def complete_async(self, messages, tools=None, **kwargs) -> Result[LLMResponse, Error]
def count_tokens(self, text: str) -> int
def get_usage_stats(self) -> Dictclass LLMProviderRegistry:
@classmethod
def register(cls, name: str, provider_class: type) -> None
@classmethod
def create(cls, config: LLMConfig) -> Result[LLMProvider, Error]@dataclass
class ChatMessage:
role: ChatRole # SYSTEM, USER, ASSISTANT, TOOL
content: str
tool_calls: List[ToolCall] = None
@dataclass
class LLMResponse:
content: str
tool_calls: List[ToolCall]
usage: TokenUsage
model: str
finish_reason: strclass AgentOrchestrator:
def form_team(self, team_name, agents, roles) -> Result[str, Error]
def form_team_by_capability(self, team_name, capabilities, agents) -> Result[str, Error]
def execute_supervised(self, team_id, goal) -> Result
def execute_consensus(self, team_id, question) -> Result
def share_memory(self, team_id, key, value) -> Nonefrom maple import AutonomousAgent, AutonomousConfig, LLMConfig
from maple.autonomy.orchestrator import AgentOrchestrator
config = AutonomousConfig(llm=LLMConfig(provider="openai", model="gpt-4"))
researcher = AutonomousAgent("researcher", autonomy_config=config)
writer = AutonomousAgent("writer", autonomy_config=config)
reviewer = AutonomousAgent("reviewer", autonomy_config=config)
orchestrator = AgentOrchestrator()
team_id = orchestrator.form_team("content_team",
agents=[researcher, writer, reviewer],
roles=["worker", "worker", "supervisor"]
).unwrap()
result = orchestrator.execute_supervised(team_id, "Write a technical blog post about MAPLE")class DecisionLogger:
def log_decision(self, trace: DecisionTrace) -> None
def get_traces(self, goal_id: str = None) -> List[DecisionTrace]
def export_json(self, goal_id: str = None) -> strclass AgentSnapshot:
@staticmethod
def capture(agent) -> Dict # Metrics, memory, active goals, LLM usageclass AuthenticationManager:
def authenticate(self, credentials: Dict) -> Result[str, Dict]
def generate_jwt(self, agent_id: str, permissions: List[str]) -> Result[str, Dict]
def verify_jwt(self, token: str) -> Result[Dict, Dict]
def revoke_token(self, token: str) -> Result[None, Dict]class AuthorizationManager:
def assign_role(self, agent_id: str, role: str) -> Result
def authorize_message(self, message: Message, sender_id: str) -> Result[bool, Dict]
def check_permission(self, agent_id: str, permission: str) -> boolclass ResourceManager:
def register_resource(self, resource_type: str, amount: Any) -> None
def allocate(self, request) -> Result[Dict, Dict]
def release(self, allocation) -> None
def get_available_resources(self) -> Dictclass CircuitBreaker:
def __init__(self, failure_threshold=5, reset_timeout=30.0)
def execute(self, func: Callable) -> Result
def get_state(self) -> str # "closed", "open", "half_open"
def reset(self) -> Noneclass AgentRegistry:
def register_agent(self, agent_id, name, capabilities=None) -> Result
def deregister_agent(self, agent_id) -> Result
def get_agent(self, agent_id) -> Result
def find_agents_by_capability(self, capability) -> Listclass HealthMonitor:
def start_monitoring(self) -> None
def stop_monitoring(self) -> None
def record_heartbeat(self, agent_id, metrics) -> None
def get_agent_health(self, agent_id) -> Resultclass StateStore:
def get(self, key: str) -> Result[Any, Dict]
def set(self, key: str, value: Any) -> Result[None, Dict]
def delete(self, key: str) -> Result[None, Dict]
def list_keys(self, prefix: str = "") -> List[str]| API | Status |
|---|---|
| Core (Agent, Message, Result, Config) | Stable |
| Autonomous (AutonomousAgent, Tool, Memory) | Stable |
| LLM Providers | Stable |
| Security | Stable |
| Resources | Stable |
| Discovery | Stable |
| State Management | Stable |
Created by: Mahesh Vaijainthymala Krishnamoorthy (Mahesh Vaikri)
MAPLE v1.1.0 | Copyright (C) 2025-2026 Mahesh Vaijainthymala Krishnamoorthy (Mahesh Vaikri) | License AGPL 3.0
MAPLE v1.1.0
Links