Docs/Guides/Rag System

RAG System Monitoring

Observe retrieval-augmented generation systems end-to-end with comprehensive monitoring and optimization.

Overview

Retrieval-Augmented Generation (RAG) systems combine information retrieval with text generation. AgenticAnts provides complete observability for:

  • Retrieval Quality - Monitor vector search performance
  • Generation Performance - Track LLM response quality
  • End-to-End Latency - Measure total system performance
  • Cost Optimization - Track retrieval and generation costs
  • Quality Metrics - Monitor answer relevance and accuracy

RAG Architecture Components

Typical RAG System Flow

python
from agenticants import AgenticAnts from agenticants.integrations import langchain ants = AgenticAnts(api_key=os.getenv('AGENTICANTS_API_KEY')) langchain.auto_instrument(ants) class RAGSystem: def __init__(self): self.vector_store = VectorStore() self.retriever = Retriever() self.generator = Generator() def query(self, question: str, user_id: str = None): # Create main RAG trace rag_trace = ants.trace.create( name="rag-system", input=question, metadata={ "user_id": user_id, "system_type": "rag", "components": ["retrieval", "generation"] } ) try: # Step 1: Query Preprocessing with rag_trace.span("query-preprocessing") as span: processed_query = self.preprocess_query(question) span.set_metadata({ "original_length": len(question), "processed_length": len(processed_query), "preprocessing_time": span.duration }) # Step 2: Vector Retrieval with rag_trace.span("vector-retrieval") as span: retrieved_docs = self.retriever.retrieve(processed_query) span.set_metadata({ "retrieved_count": len(retrieved_docs), "retrieval_time": span.duration, "vector_db": "pinecone" }) # Step 3: Context Preparation with rag_trace.span("context-preparation") as span: context = self.prepare_context(retrieved_docs) span.set_metadata({ "context_length": len(context), "preparation_time": span.duration }) # Step 4: Generation with rag_trace.span("generation") as span: answer = self.generator.generate(question, context) span.set_metadata({ "generation_time": span.duration, "answer_length": len(answer), "model": "gpt-4" }) # Step 5: Post-processing with rag_trace.span("post-processing") as span: final_answer = self.post_process(answer) span.set_metadata({ "post_processing_time": span.duration, "final_length": len(final_answer) }) rag_trace.complete( output=final_answer, metadata={ "total_components": 5, "success": True, "total_time": rag_trace.duration } ) return final_answer except Exception as error: rag_trace.error(error=str(error)) raise error

Retrieval Quality Monitoring

Vector Search Performance

python
class RetrievalMonitor: def __init__(self): self.ants = AgenticAnts(api_key=os.getenv('AGENTICANTS_API_KEY')) def monitor_retrieval(self, query: str, top_k: int = 5): retrieval_trace = self.ants.trace.create( name="vector-retrieval", input=query, metadata={ "top_k": top_k, "retrieval_type": "semantic_search" } ) try: # Track retrieval metrics start_time = time.time() # Perform vector search results = self.vector_store.similarity_search( query=query, k=top_k ) retrieval_time = time.time() - start_time # Calculate relevance scores relevance_scores = [] for result in results: score = self.calculate_relevance(query, result.content) relevance_scores.append(score) retrieval_trace.complete( output=results, metadata={ "retrieval_time": retrieval_time, "results_count": len(results), "avg_relevance": sum(relevance_scores) / len(relevance_scores), "min_relevance": min(relevance_scores), "max_relevance": max(relevance_scores) } ) return results except Exception as error: retrieval_trace.error(error=str(error)) raise error def calculate_relevance(self, query: str, content: str) -> float: # Implement relevance scoring logic # This could use semantic similarity, keyword matching, etc. pass

Retrieval Quality Metrics

typescript
class RetrievalQualityMonitor { private ants: AgenticAnts async monitorRetrievalQuality(query: string, retrievedDocs: any[]) { const qualityTrace = await this.ants.trace.create({ name: 'retrieval-quality', input: query, metadata: { retrievedCount: retrievedDocs.length, queryType: this.classifyQuery(query) } }) try { // Calculate quality metrics const metrics = { relevance: await this.calculateRelevance(query, retrievedDocs), diversity: this.calculateDiversity(retrievedDocs), coverage: this.calculateCoverage(query, retrievedDocs), freshness: this.calculateFreshness(retrievedDocs) } await qualityTrace.complete({ output: metrics, metadata: { avgRelevance: metrics.relevance.avg, diversityScore: metrics.diversity, coverageScore: metrics.coverage, freshnessScore: metrics.freshness } }) return metrics } catch (error) { await qualityTrace.error({ error: error.message }) throw error } } private classifyQuery(query: string): string { // Classify query type (factual, analytical, creative, etc.) return 'factual' // Simplified } private calculateDiversity(docs: any[]): number { // Calculate diversity of retrieved documents return 0.8 // Simplified } private calculateCoverage(query: string, docs: any[]): number { // Calculate how well documents cover the query return 0.9 // Simplified } private calculateFreshness(docs: any[]): number { // Calculate freshness of retrieved documents return 0.7 // Simplified } }

Generation Performance Monitoring

LLM Generation Tracking

python
class GenerationMonitor: def __init__(self): self.ants = AgenticAnts(api_key=os.getenv('AGENTICANTS_API_KEY')) def monitor_generation(self, question: str, context: str, model: str = "gpt-4"): generation_trace = self.ants.trace.create( name="llm-generation", input=question, metadata={ "model": model, "context_length": len(context), "question_length": len(question) } ) try: # Track generation metrics start_time = time.time() # Generate response response = self.generate_response(question, context, model) generation_time = time.time() - start_time # Calculate quality metrics quality_metrics = self.evaluate_response_quality(question, response) generation_trace.complete( output=response, metadata={ "generation_time": generation_time, "response_length": len(response), "tokens_used": self.count_tokens(response), "quality_score": quality_metrics["overall_score"], "relevance_score": quality_metrics["relevance"], "coherence_score": quality_metrics["coherence"], "factual_accuracy": quality_metrics["factual_accuracy"] } ) return response except Exception as error: generation_trace.error(error=str(error)) raise error def evaluate_response_quality(self, question: str, response: str) -> dict: # Implement quality evaluation logic return { "overall_score": 0.85, "relevance": 0.9, "coherence": 0.8, "factual_accuracy": 0.85 }

Response Quality Evaluation

typescript
class ResponseQualityEvaluator { private ants: AgenticAnts async evaluateResponse(question: string, response: string, context: string) { const evaluationTrace = await this.ants.trace.create({ name: 'response-evaluation', input: { question, response, context }, metadata: { questionLength: question.length, responseLength: response.length, contextLength: context.length } }) try { // Evaluate multiple quality dimensions const evaluations = await Promise.all([ this.evaluateRelevance(question, response), this.evaluateCoherence(response), this.evaluateFactualAccuracy(response, context), this.evaluateCompleteness(question, response) ]) const overallScore = evaluations.reduce((sum, score) => sum + score, 0) / evaluations.length await evaluationTrace.complete({ output: { overallScore, evaluations }, metadata: { relevance: evaluations[0], coherence: evaluations[1], factualAccuracy: evaluations[2], completeness: evaluations[3], overallScore } }) return { overallScore, evaluations } } catch (error) { await evaluationTrace.error({ error: error.message }) throw error } } private async evaluateRelevance(question: string, response: string): Promise<number> { // Implement relevance evaluation return 0.9 } private async evaluateCoherence(response: string): Promise<number> { // Implement coherence evaluation return 0.8 } private async evaluateFactualAccuracy(response: string, context: string): Promise<number> { // Implement factual accuracy evaluation return 0.85 } private async evaluateCompleteness(question: string, response: string): Promise<number> { // Implement completeness evaluation return 0.9 } }

End-to-End RAG Monitoring

Complete RAG Pipeline Tracking

python
class CompleteRAGMonitor: def __init__(self): self.ants = AgenticAnts(api_key=os.getenv('AGENTICANTS_API_KEY')) def monitor_rag_pipeline(self, question: str, user_id: str = None): # Create comprehensive RAG trace rag_trace = self.ants.trace.create( name="complete-rag-pipeline", input=question, metadata={ "user_id": user_id, "pipeline_version": "1.0", "components": ["preprocessing", "retrieval", "generation", "evaluation"] } ) try: # Step 1: Query Analysis with rag_trace.span("query-analysis") as span: query_analysis = self.analyze_query(question) span.set_metadata({ "query_type": query_analysis["type"], "complexity": query_analysis["complexity"], "keywords": query_analysis["keywords"] }) # Step 2: Retrieval with rag_trace.span("retrieval") as span: retrieved_docs = self.retrieve_documents(question) span.set_metadata({ "retrieved_count": len(retrieved_docs), "retrieval_time": span.duration, "avg_relevance": self.calculate_avg_relevance(retrieved_docs) }) # Step 3: Context Preparation with rag_trace.span("context-preparation") as span: context = self.prepare_context(retrieved_docs) span.set_metadata({ "context_length": len(context), "preparation_time": span.duration }) # Step 4: Generation with rag_trace.span("generation") as span: response = self.generate_response(question, context) span.set_metadata({ "generation_time": span.duration, "response_length": len(response), "tokens_used": self.count_tokens(response) }) # Step 5: Quality Evaluation with rag_trace.span("quality-evaluation") as span: quality_metrics = self.evaluate_response_quality(question, response, context) span.set_metadata({ "quality_score": quality_metrics["overall_score"], "evaluation_time": span.duration }) # Step 6: Post-processing with rag_trace.span("post-processing") as span: final_response = self.post_process(response) span.set_metadata({ "post_processing_time": span.duration, "final_length": len(final_response) }) rag_trace.complete( output=final_response, metadata={ "total_components": 6, "success": True, "total_time": rag_trace.duration, "quality_score": quality_metrics["overall_score"] } ) return final_response except Exception as error: rag_trace.error(error=str(error)) raise error

Cost Optimization

RAG Cost Tracking

python
class RAGCostMonitor: def __init__(self): self.ants = AgenticAnts(api_key=os.getenv('AGENTICANTS_API_KEY')) def track_rag_costs(self, question: str, retrieved_docs: list, response: str): cost_trace = self.ants.trace.create( name="rag-cost-tracking", input=question, metadata={ "cost_components": ["retrieval", "generation", "evaluation"] } ) try: # Calculate retrieval costs retrieval_cost = self.calculate_retrieval_cost(retrieved_docs) # Calculate generation costs generation_cost = self.calculate_generation_cost(response) # Calculate evaluation costs evaluation_cost = self.calculate_evaluation_cost(question, response) total_cost = retrieval_cost + generation_cost + evaluation_cost cost_trace.complete( output={"total_cost": total_cost}, metadata={ "retrieval_cost": retrieval_cost, "generation_cost": generation_cost, "evaluation_cost": evaluation_cost, "total_cost": total_cost, "cost_per_token": total_cost / self.count_tokens(response) } ) return total_cost except Exception as error: cost_trace.error(error=str(error)) raise error def calculate_retrieval_cost(self, docs: list) -> float: # Calculate cost based on vector search operations return len(docs) * 0.001 # Simplified def calculate_generation_cost(self, response: str) -> float: # Calculate cost based on token usage tokens = self.count_tokens(response) return tokens * 0.00003 # Simplified def calculate_evaluation_cost(self, question: str, response: str) -> float: # Calculate cost for quality evaluation return 0.001 # Simplified

Cost Optimization Strategies

typescript
class RAGCostOptimizer { private ants: AgenticAnts async optimizeRAGCosts() { // Analyze current costs const costAnalysis = await this.ants.finops.getRAGCosts({ period: 'last_30_days' }) console.log('RAG Cost Analysis:') console.log(`Total cost: $${costAnalysis.totalCost}`) console.log(`Retrieval cost: $${costAnalysis.retrievalCost}`) console.log(`Generation cost: $${costAnalysis.generationCost}`) // Get optimization recommendations const recommendations = await this.ants.finops.getRAGOptimizations() console.log('\nOptimization Recommendations:') recommendations.forEach(rec => { console.log(`- ${rec.title}: Save $${rec.savings}/month`) console.log(` Action: ${rec.action}`) }) return recommendations } async implementOptimizations(recommendations: any[]) { for (const rec of recommendations) { switch (rec.type) { case 'reduce_retrieval_k': await this.reduceRetrievalK(rec.optimalK) break case 'optimize_context_length': await this.optimizeContextLength(rec.optimalLength) break case 'implement_caching': await this.implementCaching() break case 'use_smaller_model': await this.useSmallerModel(rec.model) break } } } }

Performance Monitoring

RAG Performance Metrics

python
class RAGPerformanceMonitor: def __init__(self): self.ants = AgenticAnts(api_key=os.getenv('AGENTICANTS_API_KEY')) def monitor_rag_performance(self): # Get performance metrics for RAG system metrics = self.ants.metrics.get_rag_metrics( period="last_7_days" ) print("RAG Performance Metrics:") print(f" Average query time: {metrics.avg_query_time}ms") print(f" Average retrieval time: {metrics.avg_retrieval_time}ms") print(f" Average generation time: {metrics.avg_generation_time}ms") print(f" Average response quality: {metrics.avg_response_quality}") print(f" Success rate: {metrics.success_rate}%") print(f" Error rate: {metrics.error_rate}%") # Check for performance issues if metrics.avg_query_time > 10000: print(" ⚠️ High query time detected!") if metrics.avg_response_quality < 0.7: print(" ⚠️ Low response quality detected!") if metrics.error_rate > 5: print(" ⚠️ High error rate detected!") return metrics

Performance Optimization

typescript
class RAGPerformanceOptimizer { private ants: AgenticAnts async optimizeRAGPerformance() { // Analyze performance bottlenecks const bottlenecks = await this.ants.sre.findRAGBottlenecks({ period: 'last_7_days' }) console.log('RAG Performance Bottlenecks:') bottlenecks.forEach(bottleneck => { console.log(`- ${bottleneck.component}: ${bottleneck.avgTime}ms avg`) console.log(` Optimization: ${bottleneck.optimization}`) }) // Implement optimizations for (const bottleneck of bottlenecks) { await this.implementOptimization(bottleneck) } } private async implementOptimization(bottleneck: any) { switch (bottleneck.component) { case 'retrieval': await this.optimizeRetrieval() break case 'generation': await this.optimizeGeneration() break case 'context_preparation': await this.optimizeContextPreparation() break } } }

Quality Monitoring

Response Quality Tracking

python
class RAGQualityMonitor: def __init__(self): self.ants = AgenticAnts(api_key=os.getenv('AGENTICANTS_API_KEY')) def monitor_response_quality(self, question: str, response: str, context: str): quality_trace = self.ants.trace.create( name="rag-quality-monitoring", input=question, metadata={ "quality_dimensions": ["relevance", "accuracy", "completeness", "coherence"] } ) try: # Evaluate response quality quality_metrics = self.evaluate_response_quality(question, response, context) quality_trace.complete( output=quality_metrics, metadata={ "overall_quality": quality_metrics["overall_score"], "relevance": quality_metrics["relevance"], "accuracy": quality_metrics["accuracy"], "completeness": quality_metrics["completeness"], "coherence": quality_metrics["coherence"] } ) return quality_metrics except Exception as error: quality_trace.error(error=str(error)) raise error def evaluate_response_quality(self, question: str, response: str, context: str) -> dict: # Implement comprehensive quality evaluation return { "overall_score": 0.85, "relevance": 0.9, "accuracy": 0.8, "completeness": 0.85, "coherence": 0.8 }

Best Practices

1. Comprehensive Monitoring

python
class BestPracticeRAGSystem: def __init__(self): self.ants = AgenticAnts(api_key=os.getenv('AGENTICANTS_API_KEY')) def query_with_monitoring(self, question: str, user_id: str = None): # Always create comprehensive traces rag_trace = self.ants.trace.create( name="best-practice-rag", input=question, metadata={ "user_id": user_id, "monitoring_level": "comprehensive", "quality_tracking": True, "cost_tracking": True, "performance_tracking": True } ) try: # Monitor all components with rag_trace.span("retrieval") as span: docs = self.retrieve_documents(question) span.set_metadata({ "retrieved_count": len(docs), "retrieval_time": span.duration }) with rag_trace.span("generation") as span: response = self.generate_response(question, docs) span.set_metadata({ "response_length": len(response), "generation_time": span.duration }) with rag_trace.span("quality-evaluation") as span: quality = self.evaluate_quality(question, response, docs) span.set_metadata({ "quality_score": quality["overall_score"], "evaluation_time": span.duration }) rag_trace.complete( output=response, metadata={ "success": True, "quality_score": quality["overall_score"] } ) return response except Exception as error: rag_trace.error(error=str(error)) raise error

2. Error Handling and Recovery

typescript
class ResilientRAGSystem { private ants: AgenticAnts async queryWithRecovery(question: string, userId?: string) { const ragTrace = await this.ants.trace.create({ name: 'resilient-rag', input: question, metadata: { userId, retryEnabled: true, fallbackEnabled: true } }) try { // Try primary RAG system const response = await this.primaryRAGSystem.query(question) await ragTrace.complete({ output: response, metadata: { success: true, system: 'primary' } }) return response } catch (error) { // Try fallback system try { const fallbackResponse = await this.fallbackRAGSystem.query(question) await ragTrace.complete({ output: fallbackResponse, metadata: { success: true, system: 'fallback' } }) return fallbackResponse } catch (fallbackError) { await ragTrace.error({ error: fallbackError.message, metadata: { success: false, systems: ['primary', 'fallback'] } }) throw fallbackError } } } }

Troubleshooting

Common RAG Issues

Issue: Low retrieval quality

python
def debug_retrieval_quality(): # Analyze retrieval performance traces = ants.traces.query({ "name": "vector-retrieval", "period": "last_24h" }) for trace in traces: if trace.metadata.get("avg_relevance", 0) < 0.7: print(f"Low relevance query: {trace.input}") print(f"Relevance score: {trace.metadata.get('avg_relevance')}") print(f"Retrieved docs: {trace.metadata.get('results_count')}")

Issue: High generation costs

typescript
async function debugGenerationCosts() { const expensiveTraces = await ants.traces.query({ name: 'llm-generation', period: 'last_24h', sortBy: 'cost' }) console.log('Most expensive generations:') expensiveTraces.slice(0, 10).forEach(trace => { console.log(`Cost: $${trace.metadata.cost}`) console.log(`Tokens: ${trace.metadata.tokens_used}`) console.log(`Query: ${trace.input}`) }) }

Next Steps

Example Projects


Congratulations! 🎉 You now have comprehensive monitoring for your RAG systems with complete visibility into retrieval quality, generation performance, and end-to-end metrics.

© 2026 ANTS Platform, Inc.Docs v1.0 · Last updated June 2026