Source code for langsmith.wrappers._openai_agents

import datetime
import logging
import uuid
from typing import Dict, Optional

from langsmith import run_trees as rt

try:
    from agents import tracing  # type: ignore[import]

    import langsmith.wrappers._agent_utils as agent_utils

    HAVE_AGENTS = True
except ImportError:
    HAVE_AGENTS = False

    class OpenAIAgentsTracingProcessor:
        """Stub class when agents package is not installed."""

        def __init__(self, *args, **kwargs):
            raise ImportError(
                "The `agents` package is not installed. "
                "Please install it with `pip install langsmith[openai-agents]`."
            )


from langsmith import client as ls_client

logger = logging.getLogger(__name__)

if HAVE_AGENTS:

[docs] class OpenAIAgentsTracingProcessor(tracing.TracingProcessor): # type: ignore[no-redef] """Tracing processor for the `OpenAI Agents SDK <https://openai.github.io/openai-agents-python/>`_. Traces all intermediate steps of your OpenAI Agent to LangSmith. Requirements: Make sure to install ``pip install -U langsmith[openai-agents]``. Args: client: An instance of langsmith.client.Client. If not provided, a default client is created. Example: .. code-block:: python from agents import ( Agent, FileSearchTool, Runner, WebSearchTool, function_tool, set_trace_processors, ) from langsmith.wrappers import OpenAIAgentsTracingProcessor set_trace_processors([OpenAIAgentsTracingProcessor()]) @function_tool def get_weather(city: str) -> str: return f"The weather in {city} is sunny" haiku_agent = Agent( name="Haiku agent", instructions="Always respond in haiku form", model="o3-mini", tools=[get_weather], ) agent = Agent( name="Assistant", tools=[WebSearchTool()], instructions="speak in spanish. use Haiku agent if they ask for a haiku or for the weather", handoffs=[haiku_agent], ) result = await Runner.run( agent, "write a haiku about the weather today and tell me a recent news story about new york", ) print(result.final_output) """ # noqa: E501
[docs] def __init__(self, client: Optional[ls_client.Client] = None): self.client = client or rt.get_cached_client() self._runs: Dict[str, str] = {}
def on_trace_start(self, trace: tracing.Trace) -> None: run_name = trace.name if trace.name else "Agent workflow" trace_run_id = str(uuid.uuid4()) self._runs[trace.trace_id] = trace_run_id try: run_data: dict = dict( name=run_name, inputs={}, run_type="chain", id=trace_run_id, revision_id=None, ) self.client.create_run(**run_data) except Exception as e: logger.exception(f"Error creating trace run: {e}") def on_trace_end(self, trace: tracing.Trace) -> None: run_id = self._runs.pop(trace.trace_id, None) trace_dict = trace.export() or {} metadata = trace_dict.get("metadata") or {} if run_id: try: self.client.update_run(run_id=run_id, extra={"metadata": metadata}) except Exception as e: logger.exception(f"Error updating trace run: {e}") def on_span_start(self, span: tracing.Span) -> None: parent_run_id = self._runs.get(span.parent_id or span.trace_id) span_run_id = str(uuid.uuid4()) self._runs[span.span_id] = span_run_id run_name = agent_utils.get_run_name(span) run_type = agent_utils.get_run_type(span) extracted = agent_utils.extract_span_data(span) try: run_data: dict = dict( name=run_name, run_type=run_type, id=span_run_id, parent_run_id=parent_run_id, inputs=extracted.get("inputs", {}), ) if span.started_at: run_data["start_time"] = datetime.datetime.fromisoformat( span.started_at ) self.client.create_run(**run_data) except Exception as e: logger.exception(f"Error creating span run: {e}") def on_span_end(self, span: tracing.Span) -> None: run_id = self._runs.pop(span.span_id, None) if run_id: extracted = agent_utils.extract_span_data(span) metadata = extracted.get("metadata", {}) metadata["openai_parent_id"] = span.parent_id metadata["openai_trace_id"] = span.trace_id metadata["openai_span_id"] = span.span_id extracted["metadata"] = metadata run_data: dict = dict( run_id=run_id, error=str(span.error) if span.error else None, outputs=extracted.pop("outputs", {}), inputs=extracted.pop("inputs", {}), extra=extracted, ) if span.ended_at: run_data["end_time"] = datetime.datetime.fromisoformat( span.ended_at ) self.client.update_run(**run_data) def shutdown(self) -> None: self.client.flush() def force_flush(self) -> None: self.client.flush()