- extract every tneities and relatiaons (without formatinng)
- validate and build knoweldge graph
models.py sqlalchemy 2.0 mapped column
""" Database models for the agent monitoring system. """ import json import uuid from datetime import datetime from typing import Optional, cast from sqlalchemy import Boolean, DateTime, Float, ForeignKey, Index, Integer, String, Text, UniqueConstraint from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import Mapped, mapped_column, relationship from sqlalchemy.types import JSON, TypeDecorator Base = declarative_base() class SafeJSON(TypeDecorator): """Custom JSON type that handles circular references using default=str""" impl = Text def process_bind_param(self, value, dialect): if value is not None: return json.dumps(value, default=str) return value def process_result_value(self, value, dialect): if value is not None: return json.loads(value) return value class Trace(Base): """Model for storing agent traces (conversations, interactions, etc.).""" __tablename__ = "traces" id: Mapped[int] = mapped_column(Integer, primary_key=True, index=True) trace_id: Mapped[str] = mapped_column(String(36), unique=True, index=True, default=lambda: str(uuid.uuid4())) filename: Mapped[str] = mapped_column(String(255), nullable=True, index=True) title: Mapped[str] = mapped_column(String(255), nullable=True) description: Mapped[str] = mapped_column(Text, nullable=True) content: Mapped[str] = mapped_column(Text, nullable=True) # Full trace content content_hash: Mapped[str] = mapped_column(String(64), nullable=True, index=True) # Hash of content for deduplication upload_timestamp: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow) update_timestamp: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) uploader: Mapped[str] = mapped_column(String(255), nullable=True) trace_type: Mapped[str] = mapped_column(String(50), nullable=True) # e.g., 'conversation', 'code_execution', etc. trace_source: Mapped[str] = mapped_column(String(50), nullable=True) # e.g., 'user_upload', 'api', 'generated' character_count: Mapped[int] = mapped_column(Integer, default=0) turn_count: Mapped[int] = mapped_column(Integer, default=0) status: Mapped[str] = mapped_column(String(50), default="uploaded") # uploaded, processed, analyzed, etc. processing_method: Mapped[str] = mapped_column(String(50), nullable=True) # e.g., 'sliding_window', 'single_pass', etc. tags: Mapped[list[str]] = mapped_column(JSON, nullable=True) # Store tags as JSON array trace_metadata: Mapped[dict] = mapped_column(JSON, nullable=True) # Additional metadata as JSON # Relationships knowledge_graphs = relationship("KnowledgeGraph", back_populates="trace", foreign_keys="KnowledgeGraph.trace_id", cascade="all, delete-orphan") __table_args__ = ( UniqueConstraint('trace_id', name='uix_trace_id'), Index('idx_trace_content_hash', 'content_hash'), Index('idx_trace_title', 'title'), Index('idx_trace_status', 'status'), ) def to_dict(self): """Convert to dictionary representation.""" return { "id": self.id, "trace_id": self.trace_id, "filename": self.filename, "title": self.title, "description": self.description, "upload_timestamp": self.upload_timestamp.isoformat() if self.upload_timestamp else None, "update_timestamp": self.update_timestamp.isoformat() if self.update_timestamp else None, "uploader": self.uploader, "trace_type": self.trace_type, "trace_source": self.trace_source, "character_count": self.character_count, "turn_count": self.turn_count, "status": self.status, "processing_method": self.processing_method, "tags": self.tags, "metadata": self.trace_metadata, "knowledge_graph_count": len(self.knowledge_graphs) if self.knowledge_graphs else 0 } @classmethod def from_content(cls, content, filename=None, title=None, description=None, trace_type=None, trace_source="user_upload", uploader=None, tags=None, trace_metadata=None): """Create a Trace instance from content.""" import hashlib trace = cls() trace.trace_id = str(uuid.uuid4()) trace.filename = cast(str, filename) trace.title = title or f"Trace {trace.trace_id[:8]}" trace.description = cast(str, description) trace.content = content # Calculate content hash for deduplication if content: content_hash = hashlib.sha256(content.encode('utf-8')).hexdigest() trace.content_hash = content_hash # Set character count trace.character_count = len(content) # Estimate turn count (approximate) turn_markers = [ "user:", "assistant:", "system:", "human:", "ai:", "User:", "Assistant:", "System:", "Human:", "AI:" ] turn_count = 0 for marker in turn_markers: turn_count += content.count(marker) trace.turn_count = max(1, turn_count) # At least 1 turn trace.trace_type = cast(str, trace_type) trace.trace_source = trace_source trace.uploader = cast(str, uploader) trace.tags = tags or [] trace.trace_metadata = trace_metadata or {} trace.status = "uploaded" return trace class KnowledgeGraph(Base): """Model for storing knowledge graphs.""" __tablename__ = "knowledge_graphs" id: Mapped[int] = mapped_column(Integer, primary_key=True, index=True) filename: Mapped[str] = mapped_column(String(255), unique=True, index=True) creation_timestamp: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow) update_timestamp: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) entity_count: Mapped[int] = mapped_column(Integer, default=0) relation_count: Mapped[int] = mapped_column(Integer, default=0) _graph_data: Mapped[Optional[str]] = mapped_column("graph_data", Text, nullable=True) # Underlying TEXT field status: Mapped[str] = mapped_column(String(50), default="created", nullable=False) # Status of processing: created, enriched, perturbed, causal # Add fields for trace and window tracking trace_id: Mapped[str] = mapped_column(String(36), ForeignKey("traces.trace_id"), nullable=True, index=True, comment="ID to group knowledge graphs from the same trace") window_index: Mapped[int] = mapped_column(Integer, nullable=True, comment="Sequential index of window within a trace") window_total: Mapped[int] = mapped_column(Integer, nullable=True, comment="Total number of windows in the trace") window_start_char: Mapped[int] = mapped_column(Integer, nullable=True, comment="Starting character position in the original trace") window_end_char: Mapped[int] = mapped_column(Integer, nullable=True, comment="Ending character position in the original trace") processing_run_id: Mapped[str] = mapped_column(String(36), nullable=True, index=True, comment="ID to distinguish multiple processing runs of the same trace") # Relationships entities = relationship("Entity", back_populates="graph", cascade="all, delete-orphan") relations = relationship("Relation", back_populates="graph", cascade="all, delete-orphan") trace = relationship("Trace", back_populates="knowledge_graphs", foreign_keys=[trace_id]) prompt_reconstructions = relationship( "PromptReconstruction", back_populates="knowledge_graph", cascade="all, delete-orphan" ) perturbation_tests = relationship("PerturbationTest", back_populates="knowledge_graph", cascade="all, delete-orphan") causal_analyses = relationship("CausalAnalysis", back_populates="knowledge_graph", cascade="all, delete-orphan") __table_args__ = ( UniqueConstraint('filename', name='uix_knowledge_graph_filename'), ) @property def graph_data(self): """Get the graph_data as a parsed JSON object""" if self._graph_data is None: return None if isinstance(self._graph_data, dict): # Already a dictionary, return as is return self._graph_data # Try to parse as JSON try: return json.loads(self._graph_data) except Exception: # If parsing fails, return None return None @graph_data.setter def graph_data(self, value): """Set graph_data, converting to a JSON string if it's a dictionary""" if value is None: self._graph_data = None elif isinstance(value, dict): self._graph_data = json.dumps(value) else: # Assume it's already a string self._graph_data = value @property def graph_content(self): """Get the graph content from graph_data field""" # Return graph_data return self.graph_data or {} @graph_content.setter def graph_content(self, data): """Set graph content from a dictionary.""" self.graph_data = data # Update counts if isinstance(data, dict): if 'entities' in data and isinstance(data['entities'], list): self.entity_count = len(data['entities']) if 'relations' in data and isinstance(data['relations'], list): self.relation_count = len(data['relations']) def get_entities_from_content(self): """Get entities directly from content field.""" data = self.graph_content entities = data.get('entities', []) if isinstance(data, dict) else [] return entities def get_relations_from_content(self): """Get relations directly from content field.""" data = self.graph_content relations = data.get('relations', []) if isinstance(data, dict) else [] return relations def get_all_entities(self, session=None): """ Get all entities, preferring database entities if available. If no database entities exist, falls back to content entities. If session is provided, queries database entities, otherwise returns content entities. """ if session: db_entities = session.query(Entity).filter_by(graph_id=self.id).all() if db_entities: return [entity.to_dict() for entity in db_entities] return self.get_entities_from_content() def get_all_relations(self, session=None): """ Get all relations, preferring database relations if available. If no database relations exist, falls back to content relations. If session is provided, queries database relations, otherwise returns content relations. """ if session: db_relations = session.query(Relation).filter_by(graph_id=self.id).all() if db_relations: return [relation.to_dict() for relation in db_relations] return self.get_relations_from_content() def to_dict(self): """Convert to dictionary representation.""" result = { "id": self.id, "filename": self.filename, "creation_timestamp": self.creation_timestamp.isoformat(), "entity_count": self.entity_count, "relation_count": self.relation_count, } return result @classmethod def from_dict(cls, data): """Create a KnowledgeGraph instance from a dictionary representation.""" kg = cls() kg.filename = data.get('filename') # Store content as JSON kg.content = json.dumps(data) return kg class Entity(Base): """Model for storing knowledge graph entities.""" __tablename__ = "entities" id: Mapped[int] = mapped_column(Integer, primary_key=True, index=True) graph_id: Mapped[int] = mapped_column(Integer, ForeignKey("knowledge_graphs.id")) entity_id: Mapped[str] = mapped_column(String(255), index=True) # Original entity ID in the graph type: Mapped[str] = mapped_column(String(255)) name: Mapped[str] = mapped_column(String(255)) properties: Mapped[dict] = mapped_column(JSON) # Relationships graph = relationship("KnowledgeGraph", back_populates="entities") source_relations = relationship("Relation", foreign_keys="Relation.source_id", back_populates="source") target_relations = relationship("Relation", foreign_keys="Relation.target_id", back_populates="target") # Add a composite unique constraint to ensure entity_id is unique per graph __table_args__ = ( UniqueConstraint('graph_id', 'entity_id', name='uix_entity_graph_id_entity_id'), ) def to_dict(self): """Convert to dictionary representation.""" result = { "id": self.entity_id, "type": self.type, "name": self.name, "properties": self.properties or {} } return result @classmethod def from_dict(cls, data, graph_id): """Create an Entity instance from a dictionary.""" entity = cls() entity.graph_id = graph_id entity.entity_id = data.get('id') entity.type = data.get('type') entity.name = data.get('name') entity.properties = data.get('properties') return entity class Relation(Base): """Model for storing knowledge graph relations.""" __tablename__ = "relations" id: Mapped[int] = mapped_column(Integer, primary_key=True, index=True) graph_id: Mapped[int] = mapped_column(Integer, ForeignKey("knowledge_graphs.id")) relation_id: Mapped[str] = mapped_column(String(255), index=True) # Original relation ID in the graph type: Mapped[str] = mapped_column(String(255)) source_id: Mapped[int] = mapped_column(Integer, ForeignKey("entities.id")) target_id: Mapped[int] = mapped_column(Integer, ForeignKey("entities.id")) properties: Mapped[dict] = mapped_column(JSON) # Relationships graph = relationship("KnowledgeGraph", back_populates="relations") source = relationship("Entity", foreign_keys=[source_id], back_populates="source_relations") target = relationship("Entity", foreign_keys=[target_id], back_populates="target_relations") # Add a composite unique constraint to ensure relation_id is unique per graph __table_args__ = ( UniqueConstraint('graph_id', 'relation_id', name='uix_relation_graph_id_relation_id'), ) def to_dict(self): """Convert to dictionary representation.""" result = { "id": self.relation_id, "type": self.type, "source": self.source.entity_id if self.source else None, "target": self.target.entity_id if self.target else None, "properties": self.properties or {} } return result @classmethod def from_dict(cls, data, graph_id, source_entity=None, target_entity=None): """Create a Relation instance from a dictionary.""" relation = cls() relation.graph_id = graph_id relation.relation_id = data.get('id') relation.type = data.get('type') # Set source and target if source_entity: relation.source_id = source_entity.id if target_entity: relation.target_id = target_entity.id # Set properties relation.properties = data.get('properties') return relation class PromptReconstruction(Base): """Model for storing prompt reconstruction results.""" __tablename__ = "prompt_reconstructions" id: Mapped[int] = mapped_column(Integer, primary_key=True) knowledge_graph_id: Mapped[int] = mapped_column(Integer, ForeignKey("knowledge_graphs.id"), nullable=False) relation_id: Mapped[str] = mapped_column(String(255), nullable=False) reconstructed_prompt: Mapped[str] = mapped_column(Text) dependencies: Mapped[dict] = mapped_column(JSON) created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow) updated_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) # Relationships knowledge_graph = relationship("KnowledgeGraph", back_populates="prompt_reconstructions") perturbation_tests = relationship("PerturbationTest", back_populates="prompt_reconstruction") def to_dict(self): return { "id": self.id, "knowledge_graph_id": self.knowledge_graph_id, "relation_id": self.relation_id, "reconstructed_prompt": self.reconstructed_prompt, "dependencies": self.dependencies, "created_at": self.created_at.isoformat() if self.created_at else None, "updated_at": self.updated_at.isoformat() if self.updated_at else None } class PerturbationTest(Base): """Model for storing perturbation test results.""" __tablename__ = "perturbation_tests" id: Mapped[int] = mapped_column(Integer, primary_key=True) knowledge_graph_id: Mapped[int] = mapped_column(Integer, ForeignKey("knowledge_graphs.id"), nullable=False) prompt_reconstruction_id: Mapped[int] = mapped_column(Integer, ForeignKey("prompt_reconstructions.id"), nullable=False) relation_id: Mapped[str] = mapped_column(String(255), nullable=False) perturbation_type: Mapped[str] = mapped_column(String(50), nullable=False) # e.g., 'entity_removal', 'relation_removal' perturbation_set_id: Mapped[str] = mapped_column(String(64), nullable=False, index=True) test_result: Mapped[dict] = mapped_column(JSON) perturbation_score: Mapped[float] = mapped_column(Float) test_metadata: Mapped[dict] = mapped_column(JSON) created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow) updated_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) # Relationships knowledge_graph = relationship("KnowledgeGraph", back_populates="perturbation_tests") prompt_reconstruction = relationship("PromptReconstruction", back_populates="perturbation_tests") def to_dict(self): return { "id": self.id, "knowledge_graph_id": self.knowledge_graph_id, "prompt_reconstruction_id": self.prompt_reconstruction_id, "relation_id": self.relation_id, "perturbation_type": self.perturbation_type, "perturbation_set_id": self.perturbation_set_id, "test_result": self.test_result, "perturbation_score": self.perturbation_score, "test_metadata": self.test_metadata, "created_at": self.created_at.isoformat() if self.created_at else None, "updated_at": self.updated_at.isoformat() if self.updated_at else None } class CausalAnalysis(Base): """Model for storing causal analysis results.""" __tablename__ = "causal_analyses" id: Mapped[int] = mapped_column(Integer, primary_key=True) knowledge_graph_id: Mapped[int] = mapped_column(Integer, ForeignKey("knowledge_graphs.id"), nullable=False) perturbation_set_id: Mapped[str] = mapped_column(String(64), nullable=False, index=True) # Analysis method and results analysis_method: Mapped[str] = mapped_column(String(50), nullable=False) # e.g., 'graph', 'component', 'dowhy' analysis_result: Mapped[dict] = mapped_column(JSON) # Store the full analysis result causal_score: Mapped[float] = mapped_column(Float) # Store the numerical causal score analysis_metadata: Mapped[dict] = mapped_column(JSON) # Store additional metadata about the analysis # Timestamps created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow) updated_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) # Relationships knowledge_graph = relationship("KnowledgeGraph", back_populates="causal_analyses") # Indexes __table_args__ = ( Index("idx_causal_analyses_kgid", "knowledge_graph_id"), Index("idx_causal_analyses_method", "analysis_method"), Index("idx_causal_analyses_setid", "perturbation_set_id"), ) def to_dict(self): return { "id": self.id, "knowledge_graph_id": self.knowledge_graph_id, "perturbation_set_id": self.perturbation_set_id, "analysis_method": self.analysis_method, "analysis_result": self.analysis_result, "causal_score": self.causal_score, "analysis_metadata": self.analysis_metadata, "created_at": self.created_at.isoformat() if self.created_at else None, "updated_at": self.updated_at.isoformat() if self.updated_at else None } class ObservabilityConnection(Base): """Model for storing AI observability platform connections.""" __tablename__ = "observability_connections" id: Mapped[int] = mapped_column(Integer, primary_key=True, index=True) connection_id: Mapped[str] = mapped_column(String(36), unique=True, index=True, default=lambda: str(uuid.uuid4())) platform: Mapped[str] = mapped_column(String(50), nullable=False) # langfuse, langsmith, etc. public_key: Mapped[str] = mapped_column(Text, nullable=False) # Encrypted API key secret_key: Mapped[Optional[str]] = mapped_column(Text, nullable=True) # Encrypted secret key (for Langfuse) host: Mapped[Optional[str]] = mapped_column(String(255), nullable=True) # Host URL projects: Mapped[Optional[list[str]]] = mapped_column(JSON, nullable=True) # Available projects from the platform status: Mapped[str] = mapped_column(String(50), default="connected") connected_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow) last_sync: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True) created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow) updated_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) # Relationships fetched_traces = relationship("FetchedTrace", back_populates="connection", cascade="all, delete-orphan") def to_dict(self): return { "id": self.connection_id, "platform": self.platform, "status": self.status, "connected_at": self.connected_at.isoformat() if self.connected_at else None, "last_sync": self.last_sync.isoformat() if self.last_sync else None, "host": self.host, "projects": self.projects or [] } class FetchedTrace(Base): """Model for storing fetched traces from observability platforms.""" __tablename__ = "fetched_traces" id: Mapped[int] = mapped_column(Integer, primary_key=True, index=True) trace_id: Mapped[str] = mapped_column(String(255), nullable=False, index=True) # Original trace ID from platform name: Mapped[str] = mapped_column(String(255), nullable=False) platform: Mapped[str] = mapped_column(String(50), nullable=False) connection_id: Mapped[str] = mapped_column(String(36), ForeignKey("observability_connections.connection_id"), nullable=False) project_name: Mapped[str] = mapped_column(String(255), nullable=True, index=True) # Project name for LangSmith, null for Langfuse data: Mapped[dict] = mapped_column(SafeJSON, nullable=True) # Full trace data fetched_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow) imported: Mapped[bool] = mapped_column(Boolean, default=False) imported_at: Mapped[datetime] = mapped_column(DateTime, nullable=True) imported_trace_id: Mapped[str] = mapped_column(String(36), nullable=True) # Reference to imported trace # Relationships connection = relationship("ObservabilityConnection", back_populates="fetched_traces") __table_args__ = ( UniqueConstraint('trace_id', 'connection_id', name='uix_fetched_trace_id_connection'), ) def _extract_generated_timestamp(self): """Extract the actual generated timestamp from trace data based on platform.""" if not self.data: return None if self.platform == "langfuse": # For Langfuse, find the earliest timestamp from traces traces = self.data.get("traces", []) if traces: timestamps = [] for trace in traces: if isinstance(trace, dict): # Check for various timestamp fields in Langfuse traces for ts_field in ["timestamp", "startTime", "createdAt"]: if ts_field in trace: timestamps.append(trace[ts_field]) break if timestamps: return min(timestamps) # Fallback to session info or other timestamps session_info = self.data.get("session_info", {}) if session_info and "createdAt" in session_info: return session_info["createdAt"] # Other fallback fields at top level for field in ["timestamp", "createdAt", "startTime"]: if field in self.data: return self.data[field] elif self.platform == "langsmith": # For LangSmith, find the earliest start_time from traces traces = self.data.get("traces", []) if traces: start_times = [] for trace in traces: if isinstance(trace, dict) and "start_time" in trace: start_times.append(trace["start_time"]) if start_times: return min(start_times) # Fallback to other timestamp fields for field in ["timestamp", "start_time", "created_at"]: if field in self.data: return self.data[field] return None def to_dict(self, preview=True): data = self.data original_stats = {} if data: # Calculate original data statistics import json original_json_str = json.dumps(data, ensure_ascii=False) original_stats = { "original_character_count": len(original_json_str), "original_line_count": original_json_str.count('\n') + 1, "original_size_kb": round(len(original_json_str) / 1024, 2) } if preview: # Truncate long strings to prevent browser crashes but preserve full structure from backend.routers.observability import truncate_long_strings data = truncate_long_strings(data, max_string_length=500) # Extract generated timestamp generated_timestamp = self._extract_generated_timestamp() result = { "id": self.trace_id, "name": self.name, "platform": self.platform, "fetched_at": self.fetched_at.isoformat() if self.fetched_at else None, "generated_timestamp": generated_timestamp, "imported": self.imported, "imported_at": self.imported_at.isoformat() if self.imported_at else None, "data": data } # Add original statistics to the result result.update(original_stats) return result def get_full_data(self): """Get full original data for download (no limitations)""" return { "id": self.trace_id, "name": self.name, "platform": self.platform, "fetched_at": self.fetched_at.isoformat() if self.fetched_at else None, "imported": self.imported, "imported_at": self.imported_at.isoformat() if self.imported_at else None, "data": self.data # Full original data }
Visualization

""" AI Observability Platform Integration Router Handles connections to external AI observability platforms like Langfuse and LangSmith. Provides endpoints for: - Platform connection management - Trace fetching and importing - Automated synchronization """ import base64 import json import logging import time import uuid from datetime import datetime from typing import Dict, List, Optional, cast import requests from fastapi import APIRouter, Depends, HTTPException from langsmith import Client as LangsmithClient from pydantic import BaseModel from sqlalchemy import Column from sqlalchemy.orm import Session from backend.database import get_db from backend.database.models import FetchedTrace, ObservabilityConnection from backend.database.utils import save_trace from backend.routers.observe_models import LangFuseSession, LangSmithTrace from utils.langfuse_downloader import LangfuseDownloader from utils.preprocess_traces import filter_langfuse_session, filter_langsmith_trace logger = logging.getLogger("agent_monitoring_server.routers.observability") router = APIRouter(prefix="/api/observability", tags=["observability"]) def truncate_long_strings(obj, max_string_length=500): """ Recursively process JSON object to truncate very long strings No depth limit - all keys and array items are preserved Only truncates string values that are too long """ if isinstance(obj, dict): truncated = {} # Process ALL keys, no limit on key count or depth for key, value in obj.items(): truncated[key] = truncate_long_strings(value, max_string_length) return truncated elif isinstance(obj, list): truncated = [] # Process ALL items, no limit on item count or depth for item in obj: truncated.append(truncate_long_strings(item, max_string_length)) return truncated elif isinstance(obj, str) and len(obj) > max_string_length: return f"{obj[:max_string_length]}...({len(obj)} chars)" return obj # Helper Functions for Common Operations def get_langfuse_projects(public_key: str, secret_key: str, host: Optional[str]) -> List[Dict]: """Fetch projects from Langfuse API""" try: # Create Basic Auth header auth_string = f"{public_key}:{secret_key}" auth_bytes = auth_string.encode('ascii') auth_b64 = base64.b64encode(auth_bytes).decode('ascii') headers = { 'Authorization': f'Basic {auth_b64}', 'Content-Type': 'application/json' } # Get projects from Langfuse API host_url = host or "https://cloud.langfuse.com" projects_url = f"{host_url}/api/public/projects" response = requests.get(projects_url, headers=headers, timeout=10) response.raise_for_status() projects_data = response.json() projects_info = [] # Extract project information if 'data' in projects_data: for project in projects_data['data']: project_info = { "id": project.get('id', ''), "name": project.get('name', ''), "description": project.get('description', ''), "created_at": project.get('createdAt', None) } projects_info.append(project_info) if not projects_info: # Fallback to default project if no projects found projects_info = [{"name": "Default", "id": "default", "description": "Langfuse workspace"}] logger.info(f"Successfully fetched {len(projects_info)} Langfuse projects") return projects_info except Exception as e: logger.warning(f"Failed to fetch Langfuse projects: {str(e)}, using default project") return [{"name": "Default", "id": "default", "description": "Langfuse workspace"}] def get_langsmith_projects(api_key: str) -> List[Dict]: """Fetch projects from LangSmith API""" try: client = LangsmithClient(api_key=api_key) projects = list(client.list_projects()) logger.info(f"Successfully fetched {len(projects)} LangSmith projects") # Extract project information projects_info = [] for project in projects: project_info = { "id": str(project.id), "name": project.name, "description": getattr(project, 'description', ''), "created_at": getattr(project, 'created_at', None) } projects_info.append(project_info) return projects_info except Exception as e: logger.error(f"Failed to fetch LangSmith projects: {str(e)}") raise def test_langfuse_connection(public_key: str, secret_key: str, host: Optional[str]) -> bool: """Test Langfuse connection by fetching traces""" try: downloader = LangfuseDownloader( secret_key=secret_key, public_key=public_key, host=host or "https://cloud.langfuse.com" ) # Test connection by fetching a small number of traces test_traces = downloader.download_recent_traces(limit=1) logger.info(f"Successfully tested Langfuse connection, found {len(test_traces)} traces") return True except Exception as e: logger.error(f"Failed to connect to Langfuse: {str(e)}") raise HTTPException(status_code=400, detail=f"Failed to connect to Langfuse: {str(e)}") from e def test_langsmith_connection(api_key: str) -> bool: """Test LangSmith connection by listing projects""" try: client = LangsmithClient(api_key=api_key) projects = list(client.list_projects()) logger.info(f"Successfully tested LangSmith connection, found {len(projects)} projects") return True except Exception as e: logger.error(f"Failed to connect to LangSmith: {str(e)}") raise HTTPException(status_code=400, detail=f"Failed to connect to LangSmith: {str(e)}") from e def get_connection_projects(platform: str, public_key: str, secret_key: str, host: Optional[str]) -> List[Dict]: """Get projects for a platform connection""" platform = platform.lower() if platform == "langfuse": test_langfuse_connection(public_key, secret_key, host) return get_langfuse_projects(public_key, secret_key, host) elif platform == "langsmith": if not public_key: raise HTTPException(status_code=400, detail="LangSmith requires an API token") test_langsmith_connection(public_key) return get_langsmith_projects(public_key) else: raise HTTPException(status_code=400, detail=f"Unsupported platform: {platform}") def get_last_fetch_time(db: Session, connection_id: str, platform: str, project_name: Optional[str] = None) -> Optional[datetime]: """Get last fetch time for a connection and optionally a specific project""" query = db.query(FetchedTrace).filter( FetchedTrace.connection_id == connection_id, FetchedTrace.platform == platform ) if project_name: query = query.filter(FetchedTrace.project_name == project_name) last_trace = query.order_by(FetchedTrace.fetched_at.desc()).first() return cast(datetime, last_trace.fetched_at) if last_trace else None def create_fetched_trace(trace_id: str, name: str, platform: str, connection_id: str, data: Dict, project_name: Optional[str] = None) -> FetchedTrace: """Create a FetchedTrace object""" return FetchedTrace( trace_id=trace_id, name=name, platform=platform, connection_id=connection_id, project_name=project_name, data=data ) def fetch_langfuse_sessions(connection: ObservabilityConnection, db: Session, limit: int = 50) -> List[Dict]: """Fetch sessions from Langfuse""" downloader = LangfuseDownloader( secret_key=cast(str, connection.secret_key), public_key=cast(str, connection.public_key), host=cast(str, connection.host) ) # Get last fetched time for this connection from_timestamp = get_last_fetch_time(db, cast(str, connection.connection_id), "langfuse") if from_timestamp: logger.info(f"Fetching sessions from {from_timestamp} onwards") else: logger.info("No previous fetches found, fetching all sessions") try: # List sessions to get session IDs if from_timestamp: sessions_response = downloader.client.api.sessions.list( limit=limit, from_timestamp=from_timestamp ) else: sessions_response = downloader.client.api.sessions.list(limit=limit) # Handle different response formats if hasattr(sessions_response, 'data'): sessions = [downloader._convert_to_dict(session) for session in sessions_response.data] else: sessions = [downloader._convert_to_dict(session) for session in sessions_response] logger.info(f"Found {len(sessions)} sessions") # Store each session as a fetched trace for session in sessions: try: session_id = session.get('id') if session_id: # Check if session already exists existing_session = db.query(FetchedTrace).filter( FetchedTrace.trace_id == session_id, FetchedTrace.connection_id == connection.connection_id ).first() if not existing_session: # Get all traces for this session - same as langfuse_downloader.py try: traces_response = downloader.client.api.trace.list(session_id=session_id) if hasattr(traces_response, 'data'): session_traces = [downloader._convert_to_dict(trace) for trace in traces_response.data] else: session_traces = [downloader._convert_to_dict(trace) for trace in traces_response] # Get detailed trace data for each trace - same as langfuse_downloader.py detailed_traces = [] for i, trace_summary in enumerate(session_traces): trace_id = trace_summary.get('id') if trace_id: try: # Add small delay between requests to avoid rate limits if i > 0: time.sleep(0.1) detailed_trace = downloader.client.api.trace.get(trace_id) if detailed_trace: trace_data = downloader._convert_to_dict(detailed_trace) detailed_traces.append(trace_data) logger.info(f"Downloaded detailed trace: {trace_id} ({i+1}/{len(session_traces)})") except Exception as e: logger.error(f"Error downloading detailed trace {trace_id}: {e}") detailed_traces.append(trace_summary) # Create session data - compatible format with LangSmith session_data = { 'run_id': session_id, 'run_name': session_id, # Use run_id as name 'project_name': "Default", 'trace_id': session_id, 'export_timestamp': datetime.now().isoformat(), 'total_traces': len(detailed_traces), 'traces': detailed_traces } # Convert to JSON-serializable format data_json = json.loads(json.dumps(session_data, default=str)) fetched_trace = create_fetched_trace( trace_id=session_id, name=session_id, # Use run_id as name platform="langfuse", connection_id=cast(str, connection.connection_id), data=data_json, project_name="Default" ) db.add(fetched_trace) logger.info(f"Stored session {session_id} with {len(detailed_traces)} traces") except Exception as e: logger.error(f"Error downloading session {session_id}: {e}") continue except Exception as e: logger.error(f"Error storing session {session_id}: {str(e)}") continue db.commit() logger.info(f"Fetched {len(sessions)} sessions from Langfuse") return sessions except Exception as e: logger.error(f"Error fetching sessions from Langfuse: {str(e)}") raise HTTPException(status_code=500, detail=f"Failed to fetch sessions: {str(e)}") from e def fetch_langsmith_traces(connection: ObservabilityConnection, db: Session, limit: int = 50) -> List[Dict]: """Fetch traces from LangSmith""" try: client = LangsmithClient(api_key=cast(str, connection.public_key)) logger.info("Connected to LangSmith successfully") # Get all projects try: projects = list(client.list_projects()) logger.info(f"Found {len(projects)} projects") except Exception as e: logger.error(f"Error listing projects: {e}") raise HTTPException(status_code=500, detail=f"Error listing projects: {str(e)}") from e # Export runs from each project all_traces = [] total_limit = limit # Get existing trace IDs to avoid duplicates existing_traces = db.query(FetchedTrace).filter( FetchedTrace.connection_id == connection.connection_id, FetchedTrace.platform == "langsmith" ).all() existing_trace_ids = {cast(str, trace.trace_id) for trace in existing_traces} for project in projects: project_name = project.name if not project_name: logger.info(f"Skipping project with no name (ID: {project.id})") continue logger.info(f"Exporting project: {project_name}") try: # Get last fetched time for this specific project project_from_timestamp = get_last_fetch_time( db, cast(str, connection.connection_id), "langsmith", project_name ) if project_from_timestamp: logger.info(f"Fetching {project_name} runs from {project_from_timestamp} onwards") else: logger.info(f"No previous fetches found for {project_name}, fetching all runs") # Get all runs (top-level runs only) - same as langsmith_exporter.py list_runs_kwargs = { "project_name": project_name, "is_root": True, "limit": None # No limit to get everything } # Add start_time filter if we have a project-specific timestamp if project_from_timestamp: list_runs_kwargs["start_time"] = project_from_timestamp runs = list(client.list_runs(**list_runs_kwargs)) logger.info(f"Found {len(runs)} runs in project {project_name}") # Process runs in batch new_traces_to_add = [] for run in runs: run_name = getattr(run, 'name', 'unnamed') run_id = str(run.id) unique_trace_id = f"{run_name}_{run_id}" # Skip if already exists if unique_trace_id in existing_trace_ids: logger.debug(f"Skipping existing trace: {unique_trace_id}") continue # Get all traces for this run (including nested children) - same as langsmith_exporter.py all_traces = [] try: # Get the root run and all its children trace_runs = client.list_runs(project_name=project_name, trace_id=run.trace_id) trace_list = list(trace_runs) # Sort traces by start_time descending (latest first) sorted_traces = sorted(trace_list, key=lambda t: getattr(t, 'start_time', None) or datetime.min) for trace_run in sorted_traces: trace_data = trace_run.dict() if hasattr(trace_run, 'dict') else dict(trace_run) all_traces.append(trace_data) except Exception as e: logger.warning(f"Could not get child traces for run {run_id}: {e}") # Fallback to just the main run run_data = run.dict() if hasattr(run, 'dict') else dict(run) all_traces = [run_data] # Create run export structure - same format as langsmith_exporter.py run_export = { "run_id": run_id, "run_name": run_name, "project_name": project_name, "trace_id": str(run.trace_id) if hasattr(run, 'trace_id') else None, "export_timestamp": datetime.now().isoformat(), "total_traces": len(all_traces), "traces": all_traces } # Prepare for batch database insert try: # Same as langsmith_exporter.py: json.dump(..., default=str) # Convert to JSON string then back to dict to clean circular references clean_data = json.loads(json.dumps(run_export, default=str)) fetched_trace = create_fetched_trace( trace_id=unique_trace_id, name=f"{run_name}_{run_id[:8]}", platform="langsmith", connection_id=cast(str, connection.connection_id), data=clean_data, project_name=project_name ) new_traces_to_add.append(fetched_trace) existing_trace_ids.add(unique_trace_id) except Exception as e: logger.error(f"Error preparing trace {unique_trace_id}: {e}") continue # Stop if we've reached the limit if len(all_traces) >= total_limit: break # Batch insert new traces if new_traces_to_add: db.add_all(new_traces_to_add) logger.info(f"Added {len(new_traces_to_add)} new traces from project {project_name}") # Stop if we've reached the limit across all projects if len(all_traces) >= total_limit: break except Exception as e: logger.error(f"Error exporting project {project_name}: {e}") continue # Single commit for all operations db.commit() logger.info(f"Fetched {len(all_traces)} traces from LangSmith") return all_traces except Exception as e: logger.error(f"Error fetching LangSmith traces: {str(e)}") raise HTTPException(status_code=500, detail=f"Failed to fetch traces: {str(e)}") from e # Request/Response Models class ConnectionRequest(BaseModel): platform: str publicKey: str secretKey: str host: Optional[str] = None class ConnectionResponse(BaseModel): status: str message: str connection_id: str class TraceFetchRequest(BaseModel): limit: Optional[int] = 50 start_date: Optional[str] = None end_date: Optional[str] = None class TraceImportRequest(BaseModel): trace_ids: List[str] @router.post("/connect", response_model=ConnectionResponse) async def connect_platform(request: ConnectionRequest, db: Session = Depends(get_db)): # noqa: B008 """Connect to an AI observability platform""" try: platform = request.platform.lower() public_key = request.publicKey secret_key = request.secretKey # Get projects and test connection projects_info = get_connection_projects(platform, public_key, secret_key, request.host) # Store connection info in database connection_id = str(uuid.uuid4()) db_connection = ObservabilityConnection( connection_id=connection_id, platform=platform, public_key=public_key, secret_key=secret_key, host=request.host, projects=projects_info, status="connected" ) db.add(db_connection) db.commit() db.refresh(db_connection) logger.info(f"Successfully connected to {platform} with connection ID: {connection_id}") return ConnectionResponse( status="success", message=f"Successfully connected to {platform.title()}", connection_id=connection_id ) except HTTPException: raise except Exception as e: logger.error(f"Unexpected error connecting to platform: {str(e)}") raise HTTPException(status_code=500, detail="Internal server error") from e @router.get("/{platform}/fetched-traces") async def get_fetched_traces(platform: str, db: Session = Depends(get_db)): # noqa: B008 """Get all fetched traces for a platform""" platform = platform.lower() # Get connection for platform connection = db.query(ObservabilityConnection).filter( ObservabilityConnection.platform == platform, ObservabilityConnection.status == "connected" ).first() if not connection: raise HTTPException(status_code=404, detail=f"No active connection found for {platform}") # Get all fetched traces for this connection fetched_traces = db.query(FetchedTrace).filter( FetchedTrace.connection_id == connection.connection_id ).order_by(FetchedTrace.fetched_at.desc()).all() return { "traces": [trace.to_dict() for trace in fetched_traces], "total": len(fetched_traces), "platform": platform } @router.get("/{platform}/traces/{trace_id}/download") async def download_trace_full_data(platform: str, trace_id: str, db: Session = Depends(get_db)): # noqa: B008 """Download full trace data without limitations""" platform = platform.lower() # Get connection for platform connection = db.query(ObservabilityConnection).filter( ObservabilityConnection.platform == platform, ObservabilityConnection.status == "connected" ).first() if not connection: raise HTTPException(status_code=404, detail=f"No active connection found for {platform}") # Get specific trace trace = db.query(FetchedTrace).filter( FetchedTrace.trace_id == trace_id, FetchedTrace.connection_id == connection.connection_id ).first() if not trace: raise HTTPException(status_code=404, detail="Trace not found") # Return full data without limitations return trace.get_full_data() @router.get("/connections") async def get_connections(db: Session = Depends(get_db)): # noqa: B008 """Get all active platform connections""" connections = db.query(ObservabilityConnection).all() return {"connections": [conn.to_dict() for conn in connections]} @router.put("/connections/{connection_id}") async def update_connection( connection_id: str, request: ConnectionRequest, db: Session = Depends(get_db) # noqa: B008 ): """Update an existing platform connection""" try: # Find existing connection connection = db.query(ObservabilityConnection).filter( ObservabilityConnection.connection_id == connection_id ).first() if not connection: raise HTTPException(status_code=404, detail="Connection not found") platform = request.platform.lower() public_key = request.publicKey secret_key = request.secretKey # Test connection and get projects projects_info = get_connection_projects(platform, public_key, secret_key, request.host) # Update connection in database connection.public_key = cast(Column[str], public_key) connection.secret_key = cast(Column[str], secret_key) connection.host = cast(Column[str], request.host) connection.projects = cast(Column[List[Dict]], projects_info) connection.status = cast(Column[str], "connected") db.commit() db.refresh(connection) logger.info(f"Successfully updated {platform} connection: {connection_id}") return { "status": "success", "message": f"Successfully updated {platform.title()} connection" } except HTTPException: raise except Exception as e: logger.error(f"Unexpected error updating connection: {str(e)}") raise HTTPException(status_code=500, detail="Internal server error") from e @router.post("/{platform}/traces") async def fetch_platform_traces( platform: str, request: TraceFetchRequest, db: Session = Depends(get_db) # noqa: B008 ): """Fetch traces from connected platform""" platform = platform.lower() # Find connection for platform connection = db.query(ObservabilityConnection).filter( ObservabilityConnection.platform == platform, ObservabilityConnection.status == "connected" ).first() if not connection: raise HTTPException(status_code=404, detail=f"No active connection found for {platform}") try: import asyncio # Run blocking operations in executor to avoid blocking the event loop loop = asyncio.get_event_loop() def sync_fetch(): # Create new db session for the thread from backend.database import get_db thread_db = next(get_db()) try: if platform == "langfuse": traces = fetch_langfuse_sessions(connection, thread_db, request.limit or 50) elif platform == "langsmith": traces = fetch_langsmith_traces(connection, thread_db, request.limit or 50) else: raise HTTPException(status_code=400, detail=f"Unsupported platform: {platform}") # Update last sync time connection.last_sync = cast(Column[datetime], datetime.now()) thread_db.commit() return traces finally: thread_db.close() # Execute in thread pool to avoid blocking traces = await loop.run_in_executor(None, sync_fetch) return { "status": "success", "message": f"Successfully fetched {len(traces)} traces from {platform}", "platform": platform, "traces_count": len(traces), "completed_at": datetime.now().isoformat() } except Exception as e: logger.error(f"Failed to fetch traces from {platform}: {str(e)}") raise HTTPException(status_code=500, detail=f"Failed to fetch traces: {str(e)}") from e @router.post("/{platform}/import") async def import_platform_traces( platform: str, request: TraceImportRequest, db: Session = Depends(get_db) # noqa: B008 ): """Import specific traces from platform to local database""" platform = platform.lower() # Find connection for platform connection = db.query(ObservabilityConnection).filter( ObservabilityConnection.platform == platform, ObservabilityConnection.status == "connected" ).first() if not connection: raise HTTPException(status_code=404, detail=f"No active connection found for {platform}") try: imported_count = 0 errors = [] if platform == "langfuse": for session_id in request.trace_ids: try: fetched_session = db.query(FetchedTrace).filter( FetchedTrace.trace_id == session_id, FetchedTrace.connection_id == connection.connection_id ).first() if not fetched_session: errors.append(f"Session {session_id} not found in fetched traces") continue # Use the already fetched session data session_data = cast(Dict, fetched_session.data) preprocessed_session = filter_langfuse_session(LangFuseSession(**session_data), topk=10, max_char=1000) session_content = json.dumps(preprocessed_session, indent=2) saved_trace = save_trace( session=db, content=session_content, filename=f"langfuse_{session_id}.json", title=session_id, description=f"Processed import from Langfuse {session_id}", trace_type="langfuse_session_import", trace_source="langfuse", tags=["imported", "langfuse", "session", session_id], trace_metadata={ "session_id": session_id, "platform": "langfuse", "processing_type": "session", "total_traces": len(preprocessed_session["traces"]), } ) imported_count += 1 logger.info(f"Successfully imported Langfuse {session_id} as {saved_trace.trace_id}") # Auto-generate context documents using universal parser try: from backend.services.universal_parser_service import auto_generate_context_documents created_docs = auto_generate_context_documents(cast(str, saved_trace.trace_id), session_content, db) if created_docs: logger.info(f"Auto-generated {len(created_docs)} context documents for imported trace {saved_trace.trace_id}") except Exception as e: logger.warning(f"Failed to auto-generate context documents for imported trace {saved_trace.trace_id}: {str(e)}") except Exception as e: error_msg = f"Failed to import session {session_id}: {str(e)}" errors.append(error_msg) logger.error(error_msg) elif platform == "langsmith": # LangSmith import implementation for trace_id in request.trace_ids: try: # Get the fetched trace data from database fetched_trace = db.query(FetchedTrace).filter( FetchedTrace.trace_id == trace_id, FetchedTrace.connection_id == connection.connection_id ).first() if not fetched_trace: errors.append(f"Trace {trace_id} not found in fetched traces") continue # Use the already fetched data langsmith_export = cast(Dict, fetched_trace.data) preprocessed_trace = filter_langsmith_trace(LangSmithTrace(**langsmith_export), topk=10, max_char=1000) processed_content = json.dumps(preprocessed_trace, indent=2) processed_trace = save_trace( session=db, content=processed_content, filename=f"langsmith_{trace_id}.json", title=cast(str, fetched_trace.name), description="Processed import from LangSmith", trace_type="langsmith_processed_import", trace_source="langsmith", tags=["imported", "langsmith", "processed"], trace_metadata={ "original_trace_id": trace_id, "platform": "langsmith", "processing_type": "processed", } ) imported_count += 1 logger.info(f"Successfully imported LangSmith trace {trace_id} as processed ({processed_trace.trace_id})") # Auto-generate context documents using universal parser (use raw content, not processed) try: from backend.services.universal_parser_service import auto_generate_context_documents # Use the original langsmith_export for better parsing results raw_content = json.dumps(langsmith_export, indent=2, default=str) created_docs = auto_generate_context_documents(cast(str, processed_trace.trace_id), raw_content, db) if created_docs: logger.info(f"Auto-generated {len(created_docs)} context documents for processed trace {processed_trace.trace_id}") except Exception as e: logger.warning(f"Failed to auto-generate context documents for processed trace {processed_trace.trace_id}: {str(e)}") except Exception as e: error_msg = f"Failed to import trace {trace_id}: {str(e)}" errors.append(error_msg) logger.error(error_msg) # Update last sync time connection.last_sync = cast(Column[datetime], datetime.now()) db.commit() return { "imported": imported_count, "errors": errors, "platform": platform, "import_completed_at": datetime.now().isoformat() } except Exception as e: logger.error(f"Failed to import traces from {platform}: {str(e)}") raise HTTPException(status_code=500, detail=f"Failed to import traces: {str(e)}") from e @router.delete("/connections/{connection_id}") async def disconnect_platform(connection_id: str, db: Session = Depends(get_db)): # noqa: B008 """Disconnect from a platform""" connection = db.query(ObservabilityConnection).filter( ObservabilityConnection.connection_id == connection_id ).first() if not connection: raise HTTPException(status_code=404, detail="Connection not found") platform = connection.platform # Delete all fetched traces for this connection fetched_traces = db.query(FetchedTrace).filter( FetchedTrace.connection_id == connection_id ).all() deleted_traces_count = len(fetched_traces) # Delete fetched traces for fetched_trace in fetched_traces: db.delete(fetched_trace) # Remove connection from database db.delete(connection) db.commit() logger.info(f"Disconnected from {platform} (connection ID: {connection_id})") logger.info(f"Deleted {deleted_traces_count} fetched traces for connection {connection_id}") return { "status": "success", "message": f"Disconnected from {platform.title()}", "deleted_fetched_traces": deleted_traces_count, "disconnected_at": datetime.now().isoformat() } # Connection-specific routes (required by frontend) @router.get("/connections/{connection_id}/fetched-traces") async def get_fetched_traces_by_connection(connection_id: str, db: Session = Depends(get_db)): # noqa: B008 """Get all fetched traces for a specific connection""" # Get connection connection = db.query(ObservabilityConnection).filter( ObservabilityConnection.connection_id == connection_id, ObservabilityConnection.status == "connected" ).first() if not connection: raise HTTPException(status_code=404, detail=f"No active connection found with ID {connection_id}") # Get all fetched traces for this connection fetched_traces = db.query(FetchedTrace).filter( FetchedTrace.connection_id == connection_id ).order_by(FetchedTrace.fetched_at.desc()).all() return { "traces": [trace.to_dict() for trace in fetched_traces], "total": len(fetched_traces), "platform": connection.platform } @router.post("/connections/{connection_id}/fetch-traces") async def fetch_traces_by_connection( connection_id: str, request: TraceFetchRequest, db: Session = Depends(get_db) # noqa: B008 ): """Fetch traces from a specific connection""" # Get connection connection = db.query(ObservabilityConnection).filter( ObservabilityConnection.connection_id == connection_id, ObservabilityConnection.status == "connected" ).first() if not connection: raise HTTPException(status_code=404, detail=f"No active connection found with ID {connection_id}") try: import asyncio # Run blocking operations in executor to avoid blocking the event loop loop = asyncio.get_event_loop() def sync_fetch(): # Create new db session for the thread from backend.database import get_db thread_db = next(get_db()) try: if cast(str, connection.platform) == "langfuse": traces = fetch_langfuse_sessions(connection, thread_db, request.limit or 50) elif cast(str, connection.platform) == "langsmith": traces = fetch_langsmith_traces(connection, thread_db, request.limit or 50) else: raise HTTPException(status_code=400, detail=f"Unsupported platform: {connection.platform}") # Update last sync time connection.last_sync = cast(Column[datetime], datetime.now()) thread_db.commit() return traces finally: thread_db.close() # Execute in thread pool to avoid blocking traces = await loop.run_in_executor(None, sync_fetch) return { "status": "success", "message": f"Successfully fetched {len(traces)} traces from {connection.platform}", "platform": connection.platform, "traces_count": len(traces), "completed_at": datetime.now().isoformat() } except Exception as e: logger.error(f"Failed to fetch traces from connection {connection_id}: {str(e)}") raise HTTPException(status_code=500, detail=f"Failed to fetch traces: {str(e)}") from e @router.post("/connections/{connection_id}/import") async def import_traces_by_connection( connection_id: str, request: TraceImportRequest, db: Session = Depends(get_db) # noqa: B008 ): """Import specific traces from a connection to local database""" # Get connection connection = db.query(ObservabilityConnection).filter( ObservabilityConnection.connection_id == connection_id, ObservabilityConnection.status == "connected" ).first() if not connection: raise HTTPException(status_code=404, detail=f"No active connection found with ID {connection_id}") try: imported_count = 0 errors = [] for trace_id in request.trace_ids: try: # Get trace from fetched_traces table trace = db.query(FetchedTrace).filter( FetchedTrace.trace_id == trace_id, FetchedTrace.connection_id == connection_id ).first() if not trace: errors.append(f"Trace {trace_id} not found in fetched traces for connection {connection_id}") continue # Process based on platform if cast(str, connection.platform) == "langfuse": filtered_trace = filter_langfuse_session(LangFuseSession(**trace.get_full_data()), topk=10, max_char=1000) processed_trace = save_trace( content=json.dumps(filtered_trace, indent=2, default=str), filename=f"langfuse_trace_{trace_id}", trace_type="langfuse_raw_import", session=db, ) if processed_trace: imported_count += 1 logger.info(f"Successfully imported Langfuse trace {trace_id} as {processed_trace.trace_id}") # Auto-generate context documents using universal parser try: from backend.services.universal_parser_service import auto_generate_context_documents raw_content = json.dumps(filtered_trace, indent=2, default=str) created_docs = auto_generate_context_documents(cast(str, processed_trace.trace_id), raw_content, db) if created_docs: logger.info(f"Auto-generated {len(created_docs)} context documents for processed trace {processed_trace.trace_id}") except Exception as e: logger.warning(f"Failed to auto-generate context documents for processed trace {processed_trace.trace_id}: {str(e)}") elif cast(str, connection.platform) == "langsmith": langsmith_export = trace.get_full_data() filtered_export = filter_langsmith_trace(LangSmithTrace(**langsmith_export), topk=10, max_char=1000) processed_trace = save_trace( content=json.dumps(filtered_export, indent=2, default=str), filename=f"langsmith_trace_{trace_id}", trace_type="langsmith_raw_import", session=db ) if processed_trace: imported_count += 1 logger.info(f"Successfully imported LangSmith trace {trace_id} as {processed_trace.trace_id}") # Auto-generate context documents using universal parser (use raw content, not processed) try: from backend.services.universal_parser_service import auto_generate_context_documents # Use the original langsmith_export for better parsing results raw_content = json.dumps(langsmith_export, indent=2, default=str) created_docs = auto_generate_context_documents(cast(str, processed_trace.trace_id), raw_content, db) if created_docs: logger.info(f"Auto-generated {len(created_docs)} context documents for processed trace {processed_trace.trace_id}") except Exception as e: logger.warning(f"Failed to auto-generate context documents for processed trace {processed_trace.trace_id}: {str(e)}") except Exception as e: error_msg = f"Failed to import trace {trace_id}: {str(e)}" errors.append(error_msg) logger.error(error_msg) # Update last sync time connection.last_sync = cast(Column[datetime], datetime.now()) db.commit() return { "imported": imported_count, "errors": errors, "platform": connection.platform, "import_completed_at": datetime.now().isoformat() } except Exception as e: logger.error(f"Failed to import traces from connection {connection_id}: {str(e)}") raise HTTPException(status_code=500, detail=f"Failed to import traces: {str(e)}") from e @router.get("/traces/{trace_id}/download") async def download_trace_by_id(trace_id: str, db: Session = Depends(get_db)): # noqa: B008 """Download full trace data by trace ID (platform-agnostic)""" # Find the trace in fetched_traces table trace = db.query(FetchedTrace).filter( FetchedTrace.trace_id == trace_id ).first() if not trace: raise HTTPException(status_code=404, detail="Trace not found") # Return full data without limitations return trace.get_full_data()
""" AI Observability Platform Integration Router Handles connections to external AI observability platforms like Langfuse and LangSmith. Provides endpoints for: - Platform connection management - Trace fetching and importing - Automated synchronization """ import asyncio import base64 import json import logging import uuid from datetime import datetime from typing import Dict, List, Optional, cast import requests from fastapi import APIRouter, Depends, HTTPException from langsmith import Client as LangsmithClient from pydantic import BaseModel from sqlalchemy import Column from sqlalchemy.orm import Session from backend.database import get_db from backend.database.models import FetchedTrace, ObservabilityConnection from backend.database.utils import save_trace from backend.routers.observe_models import LangFuseSession, LangSmithRun, LangSmithTrace from utils.langfuse_downloader import LangfuseDownloader logger = logging.getLogger("agent_monitoring_server.routers.observability") router = APIRouter(prefix="/api/observability", tags=["observability"]) db_depend = Depends(get_db) def truncate_long_strings(obj, max_string_length=500): """ Recursively process JSON object to truncate very long strings No depth limit - all keys and array items are preserved Only truncates string values that are too long """ if isinstance(obj, dict): truncated = {} # Process ALL keys, no limit on key count or depth for key, value in obj.items(): truncated[key] = truncate_long_strings(value, max_string_length) return truncated elif isinstance(obj, list): truncated = [] # Process ALL items, no limit on item count or depth for item in obj: truncated.append(truncate_long_strings(item, max_string_length)) return truncated elif isinstance(obj, str) and len(obj) > max_string_length: return f"{obj[:max_string_length]}...({len(obj)} chars)" return obj # Helper Functions for Common Operations def get_langfuse_projects(public_key: str, secret_key: str, host: Optional[str]) -> List[Dict]: """Fetch projects from Langfuse API""" try: # Create Basic Auth header auth_string = f"{public_key}:{secret_key}" auth_bytes = auth_string.encode('ascii') auth_b64 = base64.b64encode(auth_bytes).decode('ascii') headers = { 'Authorization': f'Basic {auth_b64}', 'Content-Type': 'application/json' } # Get projects from Langfuse API host_url = host or "https://cloud.langfuse.com" projects_url = f"{host_url}/api/public/projects" response = requests.get(projects_url, headers=headers, timeout=10) response.raise_for_status() projects_data = response.json() projects_info = [] # Extract project information if 'data' in projects_data: for project in projects_data['data']: project_info = { "id": project.get('id', ''), "name": project.get('name', ''), } projects_info.append(project_info) if not projects_info: # Fallback to default project if no projects found projects_info = [{"name": "Default", "id": "default", "description": "Langfuse workspace"}] logger.info(f"Successfully fetched {len(projects_info)} Langfuse projects") return projects_info except Exception as e: logger.warning(f"Failed to fetch Langfuse projects: {str(e)}, using default project") return [{"name": "Default", "id": "default", "description": "Langfuse workspace"}] def get_langsmith_projects(api_key: str) -> List[Dict]: """Fetch projects from LangSmith API""" try: client = LangsmithClient(api_key=api_key) projects = list(client.list_projects()) logger.info(f"Successfully fetched {len(projects)} LangSmith projects") # Extract project information projects_info = [] for project in projects: project_info = { "id": str(project.id), "name": project.name, } projects_info.append(project_info) return projects_info except Exception as e: logger.error(f"Failed to fetch LangSmith projects: {str(e)}") raise def test_langfuse_connection(public_key: str, secret_key: str, host: Optional[str]) -> bool: """Test Langfuse connection by fetching traces""" downloader = LangfuseDownloader( secret_key=secret_key, public_key=public_key, host=host or "https://cloud.langfuse.com" ) # Test connection by fetching a small number of traces test_traces = downloader.download_recent_traces(limit=1) logger.info(f"Successfully tested Langfuse connection, found {len(test_traces)} traces") return True def test_langsmith_connection(api_key: str) -> bool: """Test LangSmith connection by listing projects""" client = LangsmithClient(api_key=api_key) projects = list(client.list_projects()) logger.info(f"Successfully tested LangSmith connection, found {len(projects)} projects") return True def get_connection_projects(platform: str, public_key: str, secret_key: str, host: Optional[str]) -> List[Dict]: """Get projects for a platform connection""" platform = platform.lower() if platform == "langfuse": test_langfuse_connection(public_key, secret_key, host) return get_langfuse_projects(public_key, secret_key, host) elif platform == "langsmith": if not public_key: raise HTTPException(status_code=400, detail="LangSmith requires an API token") test_langsmith_connection(public_key) return get_langsmith_projects(public_key) else: raise HTTPException(status_code=400, detail=f"Unsupported platform: {platform}") def get_last_fetch_time(db: Session, connection_id: str, platform: str, project_name: Optional[str] = None) -> Optional[datetime]: """Get last fetch time for a connection and optionally a specific project""" query = db.query(FetchedTrace).filter( FetchedTrace.connection_id == connection_id, FetchedTrace.platform == platform ) if project_name: query = query.filter(FetchedTrace.project_name == project_name) last_trace = query.order_by(FetchedTrace.fetched_at.desc()).first() return cast(datetime, last_trace.fetched_at) if last_trace else None def create_fetched_trace(trace_id: str, name: str, platform: str, connection_id: str, data: Dict, project_name: Optional[str] = None) -> FetchedTrace: """Create a FetchedTrace object""" return FetchedTrace( trace_id=trace_id, name=name, platform=platform, connection_id=connection_id, project_name=project_name, data=data ) def fetch_langfuse_sessions(connection: ObservabilityConnection, db: Session, project_name: str, limit: int = 50) -> List[Dict]: """Fetch sessions from Langfuse""" downloader = LangfuseDownloader( secret_key=cast(str, connection.secret_key), public_key=cast(str, connection.public_key), host=cast(str, connection.host) ) # Get last fetched time for this connection from_timestamp = get_last_fetch_time(db, cast(str, connection.connection_id), "langfuse") if from_timestamp: logger.info(f"Fetching sessions from {from_timestamp} onwards") else: logger.info("No previous fetches found, fetching all sessions") # List sessions to get session IDs if from_timestamp: sessions_response = downloader.client.api.sessions.list( limit=limit, from_timestamp=from_timestamp ) else: sessions_response = downloader.client.api.sessions.list(limit=limit) # Handle different response formats if hasattr(sessions_response, 'data'): sessions = [downloader._convert_to_dict(session) for session in sessions_response.data] else: sessions = [downloader._convert_to_dict(session) for session in sessions_response] logger.info(f"Found {len(sessions)} sessions") # Store each session as a fetched trace for session in sessions: try: session_id = session.get('id') if session_id: # Check if session already exists existing_session = db.query(FetchedTrace).filter( FetchedTrace.trace_id == session_id, FetchedTrace.connection_id == connection.connection_id ).first() if not existing_session: try: traces_response = downloader.client.api.trace.list(session_id=session_id) if hasattr(traces_response, 'data'): session_traces = [downloader._convert_to_dict(trace) for trace in traces_response.data] else: session_traces = [downloader._convert_to_dict(trace) for trace in traces_response] # Get detailed trace data for each trace detailed_traces = [] for trace_summary in session_traces: trace_id = trace_summary.get('id') if trace_id: try: detailed_trace = downloader.client.api.trace.get(trace_id) if detailed_trace: trace_data = downloader._convert_to_dict(detailed_trace) detailed_traces.append(trace_data) print(f"Downloaded detailed trace: {trace_id}") except Exception as e: print(f"Error downloading detailed trace {trace_id}: {e}") # Create session data - compatible format with LangSmith session_data = LangFuseSession( session_id=session_id, session_name=session_id, project_name=project_name, export_timestamp=datetime.now().isoformat(), total_traces=len(detailed_traces), traces=detailed_traces ) # Convert to JSON-serializable format data_json = session_data.model_dump() fetched_trace = create_fetched_trace( trace_id=session_id, name=session_id, platform="langfuse", connection_id=cast(str, connection.connection_id), data=data_json, project_name=project_name ) db.add(fetched_trace) logger.info(f"Stored session {session_id} with {len(session_traces)} traces") except Exception as e: logger.error(f"Error downloading session {session_id}: {e}") continue except Exception as e: logger.error(f"Error storing session {session_id}: {str(e)}") continue db.commit() logger.info(f"Fetched {len(sessions)} sessions from Langfuse") return sessions def fetch_langsmith_traces(connection: ObservabilityConnection, db: Session, project_name: str,limit: int = 50) -> List[Dict]: """Fetch traces from LangSmith""" client = LangsmithClient(api_key=cast(str, connection.public_key)) logger.info("Connected to LangSmith successfully") # Get all projects projects = list(client.list_projects()) logger.info(f"Found {len(projects)} projects") # Export runs from each project all_traces = [] total_limit = limit # Get existing trace IDs to avoid duplicates existing_traces = db.query(FetchedTrace).filter( FetchedTrace.connection_id == connection.connection_id, FetchedTrace.platform == "langsmith" ).all() existing_trace_ids = {cast(str, trace.trace_id) for trace in existing_traces} for project in projects: if not project_name: logger.info(f"Skipping project with no name (ID: {project.id})") continue logger.info(f"Exporting project: {project_name}") try: # Get last fetched time for this specific project last_fetch = get_last_fetch_time( db, cast(str, connection.connection_id), "langsmith", project_name ) if last_fetch: logger.info(f"Fetching {project_name} runs from {last_fetch} onwards") else: logger.info(f"No previous fetches found for {project_name}, fetching all runs") traces = list(client.list_runs(project_name=project_name, is_root=True, limit=None, start_time=last_fetch)) logger.info(f"Found {len(traces)} traces in project {project_name}") # Process runs in batch new_traces_to_add = [] for trace in traces: trace_name = getattr(trace, 'name', 'unnamed') trace_id = str(trace.id) unique_trace_id = f"{trace_name}_{trace_id}" # Skip if already exists if unique_trace_id in existing_trace_ids: logger.debug(f"Skipping existing trace: {unique_trace_id}") continue # Get all traces for this run (including nested children) - same as langsmith_exporter.py all_runs: List[LangSmithRun] = [] try: # Get the root run and all its children trace_runs = client.list_runs(project_name=project_name, trace_id=trace.trace_id) run_list = list(trace_runs) # Sort traces by start_time descending (latest first) sorted_runs = sorted(run_list, key=lambda t: getattr(t, 'start_time', None) or datetime.min) for run in sorted_runs: run_data = run.dict() if hasattr(run, 'dict') else dict(run) all_runs.append(LangSmithRun(**run_data)) except Exception as e: logger.warning(f"Could not get child traces for run {trace_id}: {e}") # Fallback to just the main run run_data = trace.dict() if hasattr(trace, 'dict') else dict(trace) all_runs = [LangSmithRun(**run_data)] # Create run export structure - same format as langsmith_exporter.py run_export = LangSmithTrace( trace_id=trace_id, trace_name=trace_name, project_name=project_name, export_time=datetime.now().isoformat(), total_runs=len(all_runs), runs=all_runs ) # Prepare for batch database insert try: # Same as langsmith_exporter.py: json.dump(..., default=str) # Convert to JSON string then back to dict to clean circular references clean_data = run_export.model_dump() fetched_trace = create_fetched_trace( trace_id=unique_trace_id, name=f"{trace_name}_{trace_id[:8]}", platform="langsmith", connection_id=cast(str, connection.connection_id), data=clean_data, project_name=project_name ) new_traces_to_add.append(fetched_trace) existing_trace_ids.add(unique_trace_id) except Exception as e: logger.error(f"Error preparing trace {unique_trace_id}: {e}") continue # Stop if we've reached the limit if len(all_traces) >= total_limit: break # Batch insert new traces if new_traces_to_add: db.add_all(new_traces_to_add) logger.info(f"Added {len(new_traces_to_add)} new traces from project {project_name}") # Stop if we've reached the limit across all projects if len(all_traces) >= total_limit: break except Exception as e: logger.error(f"Error exporting project {project_name}: {e}") continue # Single commit for all operations db.commit() logger.info(f"Fetched {len(all_traces)} traces from LangSmith") return all_traces # Request/Response Models class ConnectionRequest(BaseModel): platform: str publicKey: str secretKey: str host: Optional[str] = None class ConnectionResponse(BaseModel): status: str message: str connection_id: str class TraceFetchRequest(BaseModel): start_date: Optional[str] = None end_date: Optional[str] = None project_name: str limit: int = 50 class TraceImportRequest(BaseModel): trace_ids: List[str] preprocessing_method: Optional[str] = "default" @router.post("/connect", response_model=ConnectionResponse) async def connect_platform(request: ConnectionRequest, db: Session = db_depend): """Connect to an AI observability platform""" platform = request.platform.lower() public_key = request.publicKey secret_key = request.secretKey # Get projects and test connection projects_info = get_connection_projects(platform, public_key, secret_key, request.host) # Store connection info in database connection_id = str(uuid.uuid4()) db_connection = ObservabilityConnection( connection_id=connection_id, platform=platform, public_key=public_key, secret_key=secret_key, host=request.host, projects=projects_info, status="connected" ) db.add(db_connection) db.commit() db.refresh(db_connection) logger.info(f"Successfully connected to {platform} with connection ID: {connection_id}") return ConnectionResponse( status="success", message=f"Successfully connected to {platform.title()}", connection_id=connection_id ) @router.get("/connections/{connection_id}/fetched-traces") async def get_fetched_traces_by_connection(connection_id: str, db: Session = db_depend): """Get all fetched traces for a specific connection""" # Get specific connection connection = db.query(ObservabilityConnection).filter( ObservabilityConnection.connection_id == connection_id, ObservabilityConnection.status == "connected" ).first() if not connection: raise HTTPException(status_code=404, detail="Connection not found") # Get all fetched traces for this connection fetched_traces = db.query(FetchedTrace).filter( FetchedTrace.connection_id == connection_id ).order_by(FetchedTrace.fetched_at.desc()).all() return { "traces": [trace.to_dict() for trace in fetched_traces], "total": len(fetched_traces), "platform": connection.platform, "connection_id": connection_id } @router.get("/traces/{trace_id}/download") async def download_trace_by_id(trace_id: str, db: Session = db_depend): """Download full trace data by trace ID only - simple and clean""" # Find trace directly by trace_id (much simpler!) trace = db.query(FetchedTrace).filter( FetchedTrace.trace_id == trace_id ).first() if not trace: raise HTTPException(status_code=404, detail="Trace not found") # Return full data without limitations return trace.get_full_data() @router.get("/connections") async def get_connections(db: Session = db_depend): """Get all active platform connections""" connections = db.query(ObservabilityConnection).all() return {"connections": [conn.to_dict() for conn in connections]} @router.put("/connections/{connection_id}") async def update_connection( connection_id: str, request: ConnectionRequest, db: Session = db_depend ): """Update an existing platform connection""" # Find existing connection connection = db.query(ObservabilityConnection).filter( ObservabilityConnection.connection_id == connection_id ).first() if not connection: raise HTTPException(status_code=404, detail="Connection not found") platform = request.platform.lower() public_key = request.publicKey secret_key = request.secretKey # Test connection and get projects projects_info = get_connection_projects(platform, public_key, secret_key, request.host) # Update connection in database connection.public_key = cast(Column, public_key) connection.secret_key = cast(Column, secret_key) connection.host = cast(Column, request.host) connection.projects = cast(Column, projects_info) connection.status = cast(Column, "connected") db.commit() db.refresh(connection) logger.info(f"Successfully updated {platform} connection: {connection_id}") return { "status": "success", "message": f"Successfully updated {platform.title()} connection" } @router.post("/connections/{connection_id}/fetch") async def fetch_platform_traces_by_connection( connection_id: str, request: TraceFetchRequest, db: Session = db_depend ): """Fetch traces from a specific platform connection""" # Find specific connection connection = db.query(ObservabilityConnection).filter( ObservabilityConnection.connection_id == connection_id, ObservabilityConnection.status == "connected" ).first() if not connection: raise HTTPException(status_code=404, detail="Connection not found") platform = connection.platform.lower() # Run blocking operations in executor to avoid blocking the event loop loop = asyncio.get_event_loop() def sync_fetch(): # Create new db session for the thread thread_db = next(get_db()) try: if platform == "langfuse": traces = fetch_langfuse_sessions( connection, thread_db, limit=request.limit or 50, project_name=request.project_name ) elif platform == "langsmith": traces = fetch_langsmith_traces( connection, thread_db, limit=request.limit or 50, project_name=request.project_name ) else: raise HTTPException(status_code=400, detail=f"Unsupported platform: {platform}") # Update last sync time connection.last_sync = cast(Column, datetime.now()) thread_db.commit() return traces finally: thread_db.close() # Execute in thread pool to avoid blocking traces = await loop.run_in_executor(None, sync_fetch) return { "status": "success", "message": f"Successfully fetched {len(traces)} traces from {platform}", "platform": platform, "connection_id": connection_id, "traces_count": len(traces), "completed_at": datetime.now().isoformat() } @router.post("/connections/{connection_id}/import") async def import_platform_traces_by_connection( connection_id: str, request: TraceImportRequest, db: Session = db_depend ): """Import specific traces from a platform connection to local database""" # Find specific connection connection = db.query(ObservabilityConnection).filter( ObservabilityConnection.connection_id == connection_id, ObservabilityConnection.status == "connected" ).first() if not connection: raise HTTPException(status_code=404, detail="Connection not found") platform = connection.platform.lower() imported_count = 0 errors = [] preprocessing_method = request.preprocessing_method or "default" # Choose the appropriate preprocessing function based on method if preprocessing_method == "variable_replacing": from utils.preprocess_traces_variable import filter_langfuse_trace, filter_langsmith_export elif preprocessing_method == "aggressive": from utils.preprocess_traces_aggressive import filter_langfuse_trace, filter_langsmith_export else: # default method - raw content preservation from utils.preprocess_traces_raw import filter_langfuse_trace, filter_langsmith_export if platform == "langfuse": # Langfuse import implementation for session_id in request.trace_ids: try: # Get the fetched session data from database fetched_session = db.query(FetchedTrace).filter( FetchedTrace.trace_id == session_id, FetchedTrace.connection_id == connection.connection_id ).first() if not fetched_session: errors.append(f"Session {session_id} not found in fetched traces") continue # Use the already fetched session data session_data = fetched_session.data session_traces = session_data.get('traces', []) # Apply Langfuse filtering to all traces in session filtered_traces = [] for trace_data in session_traces: try: filtered_trace = filter_langfuse_trace(trace_data) filtered_traces.append(filtered_trace) except Exception as e: logger.error(f"Error filtering trace {trace_data.get('id', 'unknown')}: {e}") filtered_traces.append(trace_data) session_json = { "session_id": session_id, "traces": filtered_traces } # Save as single JSON file for the entire session session_content = json.dumps(session_json, indent=2) now = datetime.now().strftime('%Y-%m-%d %H:%M:%S') saved_trace = save_trace( session=db, content=session_content, filename=f"langfuse_{session_id}.json", title=session_id, description=f"Imported Langfuse {session_id} with {len(filtered_traces)} traces on {now}", trace_type="langfuse_session_import", trace_source="langfuse", tags=["imported", "langfuse", "session", session_id], trace_metadata={ "session_id": session_id, "platform": "langfuse", "processing_type": "session", "total_traces": len(filtered_traces) } ) imported_count += 1 logger.info(f"Imported Langfuse {session_id} with {len(filtered_traces)} traces as {saved_trace.trace_id}") except Exception as e: error_msg = f"Failed to import session {session_id}: {str(e)}" errors.append(error_msg) logger.error(error_msg) elif platform == "langsmith": # LangSmith import implementation for trace_id in request.trace_ids: try: # Get the fetched trace data from database fetched_trace = db.query(FetchedTrace).filter( FetchedTrace.trace_id == trace_id, FetchedTrace.connection_id == connection.connection_id ).first() if not fetched_trace: errors.append(f"Trace {trace_id} not found in fetched traces") continue # Use the already fetched data langsmith_export = cast(Dict, fetched_trace.data) # Apply LangSmith filtering filtered_export = filter_langsmith_export(langsmith_export) # Save processed version processed_content = json.dumps(filtered_export, indent=2, default=str) processed_trace = save_trace( session=db, content=processed_content, filename=f"{langsmith_export.get('trace_name', 'unknown')}_{trace_id}.json", title=cast(str, fetched_trace.name), description=f"Processed import from LangSmith on {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", trace_type="langsmith_processed_import", trace_source="langsmith", tags=["imported", "langsmith", "processed"], trace_metadata={"original_trace_id": trace_id, "platform": "langsmith", "processing_type": "processed"} ) imported_count += 1 logger.info(f"Imported LangSmith trace {trace_id} as processed ({processed_trace.trace_id}) using {preprocessing_method}") except Exception as e: error_msg = f"Failed to import trace {trace_id}: {str(e)}" errors.append(error_msg) logger.error(error_msg) # Update last sync time connection.last_sync = cast(Column, datetime.now()) db.commit() return { "imported": imported_count, "errors": errors, "platform": platform, "connection_id": connection_id, "import_completed_at": datetime.now().isoformat() } @router.delete("/connections/{connection_id}") async def disconnect_platform(connection_id: str, db: Session = db_depend): """Disconnect from a platform""" connection = db.query(ObservabilityConnection).filter( ObservabilityConnection.connection_id == connection_id ).first() if not connection: raise HTTPException(status_code=404, detail="Connection not found") platform = cast(str, connection.platform) # Delete all fetched traces for this connection fetched_traces = db.query(FetchedTrace).filter( FetchedTrace.connection_id == connection_id ).all() deleted_traces_count = len(fetched_traces) # Delete fetched traces for fetched_trace in fetched_traces: db.delete(fetched_trace) # Remove connection from database db.delete(connection) db.commit() logger.info(f"Disconnected from {platform} (connection ID: {connection_id})") logger.info(f"Deleted {deleted_traces_count} fetched traces for connection {connection_id}") return { "status": "success", "message": f"Disconnected from {platform.title()}", "deleted_fetched_traces": deleted_traces_count, "disconnected_at": datetime.now().isoformat() }
Graph trimmed
📊 EVALUATION SUMMARY ============================================================ Total datasets: 7 Successful: 7 Failed: 0 📈 METHOD PERFORMANCE: unified_method: Success rate: 7/7 Avg exact score: 0.057 Avg semantic score: 0.531 Avg processing time: 65.45s Avg matching score: 0.321 💰 Avg tokens per run: 411793 💰 Avg cost per run: $0.0644 💰 Total cost: $0.4505 🤖 Model: unknown original_method: Success rate: 7/7 Avg exact score: 0.050 Avg semantic score: 0.351 Avg processing time: 56.29s Avg matching score: 0.196 💰 Avg tokens per run: 422546 💰 Avg cost per run: $0.0665 💰 Total cost: $0.4658 🤖 Model: unknown hybrid_method: Success rate: 7/7 Avg exact score: 0.097 Avg semantic score: 0.635 Avg processing time: 124.07s Avg matching score: 0.412 💰 Avg tokens per run: 427415 💰 Avg cost per run: $0.0701 💰 Total cost: $0.4908 🤖 Model: unknown clustering_method: Success rate: 7/7 Avg exact score: 0.083 Avg semantic score: 0.526 Avg processing time: 194.26s Avg matching score: 0.313 💰 Avg tokens per run: 459523 💰 Avg cost per run: $0.0809 💰 Total cost: $0.5664 🤖 Model: gpt-4o-mini direct_llm_method: Success rate: 7/7 Avg exact score: 0.076 Avg semantic score: 0.587 Avg processing time: 459.17s Avg matching score: 0.408 💰 Avg tokens per run: 104811 💰 Avg cost per run: $0.0166 💰 Total cost: $0.1163 🤖 Model: gpt-4o-mini pydantic_ai_method: Success rate: 7/7 Avg exact score: 0.064 Avg semantic score: 0.299 Avg processing time: 112.01s Avg matching score: 0.146 💰 Avg tokens per run: 106761 💰 Avg cost per run: $0.0168 💰 Total cost: $0.1010 🤖 Model: unknown sequential_pydantic: Success rate: 7/7 Avg exact score: 0.067 Avg semantic score: 0.380 Avg processing time: 27.02s Avg matching score: 0.200 💰 Avg tokens per run: 105026 💰 Avg cost per run: $0.0163 💰 Total cost: $0.1142 🤖 Model: unknown pydantic_hybrid_method: Success rate: 7/7 Avg exact score: 0.119 Avg semantic score: 0.622 Avg processing time: 68.52s Avg matching score: 0.413 💰 Avg tokens per run: 156297 💰 Avg cost per run: $0.0248 💰 Total cost: $0.1738 🤖 Model: unknown perfect_method: Success rate: 7/7 Avg exact score: 1.000 Avg semantic score: 1.000 Avg processing time: 0.00s Avg matching score: 1.000 💰 Avg tokens per run: 0 💰 Avg cost per run: $0.0000 💰 Total cost: $0.0000 🤖 Model: unknown dumb_method: Success rate: 7/7 Avg exact score: 0.000 Avg semantic score: 0.000 Avg processing time: 0.00s Avg matching score: 0.000 💰 Token usage: Not available 💾 CACHE PERFORMANCE: Total cache hits: 70 Total cache misses: 0 Cache hit rate: 100.0%
fixed hybrid
pydantic_hybrid_method: Success rate: 7/7 Avg exact score: 0.095 Avg semantic score: 0.692 Avg processing time: 85.63s Avg matching score: 0.461 💰 Avg tokens per run: 107946 💰 Avg cost per run: $0.0178 💰 Total cost: $0.1248 🤖 Model: unknown
Graph replaced
📊 EVALUATION SUMMARY ============================================================ Total datasets: 7 Successful: 7 Failed: 0 📈 METHOD PERFORMANCE: unified_method: Success rate: 7/7 Avg exact score: 0.122 Avg semantic score: 0.700 Avg processing time: 73.35s Avg matching score: 0.440 💰 Avg tokens per run: 318206 💰 Avg cost per run: $0.0508 💰 Total cost: $0.3558 🤖 Model: unknown original_method: Success rate: 7/7 Avg exact score: 0.106 Avg semantic score: 0.489 Avg processing time: 77.57s Avg matching score: 0.317 💰 Avg tokens per run: 335146 💰 Avg cost per run: $0.0555 💰 Total cost: $0.3883 🤖 Model: unknown hybrid_method: Success rate: 7/7 Avg exact score: 0.093 Avg semantic score: 0.720 Avg processing time: 528.95s Avg matching score: 0.445 💰 Avg tokens per run: 335928 💰 Avg cost per run: $0.0573 💰 Total cost: $0.4010 🤖 Model: unknown clustering_method: Success rate: 7/7 Avg exact score: 0.122 Avg semantic score: 0.585 Avg processing time: 209.73s Avg matching score: 0.341 💰 Avg tokens per run: 360825 💰 Avg cost per run: $0.0651 💰 Total cost: $0.4556 🤖 Model: gpt-4o-mini direct_llm_method: Success rate: 7/7 Avg exact score: 0.143 Avg semantic score: 0.767 Avg processing time: 111.73s Avg matching score: 0.579 💰 Avg tokens per run: 81373 💰 Avg cost per run: $0.0131 💰 Total cost: $0.0916 🤖 Model: gpt-4o-mini pydantic_ai_method: Success rate: 7/7 Avg exact score: 0.075 Avg semantic score: 0.385 Avg processing time: 208.52s Avg matching score: 0.231 💰 Avg tokens per run: 94392 💰 Avg cost per run: $0.0182 💰 Total cost: $0.0908 🤖 Model: unknown sequential_pydantic: Success rate: 7/7 Avg exact score: 0.126 Avg semantic score: 0.593 Avg processing time: 58.05s Avg matching score: 0.423 💰 Avg tokens per run: 84436 💰 Avg cost per run: $0.0139 💰 Total cost: $0.0976 🤖 Model: unknown pydantic_hybrid_method: Success rate: 7/7 Avg exact score: 0.115 Avg semantic score: 0.752 Avg processing time: 106.90s Avg matching score: 0.507 💰 Avg tokens per run: 122752 💰 Avg cost per run: $0.0202 💰 Total cost: $0.1417 🤖 Model: unknown perfect_method: Success rate: 7/7 Avg exact score: 1.000 Avg semantic score: 1.000 Avg processing time: 0.00s Avg matching score: 1.000 💰 Avg tokens per run: 0 💰 Avg cost per run: $0.0000 💰 Total cost: $0.0000 🤖 Model: unknown dumb_method: Success rate: 7/7 Avg exact score: 0.000 Avg semantic score: 0.000 Avg processing time: 0.00s Avg matching score: 0.000 💰 Token usage: Not available 💾 CACHE PERFORMANCE: Total cache hits: 63 Total cache misses: 7 Cache hit rate: 90.0% 💾 Results saved to: evaluation_results.json
fixed hybrid
pydantic_hybrid_method: Success rate: 7/7 Avg exact score: 0.136 Avg semantic score: 0.713 Avg processing time: 78.57s Avg matching score: 0.486 💰 Avg tokens per run: 84902 💰 Avg cost per run: $0.0144 💰 Total cost: $0.1011 🤖 Model: unknown
Crew Replaced
📈 METHOD PERFORMANCE: direct_llm_method: Success rate: 10/10 Avg exact score: 0.026 Avg semantic score: 0.407 Avg processing time: 24.76s Avg matching score: 0.552 💰 Avg tokens per run: 12760 💰 Avg cost per run: $0.0025 💰 Total cost: $0.0246 🤖 Model: gpt-4o-mini pydantic_hybrid_method: Success rate: 10/10 Avg exact score: 0.026 Avg semantic score: 0.379 Avg processing time: 47.23s Avg matching score: 0.540 💰 Avg tokens per run: 18224 💰 Avg cost per run: $0.0038 💰 Total cost: $0.0377 🤖 Model: unknown
fixed hybrid without original data
pydantic_hybrid_method: Success rate: 10/10 Avg exact score: 0.026 Avg semantic score: 0.377 Avg processing time: 46.00s Avg matching score: 0.498 💰 Avg tokens per run: 14494 💰 Avg cost per run: $0.0032 💰 Total cost: $0.0316 🤖 Model: unknown
pydantic_hybrid_method: Success rate: 10/10 Avg exact score: 0.026 Avg semantic score: 0.380 Avg processing time: 45.28s Avg matching score: 0.482 💰 Avg tokens per run: 14498 💰 Avg cost per run: $0.0032 💰 Total cost: $0.0317 🤖 Model: unknown
Crew Hierarchy
📈 METHOD PERFORMANCE: direct_llm_method: Success rate: 10/10 Avg exact score: 0.000 Avg semantic score: 0.333 Avg processing time: 25.11s Avg matching score: 0.495 💰 Avg tokens per run: 17159 💰 Avg cost per run: $0.0032 💰 Total cost: $0.0286 🤖 Model: gpt-4o-mini pydantic_hybrid_method: Success rate: 10/10 Avg exact score: 0.000 Avg semantic score: 0.310 Avg processing time: 57.32s Avg matching score: 0.487 💰 Avg tokens per run: 25673 💰 Avg cost per run: $0.0053 💰 Total cost: $0.0474 🤖 Model: unknown
reduced (fair comparison)
📈 METHOD PERFORMANCE: direct_llm_method: Success rate: 10/10 Avg exact score: 0.021 Avg semantic score: 0.407 Avg processing time: 26.41s Avg matching score: 0.544 💰 Avg tokens per run: 15161 💰 Avg cost per run: $0.0028 💰 Total cost: $0.0282 🤖 Model: gpt-4o-mini pydantic_hybrid_method: Success rate: 10/10 Avg exact score: 0.017 Avg semantic score: 0.407 Avg processing time: 48.62s Avg matching score: 0.538 💰 Avg tokens per run: 21968 💰 Avg cost per run: $0.0043 💰 Total cost: $0.0435 🤖 Model: unknown
trimmed 길다
수정된 hybrid가 좋음
replaced 짧다
complex 한 trace with validation hybrid가 더좋음
hierarch 가장 쉽다
simple 한 validation only hybrid 가 더좋음
Crew AI examples
Trim only
hybrid_method: Success rate: 10/10 Avg exact score: 0.269 Avg semantic score: 0.608 Avg processing time: 44.74s Avg matching score: 0.443 💰 Avg tokens per run: 169534 💰 Avg cost per run: $0.0298 💰 Total cost: $0.2976 🤖 Model: unknown direct_llm_method: Success rate: 10/10 Avg exact score: 0.280 Avg semantic score: 0.530 Avg processing time: 22.75s Avg matching score: 0.415 💰 Avg tokens per run: 29165 💰 Avg cost per run: $0.0050 💰 Total cost: $0.0446 🤖 Model: gpt-4o-mini pydantic_hybrid_method: Success rate: 10/10 Avg exact score: 0.254 Avg semantic score: 0.521 Avg processing time: 34.83s Avg matching score: 0.397 💰 Avg tokens per run: 30436 💰 Avg cost per run: $0.0054 💰 Total cost: $0.0490 🤖 Model: unknown
improved hybrid
Trim + Replaced
hybrid_method: Success rate: 10/10 Avg exact score: 0.268 Avg semantic score: 0.704 Avg processing time: 44.13s Avg matching score: 0.487 💰 Avg tokens per run: 79385 💰 Avg cost per run: $0.0166 💰 Total cost: $0.1659 🤖 Model: unknown direct_llm_method: Success rate: 10/10 Avg exact score: 0.365 Avg semantic score: 0.727 Avg processing time: 26.10s Avg matching score: 0.545 💰 Avg tokens per run: 12796 💰 Avg cost per run: $0.0025 💰 Total cost: $0.0248 🤖 Model: gpt-4o-mini pydantic_hybrid_method: Success rate: 10/10 Avg exact score: 0.325 Avg semantic score: 0.665 Avg processing time: 49.07s Avg matching score: 0.524 💰 Avg tokens per run: 14567 💰 Avg cost per run: $0.0032 💰 Total cost: $0.0320 🤖 Model: unknown
improved hybrid
Trim + Replaced + Hierarchy
hybrid_method: Success rate: 10/10 Avg exact score: 0.313 Avg semantic score: 0.701 Avg processing time: 44.87s Avg matching score: 0.521 💰 Avg tokens per run: 96469 💰 Avg cost per run: $0.0196 💰 Total cost: $0.1955 🤖 Model: unknown direct_llm_method: Success rate: 10/10 Avg exact score: 0.338 Avg semantic score: 0.711 Avg processing time: 22.10s Avg matching score: 0.520 💰 Avg tokens per run: 15109 💰 Avg cost per run: $0.0028 💰 Total cost: $0.0279 🤖 Model: gpt-4o-mini pydantic_hybrid_method: Success rate: 10/10 Avg exact score: 0.365 Avg semantic score: 0.717 Avg processing time: 51.87s Avg matching score: 0.549 💰 Avg tokens per run: 17469 💰 Avg cost per run: $0.0038 💰 Total cost: $0.0378 🤖 Model: unknown
improved hybrid
Seonglae Cho