aevaluate#

async langsmith.evaluation._arunner.aevaluate(
target: ATARGET_T | AsyncIterable[dict] | Runnable | str | uuid.UUID | schemas.TracerSession,
/,
data: DATA_T | AsyncIterable[schemas.Example] | Iterable[schemas.Example] | None = None,
evaluators: Sequence[EVALUATOR_T | AEVALUATOR_T] | None = None,
summary_evaluators: Sequence[SUMMARY_EVALUATOR_T] | None = None,
metadata: dict | None = None,
experiment_prefix: str | None = None,
description: str | None = None,
max_concurrency: int | None = 0,
num_repetitions: int = 1,
client: langsmith.Client | None = None,
blocking: bool = True,
experiment: schemas.TracerSession | str | uuid.UUID | None = None,
upload_results: bool = True,
error_handling: Literal['log', 'ignore'] = 'log',
**kwargs: Any,
) AsyncExperimentResults[source]#

Evaluate an async target system on a given dataset.

Parameters:
  • target (AsyncCallable[[dict], dict] | AsyncIterable[dict] | Runnable | EXPERIMENT_T | Tuple[EXPERIMENT_T, EXPERIMENT_T]) – The target system or experiment(s) to evaluate. Can be an async function that takes a dict and returns a dict, a langchain Runnable, an existing experiment ID, or a two-tuple of experiment IDs.

  • data (Union[DATA_T, AsyncIterable[schemas.Example]]) – The dataset to evaluate on. Can be a dataset name, a list of examples, an async generator of examples, or an async iterable of examples.

  • evaluators (Optional[Sequence[EVALUATOR_T]]) – A list of evaluators to run on each example. Defaults to None.

  • summary_evaluators (Optional[Sequence[SUMMARY_EVALUATOR_T]]) – A list of summary evaluators to run on the entire dataset. Defaults to None.

  • metadata (Optional[dict]) – Metadata to attach to the experiment. Defaults to None.

  • experiment_prefix (Optional[str]) – A prefix to provide for your experiment name. Defaults to None.

  • description (Optional[str]) – A description of the experiment.

  • max_concurrency (int | None) – The maximum number of concurrent evaluations to run. If None then no limit is set. If 0 then no concurrency. Defaults to 0.

  • num_repetitions (int) – The number of times to run the evaluation. Each item in the dataset will be run and evaluated this many times. Defaults to 1.

  • client (Optional[langsmith.Client]) – The LangSmith client to use. Defaults to None.

  • blocking (bool) – Whether to block until the evaluation is complete. Defaults to True.

  • experiment (Optional[schemas.TracerSession]) – An existing experiment to extend. If provided, experiment_prefix is ignored. For advanced usage only.

  • load_nested – Whether to load all child runs for the experiment. Default is to only load the top-level root runs. Should only be specified when evaluating an existing experiment.

  • error_handling (str, default="log") – How to handle individual run errors. ‘log’ will trace the runs with the error message as part of the experiment, ‘ignore’ will not count the run as part of the experiment at all.

  • upload_results (bool)

  • kwargs (Any)

Returns:

An async iterator over the experiment results.

Return type:

AsyncIterator[ExperimentResultRow]

Environment:
  • LANGSMITH_TEST_CACHE: If set, API calls will be cached to disk to save time and

    cost during testing. Recommended to commit the cache files to your repository for faster CI/CD runs. Requires the ‘langsmith[vcr]’ package to be installed.

Examples

>>> from typing import Sequence
>>> from langsmith import Client, aevaluate
>>> from langsmith.schemas import Example, Run
>>> client = Client()
>>> dataset = client.clone_public_dataset(
...     "https://smith.langchain.com/public/419dcab2-1d66-4b94-8901-0357ead390df/d"
... )
>>> dataset_name = "Evaluate Examples"

Basic usage:

>>> def accuracy(run: Run, example: Example):
...     # Row-level evaluator for accuracy.
...     pred = run.outputs["output"]
...     expected = example.outputs["answer"]
...     return {"score": expected.lower() == pred.lower()}
>>> def precision(runs: Sequence[Run], examples: Sequence[Example]):
...     # Experiment-level evaluator for precision.
...     # TP / (TP + FP)
...     predictions = [run.outputs["output"].lower() for run in runs]
...     expected = [example.outputs["answer"].lower() for example in examples]
...     # yes and no are the only possible answers
...     tp = sum([p == e for p, e in zip(predictions, expected) if p == "yes"])
...     fp = sum([p == "yes" and e == "no" for p, e in zip(predictions, expected)])
...     return {"score": tp / (tp + fp)}
>>> import asyncio
>>> async def apredict(inputs: dict) -> dict:
...     # This can be any async function or just an API call to your app.
...     await asyncio.sleep(0.1)
...     return {"output": "Yes"}
>>> results = asyncio.run(
...     aevaluate(
...         apredict,
...         data=dataset_name,
...         evaluators=[accuracy],
...         summary_evaluators=[precision],
...         experiment_prefix="My Experiment",
...         description="Evaluate the accuracy of the model asynchronously.",
...         metadata={
...             "my-prompt-version": "abcd-1234",
...         },
...     )
... )
View the evaluation results for experiment:...

Evaluating over only a subset of the examples using an async generator:

>>> async def example_generator():
...     examples = client.list_examples(dataset_name=dataset_name, limit=5)
...     for example in examples:
...         yield example
>>> results = asyncio.run(
...     aevaluate(
...         apredict,
...         data=example_generator(),
...         evaluators=[accuracy],
...         summary_evaluators=[precision],
...         experiment_prefix="My Subset Experiment",
...         description="Evaluate a subset of examples asynchronously.",
...     )
... )
View the evaluation results for experiment:...

Streaming each prediction to more easily + eagerly debug.

>>> results = asyncio.run(
...     aevaluate(
...         apredict,
...         data=dataset_name,
...         evaluators=[accuracy],
...         summary_evaluators=[precision],
...         experiment_prefix="My Streaming Experiment",
...         description="Streaming predictions for debugging.",
...         blocking=False,
...     )
... )
View the evaluation results for experiment:...
>>> async def aenumerate(iterable):
...     async for elem in iterable:
...         print(elem)
>>> asyncio.run(aenumerate(results))

Running without concurrency:

>>> results = asyncio.run(
...     aevaluate(
...         apredict,
...         data=dataset_name,
...         evaluators=[accuracy],
...         summary_evaluators=[precision],
...         experiment_prefix="My Experiment Without Concurrency",
...         description="This was run without concurrency.",
...         max_concurrency=0,
...     )
... )
View the evaluation results for experiment:...

Using Async evaluators:

>>> async def helpfulness(run: Run, example: Example):
...     # Row-level evaluator for helpfulness.
...     await asyncio.sleep(5)  # Replace with your LLM API call
...     return {"score": run.outputs["output"] == "Yes"}
>>> results = asyncio.run(
...     aevaluate(
...         apredict,
...         data=dataset_name,
...         evaluators=[helpfulness],
...         summary_evaluators=[precision],
...         experiment_prefix="My Helpful Experiment",
...         description="Applying async evaluators example.",
...     )
... )
View the evaluation results for experiment:...

Changed in version 0.2.0: ‘max_concurrency’ default updated from None (no limit on concurrency) to 0 (no concurrency at all).