Docs/Guides/Multi Agent

Multi-Agent Systems

Monitor complex multi-agent collaborations and workflows with comprehensive observability.

Overview

Multi-agent systems involve multiple AI agents working together to solve complex problems. AgenticAnts provides complete visibility into:

  • Agent Communication - Track messages between agents
  • Workflow Orchestration - Monitor complex workflows
  • Collaboration Patterns - Understand agent interactions
  • Performance Bottlenecks - Identify slow agents
  • Cost Attribution - Track costs per agent

Architecture Patterns

1. Sequential Workflow

Agents work in a pipeline, passing results to the next agent:

python
from agenticants import AgenticAnts from agenticants.integrations import langchain ants = AgenticAnts(api_key=os.getenv('AGENTICANTS_API_KEY')) langchain.auto_instrument(ants) def sequential_workflow(input_text: str): # Create main workflow trace workflow_trace = ants.trace.create( name="sequential-workflow", input=input_text, metadata={ "workflow_type": "sequential", "total_agents": 3 } ) # Agent 1: Text Preprocessor with workflow_trace.span("text-preprocessor") as span: preprocessed = preprocess_agent.process(input_text) span.set_metadata({ "agent": "preprocessor", "output_length": len(preprocessed) }) # Agent 2: Content Analyzer with workflow_trace.span("content-analyzer") as span: analysis = analyzer_agent.process(preprocessed) span.set_metadata({ "agent": "analyzer", "analysis_type": analysis.type }) # Agent 3: Response Generator with workflow_trace.span("response-generator") as span: response = generator_agent.process(analysis) span.set_metadata({ "agent": "generator", "response_length": len(response) }) workflow_trace.complete( output=response, metadata={ "total_agents": 3, "success": True } ) return response

2. Parallel Processing

Multiple agents work simultaneously on different tasks:

typescript
const ants = new AgenticAnts({ apiKey: process.env.AGENTICANTS_API_KEY }) async function parallelWorkflow(inputData: any) { // Create main workflow trace const workflowTrace = await ants.trace.create({ name: 'parallel-workflow', input: inputData, metadata: { workflowType: 'parallel', totalAgents: 3 } }) try { // Run agents in parallel const [sentiment, entities, summary] = await Promise.all([ // Agent 1: Sentiment Analysis workflowTrace.span('sentiment-analyzer', async (span) => { const result = await sentimentAgent.analyze(inputData.text) span.setMetadata({ agent: 'sentiment-analyzer', score: result.score }) return result }), // Agent 2: Entity Extraction workflowTrace.span('entity-extractor', async (span) => { const result = await entityAgent.extract(inputData.text) span.setMetadata({ agent: 'entity-extractor', count: result.entities.length }) return result }), // Agent 3: Text Summarizer workflowTrace.span('text-summarizer', async (span) => { const result = await summarizerAgent.summarize(inputData.text) span.setMetadata({ agent: 'text-summarizer', summaryLength: result.length }) return result }) ]) // Combine results const combinedResult = { sentiment: sentiment.score, entities: entities.entities, summary: summary.text } await workflowTrace.complete({ output: combinedResult, metadata: { totalAgents: 3, success: true } }) return combinedResult } catch (error) { await workflowTrace.error({ error: error.message, metadata: { success: false } }) throw error } }

3. Hierarchical Coordination

A coordinator agent manages multiple worker agents:

python
class HierarchicalMultiAgent: def __init__(self): self.coordinator = CoordinatorAgent() self.workers = { 'researcher': ResearcherAgent(), 'writer': WriterAgent(), 'reviewer': ReviewerAgent() } def process_task(self, task: str): # Create main workflow trace workflow_trace = ants.trace.create( name="hierarchical-workflow", input=task, metadata={ "workflow_type": "hierarchical", "coordinator": "coordinator-agent", "workers": list(self.workers.keys()) } ) try: # Coordinator analyzes task with workflow_trace.span("coordinator-analysis") as span: plan = self.coordinator.analyze_task(task) span.set_metadata({ "agent": "coordinator", "plan_steps": len(plan.steps) }) # Execute plan with workers results = {} for step in plan.steps: worker_name = step.assigned_worker with workflow_trace.span(f"worker-{worker_name}") as span: result = self.workers[worker_name].execute(step) results[worker_name] = result span.set_metadata({ "agent": worker_name, "step": step.name, "result_size": len(str(result)) }) # Coordinator synthesizes results with workflow_trace.span("coordinator-synthesis") as span: final_result = self.coordinator.synthesize(results) span.set_metadata({ "agent": "coordinator", "final_result_length": len(str(final_result)) }) workflow_trace.complete( output=final_result, metadata={ "total_steps": len(plan.steps), "success": True } ) return final_result except Exception as error: workflow_trace.error(error=str(error)) raise error

Agent Communication Monitoring

Message Passing Tracking

python
class CommunicatingAgents: def __init__(self): self.agents = { 'agent_a': AgentA(), 'agent_b': AgentB(), 'agent_c': AgentC() } def send_message(self, from_agent: str, to_agent: str, message: dict): # Track inter-agent communication comm_trace = ants.trace.create( name="agent-communication", input=message, metadata={ "from_agent": from_agent, "to_agent": to_agent, "message_type": message.get("type"), "communication_id": message.get("id") } ) try: # Send message response = self.agents[to_agent].receive_message(message) comm_trace.complete( output=response, metadata={ "response_time": comm_trace.duration, "success": True } ) return response except Exception as error: comm_trace.error(error=str(error)) raise error

Conversation Flow Tracking

typescript
class ConversationFlow { private agents: Map<string, any> = new Map() async processConversation(conversation: Conversation) { const conversationTrace = await ants.trace.create({ name: 'conversation-flow', input: conversation, metadata: { conversationId: conversation.id, participantCount: conversation.participants.length, messageCount: conversation.messages.length } }) for (const message of conversation.messages) { await conversationTrace.span(`message-${message.id}`, async (span) => { const agent = this.agents.get(message.agentId) const response = await agent.processMessage(message) span.setMetadata({ agentId: message.agentId, messageType: message.type, responseLength: response.length }) return response }) } await conversationTrace.complete({ output: conversation.responses, metadata: { success: true } }) } }

Workflow Orchestration

Complex Workflow with Decision Points

python
class ComplexWorkflow: def __init__(self): self.decision_agent = DecisionAgent() self.execution_agents = { 'path_a': PathAAgent(), 'path_b': PathBAgent(), 'path_c': PathCAgent() } def execute_workflow(self, input_data: dict): workflow_trace = ants.trace.create( name="complex-workflow", input=input_data, metadata={ "workflow_type": "complex", "has_decision_points": True } ) try: # Step 1: Decision making with workflow_trace.span("decision-making") as span: decision = self.decision_agent.decide(input_data) span.set_metadata({ "agent": "decision-agent", "decision": decision.path, "confidence": decision.confidence }) # Step 2: Execute based on decision if decision.path == "path_a": with workflow_trace.span("path-a-execution") as span: result = self.execution_agents['path_a'].execute(input_data) span.set_metadata({ "agent": "path-a-agent", "execution_time": span.duration }) elif decision.path == "path_b": with workflow_trace.span("path-b-execution") as span: result = self.execution_agents['path_b'].execute(input_data) span.set_metadata({ "agent": "path-b-agent", "execution_time": span.duration }) else: with workflow_trace.span("path-c-execution") as span: result = self.execution_agents['path_c'].execute(input_data) span.set_metadata({ "agent": "path-c-agent", "execution_time": span.duration }) workflow_trace.complete( output=result, metadata={ "decision_path": decision.path, "success": True } ) return result except Exception as error: workflow_trace.error(error=str(error)) ])

Performance Monitoring

Agent Performance Comparison

python
def monitor_agent_performance(): # Get performance metrics for all agents agents = ['agent_a', 'agent_b', 'agent_c'] for agent in agents: metrics = ants.metrics.get_agent_metrics( agent_name=agent, period="last_7_days" ) print(f"\n{agent} Performance:") print(f" Average latency: {metrics.avg_latency}ms") print(f" Success rate: {metrics.success_rate}%") print(f" Total requests: {metrics.total_requests}") print(f" Cost: ${metrics.total_cost}") # Check for performance issues if metrics.avg_latency > 5000: print(f" ⚠️ High latency detected!") if metrics.success_rate < 95: print(f" ⚠️ Low success rate detected!") if metrics.total_cost > 100: print(f" ⚠️ High cost detected!")

Workflow Bottleneck Analysis

typescript
async function analyzeWorkflowBottlenecks() { const workflows = await ants.metrics.getWorkflowMetrics({ period: 'last_7_days' }) for (const workflow of workflows) { console.log(`\nWorkflow: ${workflow.name}`) // Find slowest steps const slowestSteps = workflow.steps .sort((a, b) => b.avgLatency - a.avgLatency) .slice(0, 3) console.log('Slowest steps:') slowestSteps.forEach(step => { console.log(` ${step.name}: ${step.avgLatency}ms avg`) }) // Find error-prone steps const errorProneSteps = workflow.steps .filter(step => step.errorRate > 5) .sort((a, b) => b.errorRate - a.errorRate) if (errorProneSteps.length > 0) { console.log('Error-prone steps:') errorProneSteps.forEach(step => { console.log(` ${step.name}: ${step.errorRate}% error rate`) }) } } }

Cost Attribution

Track Costs Per Agent

python
def analyze_agent_costs(): agents = ['agent_a', 'agent_b', 'agent_c'] total_cost = 0 agent_costs = {} for agent in agents: cost_metrics = ants.finops.get_agent_costs( agent_name=agent, period="last_30_days" ) agent_costs[agent] = cost_metrics.total_cost total_cost += cost_metrics.total_cost print(f"{agent}: ${cost_metrics.total_cost:.2f}") print(f" Token usage: {cost_metrics.total_tokens:,}") print(f" Cost per token: ${cost_metrics.cost_per_token:.6f}") print(f"\nTotal cost: ${total_cost:.2f}") # Cost distribution for agent, cost in agent_costs.items(): percentage = (cost / total_cost) * 100 print(f"{agent}: {percentage:.1f}% of total cost")

Optimize Expensive Agents

typescript
async function optimizeExpensiveAgents() { const agentCosts = await ants.finops.getAgentCosts({ period: 'last_30_days', sortBy: 'cost' }) console.log('Agent cost analysis:') for (const agent of agentCosts) { console.log(`\n${agent.name}: $${agent.totalCost}`) // Get optimization recommendations const recommendations = await ants.finops.getOptimizations({ agent: agent.name }) if (recommendations.length > 0) { console.log('Optimization opportunities:') recommendations.forEach(rec => { console.log(` - ${rec.title}: Save $${rec.savings}/month`) }) } } }

Best Practices

1. Design for Observability

python
class ObservableMultiAgent: def __init__(self): self.ants = AgenticAnts(api_key=os.getenv('AGENTICANTS_API_KEY')) def execute_workflow(self, input_data): # Always create a main trace main_trace = self.ants.trace.create( name="observable-workflow", input=input_data, metadata={ "workflow_version": "1.0", "environment": "production" } ) try: # Use spans for each agent interaction with main_trace.span("agent-interaction") as span: result = self.process_with_agents(input_data) span.set_metadata({ "agents_used": len(self.agents), "result_size": len(str(result)) }) main_trace.complete(output=result) return result except Exception as error: main_trace.error(error=str(error)) raise error

2. Implement Circuit Breakers

typescript
class ResilientMultiAgent { private circuitBreakers: Map<string, CircuitBreaker> = new Map() async executeWithCircuitBreaker(agentName: string, operation: () => Promise<any>) { const breaker = this.circuitBreakers.get(agentName) if (breaker && breaker.isOpen()) { throw new Error(`Circuit breaker open for ${agentName}`) } try { const result = await operation() breaker?.recordSuccess() return result } catch (error) { breaker?.recordFailure() throw error } } }

3. Monitor Agent Health

python
def monitor_agent_health(): agents = ['agent_a', 'agent_b', 'agent_c'] for agent in agents: health = ants.sre.get_agent_health(agent) print(f"\n{agent} Health:") print(f" Status: {health.status}") print(f" Uptime: {health.uptime}%") print(f" Last check: {health.last_check}") if health.status != "healthy": print(f" ⚠️ Health issues detected!") print(f" Issues: {health.issues}")

Troubleshooting

Common Issues

Issue: Agents not communicating properly

python
# Debug agent communication def debug_agent_communication(): traces = ants.traces.query({ "name": "agent-communication", "status": "error", "period": "last_24h" }) for trace in traces: print(f"Communication error: {trace.error}") print(f"From: {trace.metadata.get('from_agent')}") print(f"To: {trace.metadata.get('to_agent')}") print(f"Message: {trace.input}")

Issue: Workflow taking too long

typescript
// Debug workflow performance async function debugWorkflowPerformance() { const slowWorkflows = await ants.metrics.getSlowWorkflows({ threshold: 30000, // 30 seconds period: 'last_24h' }) for (const workflow of slowWorkflows) { console.log(`Slow workflow: ${workflow.name}`) console.log(`Duration: ${workflow.duration}ms`) // Analyze spans to find bottlenecks const spans = await ants.traces.getSpans(workflow.traceId) const slowestSpans = spans .sort((a, b) => b.duration - a.duration) .slice(0, 3) console.log('Slowest spans:') slowestSpans.forEach(span => { console.log(` ${span.name}: ${span.duration}ms`) }) } }

Next Steps

Example Projects


Congratulations! 🎉 You now have comprehensive monitoring for your multi-agent systems with complete visibility into agent interactions, performance, and costs.

© 2026 ANTS Platform, Inc.Docs v1.0 · Last updated June 2026