"""Client initialization and management for AgentLens.""" import atexit import logging from typing import Optional, List from agentlens.models import TraceData from agentlens.transport import BatchTransport logger = logging.getLogger("agentlens") _client: Optional["AgentLensClient"] = None class AgentLensClient: def __init__( self, api_key: str, endpoint: str = "https://agentlens.vectry.tech", flush_interval: float = 5.0, max_batch_size: int = 10, enabled: bool = True, ) -> None: self.api_key = api_key self.endpoint = endpoint.rstrip("/") self.flush_interval = flush_interval self.max_batch_size = max_batch_size self.enabled = enabled self._transport: Optional[BatchTransport] = None self._sent_traces: List[TraceData] = [] if enabled: try: self._transport = BatchTransport( api_key=api_key, endpoint=endpoint, flush_interval=flush_interval, max_batch_size=max_batch_size, ) except ImportError: logger.warning("httpx not available, transport disabled") self.enabled = False def send_trace(self, trace: TraceData) -> None: if not self.enabled: self._sent_traces.append(trace) return if self._transport: self._transport.add(trace) def shutdown(self) -> None: if self._transport: self._transport.flush() self._transport.shutdown() self._transport = None @property def sent_traces(self) -> List[TraceData]: return self._sent_traces @property def transport(self) -> Optional["BatchTransport"]: return self._transport def init( api_key: str, endpoint: str = "https://agentlens.vectry.tech", flush_interval: float = 5.0, max_batch_size: int = 10, enabled: bool = True, ) -> None: """Initialize the AgentLens SDK. Args: api_key: Your AgentLens API key. endpoint: The AgentLens API endpoint. flush_interval: Seconds between automatic flushes (default: 5.0). max_batch_size: Number of traces before auto-flush (default: 10). enabled: Set to False to disable sending (useful for testing). """ global _client if _client is not None: _client.shutdown() _client = AgentLensClient( api_key=api_key, endpoint=endpoint, flush_interval=flush_interval, max_batch_size=max_batch_size, enabled=enabled, ) atexit.register(shutdown) logger.debug("AgentLens initialized: endpoint=%s", endpoint) def shutdown() -> None: global _client if _client is not None: _client.shutdown() _client = None logger.debug("AgentLens shutdown complete") def get_client() -> Optional[AgentLensClient]: """Get the current client instance. Returns None if not initialized.""" return _client