Source code for langsmith.testing._internal

from __future__ import annotations

import atexit
import contextlib
import contextvars
import datetime
import functools
import importlib
import inspect
import logging
import os
import threading
import time
import uuid
import warnings
from collections import defaultdict
from concurrent.futures import Future
from pathlib import Path
from typing import (
    Any,
    Callable,
    Generator,
    Optional,
    Sequence,
    Tuple,
    TypeVar,
    Union,
    cast,
    overload,
)

from typing_extensions import TypedDict

from langsmith import client as ls_client
from langsmith import env as ls_env
from langsmith import run_helpers as rh
from langsmith import run_trees
from langsmith import run_trees as rt
from langsmith import schemas as ls_schemas
from langsmith import utils as ls_utils
from langsmith._internal import _orjson
from langsmith._internal._serde import dumps_json
from langsmith.client import ID_TYPE

try:
    import pytest  # type: ignore

    SkipException = pytest.skip.Exception
except ImportError:

[docs] class SkipException(Exception): # type: ignore[no-redef] pass
logger = logging.getLogger(__name__) T = TypeVar("T") U = TypeVar("U") @overload def test( func: Callable, ) -> Callable: ... @overload def test( *, id: Optional[uuid.UUID] = None, output_keys: Optional[Sequence[str]] = None, client: Optional[ls_client.Client] = None, test_suite_name: Optional[str] = None, ) -> Callable[[Callable], Callable]: ...
[docs] def test(*args: Any, **kwargs: Any) -> Callable: """Trace a pytest test case in LangSmith. This decorator is used to trace a pytest test to LangSmith. It ensures that the necessary example data is created and associated with the test function. The decorated function will be executed as a test case, and the results will be recorded and reported by LangSmith. Args: - id (Optional[uuid.UUID]): A unique identifier for the test case. If not provided, an ID will be generated based on the test function's module and name. - output_keys (Optional[Sequence[str]]): A list of keys to be considered as the output keys for the test case. These keys will be extracted from the test function's inputs and stored as the expected outputs. - client (Optional[ls_client.Client]): An instance of the LangSmith client to be used for communication with the LangSmith service. If not provided, a default client will be used. - test_suite_name (Optional[str]): The name of the test suite to which the test case belongs. If not provided, the test suite name will be determined based on the environment or the package name. Returns: Callable: The decorated test function. Environment: - LANGSMITH_TEST_CACHE: If set, API calls will be cached to disk to save time and costs during testing. Recommended to commit the cache files to your repository for faster CI/CD runs. Requires the 'langsmith[vcr]' package to be installed. - LANGSMITH_TEST_TRACKING: Set this variable to the path of a directory to enable caching of test results. This is useful for re-running tests without re-executing the code. Requires the 'langsmith[vcr]' package. Example: For basic usage, simply decorate a test function with `@pytest.mark.langsmith`. Under the hood this will call the `test` method: .. code-block:: python import pytest # Equivalently can decorate with `test` directly: # from langsmith import test # @test @pytest.mark.langsmith def test_addition(): assert 3 + 4 == 7 Any code that is traced (such as those traced using `@traceable` or `wrap_*` functions) will be traced within the test case for improved visibility and debugging. .. code-block:: python import pytest from langsmith import traceable @traceable def generate_numbers(): return 3, 4 @pytest.mark.langsmith def test_nested(): # Traced code will be included in the test case a, b = generate_numbers() assert a + b == 7 LLM calls are expensive! Cache requests by setting `LANGSMITH_TEST_CACHE=path/to/cache`. Check in these files to speed up CI/CD pipelines, so your results only change when your prompt or requested model changes. Note that this will require that you install langsmith with the `vcr` extra: `pip install -U "langsmith[vcr]"` Caching is faster if you install libyaml. See https://vcrpy.readthedocs.io/en/latest/installation.html#speed for more details. .. code-block:: python # os.environ["LANGSMITH_TEST_CACHE"] = "tests/cassettes" import openai import pytest from langsmith import wrappers oai_client = wrappers.wrap_openai(openai.Client()) @pytest.mark.langsmith def test_openai_says_hello(): # Traced code will be included in the test case response = oai_client.chat.completions.create( model="gpt-3.5-turbo", messages=[ {"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": "Say hello!"}, ], ) assert "hello" in response.choices[0].message.content.lower() LLMs are stochastic. Naive assertions are flakey. You can use langsmith's `expect` to score and make approximate assertions on your results. .. code-block:: python import pytest from langsmith import expect @pytest.mark.langsmith def test_output_semantically_close(): response = oai_client.chat.completions.create( model="gpt-3.5-turbo", messages=[ {"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": "Say hello!"}, ], ) # The embedding_distance call logs the embedding distance to LangSmith expect.embedding_distance( prediction=response.choices[0].message.content, reference="Hello!", # The following optional assertion logs a # pass/fail score to LangSmith # and raises an AssertionError if the assertion fails. ).to_be_less_than(1.0) # Compute damerau_levenshtein distance expect.edit_distance( prediction=response.choices[0].message.content, reference="Hello!", # And then log a pass/fail score to LangSmith ).to_be_less_than(1.0) The `@test` decorator works natively with pytest fixtures. The values will populate the "inputs" of the corresponding example in LangSmith. .. code-block:: python import pytest @pytest.fixture def some_input(): return "Some input" @pytest.mark.langsmith def test_with_fixture(some_input: str): assert "input" in some_input You can still use pytest.parametrize() as usual to run multiple test cases using the same test function. .. code-block:: python import pytest @pytest.mark.langsmith(output_keys=["expected"]) @pytest.mark.parametrize( "a, b, expected", [ (1, 2, 3), (3, 4, 7), ], ) def test_addition_with_multiple_inputs(a: int, b: int, expected: int): assert a + b == expected By default, each test case will be assigned a consistent, unique identifier based on the function name and module. You can also provide a custom identifier using the `id` argument: .. code-block:: python import pytest import uuid example_id = uuid.uuid4() @pytest.mark.langsmith(id=str(example_id)) def test_multiplication(): assert 3 * 4 == 12 By default, all test inputs are saved as "inputs" to a dataset. You can specify the `output_keys` argument to persist those keys within the dataset's "outputs" fields. .. code-block:: python import pytest @pytest.fixture def expected_output(): return "input" @pytest.mark.langsmith(output_keys=["expected_output"]) def test_with_expected_output(some_input: str, expected_output: str): assert expected_output in some_input To run these tests, use the pytest CLI. Or directly run the test functions. .. code-block:: python test_output_semantically_close() test_addition() test_nested() test_with_fixture("Some input") test_with_expected_output("Some input", "Some") test_multiplication() test_openai_says_hello() test_addition_with_multiple_inputs(1, 2, 3) """ langtest_extra = _UTExtra( id=kwargs.pop("id", None), output_keys=kwargs.pop("output_keys", None), client=kwargs.pop("client", None), test_suite_name=kwargs.pop("test_suite_name", None), cache=ls_utils.get_cache_dir(kwargs.pop("cache", None)), ) if kwargs: warnings.warn(f"Unexpected keyword arguments: {kwargs.keys()}") disable_tracking = ls_utils.test_tracking_is_disabled() if disable_tracking: logger.info( "LANGSMITH_TEST_TRACKING is set to 'false'." " Skipping LangSmith test tracking." ) def decorator(func: Callable) -> Callable: if inspect.iscoroutinefunction(func): @functools.wraps(func) async def async_wrapper( *test_args: Any, request: Any = None, **test_kwargs: Any ): if disable_tracking: return await func(*test_args, **test_kwargs) await _arun_test( func, *test_args, pytest_request=request, **test_kwargs, langtest_extra=langtest_extra, ) return async_wrapper @functools.wraps(func) def wrapper(*test_args: Any, request: Any = None, **test_kwargs: Any): if disable_tracking: return func(*test_args, **test_kwargs) _run_test( func, *test_args, pytest_request=request, **test_kwargs, langtest_extra=langtest_extra, ) return wrapper if args and callable(args[0]): return decorator(args[0]) return decorator
## Private functions def _get_experiment_name(test_suite_name: str) -> str: # If this is a pytest-xdist multi-process run then we need to create the same # experiment name across processes. We can do this by accessing the # PYTEST_XDIST_TESTRUNID env var. if os.environ.get("PYTEST_XDIST_TESTRUNUID") and importlib.util.find_spec("xdist"): id_name = test_suite_name + os.environ["PYTEST_XDIST_TESTRUNUID"] id_ = str(uuid.uuid5(uuid.NAMESPACE_DNS, id_name).hex[:8]) else: id_ = str(uuid.uuid4().hex[:8]) if os.environ.get("LANGSMITH_EXPERIMENT"): prefix = os.environ["LANGSMITH_EXPERIMENT"] else: prefix = ls_utils.get_tracer_project(False) or "TestSuiteResult" name = f"{prefix}:{id_}" return name def _get_test_suite_name(func: Callable) -> str: test_suite_name = ls_utils.get_env_var("TEST_SUITE") if test_suite_name: return test_suite_name repo_name = ls_env.get_git_info()["repo_name"] try: mod = inspect.getmodule(func) if mod: return f"{repo_name}.{mod.__name__}" except BaseException: logger.debug("Could not determine test suite name from file path.") raise ValueError("Please set the LANGSMITH_TEST_SUITE environment variable.") def _get_test_suite( client: ls_client.Client, test_suite_name: str ) -> ls_schemas.Dataset: if client.has_dataset(dataset_name=test_suite_name): return client.read_dataset(dataset_name=test_suite_name) else: repo = ls_env.get_git_info().get("remote_url") or "" description = "Test suite" if repo: description += f" for {repo}" try: return client.create_dataset( dataset_name=test_suite_name, description=description ) except ls_utils.LangSmithConflictError: return client.read_dataset(dataset_name=test_suite_name) def _start_experiment( client: ls_client.Client, test_suite: ls_schemas.Dataset, ) -> ls_schemas.TracerSession: experiment_name = _get_experiment_name(test_suite.name) try: return client.create_project( experiment_name, reference_dataset_id=test_suite.id, description="Test Suite Results.", metadata={ "revision_id": ls_env.get_langchain_env_var_metadata().get( "revision_id" ) }, ) except ls_utils.LangSmithConflictError: return client.read_project(project_name=experiment_name) def _get_example_id( func: Callable, inputs: Optional[dict], suite_id: uuid.UUID ) -> Tuple[uuid.UUID, str]: try: file_path = str(Path(inspect.getfile(func)).relative_to(Path.cwd())) except ValueError: # Fall back to module name if file path is not available file_path = func.__module__ identifier = f"{suite_id}{file_path}::{func.__name__}" # If parametrized test, need to add inputs to identifier: if hasattr(func, "pytestmark") and any( m.name == "parametrize" for m in func.pytestmark ): identifier += _stringify(inputs) return uuid.uuid5(uuid.NAMESPACE_DNS, identifier), identifier[len(str(suite_id)) :] def _end_tests(test_suite: _LangSmithTestSuite): git_info = ls_env.get_git_info() or {} test_suite.shutdown() dataset_version = test_suite.get_version() dataset_id = test_suite._dataset.id test_suite.client.update_project( test_suite.experiment_id, end_time=datetime.datetime.now(datetime.timezone.utc), metadata={ **git_info, "dataset_version": dataset_version, "revision_id": ls_env.get_langchain_env_var_metadata().get("revision_id"), }, ) if dataset_version and git_info["commit"] is not None: test_suite.client.update_dataset_tag( dataset_id=dataset_id, as_of=dataset_version, tag=f'git:commit:{git_info["commit"]}', ) if dataset_version and git_info["branch"] is not None: test_suite.client.update_dataset_tag( dataset_id=dataset_id, as_of=dataset_version, tag=f'git:branch:{git_info["branch"]}', ) VT = TypeVar("VT", bound=Optional[dict]) def _serde_example_values(values: VT) -> VT: if values is None: return values bts = ls_client._dumps_json(values) return _orjson.loads(bts) class _LangSmithTestSuite: _instances: Optional[dict] = None _lock = threading.RLock() def __init__( self, client: Optional[ls_client.Client], experiment: ls_schemas.TracerSession, dataset: ls_schemas.Dataset, ): self.client = client or rt.get_cached_client() self._experiment = experiment self._dataset = dataset self._version: Optional[datetime.datetime] = None self._executor = ls_utils.ContextThreadPoolExecutor(max_workers=1) self._example_futures: dict[ID_TYPE, list[Future]] = defaultdict(list) self._example_modified_at: dict[ID_TYPE, datetime.datetime] = {} atexit.register(_end_tests, self) @property def id(self): return self._dataset.id @property def experiment_id(self): return self._experiment.id @property def experiment(self): return self._experiment @classmethod def from_test( cls, client: Optional[ls_client.Client], func: Callable, test_suite_name: Optional[str] = None, ) -> _LangSmithTestSuite: client = client or rt.get_cached_client() test_suite_name = test_suite_name or _get_test_suite_name(func) with cls._lock: if not cls._instances: cls._instances = {} if test_suite_name not in cls._instances: test_suite = _get_test_suite(client, test_suite_name) experiment = _start_experiment(client, test_suite) cls._instances[test_suite_name] = cls(client, experiment, test_suite) return cls._instances[test_suite_name] @property def name(self): return self._experiment.name def update_version(self, version: datetime.datetime, example_id: uuid.UUID) -> None: with self._lock: if self._version is None or version > self._version: self._version = version self._example_modified_at[example_id] = version def get_version( self, example_id: Optional[uuid.UUID] = None ) -> Optional[datetime.datetime]: with self._lock: if not example_id: return self._version else: return self._example_modified_at[example_id] def submit_result( self, run_id: uuid.UUID, error: Optional[str] = None, skipped: bool = False, pytest_plugin: Any = None, pytest_nodeid: Any = None, ) -> None: if skipped: score = None status = "skipped" elif error: score = 0 status = "failed" else: score = 1 status = "passed" if pytest_plugin and pytest_nodeid: pytest_plugin.update_process_status(pytest_nodeid, {"status": status}) self._executor.submit(self._submit_result, run_id, score) def _submit_result(self, run_id: uuid.UUID, score: Optional[int]) -> None: self.client.create_feedback(run_id, key="pass", score=score) def sync_example( self, example_id: uuid.UUID, *, inputs: Optional[dict] = None, outputs: Optional[dict] = None, metadata: Optional[dict] = None, pytest_plugin=None, pytest_nodeid=None, ) -> None: if pytest_plugin and pytest_nodeid: update = {"inputs": inputs, "reference_outputs": outputs} update = {k: v for k, v in update.items() if v is not None} pytest_plugin.update_process_status(pytest_nodeid, update) future = self._executor.submit( self._sync_example, example_id, inputs, outputs, metadata.copy() if metadata else metadata, ) with self._lock: self._example_futures[example_id].append(future) def _sync_example( self, example_id: uuid.UUID, inputs: Optional[dict], outputs: Optional[dict], metadata: Optional[dict], ) -> None: inputs = _serde_example_values(inputs) outputs = _serde_example_values(outputs) try: example = self.client.read_example(example_id=example_id) except ls_utils.LangSmithNotFoundError: example = self.client.create_example( example_id=example_id, inputs=inputs, outputs=outputs, dataset_id=self.id, metadata=metadata, created_at=self._experiment.start_time, ) modified_at = example.modified_at else: if ( (inputs is not None and inputs != example.inputs) or (outputs is not None and outputs != example.outputs) or (metadata is not None and metadata != example.metadata) or str(example.dataset_id) != str(self.id) ): response = self.client.update_example( example_id=example.id, inputs=inputs, outputs=outputs, metadata=metadata, dataset_id=self.id, ) modified_at = datetime.datetime.fromisoformat(response["modified_at"]) else: modified_at = example.modified_at if modified_at: self.update_version(modified_at, example_id=example_id) def _submit_feedback( self, run_id: ID_TYPE, feedback: Union[dict, list], pytest_plugin: Any = None, pytest_nodeid: Any = None, **kwargs: Any, ): feedback = feedback if isinstance(feedback, list) else [feedback] for fb in feedback: if pytest_plugin and pytest_nodeid: val = fb["score"] if "score" in fb else fb["value"] pytest_plugin.update_process_status( pytest_nodeid, {"feedback": {fb["key"]: val}} ) self._executor.submit( self._create_feedback, run_id=run_id, feedback=fb, **kwargs ) def _create_feedback(self, run_id: ID_TYPE, feedback: dict, **kwargs: Any) -> None: self.client.create_feedback(run_id, **feedback, **kwargs) def shutdown(self): self._executor.shutdown() def wait_example_updates(self, example_id: ID_TYPE): """Wait for all example updates to complete.""" while self._example_futures[example_id]: self._example_futures[example_id].pop().result() def end_run( self, run_tree, example_id, outputs, end_time, pytest_plugin=None, pytest_nodeid=None, ) -> Future: return self._executor.submit( self._end_run, run_tree=run_tree, example_id=example_id, outputs=outputs, end_time=end_time, pytest_plugin=pytest_plugin, pytest_nodeid=pytest_nodeid, ) def _end_run( self, run_tree, example_id, outputs, end_time, pytest_plugin, pytest_nodeid ) -> None: # TODO: remove this hack so that run durations are correct # Ensure example is fully updated self.wait_example_updates(example_id) # Ensure that run end time is after example modified at. example_modified_at = self.get_version(example_id=example_id) end_time = max(example_modified_at, end_time) run_tree.end(outputs=outputs, end_time=end_time) run_tree.patch() pytest_plugin.update_process_status(pytest_nodeid, {"logged": True}) class _TestCase: def __init__( self, test_suite: _LangSmithTestSuite, example_id: uuid.UUID, run_id: uuid.UUID, pytest_plugin: Any = None, pytest_nodeid: Any = None, ) -> None: self.test_suite = test_suite self.example_id = example_id self.run_id = run_id self.pytest_plugin = pytest_plugin self.pytest_nodeid = pytest_nodeid if pytest_plugin and pytest_nodeid: pytest_plugin.add_process_to_test_suite( test_suite._dataset.name, pytest_nodeid ) def sync_example( self, *, inputs: Optional[dict] = None, outputs: Optional[dict] = None ) -> None: self.test_suite.sync_example( self.example_id, inputs=inputs, outputs=outputs, pytest_plugin=self.pytest_plugin, pytest_nodeid=self.pytest_nodeid, ) def submit_feedback(self, *args, **kwargs: Any): self.test_suite._submit_feedback( *args, **{ **kwargs, **dict( pytest_plugin=self.pytest_plugin, pytest_nodeid=self.pytest_nodeid, ), }, ) def log_outputs(self, outputs: dict) -> None: if self.pytest_plugin and self.pytest_nodeid: self.pytest_plugin.update_process_status( self.pytest_nodeid, {"outputs": outputs} ) def submit_test_result( self, error: Optional[str] = None, skipped: bool = False, ) -> None: return self.test_suite.submit_result( self.run_id, error=error, skipped=skipped, pytest_plugin=self.pytest_plugin, pytest_nodeid=self.pytest_nodeid, ) def start_time(self) -> None: if self.pytest_plugin and self.pytest_nodeid: self.pytest_plugin.update_process_status( self.pytest_nodeid, {"start_time": time.time()} ) def end_time(self) -> None: if self.pytest_plugin and self.pytest_nodeid: self.pytest_plugin.update_process_status( self.pytest_nodeid, {"end_time": time.time()} ) def end_run(self, run_tree, outputs: Any) -> Future: if not (outputs is None or isinstance(outputs, dict)): outputs = {"output": outputs} end_time = datetime.datetime.now(datetime.timezone.utc) return self.test_suite.end_run( run_tree, self.example_id, outputs, end_time=end_time, pytest_plugin=self.pytest_plugin, pytest_nodeid=self.pytest_nodeid, ) _TEST_CASE = contextvars.ContextVar[Optional[_TestCase]]("_TEST_CASE", default=None) class _UTExtra(TypedDict, total=False): client: Optional[ls_client.Client] id: Optional[uuid.UUID] output_keys: Optional[Sequence[str]] test_suite_name: Optional[str] cache: Optional[str] def _get_test_repr(func: Callable, sig: inspect.Signature) -> str: name = getattr(func, "__name__", None) or "" description = getattr(func, "__doc__", None) or "" if description: description = f" - {description.strip()}" return f"{name}{sig}{description}" def _create_test_case( func: Callable, *args: Any, pytest_request: Any, langtest_extra: _UTExtra, **kwargs: Any, ) -> _TestCase: client = langtest_extra["client"] or rt.get_cached_client() output_keys = langtest_extra["output_keys"] signature = inspect.signature(func) inputs = rh._get_inputs_safe(signature, *args, **kwargs) or None outputs = None if output_keys: outputs = {} if not inputs: msg = ( "'output_keys' should only be specified when marked test function has " "input arguments." ) raise ValueError(msg) for k in output_keys: outputs[k] = inputs.pop(k, None) test_suite = _LangSmithTestSuite.from_test( client, func, langtest_extra.get("test_suite_name") ) example_id, example_name = _get_example_id(func, inputs, test_suite.id) example_id = langtest_extra["id"] or example_id test_suite.sync_example( example_id, inputs=inputs, outputs=outputs, metadata={"signature": _get_test_repr(func, signature), "name": example_name}, ) pytest_plugin = ( pytest_request.config.pluginmanager.get_plugin("langsmith_output_plugin") if pytest_request else None ) pytest_nodeid = pytest_request.node.nodeid if pytest_request else None if pytest_plugin: pytest_plugin.test_suite_urls[test_suite._dataset.name] = ( cast(str, test_suite._dataset.url) + "/compare?selectedSessions=" + str(test_suite.experiment_id) ) return _TestCase( test_suite, example_id, run_id=uuid.uuid4(), pytest_plugin=pytest_plugin, pytest_nodeid=pytest_nodeid, ) def _run_test( func: Callable, *test_args: Any, pytest_request: Any, langtest_extra: _UTExtra, **test_kwargs: Any, ) -> None: test_case = _create_test_case( func, *test_args, **test_kwargs, pytest_request=pytest_request, langtest_extra=langtest_extra, ) _TEST_CASE.set(test_case) func_sig = inspect.signature(func) func_inputs = rh._get_inputs_safe(func_sig, *test_args, **test_kwargs) def _test(): test_case.start_time() with rh.trace( name=getattr(func, "__name__", "Test"), run_id=test_case.run_id, reference_example_id=test_case.example_id, inputs=func_inputs, project_name=test_case.test_suite.name, exceptions_to_handle=(SkipException,), _end_on_exit=False, ) as run_tree: try: result = func(*test_args, **test_kwargs) except SkipException as e: test_case.submit_test_result(error=repr(e), skipped=True) test_case.end_run(run_tree, {"skipped_reason": repr(e)}) raise e except BaseException as e: test_case.submit_test_result(error=repr(e)) test_case.end_run(run_tree, None) raise e else: test_case.end_run(run_tree, result) finally: test_case.end_time() try: test_case.submit_test_result() except BaseException as e: logger.warning( f"Failed to create feedback for run_id {test_case.run_id}:\n{e}" ) if langtest_extra["cache"]: cache_path = Path(langtest_extra["cache"]) / f"{test_case.test_suite.id}.yaml" else: cache_path = None current_context = rh.get_tracing_context() metadata = { **(current_context["metadata"] or {}), **{ "experiment": test_case.test_suite.experiment.name, "reference_example_id": str(test_case.example_id), }, } with rh.tracing_context( **{**current_context, "metadata": metadata} ), ls_utils.with_optional_cache( cache_path, ignore_hosts=[test_case.test_suite.client.api_url] ): _test() async def _arun_test( func: Callable, *test_args: Any, pytest_request: Any, langtest_extra: _UTExtra, **test_kwargs: Any, ) -> None: test_case = _create_test_case( func, *test_args, **test_kwargs, pytest_request=pytest_request, langtest_extra=langtest_extra, ) _TEST_CASE.set(test_case) func_sig = inspect.signature(func) func_inputs = rh._get_inputs_safe(func_sig, *test_args, **test_kwargs) async def _test(): test_case.start_time() with rh.trace( name=getattr(func, "__name__", "Test"), run_id=test_case.run_id, reference_example_id=test_case.example_id, inputs=func_inputs, project_name=test_case.test_suite.name, exceptions_to_handle=(SkipException,), _end_on_exit=False, ) as run_tree: try: result = await func(*test_args, **test_kwargs) except SkipException as e: test_case.submit_test_result(error=repr(e), skipped=True) test_case.end_run(run_tree, {"skipped_reason": repr(e)}) raise e except BaseException as e: test_case.submit_test_result(error=repr(e)) test_case.end_run(run_tree, None) raise e else: test_case.end_run(run_tree, result) finally: test_case.end_time() try: test_case.submit_test_result() except BaseException as e: logger.warning( f"Failed to create feedback for run_id {test_case.run_id}:\n{e}" ) if langtest_extra["cache"]: cache_path = Path(langtest_extra["cache"]) / f"{test_case.test_suite.id}.yaml" else: cache_path = None current_context = rh.get_tracing_context() metadata = { **(current_context["metadata"] or {}), **{ "experiment": test_case.test_suite.experiment.name, "reference_example_id": str(test_case.example_id), }, } with rh.tracing_context( **{**current_context, "metadata": metadata} ), ls_utils.with_optional_cache( cache_path, ignore_hosts=[test_case.test_suite.client.api_url] ): await _test() # For backwards compatibility unit = test
[docs] def log_inputs(inputs: dict, /) -> None: """Log run inputs from within a pytest test run. .. warning:: This API is in beta and might change in future versions. Should only be used in pytest tests decorated with @pytest.mark.langsmith. Args: inputs: Inputs to log. Example: .. code-block:: python from langsmith import testing as t @pytest.mark.langsmith def test_foo() -> None: x = 0 y = 1 t.log_inputs({"x": x, "y": y}) assert foo(x, y) == 2 """ if ls_utils.test_tracking_is_disabled(): logger.info( "LANGSMITH_TEST_TRACKING is set to 'false'." " Skipping log_inputs." ) return run_tree = rh.get_current_run_tree() test_case = _TEST_CASE.get() if not run_tree or not test_case: msg = ( "log_inputs should only be called within a pytest test decorated with " "@pytest.mark.langsmith, and with tracing enabled (by setting the " "LANGSMITH_TRACING environment variable to 'true')." ) raise ValueError(msg) run_tree.add_inputs(inputs) test_case.sync_example(inputs=inputs)
[docs] def log_outputs(outputs: dict, /) -> None: """Log run outputs from within a pytest test run. .. warning:: This API is in beta and might change in future versions. Should only be used in pytest tests decorated with @pytest.mark.langsmith. Args: outputs: Outputs to log. Example: .. code-block:: python from langsmith import testing as t @pytest.mark.langsmith def test_foo() -> None: x = 0 y = 1 result = foo(x, y) t.log_outputs({"foo": result}) assert result == 2 """ if ls_utils.test_tracking_is_disabled(): logger.info( "LANGSMITH_TEST_TRACKING is set to 'false'." " Skipping log_outputs." ) return run_tree = rh.get_current_run_tree() test_case = _TEST_CASE.get() if not run_tree or not test_case: msg = ( "log_outputs should only be called within a pytest test decorated with " "@pytest.mark.langsmith, and with tracing enabled (by setting the " "LANGSMITH_TRACING environment variable to 'true')." ) raise ValueError(msg) run_tree.add_outputs(outputs) test_case.log_outputs(outputs)
[docs] def log_reference_outputs(outputs: dict, /) -> None: """Log example reference outputs from within a pytest test run. .. warning:: This API is in beta and might change in future versions. Should only be used in pytest tests decorated with @pytest.mark.langsmith. Args: outputs: Reference outputs to log. Example: .. code-block:: python from langsmith import testing @pytest.mark.langsmith def test_foo() -> None: x = 0 y = 1 expected = 2 testing.log_reference_outputs({"foo": expected}) assert foo(x, y) == expected """ if ls_utils.test_tracking_is_disabled(): logger.info( "LANGSMITH_TEST_TRACKING is set to 'false'." " Skipping log_reference_outputs." ) return test_case = _TEST_CASE.get() if not test_case: msg = ( "log_reference_outputs should only be called within a pytest test " "decorated with @pytest.mark.langsmith." ) raise ValueError(msg) test_case.sync_example(outputs=outputs)
[docs] def log_feedback( feedback: Optional[Union[dict, list[dict]]] = None, /, *, key: str, score: Optional[Union[int, bool, float]] = None, value: Optional[Union[str, int, float, bool]] = None, **kwargs: Any, ) -> None: """Log run feedback from within a pytest test run. .. warning:: This API is in beta and might change in future versions. Should only be used in pytest tests decorated with @pytest.mark.langsmith. Args: key: Feedback name. score: Numerical feedback value. value: Categorical feedback value kwargs: Any other Client.create_feedback args. Example: .. code-block:: python import pytest from langsmith import testing as t @pytest.mark.langsmith def test_foo() -> None: x = 0 y = 1 expected = 2 result = foo(x, y) t.log_feedback(key="right_type", score=isinstance(result, int)) assert result == expected """ if ls_utils.test_tracking_is_disabled(): logger.info( "LANGSMITH_TEST_TRACKING is set to 'false'." " Skipping log_feedback." ) return if feedback and any((key, score, value)): msg = "Must specify one of 'feedback' and ('key', 'score', 'value'), not both." raise ValueError(msg) elif not (feedback or key): msg = "Must specify at least one of 'feedback' or ('key', 'score', value')." raise ValueError(msg) elif key: feedback = {"key": key} if score is not None: feedback["score"] = score if value is not None: feedback["value"] = value else: pass run_tree = rh.get_current_run_tree() test_case = _TEST_CASE.get() if not run_tree or not test_case: msg = ( "log_feedback should only be called within a pytest test decorated with " "@pytest.mark.langsmith, and with tracing enabled (by setting the " "LANGSMITH_TRACING environment variable to 'true')." ) raise ValueError(msg) if run_tree.session_name == "evaluators" and run_tree.metadata.get( "reference_run_id" ): run_id = run_tree.metadata["reference_run_id"] run_tree.add_outputs( feedback if isinstance(feedback, dict) else {"feedback": feedback} ) kwargs["source_run_id"] = run_tree.id else: run_id = run_tree.trace_id test_case.submit_feedback(run_id, cast(Union[list, dict], feedback), **kwargs)
[docs] @contextlib.contextmanager def trace_feedback( *, name: str = "Feedback" ) -> Generator[Optional[run_trees.RunTree], None, None]: """Trace the computation of a pytest run feedback as its own run. .. warning:: This API is in beta and might change in future versions. Args: name: Feedback run name. Defaults to "Feedback". Example: .. code-block:: python import openai import pytest from langsmith import testing as t from langsmith import wrappers oai_client = wrappers.wrap_openai(openai.Client()) @pytest.mark.langsmith def test_openai_says_hello(): # Traced code will be included in the test case text = "Say hello!" response = oai_client.chat.completions.create( model="gpt-4o-mini", messages=[ {"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": text}, ], ) t.log_inputs({"text": text}) t.log_outputs({"response": response.choices[0].message.content}) t.log_reference_outputs({"response": "hello!"}) # Use this context manager to trace any steps used for generating evaluation # feedback separately from the main application logic with t.trace_feedback(): grade = oai_client.chat.completions.create( model="gpt-4o-mini", messages=[ { "role": "system", "content": "Return 1 if 'hello' is in the user message and 0 otherwise.", }, { "role": "user", "content": response.choices[0].message.content, }, ], ) # Make sure to log relevant feedback within the context for the # trace to be associated with this feedback. t.log_feedback( key="llm_judge", score=float(grade.choices[0].message.content) ) assert "hello" in response.choices[0].message.content.lower() """ # noqa: E501 if ls_utils.test_tracking_is_disabled(): logger.info( "LANGSMITH_TEST_TRACKING is set to 'false'." " Skipping log_feedback." ) yield None return parent_run = rh.get_current_run_tree() test_case = _TEST_CASE.get() if not parent_run or not test_case: msg = ( "trace_feedback should only be called within a pytest test decorated with " "@pytest.mark.langsmith, and with tracing enabled (by setting the " "LANGSMITH_TRACING environment variable to 'true')." ) raise ValueError(msg) metadata = { "experiment": test_case.test_suite.experiment.name, "reference_example_id": test_case.example_id, "reference_run_id": parent_run.id, } with rh.trace( name=name, inputs=parent_run.outputs, parent="ignore", project_name="evaluators", metadata=metadata, ) as run_tree: yield run_tree
def _stringify(x: Any) -> str: try: return dumps_json(x).decode("utf-8", errors="surrogateescape") except Exception: return str(x)