aevaluate#

async langsmith.evaluation._arunner.aevaluate(target: ATARGET_T | AsyncIterable[dict] | Runnable | str | uuid.UUID | schemas.TracerSession, /, data: DATA_T | AsyncIterable[schemas.Example] | Iterable[schemas.Example] | None = None, evaluators: Sequence[EVALUATOR_T | AEVALUATOR_T] | None = None, summary_evaluators: Sequence[SUMMARY_EVALUATOR_T] | None = None, metadata: dict | None = None, experiment_prefix: str | None = None, description: str | None = None, max_concurrency: int | None = 0, num_repetitions: int = 1, client: langsmith.Client | None = None, blocking: bool = True, experiment: schemas.TracerSession | str | uuid.UUID | None = None, upload_results: bool = True, **kwargs: Any) AsyncExperimentResults[source]#

Evaluate an async target system on a given dataset.

Parameters:
  • target (AsyncCallable[[dict], dict] | AsyncIterable[dict] | Runnable | EXPERIMENT_T | Tuple[EXPERIMENT_T, EXPERIMENT_T]) – The target system or experiment(s) to evaluate. Can be an async function that takes a dict and returns a dict, a langchain Runnable, an existing experiment ID, or a two-tuple of experiment IDs.

  • data (Union[DATA_T, AsyncIterable[schemas.Example]]) – The dataset to evaluate on. Can be a dataset name, a list of examples, an async generator of examples, or an async iterable of examples.

  • evaluators (Optional[Sequence[EVALUATOR_T]]) – A list of evaluators to run on each example. Defaults to None.

  • summary_evaluators (Optional[Sequence[SUMMARY_EVALUATOR_T]]) – A list of summary evaluators to run on the entire dataset. Defaults to None.

  • metadata (Optional[dict]) – Metadata to attach to the experiment. Defaults to None.

  • experiment_prefix (Optional[str]) – A prefix to provide for your experiment name. Defaults to None.

  • description (Optional[str]) – A description of the experiment.

  • max_concurrency (int | None) – The maximum number of concurrent evaluations to run. If None then no limit is set. If 0 then no concurrency. Defaults to 0.

  • num_repetitions (int) – The number of times to run the evaluation. Each item in the dataset will be run and evaluated this many times. Defaults to 1.

  • client (Optional[langsmith.Client]) – The LangSmith client to use. Defaults to None.

  • blocking (bool) – Whether to block until the evaluation is complete. Defaults to True.

  • experiment (Optional[schemas.TracerSession]) – An existing experiment to extend. If provided, experiment_prefix is ignored. For advanced usage only.

  • load_nested – Whether to load all child runs for the experiment. Default is to only load the top-level root runs. Should only be specified when evaluating an existing experiment.

  • upload_results (bool) –

  • kwargs (Any) –

Returns:

An async iterator over the experiment results.

Return type:

AsyncIterator[ExperimentResultRow]

Environment:
  • LANGSMITH_TEST_CACHE: If set, API calls will be cached to disk to save time and

    cost during testing. Recommended to commit the cache files to your repository for faster CI/CD runs. Requires the ‘langsmith[vcr]’ package to be installed.

Examples

>>> from typing import Sequence
>>> from langsmith import Client, aevaluate
>>> from langsmith.schemas import Example, Run
>>> client = Client()
>>> dataset = client.clone_public_dataset(
...     "https://smith.langchain.com/public/419dcab2-1d66-4b94-8901-0357ead390df/d"
... )
>>> dataset_name = "Evaluate Examples"

Basic usage:

>>> def accuracy(run: Run, example: Example):
...     # Row-level evaluator for accuracy.
...     pred = run.outputs["output"]
...     expected = example.outputs["answer"]
...     return {"score": expected.lower() == pred.lower()}
>>> def precision(runs: Sequence[Run], examples: Sequence[Example]):
...     # Experiment-level evaluator for precision.
...     # TP / (TP + FP)
...     predictions = [run.outputs["output"].lower() for run in runs]
...     expected = [example.outputs["answer"].lower() for example in examples]
...     # yes and no are the only possible answers
...     tp = sum([p == e for p, e in zip(predictions, expected) if p == "yes"])
...     fp = sum([p == "yes" and e == "no" for p, e in zip(predictions, expected)])
...     return {"score": tp / (tp + fp)}
>>> import asyncio
>>> async def apredict(inputs: dict) -> dict:
...     # This can be any async function or just an API call to your app.
...     await asyncio.sleep(0.1)
...     return {"output": "Yes"}
>>> results = asyncio.run(
...     aevaluate(
...         apredict,
...         data=dataset_name,
...         evaluators=[accuracy],
...         summary_evaluators=[precision],
...         experiment_prefix="My Experiment",
...         description="Evaluate the accuracy of the model asynchronously.",
...         metadata={
...             "my-prompt-version": "abcd-1234",
...         },
...     )
... )  
View the evaluation results for experiment:...

Evaluating over only a subset of the examples using an async generator:

>>> async def example_generator():
...     examples = client.list_examples(dataset_name=dataset_name, limit=5)
...     for example in examples:
...         yield example
>>> results = asyncio.run(
...     aevaluate(
...         apredict,
...         data=example_generator(),
...         evaluators=[accuracy],
...         summary_evaluators=[precision],
...         experiment_prefix="My Subset Experiment",
...         description="Evaluate a subset of examples asynchronously.",
...     )
... )  
View the evaluation results for experiment:...

Streaming each prediction to more easily + eagerly debug.

>>> results = asyncio.run(
...     aevaluate(
...         apredict,
...         data=dataset_name,
...         evaluators=[accuracy],
...         summary_evaluators=[precision],
...         experiment_prefix="My Streaming Experiment",
...         description="Streaming predictions for debugging.",
...         blocking=False,
...     )
... )  
View the evaluation results for experiment:...
>>> async def aenumerate(iterable):
...     async for elem in iterable:
...         print(elem)
>>> asyncio.run(aenumerate(results))

Running without concurrency:

>>> results = asyncio.run(
...     aevaluate(
...         apredict,
...         data=dataset_name,
...         evaluators=[accuracy],
...         summary_evaluators=[precision],
...         experiment_prefix="My Experiment Without Concurrency",
...         description="This was run without concurrency.",
...         max_concurrency=0,
...     )
... )  
View the evaluation results for experiment:...

Using Async evaluators:

>>> async def helpfulness(run: Run, example: Example):
...     # Row-level evaluator for helpfulness.
...     await asyncio.sleep(5)  # Replace with your LLM API call
...     return {"score": run.outputs["output"] == "Yes"}
>>> results = asyncio.run(
...     aevaluate(
...         apredict,
...         data=dataset_name,
...         evaluators=[helpfulness],
...         summary_evaluators=[precision],
...         experiment_prefix="My Helpful Experiment",
...         description="Applying async evaluators example.",
...     )
... )  
View the evaluation results for experiment:...

Changed in version 0.2.0: ‘max_concurrency’ default updated from None (no limit on concurrency) to 0 (no concurrency at all).