aevaluate#
- async langsmith.evaluation._arunner.aevaluate(target: ATARGET_T | AsyncIterable[dict] | Runnable | str | uuid.UUID | schemas.TracerSession, /, data: DATA_T | AsyncIterable[schemas.Example] | Iterable[schemas.Example] | None = None, evaluators: Sequence[EVALUATOR_T | AEVALUATOR_T] | None = None, summary_evaluators: Sequence[SUMMARY_EVALUATOR_T] | None = None, metadata: dict | None = None, experiment_prefix: str | None = None, description: str | None = None, max_concurrency: int | None = 0, num_repetitions: int = 1, client: langsmith.Client | None = None, blocking: bool = True, experiment: schemas.TracerSession | str | uuid.UUID | None = None, upload_results: bool = True, **kwargs: Any) AsyncExperimentResults [source]#
Evaluate an async target system on a given dataset.
- Parameters:
target (AsyncCallable[[dict], dict] | AsyncIterable[dict] | Runnable | EXPERIMENT_T | Tuple[EXPERIMENT_T, EXPERIMENT_T]) – The target system or experiment(s) to evaluate. Can be an async function that takes a dict and returns a dict, a langchain Runnable, an existing experiment ID, or a two-tuple of experiment IDs.
data (Union[DATA_T, AsyncIterable[schemas.Example]]) – The dataset to evaluate on. Can be a dataset name, a list of examples, an async generator of examples, or an async iterable of examples.
evaluators (Optional[Sequence[EVALUATOR_T]]) – A list of evaluators to run on each example. Defaults to None.
summary_evaluators (Optional[Sequence[SUMMARY_EVALUATOR_T]]) – A list of summary evaluators to run on the entire dataset. Defaults to None.
metadata (Optional[dict]) – Metadata to attach to the experiment. Defaults to None.
experiment_prefix (Optional[str]) – A prefix to provide for your experiment name. Defaults to None.
description (Optional[str]) – A description of the experiment.
max_concurrency (int | None) – The maximum number of concurrent evaluations to run. If None then no limit is set. If 0 then no concurrency. Defaults to 0.
num_repetitions (int) – The number of times to run the evaluation. Each item in the dataset will be run and evaluated this many times. Defaults to 1.
client (Optional[langsmith.Client]) – The LangSmith client to use. Defaults to None.
blocking (bool) – Whether to block until the evaluation is complete. Defaults to True.
experiment (Optional[schemas.TracerSession]) – An existing experiment to extend. If provided, experiment_prefix is ignored. For advanced usage only.
load_nested – Whether to load all child runs for the experiment. Default is to only load the top-level root runs. Should only be specified when evaluating an existing experiment.
upload_results (bool) –
kwargs (Any) –
- Returns:
An async iterator over the experiment results.
- Return type:
AsyncIterator[ExperimentResultRow]
- Environment:
- LANGSMITH_TEST_CACHE: If set, API calls will be cached to disk to save time and
cost during testing. Recommended to commit the cache files to your repository for faster CI/CD runs. Requires the ‘langsmith[vcr]’ package to be installed.
Examples
>>> from typing import Sequence >>> from langsmith import Client, aevaluate >>> from langsmith.schemas import Example, Run >>> client = Client() >>> dataset = client.clone_public_dataset( ... "https://smith.langchain.com/public/419dcab2-1d66-4b94-8901-0357ead390df/d" ... ) >>> dataset_name = "Evaluate Examples"
Basic usage:
>>> def accuracy(run: Run, example: Example): ... # Row-level evaluator for accuracy. ... pred = run.outputs["output"] ... expected = example.outputs["answer"] ... return {"score": expected.lower() == pred.lower()}
>>> def precision(runs: Sequence[Run], examples: Sequence[Example]): ... # Experiment-level evaluator for precision. ... # TP / (TP + FP) ... predictions = [run.outputs["output"].lower() for run in runs] ... expected = [example.outputs["answer"].lower() for example in examples] ... # yes and no are the only possible answers ... tp = sum([p == e for p, e in zip(predictions, expected) if p == "yes"]) ... fp = sum([p == "yes" and e == "no" for p, e in zip(predictions, expected)]) ... return {"score": tp / (tp + fp)}
>>> import asyncio >>> async def apredict(inputs: dict) -> dict: ... # This can be any async function or just an API call to your app. ... await asyncio.sleep(0.1) ... return {"output": "Yes"} >>> results = asyncio.run( ... aevaluate( ... apredict, ... data=dataset_name, ... evaluators=[accuracy], ... summary_evaluators=[precision], ... experiment_prefix="My Experiment", ... description="Evaluate the accuracy of the model asynchronously.", ... metadata={ ... "my-prompt-version": "abcd-1234", ... }, ... ) ... ) View the evaluation results for experiment:...
Evaluating over only a subset of the examples using an async generator:
>>> async def example_generator(): ... examples = client.list_examples(dataset_name=dataset_name, limit=5) ... for example in examples: ... yield example >>> results = asyncio.run( ... aevaluate( ... apredict, ... data=example_generator(), ... evaluators=[accuracy], ... summary_evaluators=[precision], ... experiment_prefix="My Subset Experiment", ... description="Evaluate a subset of examples asynchronously.", ... ) ... ) View the evaluation results for experiment:...
Streaming each prediction to more easily + eagerly debug.
>>> results = asyncio.run( ... aevaluate( ... apredict, ... data=dataset_name, ... evaluators=[accuracy], ... summary_evaluators=[precision], ... experiment_prefix="My Streaming Experiment", ... description="Streaming predictions for debugging.", ... blocking=False, ... ) ... ) View the evaluation results for experiment:...
>>> async def aenumerate(iterable): ... async for elem in iterable: ... print(elem) >>> asyncio.run(aenumerate(results))
Running without concurrency:
>>> results = asyncio.run( ... aevaluate( ... apredict, ... data=dataset_name, ... evaluators=[accuracy], ... summary_evaluators=[precision], ... experiment_prefix="My Experiment Without Concurrency", ... description="This was run without concurrency.", ... max_concurrency=0, ... ) ... ) View the evaluation results for experiment:...
Using Async evaluators:
>>> async def helpfulness(run: Run, example: Example): ... # Row-level evaluator for helpfulness. ... await asyncio.sleep(5) # Replace with your LLM API call ... return {"score": run.outputs["output"] == "Yes"}
>>> results = asyncio.run( ... aevaluate( ... apredict, ... data=dataset_name, ... evaluators=[helpfulness], ... summary_evaluators=[precision], ... experiment_prefix="My Helpful Experiment", ... description="Applying async evaluators example.", ... ) ... ) View the evaluation results for experiment:...
Changed in version 0.2.0: ‘max_concurrency’ default updated from None (no limit on concurrency) to 0 (no concurrency at all).