"""Client for interacting with the LangSmith API.Use the client to customize API keys / workspace connections, SSL certs,etc. for tracing.Also used to create, read, update, and delete LangSmith resourcessuch as runs (~trace spans), datasets, examples (~records),feedback (~metrics), projects (tracer sessions/groups), etc.For detailed API documentation, visit: https://docs.smith.langchain.com/."""from__future__importannotationsimportatexitimportcollectionsimportconcurrent.futuresascfimportcontextlibimportdatetimeimportfunctoolsimportimportlibimportimportlib.metadataimportioimportitertoolsimportjsonimportloggingimportosimportrandomimportthreadingimporttimeimporttracebackimporttypingimportuuidimportwarningsimportweakreffrominspectimportsignaturefrompathlibimportPathfromqueueimportPriorityQueuefromtypingimport(TYPE_CHECKING,Any,AsyncIterable,Callable,DefaultDict,Dict,Iterable,Iterator,List,Literal,Mapping,Optional,Sequence,Tuple,Type,Union,cast,)fromurllibimportparseasurllib_parseimportrequestsfromrequestsimportadaptersasrequests_adaptersfromrequests_toolbeltimport(# type: ignore[import-untyped]multipartasrqtb_multipart,)fromtyping_extensionsimportTypeGuard,overloadfromurllib3.poolmanagerimportPoolKey# type: ignore[attr-defined, import-untyped]fromurllib3.utilimportRetry# type: ignore[import-untyped]importlangsmithfromlangsmithimportenvasls_envfromlangsmithimportschemasasls_schemasfromlangsmithimportutilsasls_utilsfromlangsmith._internalimport_orjsonfromlangsmith._internal._background_threadimport(TracingQueueItem,)fromlangsmith._internal._background_threadimport(tracing_control_thread_funcas_tracing_control_thread_func,)fromlangsmith._internal._beta_decoratorimportwarn_betafromlangsmith._internal._compressed_tracesimportCompressedTracesfromlangsmith._internal._constantsimport(_AUTO_SCALE_UP_NTHREADS_LIMIT,_BLOCKSIZE_BYTES,_BOUNDARY,_SIZE_LIMIT_BYTES,)fromlangsmith._internal._multipartimport(MultipartPart,MultipartPartsAndContext,join_multipart_parts_and_context,)fromlangsmith._internal._operationsimport(SerializedFeedbackOperation,SerializedRunOperation,combine_serialized_queue_operations,compress_multipart_parts_and_context,serialize_feedback_dict,serialize_run_dict,serialized_feedback_operation_to_multipart_parts_and_context,serialized_run_operation_to_multipart_parts_and_context,)fromlangsmith._internal._serdeimportdumps_jsonas_dumps_jsonfromlangsmith.schemasimportAttachmentInfoHAS_OTEL=Falsetry:ifls_utils.is_truish(ls_utils.get_env_var("OTEL_ENABLED")):fromopentelemetryimporttraceasotel_trace# type: ignore[import]fromlangsmith._internal.otel._otel_clientimport(get_otlp_tracer_provider,)fromlangsmith._internal.otel._otel_exporterimportOTELExporterHAS_OTEL=TrueexceptImportError:raiseImportError("To use OTEL tracing, you must install it with `pip install langsmith[otel]`")try:fromzoneinfoimportZoneInfo# type: ignore[import-not-found]exceptImportError:classZoneInfo:# type: ignore[no-redef]"""Introduced in python 3.9."""try:fromopentelemetry.sdk.traceimportTracerProvider# type: ignore[import-not-found]exceptImportError:
[docs]classTracerProvider:# type: ignore[no-redef]"""Used for optional OTEL tracing."""
ifTYPE_CHECKING:importpandasaspd# type: ignorefromlangchain_core.runnablesimportRunnablefromlangsmithimportschemasfromlangsmith.evaluationimportevaluatorasls_evaluatorfromlangsmith.evaluation._arunnerimport(AEVALUATOR_T,ATARGET_T,AsyncExperimentResults,)fromlangsmith.evaluation._runnerimport(COMPARATIVE_EVALUATOR_T,DATA_T,EVALUATOR_T,EXPERIMENT_T,SUMMARY_EVALUATOR_T,TARGET_T,ComparativeExperimentResults,ExperimentResults,)logger=logging.getLogger(__name__)_urllib3_logger=logging.getLogger("urllib3.connectionpool")X_API_KEY="x-api-key"EMPTY_SEQ:tuple[Dict,...]=()URLLIB3_SUPPORTS_BLOCKSIZE="key_blocksize"insignature(PoolKey).parametersdef_parse_token_or_url(url_or_token:Union[str,uuid.UUID],api_url:str,num_parts:int=2,kind:str="dataset",)->Tuple[str,str]:"""Parse a public dataset URL or share token."""try:ifisinstance(url_or_token,uuid.UUID)oruuid.UUID(url_or_token):returnapi_url,str(url_or_token)exceptValueError:pass# Then it's a URLparsed_url=urllib_parse.urlparse(str(url_or_token))# Extract the UUID from the pathpath_parts=parsed_url.path.split("/")iflen(path_parts)>=num_parts:token_uuid=path_parts[-num_parts]_as_uuid(token_uuid,var="token parts")else:raisels_utils.LangSmithUserError(f"Invalid public {kind} URL: {url_or_token}")ifparsed_url.netloc=="smith.langchain.com":api_url="https://api.smith.langchain.com"elifparsed_url.netloc=="beta.smith.langchain.com":api_url="https://beta.api.smith.langchain.com"returnapi_url,token_uuiddef_is_langchain_hosted(url:str)->bool:"""Check if the URL is langchain hosted. Args: url (str): The URL to check. Returns: bool: True if the URL is langchain hosted, False otherwise. """try:netloc=urllib_parse.urlsplit(url).netloc.split(":")[0]returnnetloc.endswith("langchain.com")exceptException:returnFalseID_TYPE=Union[uuid.UUID,str]RUN_TYPE_T=Literal["tool","chain","llm","retriever","embedding","prompt","parser"]@functools.lru_cache(maxsize=1)def_default_retry_config()->Retry:"""Get the default retry configuration. If urllib3 version is 1.26 or greater, retry on all methods. Returns: Retry: The default retry configuration. """retry_params=dict(total=3,status_forcelist=[502,503,504,408,425],backoff_factor=0.5,# Sadly urllib3 1.x doesn't support backoff_jitterraise_on_redirect=False,raise_on_status=False,respect_retry_after_header=True,)# the `allowed_methods` keyword is not available in urllib3 < 1.26# check to see if urllib3 version is 1.26 or greaterurllib3_version=importlib.metadata.version("urllib3")use_allowed_methods=tuple(map(int,urllib3_version.split(".")))>=(1,26)ifuse_allowed_methods:# Retry on all methodsretry_params["allowed_methods"]=Nonereturnls_utils.LangSmithRetry(**retry_params)# type: ignoredefclose_session(session:requests.Session)->None:"""Close the session. Args: session (requests.Session): The session to close. """logger.debug("Closing Client.session")session.close()def_validate_api_key_if_hosted(api_url:str,api_key:Optional[str])->None:"""Verify API key is provided if url not localhost. Args: api_url (str): The API URL. api_key (Optional[str]): The API key. Returns: None Raises: LangSmithUserError: If the API key is not provided when using the hosted service. """# If the domain is langchain.com, raise error if no api_keyifnotapi_key:if_is_langchain_hosted(api_url):warnings.warn("API key must be provided when using hosted LangSmith API",ls_utils.LangSmithMissingAPIKeyWarning,)def_format_feedback_score(score:Union[float,int,bool,None]):"""Format a feedback score by truncating numerical values to 4 decimal places. Args: score: The score to format, can be a number or any other type Returns: The formatted score """ifisinstance(score,float):# Truncate at 4 decimal placesreturnround(score,4)returnscoredef_get_tracing_sampling_rate(tracing_sampling_rate:Optional[float]=None,)->float|None:"""Get the tracing sampling rate. Returns: Optional[float]: The tracing sampling rate. """iftracing_sampling_rateisNone:sampling_rate_str=ls_utils.get_env_var("TRACING_SAMPLING_RATE")ifnotsampling_rate_str:returnNoneelse:sampling_rate_str=str(tracing_sampling_rate)sampling_rate=float(sampling_rate_str)ifsampling_rate<0orsampling_rate>1:raisels_utils.LangSmithUserError("LANGSMITH_TRACING_SAMPLING_RATE must be between 0 and 1 if set."f" Got: {sampling_rate}")returnsampling_ratedef_get_write_api_urls(_write_api_urls:Optional[Dict[str,str]])->Dict[str,str]:_write_api_urls=_write_api_urlsorjson.loads(os.getenv("LANGSMITH_RUNS_ENDPOINTS","{}"))processed_write_api_urls={}forurl,api_keyin_write_api_urls.items():processed_url=url.strip()ifnotprocessed_url:raisels_utils.LangSmithUserError("LangSmith runs API URL within LANGSMITH_RUNS_ENDPOINTS cannot be empty")processed_url=processed_url.strip().strip('"').strip("'").rstrip("/")processed_api_key=api_key.strip().strip('"').strip("'")_validate_api_key_if_hosted(processed_url,processed_api_key)processed_write_api_urls[processed_url]=processed_api_keyreturnprocessed_write_api_urlsdef_as_uuid(value:ID_TYPE,var:Optional[str]=None)->uuid.UUID:try:returnuuid.UUID(value)ifnotisinstance(value,uuid.UUID)elsevalueexceptValueErrorase:var=varor"value"raisels_utils.LangSmithUserError(f"{var} must be a valid UUID or UUID string. Got {value}")frome@typing.overloaddef_ensure_uuid(value:Optional[Union[str,uuid.UUID]])->uuid.UUID:...@typing.overloaddef_ensure_uuid(value:Optional[Union[str,uuid.UUID]],*,accept_null:bool=True)->Optional[uuid.UUID]:...def_ensure_uuid(value:Optional[Union[str,uuid.UUID]],*,accept_null:bool=False):ifvalueisNone:ifaccept_null:returnNonereturnuuid.uuid4()return_as_uuid(value)@functools.lru_cache(maxsize=1)def_parse_url(url):parsed_url=urllib_parse.urlparse(url)host=parsed_url.netloc.split(":")[0]returnhostclass_LangSmithHttpAdapter(requests_adapters.HTTPAdapter):__attrs__=["max_retries","config","_pool_connections","_pool_maxsize","_pool_block","_blocksize",]def__init__(self,pool_connections:int=requests_adapters.DEFAULT_POOLSIZE,pool_maxsize:int=requests_adapters.DEFAULT_POOLSIZE,max_retries:Union[Retry,int,None]=requests_adapters.DEFAULT_RETRIES,pool_block:bool=requests_adapters.DEFAULT_POOLBLOCK,blocksize:int=16384,# default from urllib3.BaseHTTPSConnection)->None:self._blocksize=blocksizesuper().__init__(pool_connections,pool_maxsize,max_retries,pool_block)definit_poolmanager(self,connections,maxsize,block=False,**pool_kwargs):ifURLLIB3_SUPPORTS_BLOCKSIZE:# urllib3 before 2.0 doesn't support blocksizepool_kwargs["blocksize"]=self._blocksizereturnsuper().init_poolmanager(connections,maxsize,block,**pool_kwargs)
[docs]classClient:"""Client for interacting with the LangSmith API."""__slots__=["__weakref__","api_url","api_key","retry_config","timeout_ms","session","_get_data_type_cached","_web_url","_tenant_id","tracing_sample_rate","_filtered_post_uuids","tracing_queue","_anonymizer","_hide_inputs","_hide_outputs","_info","_write_api_urls","_settings","_manual_cleanup","_pyo3_client","compressed_traces","_data_available_event","_futures","otel_exporter",]
[docs]def__init__(self,api_url:Optional[str]=None,*,api_key:Optional[str]=None,retry_config:Optional[Retry]=None,timeout_ms:Optional[Union[int,Tuple[int,int]]]=None,web_url:Optional[str]=None,session:Optional[requests.Session]=None,auto_batch_tracing:bool=True,anonymizer:Optional[Callable[[dict],dict]]=None,hide_inputs:Optional[Union[Callable[[dict],dict],bool]]=None,hide_outputs:Optional[Union[Callable[[dict],dict],bool]]=None,info:Optional[Union[dict,ls_schemas.LangSmithInfo]]=None,api_urls:Optional[Dict[str,str]]=None,otel_tracer_provider:Optional[TracerProvider]=None,tracing_sampling_rate:Optional[float]=None,)->None:"""Initialize a Client instance. Args: api_url (Optional[str]): URL for the LangSmith API. Defaults to the LANGCHAIN_ENDPOINT environment variable or https://api.smith.langchain.com if not set. api_key (Optional[str]): API key for the LangSmith API. Defaults to the LANGCHAIN_API_KEY environment variable. retry_config (Optional[Retry]): Retry configuration for the HTTPAdapter. timeout_ms (Optional[Union[int, Tuple[int, int]]]): Timeout for the HTTPAdapter. Can also be a 2-tuple of (connect timeout, read timeout) to set them separately. web_url (Optional[str]): URL for the LangSmith web app. Default is auto-inferred from the ENDPOINT. session (Optional[requests.Session]): The session to use for requests. If None, a new session will be created. auto_batch_tracing (bool, default=True): Whether to automatically batch tracing. anonymizer (Optional[Callable[[dict], dict]]): A function applied for masking serialized run inputs and outputs, before sending to the API. hide_inputs (Optional[Union[Callable[[dict], dict], bool]]): Whether to hide run inputs when tracing with this client. If True, hides the entire inputs. If a function, applied to all run inputs when creating runs. hide_outputs (Optional[Union[Callable[[dict], dict], bool]]): Whether to hide run outputs when tracing with this client. If True, hides the entire outputs. If a function, applied to all run outputs when creating runs. info (Optional[ls_schemas.LangSmithInfo]): The information about the LangSmith API. If not provided, it will be fetched from the API. api_urls (Optional[Dict[str, str]]): A dictionary of write API URLs and their corresponding API keys. Useful for multi-tenant setups. Data is only read from the first URL in the dictionary. However, ONLY Runs are written (POST and PATCH) to all URLs in the dictionary. Feedback, sessions, datasets, examples, annotation queues and evaluation results are only written to the first. otel_tracer_provider (Optional[TracerProvider]): Optional tracer provider for OpenTelemetry integration. If not provided, a LangSmith-specific tracer provider will be used. tracing_sampling_rate (Optional[float]): The sampling rate for tracing. If provided, overrides the LANGCHAIN_TRACING_SAMPLING_RATE environment variable. Should be a float between 0 and 1, where 1 means trace everything and 0 means trace nothing. Raises: LangSmithUserError: If the API key is not provided when using the hosted service. If both api_url and api_urls are provided. """ifapi_urlandapi_urls:raisels_utils.LangSmithUserError("You cannot provide both api_url and api_urls.")if(os.getenv("LANGSMITH_ENDPOINT")oros.getenv("LANGCHAIN_ENDPOINT"))andos.getenv("LANGSMITH_RUNS_ENDPOINTS"):raisels_utils.LangSmithUserError("You cannot provide both LANGSMITH_ENDPOINT / LANGCHAIN_ENDPOINT ""and LANGSMITH_RUNS_ENDPOINTS.")self.tracing_sample_rate=_get_tracing_sampling_rate(tracing_sampling_rate)self._filtered_post_uuids:set[uuid.UUID]=set()self._write_api_urls:Mapping[str,Optional[str]]=_get_write_api_urls(api_urls)ifself._write_api_urls:self.api_url=next(iter(self._write_api_urls))self.api_key:Optional[str]=self._write_api_urls[self.api_url]else:self.api_url=ls_utils.get_api_url(api_url)self.api_key=ls_utils.get_api_key(api_key)_validate_api_key_if_hosted(self.api_url,self.api_key)self._write_api_urls={self.api_url:self.api_key}self.retry_config=retry_configor_default_retry_config()self.timeout_ms=((timeout_ms,timeout_ms)ifisinstance(timeout_ms,int)else(timeout_msor(10_000,90_001)))self._web_url=web_urlself._tenant_id:Optional[uuid.UUID]=None# Create a session and register a finalizer to close itsession_=sessionifsessionelserequests.Session()self.session=session_self._info=(infoifinfoisNoneorisinstance(info,ls_schemas.LangSmithInfo)elsels_schemas.LangSmithInfo(**info))weakref.finalize(self,close_session,self.session)atexit.register(close_session,session_)self.compressed_traces:Optional[CompressedTraces]=Noneself._data_available_event:Optional[threading.Event]=Noneself._futures:Optional[set[cf.Future]]=Noneself.otel_exporter:Optional[OTELExporter]=None# Initialize auto batchingifauto_batch_tracing:self.tracing_queue:Optional[PriorityQueue]=PriorityQueue()threading.Thread(target=_tracing_control_thread_func,# arg must be a weakref to self to avoid the Thread object# preventing garbage collection of the Client objectargs=(weakref.ref(self),),).start()else:self.tracing_queue=None# Mount the HTTPAdapter with the retry configuration.adapter=_LangSmithHttpAdapter(max_retries=self.retry_config,blocksize=_BLOCKSIZE_BYTES,# We need to set the pool_maxsize to a value greater than the# number of threads used for batch tracing, plus 1 for other# requests.pool_maxsize=_AUTO_SCALE_UP_NTHREADS_LIMIT+1,)self.session.mount("http://",adapter)self.session.mount("https://",adapter)self._get_data_type_cached=functools.lru_cache(maxsize=10)(self._get_data_type)self._anonymizer=anonymizerself._hide_inputs=(hide_inputsifhide_inputsisnotNoneelsels_utils.get_env_var("HIDE_INPUTS")=="true")self._hide_outputs=(hide_outputsifhide_outputsisnotNoneelsels_utils.get_env_var("HIDE_OUTPUTS")=="true")# To trigger this code, set the `LANGSMITH_USE_PYO3_CLIENT` env var to any value.self._pyo3_client=Noneifls_utils.get_env_var("USE_PYO3_CLIENT")isnotNone:langsmith_pyo3=Nonetry:importlangsmith_pyo3# type: ignore[import-not-found, no-redef]exceptImportErrorase:logger.warning("Failed to import `langsmith_pyo3` when PyO3 client was requested, ""falling back to Python impl: %s",repr(e),)iflangsmith_pyo3:# TODO: tweak these constants as neededqueue_capacity=1_000_000batch_size=100batch_timeout_millis=1000worker_threads=1try:self._pyo3_client=langsmith_pyo3.BlockingTracingClient(self.api_url,self.api_key,queue_capacity,batch_size,batch_timeout_millis,worker_threads,)exceptExceptionase:logger.warning("Failed to instantiate `langsmith_pyo3.BlockingTracingClient` ""when PyO3 client was requested, falling back to Python impl: %s",repr(e),)self._settings:Union[ls_schemas.LangSmithSettings,None]=Noneself._manual_cleanup=Falseifls_utils.is_truish(ls_utils.get_env_var("OTEL_ENABLED")):ifnotHAS_OTEL:warnings.warn("LANGSMITH_OTEL_ENABLED is set but OpenTelemetry packages are not installed. ""Install with `pip install langsmith[otel]`")existing_provider=otel_trace.get_tracer_provider()tracer=existing_provider.get_tracer(__name__)ifotel_tracer_providerisNone:# Use existing global provider if availableifnot(isinstance(existing_provider,otel_trace.ProxyTracerProvider)andhasattr(tracer,"_tracer")andisinstance(cast(otel_trace.ProxyTracer,tracer,)._tracer,otel_trace.NoOpTracer,)):otel_tracer_provider=cast(TracerProvider,existing_provider)else:otel_tracer_provider=get_otlp_tracer_provider()otel_trace.set_tracer_provider(otel_tracer_provider)self.otel_exporter=OTELExporter(tracer_provider=otel_tracer_provider)
def_repr_html_(self)->str:"""Return an HTML representation of the instance with a link to the URL. Returns: str: The HTML representation of the instance. """link=self._host_urlreturnf'<a href="{link}", target="_blank" rel="noopener">LangSmith Client</a>'def__repr__(self)->str:"""Return a string representation of the instance with a link to the URL. Returns: str: The string representation of the instance. """returnf"Client (API URL: {self.api_url})"@propertydef_host(self)->str:return_parse_url(self.api_url)@propertydef_host_url(self)->str:"""The web host url."""returnls_utils.get_host_url(self._web_url,self.api_url)@propertydef_headers(self)->Dict[str,str]:"""Get the headers for the API request. Returns: Dict[str, str]: The headers for the API request. """headers={"User-Agent":f"langsmith-py/{langsmith.__version__}","Accept":"application/json",}ifself.api_key:headers[X_API_KEY]=self.api_keyreturnheaders@propertydefinfo(self)->ls_schemas.LangSmithInfo:"""Get the information about the LangSmith API. Returns: ls_schemas.LangSmithInfo: The information about the LangSmith API, or None if the API is not available. """ifself._infoisNone:try:response=self.request_with_retries("GET","/info",headers={"Accept":"application/json"},timeout=(self.timeout_ms[0]/1000,self.timeout_ms[1]/1000),)ls_utils.raise_for_status_with_text(response)self._info=ls_schemas.LangSmithInfo(**response.json())exceptBaseExceptionase:logger.warning(f"Failed to get info from {self.api_url}: {repr(e)}",)self._info=ls_schemas.LangSmithInfo()returnself._infodef_get_settings(self)->ls_schemas.LangSmithSettings:"""Get the settings for the current tenant. Returns: dict: The settings for the current tenant. """ifself._settingsisNone:response=self.request_with_retries("GET","/settings")ls_utils.raise_for_status_with_text(response)self._settings=ls_schemas.LangSmithSettings(**response.json())returnself._settingsdef_content_above_size(self,content_length:Optional[int])->Optional[str]:ifcontent_lengthisNoneorself._infoisNone:returnNoneinfo=cast(ls_schemas.LangSmithInfo,self._info)bic=info.batch_ingest_configifnotbic:returnNonesize_limit=bic.get("size_limit_bytes")ifsize_limitisNone:returnNoneifcontent_length>size_limit:return(f"The content length of {content_length} bytes exceeds the "f"maximum size limit of {size_limit} bytes.")returnNone
[docs]defrequest_with_retries(self,/,method:Literal["GET","POST","PUT","PATCH","DELETE"],pathname:str,*,request_kwargs:Optional[Mapping]=None,stop_after_attempt:int=1,retry_on:Optional[Sequence[Type[BaseException]]]=None,to_ignore:Optional[Sequence[Type[BaseException]]]=None,handle_response:Optional[Callable[[requests.Response,int],Any]]=None,_context:str="",**kwargs:Any,)->requests.Response:"""Send a request with retries. Args: method (str): The HTTP request method. pathname (str): The pathname of the request URL. Will be appended to the API URL. request_kwargs (Mapping): Additional request parameters. stop_after_attempt (int, default=1): The number of attempts to make. retry_on (Optional[Sequence[Type[BaseException]]]): The exceptions to retry on. In addition to: [LangSmithConnectionError, LangSmithAPIError]. to_ignore (Optional[Sequence[Type[BaseException]]]): The exceptions to ignore / pass on. handle_response (Optional[Callable[[requests.Response, int], Any]]): A function to handle the response and return whether to continue retrying. _context (str, default=""): The context of the request. **kwargs (Any): Additional keyword arguments to pass to the request. Returns: requests.Response: The response object. Raises: LangSmithAPIError: If a server error occurs. LangSmithUserError: If the request fails. LangSmithConnectionError: If a connection error occurs. LangSmithError: If the request fails. """request_kwargs=request_kwargsor{}request_kwargs={"timeout":(self.timeout_ms[0]/1000,self.timeout_ms[1]/1000),**request_kwargs,**kwargs,"headers":{**self._headers,**request_kwargs.get("headers",{}),**kwargs.get("headers",{}),},}if(method!="GET"and"data"inrequest_kwargsand"files"notinrequest_kwargsandnotrequest_kwargs["headers"].get("Content-Type")):request_kwargs["headers"]["Content-Type"]="application/json"logging_filters=[ls_utils.FilterLangSmithRetry(),ls_utils.FilterPoolFullWarning(host=str(self._host)),]retry_on_:Tuple[Type[BaseException],...]=(*(retry_onor()),*(ls_utils.LangSmithConnectionError,ls_utils.LangSmithRequestTimeout,# 408ls_utils.LangSmithAPIError,# 500),)to_ignore_:Tuple[Type[BaseException],...]=(*(to_ignoreor()),)response=Noneforidxinrange(stop_after_attempt):try:try:withls_utils.filter_logs(_urllib3_logger,logging_filters):response=self.session.request(method,_construct_url(self.api_url,pathname),stream=False,**request_kwargs,)ls_utils.raise_for_status_with_text(response)returnresponseexceptrequests.exceptions.ReadTimeoutase:logger.debug("Passing on exception %s",e)ifidx+1==stop_after_attempt:raisesleep_time=2**idx+(random.random()*0.5)time.sleep(sleep_time)continueexceptrequests.HTTPErrorase:ifresponseisnotNone:ifhandle_responseisnotNone:ifidx+1<stop_after_attempt:should_continue=handle_response(response,idx+1)ifshould_continue:continueifresponse.status_code==500:raisels_utils.LangSmithAPIError(f"Server error caused failure to {method}"f" {pathname} in"f" LangSmith API. {repr(e)}"f"{_context}")elifresponse.status_code==408:raisels_utils.LangSmithRequestTimeout(f"Client took too long to send request to {method}"f"{pathname}{_context}")elifresponse.status_code==429:raisels_utils.LangSmithRateLimitError(f"Rate limit exceeded for {pathname}. {repr(e)}"f"{_context}")elifresponse.status_code==401:raisels_utils.LangSmithAuthError(f"Authentication failed for {pathname}. {repr(e)}"f"{_context}")elifresponse.status_code==404:raisels_utils.LangSmithNotFoundError(f"Resource not found for {pathname}. {repr(e)}"f"{_context}")elifresponse.status_code==409:raisels_utils.LangSmithConflictError(f"Conflict for {pathname}. {repr(e)}"f"{_context}")else:raisels_utils.LangSmithError(f"Failed to {method}{pathname} in LangSmith"f" API. {repr(e)}")else:raisels_utils.LangSmithUserError(f"Failed to {method}{pathname} in LangSmith API."f" {repr(e)}")exceptrequests.ConnectionErrorase:recommendation=("Please confirm your LANGCHAIN_ENDPOINT."ifself.api_url!="https://api.smith.langchain.com"else"Please confirm your internet connection.")try:content_length=int(str(e.request.headers.get("Content-Length"))ife.requestelse"")size_rec=self._content_above_size(content_length)ifsize_rec:recommendation=size_recexceptValueError:content_length=Noneapi_key=(e.request.headers.get("x-api-key")or""ife.requestelse"")prefix,suffix=api_key[:5],api_key[-2:]filler="*"*(max(0,len(api_key)-7))masked_api_key=f"{prefix}{filler}{suffix}"raisels_utils.LangSmithConnectionError(f"Connection error caused failure to {method}{pathname}"f" in LangSmith API. {recommendation}"f" {repr(e)}"f"\nContent-Length: {content_length}"f"\nAPI Key: {masked_api_key}"f"{_context}")fromeexceptExceptionase:args=list(e.args)msg=args[1]iflen(args)>1else""msg=msg.replace("session","session (project)")ifargs:emsg="\n".join([str(args[0])]+[msg]+[str(arg)forargin(args[2:]iflen(args)>2else[])])else:emsg=msgraisels_utils.LangSmithError(f"Failed to {method}{pathname} in LangSmith API. {emsg}"f"{_context}")fromeexceptto_ignore_ase:ifresponseisnotNone:logger.debug("Passing on exception %s",e)returnresponseexceptls_utils.LangSmithRateLimitError:ifidx+1==stop_after_attempt:raiseifresponseisnotNone:try:retry_after=float(response.headers.get("retry-after","30"))exceptExceptionase:logger.warning("Invalid retry-after header: %s",repr(e),)retry_after=30# Add exponential backoffretry_after=retry_after*2**idx+random.random()time.sleep(retry_after)exceptretry_on_:# Handle other exceptions more immediatelyifidx+1==stop_after_attempt:raisesleep_time=2**idx+(random.random()*0.5)time.sleep(sleep_time)continue# Else we still raise an errorraisels_utils.LangSmithError(f"Failed to {method}{pathname} in LangSmith API.")
def_get_paginated_list(self,path:str,*,params:Optional[dict]=None)->Iterator[dict]:"""Get a paginated list of items. Args: path (str): The path of the request URL. params (Optional[dict]): The query parameters. Yields: The items in the paginated list. """params_=params.copy()ifparamselse{}offset=params_.get("offset",0)params_["limit"]=params_.get("limit",100)whileTrue:params_["offset"]=offsetresponse=self.request_with_retries("GET",path,params=params_,)items=response.json()ifnotitems:breakyield fromitemsiflen(items)<params_["limit"]:# offset and limit isn't respected if we're# querying for specific valuesbreakoffset+=len(items)def_get_cursor_paginated_list(self,path:str,*,body:Optional[dict]=None,request_method:Literal["GET","POST"]="POST",data_key:str="runs",)->Iterator[dict]:"""Get a cursor paginated list of items. Args: path (str): The path of the request URL. body (Optional[dict]): The query body. request_method (Literal["GET", "POST"], default="POST"): The HTTP request method. data_key (str, default="runs"): The key in the response body that contains the items. Yields: The items in the paginated list. """params_=body.copy()ifbodyelse{}whileTrue:response=self.request_with_retries(request_method,path,request_kwargs={"data":_dumps_json(params_),},)response_body=response.json()ifnotresponse_body:breakifnotresponse_body.get(data_key):breakyield fromresponse_body[data_key]cursors=response_body.get("cursors")ifnotcursors:breakifnotcursors.get("next"):breakparams_["cursor"]=cursors["next"]
[docs]defupload_dataframe(self,df:pd.DataFrame,name:str,input_keys:Sequence[str],output_keys:Sequence[str],*,description:Optional[str]=None,data_type:Optional[ls_schemas.DataType]=ls_schemas.DataType.kv,)->ls_schemas.Dataset:"""Upload a dataframe as individual examples to the LangSmith API. Args: df (pd.DataFrame): The dataframe to upload. name (str): The name of the dataset. input_keys (Sequence[str]): The input keys. output_keys (Sequence[str]): The output keys. description (Optional[str]): The description of the dataset. data_type (Optional[DataType]): The data type of the dataset. Returns: Dataset: The uploaded dataset. Raises: ValueError: If the csv_file is not a string or tuple. Examples: .. code-block:: python from langsmith import Client import os import pandas as pd client = Client() df = pd.read_parquet("path/to/your/myfile.parquet") input_keys = ["column1", "column2"] # replace with your input column names output_keys = ["output1", "output2"] # replace with your output column names dataset = client.upload_dataframe( df=df, input_keys=input_keys, output_keys=output_keys, name="My Parquet Dataset", description="Dataset created from a parquet file", data_type="kv", # The default ) """csv_file=io.BytesIO()df.to_csv(csv_file,index=False)csv_file.seek(0)returnself.upload_csv(("data.csv",csv_file),input_keys=input_keys,output_keys=output_keys,description=description,name=name,data_type=data_type,)
[docs]defupload_csv(self,csv_file:Union[str,Tuple[str,io.BytesIO]],input_keys:Sequence[str],output_keys:Sequence[str],*,name:Optional[str]=None,description:Optional[str]=None,data_type:Optional[ls_schemas.DataType]=ls_schemas.DataType.kv,)->ls_schemas.Dataset:"""Upload a CSV file to the LangSmith API. Args: csv_file (Union[str, Tuple[str, io.BytesIO]]): The CSV file to upload. If a string, it should be the path If a tuple, it should be a tuple containing the filename and a BytesIO object. input_keys (Sequence[str]): The input keys. output_keys (Sequence[str]): The output keys. name (Optional[str]): The name of the dataset. description (Optional[str]): The description of the dataset. data_type (Optional[ls_schemas.DataType]): The data type of the dataset. Returns: Dataset: The uploaded dataset. Raises: ValueError: If the csv_file is not a string or tuple. Examples: .. code-block:: python from langsmith import Client import os client = Client() csv_file = "path/to/your/myfile.csv" input_keys = ["column1", "column2"] # replace with your input column names output_keys = ["output1", "output2"] # replace with your output column names dataset = client.upload_csv( csv_file=csv_file, input_keys=input_keys, output_keys=output_keys, name="My CSV Dataset", description="Dataset created from a CSV file", data_type="kv", # The default ) """data={"input_keys":input_keys,"output_keys":output_keys,}ifname:data["name"]=nameifdescription:data["description"]=descriptionifdata_type:data["data_type"]=ls_utils.get_enum_value(data_type)data["id"]=str(uuid.uuid4())ifisinstance(csv_file,str):withopen(csv_file,"rb")asf:file_={"file":f}response=self.request_with_retries("POST","/datasets/upload",data=data,files=file_,)elifisinstance(csv_file,tuple):response=self.request_with_retries("POST","/datasets/upload",data=data,files={"file":csv_file},)else:raiseValueError("csv_file must be a string or tuple")ls_utils.raise_for_status_with_text(response)result=response.json()# TODO: Make this more robust server-sideif"detail"inresultand"already exists"inresult["detail"]:file_name=csv_fileifisinstance(csv_file,str)elsecsv_file[0]file_name=file_name.split("/")[-1]raiseValueError(f"Dataset {file_name} already exists")returnls_schemas.Dataset(**result,_host_url=self._host_url,_tenant_id=self._get_optional_tenant_id(),)
def_run_transform(self,run:Union[ls_schemas.Run,dict,ls_schemas.RunLikeDict],update:bool=False,copy:bool=False,)->dict:"""Transform the given run object into a dictionary representation. Args: run (Union[ls_schemas.Run, dict]): The run object to transform. update (Optional[bool]): Whether the payload is for an "update" event. copy (Optional[bool]): Whether to deepcopy run inputs/outputs. Returns: dict: The transformed run object as a dictionary. """ifhasattr(run,"dict")andcallable(getattr(run,"dict")):run_create:dict=run.dict()# type: ignoreelse:run_create=cast(dict,run)if"id"notinrun_create:run_create["id"]=uuid.uuid4()elifisinstance(run_create["id"],str):run_create["id"]=uuid.UUID(run_create["id"])if"inputs"inrun_createandrun_create["inputs"]isnotNone:ifcopy:run_create["inputs"]=ls_utils.deepish_copy(run_create["inputs"])run_create["inputs"]=self._hide_run_inputs(run_create["inputs"])if"outputs"inrun_createandrun_create["outputs"]isnotNone:ifcopy:run_create["outputs"]=ls_utils.deepish_copy(run_create["outputs"])run_create["outputs"]=self._hide_run_outputs(run_create["outputs"])ifnotupdateandnotrun_create.get("start_time"):run_create["start_time"]=datetime.datetime.now(datetime.timezone.utc)# Only retain LLM & Prompt manifestsif"serialized"inrun_create:ifrun_create.get("run_type")notin("llm","prompt"):# Drop completelyrun_create.pop("serialized",None)elifrun_create.get("serialized"):# Drop graphrun_create["serialized"].pop("graph",None)returnrun_create@staticmethoddef_insert_runtime_env(runs:Sequence[dict])->None:runtime_env=ls_env.get_runtime_environment()forrun_createinruns:run_extra=cast(dict,run_create.setdefault("extra",{}))# update runtimeruntime:dict=run_extra.setdefault("runtime",{})run_extra["runtime"]={**runtime_env,**runtime}# update metadatametadata:dict=run_extra.setdefault("metadata",{})langchain_metadata=ls_env.get_langchain_env_var_metadata()metadata.update({k:vfork,vinlangchain_metadata.items()ifknotinmetadata})def_should_sample(self)->bool:ifself.tracing_sample_rateisNone:returnTruereturnrandom.random()<self.tracing_sample_ratedef_filter_for_sampling(self,runs:Iterable[dict],*,patch:bool=False)->list[dict]:ifself.tracing_sample_rateisNone:returnlist(runs)ifpatch:sampled=[]forruninruns:run_id=_as_uuid(run["id"])ifrun_idnotinself._filtered_post_uuids:sampled.append(run)else:self._filtered_post_uuids.remove(run_id)returnsampledelse:sampled=[]forruninruns:trace_id=run.get("trace_id")orrun["id"]# If we've already made a decision about this trace, follow itiftrace_idinself._filtered_post_uuids:continue# For new traces, apply samplingifrun["id"]==trace_id:ifself._should_sample():sampled.append(run)else:self._filtered_post_uuids.add(trace_id)else:# Child runs follow their trace's sampling decisionsampled.append(run)returnsampled
[docs]defcreate_run(self,name:str,inputs:Dict[str,Any],run_type:RUN_TYPE_T,*,project_name:Optional[str]=None,revision_id:Optional[str]=None,dangerously_allow_filesystem:bool=False,**kwargs:Any,)->None:"""Persist a run to the LangSmith API. Args: name (str): The name of the run. inputs (Dict[str, Any]): The input values for the run. run_type (str): The type of the run, such as tool, chain, llm, retriever, embedding, prompt, or parser. project_name (Optional[str]): The project name of the run. revision_id (Optional[Union[UUID, str]]): The revision ID of the run. **kwargs (Any): Additional keyword arguments. Returns: None Raises: LangSmithUserError: If the API key is not provided when using the hosted service. Examples: .. code-block:: python from langsmith import Client import datetime from uuid import uuid4 client = Client() run_id = uuid4() client.create_run( id=run_id, project_name=project_name, name="test_run", run_type="llm", inputs={"prompt": "hello world"}, outputs={"generation": "hi there"}, start_time=datetime.datetime.now(datetime.timezone.utc), end_time=datetime.datetime.now(datetime.timezone.utc), hide_inputs=True, hide_outputs=True, ) """project_name=project_nameorkwargs.pop("session_name",# if the project is not provided, use the environment's projectls_utils.get_tracer_project(),)run_create={**kwargs,"session_name":project_name,"name":name,"inputs":inputs,"run_type":run_type,}ifnotself._filter_for_sampling([run_create]):returnifrevision_idisnotNone:run_create["extra"]["metadata"]["revision_id"]=revision_idrun_create=self._run_transform(run_create,copy=False)self._insert_runtime_env([run_create])ifrun_create.get("attachments")isnotNone:forattachmentinrun_create["attachments"].values():if(isinstance(attachment,tuple)andisinstance(attachment[1],Path)andnotdangerously_allow_filesystem):raiseValueError("Must set dangerously_allow_filesystem=True to allow passing in Paths for attachments.")if(# batch ingest requires trace_id and dotted_order to be setrun_create.get("trace_id")isnotNoneandrun_create.get("dotted_order")isnotNone):ifself._pyo3_clientisnotNone:self._pyo3_client.create_run(run_create)elifself.compressed_tracesisnotNone:ifself._data_available_eventisNone:raiseValueError("Run compression is enabled but threading event is not configured")serialized_op=serialize_run_dict("post",run_create)multipart_form,opened_files=(serialized_run_operation_to_multipart_parts_and_context(serialized_op))withself.compressed_traces.lock:compress_multipart_parts_and_context(multipart_form,self.compressed_traces,_BOUNDARY,)self.compressed_traces.trace_count+=1self._data_available_event.set()_close_files(list(opened_files.values()))elifself.tracing_queueisnotNone:serialized_op=serialize_run_dict("post",run_create)self.tracing_queue.put(TracingQueueItem(run_create["dotted_order"],serialized_op))else:# Neither Rust nor Python batch ingestion is configured,# fall back to the non-batch approach.self._create_run(run_create)else:self._create_run(run_create)
def_create_run(self,run_create:dict):forapi_url,api_keyinself._write_api_urls.items():headers={**self._headers,X_API_KEY:api_key}self.request_with_retries("POST",f"{api_url}/runs",request_kwargs={"data":_dumps_json(run_create),"headers":headers,},to_ignore=(ls_utils.LangSmithConflictError,),)def_hide_run_inputs(self,inputs:dict):ifself._hide_inputsisTrue:return{}ifself._anonymizer:json_inputs=_orjson.loads(_dumps_json(inputs))returnself._anonymizer(json_inputs)ifself._hide_inputsisFalse:returninputsreturnself._hide_inputs(inputs)def_hide_run_outputs(self,outputs:dict):ifself._hide_outputsisTrue:return{}ifself._anonymizer:json_outputs=_orjson.loads(_dumps_json(outputs))returnself._anonymizer(json_outputs)ifself._hide_outputsisFalse:returnoutputsreturnself._hide_outputs(outputs)def_batch_ingest_run_ops(self,ops:List[SerializedRunOperation],)->None:ids_and_partial_body:dict[Literal["post","patch"],list[tuple[str,bytes]]]={"post":[],"patch":[],}# form the partial body and idsforopinops:ifisinstance(op,SerializedRunOperation):curr_dict=_orjson.loads(op._none)ifop.inputs:curr_dict["inputs"]=_orjson.Fragment(op.inputs)ifop.outputs:curr_dict["outputs"]=_orjson.Fragment(op.outputs)ifop.events:curr_dict["events"]=_orjson.Fragment(op.events)ifop.attachments:logger.warning("Attachments are not supported when use_multipart_endpoint ""is False")ids_and_partial_body[op.operation].append((f"trace={op.trace_id},id={op.id}",_orjson.dumps(curr_dict)))elifisinstance(op,SerializedFeedbackOperation):logger.warning("Feedback operations are not supported in non-multipart mode")else:logger.error("Unknown item type in tracing queue: %s",type(op))# send the requests in batchesinfo=self.infosize_limit_bytes=(info.batch_ingest_configor{}).get("size_limit_bytes")or_SIZE_LIMIT_BYTESbody_chunks:DefaultDict[str,list]=collections.defaultdict(list)context_ids:DefaultDict[str,list]=collections.defaultdict(list)body_size=0forkeyincast(List[Literal["post","patch"]],["post","patch"]):body_deque=collections.deque(ids_and_partial_body[key])whilebody_deque:if(body_size>0andbody_size+len(body_deque[0][1])>size_limit_bytes):self._post_batch_ingest_runs(_orjson.dumps(body_chunks),_context=f"\n{key}: {'; '.join(context_ids[key])}",)body_size=0body_chunks.clear()context_ids.clear()curr_id,curr_body=body_deque.popleft()body_size+=len(curr_body)body_chunks[key].append(_orjson.Fragment(curr_body))context_ids[key].append(curr_id)ifbody_size:context="; ".join(f"{k}: {'; '.join(v)}"fork,vincontext_ids.items())self._post_batch_ingest_runs(_orjson.dumps(body_chunks),_context="\n"+context)
[docs]defbatch_ingest_runs(self,create:Optional[Sequence[Union[ls_schemas.Run,ls_schemas.RunLikeDict,Dict]]]=None,update:Optional[Sequence[Union[ls_schemas.Run,ls_schemas.RunLikeDict,Dict]]]=None,*,pre_sampled:bool=False,)->None:"""Batch ingest/upsert multiple runs in the Langsmith system. Args: create (Optional[Sequence[Union[Run, RunLikeDict]]]): A sequence of `Run` objects or equivalent dictionaries representing runs to be created / posted. update (Optional[Sequence[Union[Run, RunLikeDict]]]): A sequence of `Run` objects or equivalent dictionaries representing runs that have already been created and should be updated / patched. pre_sampled (bool, default=False): Whether the runs have already been subject to sampling, and therefore should not be sampled again. Defaults to False. Raises: LangsmithAPIError: If there is an error in the API request. Returns: None Note: - The run objects MUST contain the dotted_order and trace_id fields to be accepted by the API. Examples: .. code-block:: python from langsmith import Client import datetime from uuid import uuid4 client = Client() _session = "__test_batch_ingest_runs" trace_id = uuid4() trace_id_2 = uuid4() run_id_2 = uuid4() current_time = datetime.datetime.now(datetime.timezone.utc).strftime( "%Y%m%dT%H%M%S%fZ" ) later_time = ( datetime.datetime.now(datetime.timezone.utc) + timedelta(seconds=1) ).strftime("%Y%m%dT%H%M%S%fZ") runs_to_create = [ { "id": str(trace_id), "session_name": _session, "name": "run 1", "run_type": "chain", "dotted_order": f"{current_time}{str(trace_id)}", "trace_id": str(trace_id), "inputs": {"input1": 1, "input2": 2}, "outputs": {"output1": 3, "output2": 4}, }, { "id": str(trace_id_2), "session_name": _session, "name": "run 3", "run_type": "chain", "dotted_order": f"{current_time}{str(trace_id_2)}", "trace_id": str(trace_id_2), "inputs": {"input1": 1, "input2": 2}, "error": "error", }, { "id": str(run_id_2), "session_name": _session, "name": "run 2", "run_type": "chain", "dotted_order": f"{current_time}{str(trace_id)}." f"{later_time}{str(run_id_2)}", "trace_id": str(trace_id), "parent_run_id": str(trace_id), "inputs": {"input1": 5, "input2": 6}, }, ] runs_to_update = [ { "id": str(run_id_2), "dotted_order": f"{current_time}{str(trace_id)}." f"{later_time}{str(run_id_2)}", "trace_id": str(trace_id), "parent_run_id": str(trace_id), "outputs": {"output1": 4, "output2": 5}, }, ] client.batch_ingest_runs(create=runs_to_create, update=runs_to_update) """ifnotcreateandnotupdate:return# transform and convert to dictscreate_dicts=[self._run_transform(run,copy=False)forrunincreateorEMPTY_SEQ]update_dicts=[self._run_transform(run,update=True,copy=False)forruninupdateorEMPTY_SEQ]forrunincreate_dicts:ifnotrun.get("trace_id")ornotrun.get("dotted_order"):raisels_utils.LangSmithUserError("Batch ingest requires trace_id and dotted_order to be set.")forruninupdate_dicts:ifnotrun.get("trace_id")ornotrun.get("dotted_order"):raisels_utils.LangSmithUserError("Batch ingest requires trace_id and dotted_order to be set.")# filter out runs that are not sampledifnotpre_sampled:create_dicts=self._filter_for_sampling(create_dicts)update_dicts=self._filter_for_sampling(update_dicts,patch=True)ifnotcreate_dictsandnotupdate_dicts:returnself._insert_runtime_env(create_dicts+update_dicts)# convert to serialized opsserialized_ops=cast(List[SerializedRunOperation],combine_serialized_queue_operations(list(itertools.chain((serialize_run_dict("post",run)forrunincreate_dicts),(serialize_run_dict("patch",run)forruninupdate_dicts),))),)self._batch_ingest_run_ops(serialized_ops)
def_post_batch_ingest_runs(self,body:bytes,*,_context:str):forapi_url,api_keyinself._write_api_urls.items():try:self.request_with_retries("POST",f"{api_url}/runs/batch",request_kwargs={"data":body,"headers":{**self._headers,X_API_KEY:api_key,},},to_ignore=(ls_utils.LangSmithConflictError,),stop_after_attempt=3,_context=_context,)exceptExceptionase:try:exc_desc_lines=traceback.format_exception_only(type(e),e)exc_desc="".join(exc_desc_lines).rstrip()logger.warning(f"Failed to batch ingest runs: {exc_desc}")exceptException:logger.warning(f"Failed to batch ingest runs: {repr(e)}")def_multipart_ingest_ops(self,ops:list[Union[SerializedRunOperation,SerializedFeedbackOperation]])->None:parts:list[MultipartPartsAndContext]=[]opened_files_dict:Dict[str,io.BufferedReader]={}foropinops:ifisinstance(op,SerializedRunOperation):part,opened_files=(serialized_run_operation_to_multipart_parts_and_context(op))parts.append(part)opened_files_dict.update(opened_files)elifisinstance(op,SerializedFeedbackOperation):parts.append(serialized_feedback_operation_to_multipart_parts_and_context(op))else:logger.error("Unknown operation type in tracing queue: %s",type(op))acc_multipart=join_multipart_parts_and_context(parts)ifacc_multipart:try:self._send_multipart_req(acc_multipart)finally:_close_files(list(opened_files_dict.values()))
[docs]defmultipart_ingest(self,create:Optional[Sequence[Union[ls_schemas.Run,ls_schemas.RunLikeDict,Dict]]]=None,update:Optional[Sequence[Union[ls_schemas.Run,ls_schemas.RunLikeDict,Dict]]]=None,*,pre_sampled:bool=False,dangerously_allow_filesystem:bool=False,)->None:"""Batch ingest/upsert multiple runs in the Langsmith system. Args: create (Optional[Sequence[Union[ls_schemas.Run, RunLikeDict]]]): A sequence of `Run` objects or equivalent dictionaries representing runs to be created / posted. update (Optional[Sequence[Union[ls_schemas.Run, RunLikeDict]]]): A sequence of `Run` objects or equivalent dictionaries representing runs that have already been created and should be updated / patched. pre_sampled (bool, default=False): Whether the runs have already been subject to sampling, and therefore should not be sampled again. Defaults to False. Raises: LangsmithAPIError: If there is an error in the API request. Returns: None Note: - The run objects MUST contain the dotted_order and trace_id fields to be accepted by the API. Examples: .. code-block:: python from langsmith import Client import datetime from uuid import uuid4 client = Client() _session = "__test_batch_ingest_runs" trace_id = uuid4() trace_id_2 = uuid4() run_id_2 = uuid4() current_time = datetime.datetime.now(datetime.timezone.utc).strftime( "%Y%m%dT%H%M%S%fZ" ) later_time = ( datetime.datetime.now(datetime.timezone.utc) + timedelta(seconds=1) ).strftime("%Y%m%dT%H%M%S%fZ") runs_to_create = [ { "id": str(trace_id), "session_name": _session, "name": "run 1", "run_type": "chain", "dotted_order": f"{current_time}{str(trace_id)}", "trace_id": str(trace_id), "inputs": {"input1": 1, "input2": 2}, "outputs": {"output1": 3, "output2": 4}, }, { "id": str(trace_id_2), "session_name": _session, "name": "run 3", "run_type": "chain", "dotted_order": f"{current_time}{str(trace_id_2)}", "trace_id": str(trace_id_2), "inputs": {"input1": 1, "input2": 2}, "error": "error", }, { "id": str(run_id_2), "session_name": _session, "name": "run 2", "run_type": "chain", "dotted_order": f"{current_time}{str(trace_id)}." f"{later_time}{str(run_id_2)}", "trace_id": str(trace_id), "parent_run_id": str(trace_id), "inputs": {"input1": 5, "input2": 6}, }, ] runs_to_update = [ { "id": str(run_id_2), "dotted_order": f"{current_time}{str(trace_id)}." f"{later_time}{str(run_id_2)}", "trace_id": str(trace_id), "parent_run_id": str(trace_id), "outputs": {"output1": 4, "output2": 5}, }, ] client.multipart_ingest(create=runs_to_create, update=runs_to_update) """ifnot(createorupdate):return# transform and convert to dictscreate_dicts=[self._run_transform(run)forrunincreateorEMPTY_SEQ]update_dicts=[self._run_transform(run,update=True)forruninupdateorEMPTY_SEQ]# require trace_id and dotted_orderifcreate_dicts:forrunincreate_dicts:ifnotrun.get("trace_id")ornotrun.get("dotted_order"):raisels_utils.LangSmithUserError("Multipart ingest requires trace_id and dotted_order"" to be set in create dicts.")else:delrunifupdate_dicts:forruninupdate_dicts:ifnotrun.get("trace_id")ornotrun.get("dotted_order"):raisels_utils.LangSmithUserError("Multipart ingest requires trace_id and dotted_order"" to be set in update dicts.")else:delrun# combine post and patch dicts where possibleifupdate_dictsandcreate_dicts:create_by_id={run["id"]:runforrunincreate_dicts}standalone_updates:list[dict]=[]forruninupdate_dicts:ifrun["id"]increate_by_id:fork,vinrun.items():ifvisnotNone:create_by_id[run["id"]][k]=velse:standalone_updates.append(run)else:delrunupdate_dicts=standalone_updates# filter out runs that are not sampledifnotpre_sampled:create_dicts=self._filter_for_sampling(create_dicts)update_dicts=self._filter_for_sampling(update_dicts,patch=True)ifnotcreate_dictsandnotupdate_dicts:return# insert runtime environmentself._insert_runtime_env(create_dicts)self._insert_runtime_env(update_dicts)# format as serialized operationsserialized_ops=combine_serialized_queue_operations(list(itertools.chain((serialize_run_dict("post",run)forrunincreate_dicts),(serialize_run_dict("patch",run)forruninupdate_dicts),)))foropinserialized_ops:ifisinstance(op,SerializedRunOperation)andop.attachments:forattachmentinop.attachments.values():if(isinstance(attachment,tuple)andisinstance(attachment[1],Path)andnotdangerously_allow_filesystem):raiseValueError("Must set dangerously_allow_filesystem=True to allow passing in Paths for attachments.")# sent the runs in multipart requestsself._multipart_ingest_ops(serialized_ops)
def_send_multipart_req(self,acc:MultipartPartsAndContext,*,attempts:int=3):parts=acc.parts_context=acc.contextforapi_url,api_keyinself._write_api_urls.items():foridxinrange(1,attempts+1):try:encoder=rqtb_multipart.MultipartEncoder(parts,boundary=_BOUNDARY)ifencoder.len<=20_000_000:# ~20 MBdata=encoder.to_string()else:data=encoderself.request_with_retries("POST",f"{api_url}/runs/multipart",request_kwargs={"data":data,"headers":{**self._headers,X_API_KEY:api_key,"Content-Type":encoder.content_type,},},stop_after_attempt=1,_context=_context,)breakexceptls_utils.LangSmithConflictError:breakexcept(ls_utils.LangSmithConnectionError,ls_utils.LangSmithRequestTimeout,ls_utils.LangSmithAPIError,)asexc:ifidx==attempts:logger.warning(f"Failed to multipart ingest runs: {exc}")else:continueexceptExceptionase:try:exc_desc_lines=traceback.format_exception_only(type(e),e)exc_desc="".join(exc_desc_lines).rstrip()logger.warning(f"Failed to multipart ingest runs: {exc_desc}")exceptException:logger.warning(f"Failed to multipart ingest runs: {repr(e)}")# do not retry by defaultreturndef_send_compressed_multipart_req(self,data_stream:io.BytesIO,compressed_traces_info:Optional[Tuple[int,int]],*,attempts:int=3,):"""Send a zstd-compressed multipart form data stream to the backend."""_context:str=""forapi_url,api_keyinself._write_api_urls.items():data_stream.seek(0)foridxinrange(1,attempts+1):try:headers={**self._headers,"X-API-KEY":api_key,"Content-Type":f"multipart/form-data; boundary={_BOUNDARY}","Content-Encoding":"zstd","X-Pre-Compressed-Size":(str(compressed_traces_info[0])ifcompressed_traces_infoelse""),"X-Post-Compressed-Size":(str(compressed_traces_info[1])ifcompressed_traces_infoelse""),}self.request_with_retries("POST",f"{api_url}/runs/multipart",request_kwargs={"data":data_stream,"headers":headers,},stop_after_attempt=1,_context=_context,)breakexceptls_utils.LangSmithConflictError:breakexcept(ls_utils.LangSmithConnectionError,ls_utils.LangSmithRequestTimeout,ls_utils.LangSmithAPIError,)asexc:ifidx==attempts:logger.warning(f"Failed to send compressed multipart ingest: {exc}")else:continueexceptExceptionase:try:exc_desc_lines=traceback.format_exception_only(type(e),e)exc_desc="".join(exc_desc_lines).rstrip()logger.warning(f"Failed to send compressed multipart ingest: {exc_desc}")exceptException:logger.warning(f"Failed to send compressed multipart ingest: {repr(e)}")# Do not retry by default after unknown exceptionsreturn
[docs]defupdate_run(self,run_id:ID_TYPE,*,name:Optional[str]=None,end_time:Optional[datetime.datetime]=None,error:Optional[str]=None,inputs:Optional[Dict]=None,outputs:Optional[Dict]=None,events:Optional[Sequence[dict]]=None,extra:Optional[Dict]=None,tags:Optional[List[str]]=None,attachments:Optional[ls_schemas.Attachments]=None,dangerously_allow_filesystem:bool=False,**kwargs:Any,)->None:"""Update a run in the LangSmith API. Args: run_id (Union[UUID, str]): The ID of the run to update. name (Optional[str]): The name of the run. end_time (Optional[datetime.datetime]): The end time of the run. error (Optional[str]): The error message of the run. inputs (Optional[Dict]): The input values for the run. outputs (Optional[Dict]): The output values for the run. events (Optional[Sequence[dict]]): The events for the run. extra (Optional[Dict]): The extra information for the run. tags (Optional[List[str]]): The tags for the run. attachments (Optional[Dict[str, Attachment]]): A dictionary of attachments to add to the run. The keys are the attachment names, and the values are Attachment objects containing the data and mime type. **kwargs (Any): Kwargs are ignored. Returns: None Examples: .. code-block:: python from langsmith import Client import datetime from uuid import uuid4 client = Client() project_name = "__test_update_run" start_time = datetime.datetime.now() revision_id = uuid4() run: dict = dict( id=uuid4(), name="test_run", run_type="llm", inputs={"text": "hello world"}, project_name=project_name, api_url=os.getenv("LANGCHAIN_ENDPOINT"), start_time=start_time, extra={"extra": "extra"}, revision_id=revision_id, ) # Create the run client.create_run(**run) run["outputs"] = {"output": ["Hi"]} run["extra"]["foo"] = "bar" run["name"] = "test_run_updated" # Update the run client.update_run(run["id"], **run) """data:Dict[str,Any]={"id":_as_uuid(run_id,"run_id"),"name":name,"trace_id":kwargs.pop("trace_id",None),"parent_run_id":kwargs.pop("parent_run_id",None),"dotted_order":kwargs.pop("dotted_order",None),"tags":tags,"extra":extra,"session_id":kwargs.pop("session_id",None),"session_name":kwargs.pop("session_name",None),}ifattachments:for_,attachmentinattachments.items():if(isinstance(attachment,tuple)andisinstance(attachment[1],Path)andnotdangerously_allow_filesystem):raiseValueError("Must set dangerously_allow_filesystem=True to allow passing in Paths for attachments.")data["attachments"]=attachmentsuse_multipart=((self.tracing_queueisnotNoneorself.compressed_tracesisnotNone)# batch ingest requires trace_id and dotted_order to be setanddata["trace_id"]isnotNoneanddata["dotted_order"]isnotNone)ifnotself._filter_for_sampling([data],patch=True):returnifend_timeisnotNone:data["end_time"]=end_time.isoformat()else:data["end_time"]=datetime.datetime.now(datetime.timezone.utc).isoformat()iferrorisnotNone:data["error"]=errorifinputsisnotNone:data["inputs"]=self._hide_run_inputs(inputs)ifoutputsisnotNone:ifnotuse_multipart:outputs=ls_utils.deepish_copy(outputs)data["outputs"]=self._hide_run_outputs(outputs)ifeventsisnotNone:data["events"]=eventsifdata["extra"]:self._insert_runtime_env([data])ifself._pyo3_clientisnotNone:self._pyo3_client.update_run(data)elifuse_multipart:serialized_op=serialize_run_dict(operation="patch",payload=data)ifself.compressed_tracesisnotNone:multipart_form,opened_files=(serialized_run_operation_to_multipart_parts_and_context(serialized_op))withself.compressed_traces.lock:ifself._data_available_eventisNone:raiseValueError("Run compression is enabled but threading event is not configured")compress_multipart_parts_and_context(multipart_form,self.compressed_traces,_BOUNDARY,)self.compressed_traces.trace_count+=1self._data_available_event.set()_close_files(list(opened_files.values()))elifself.tracing_queueisnotNone:self.tracing_queue.put(TracingQueueItem(data["dotted_order"],serialized_op))else:self._update_run(data)
[docs]defflush_compressed_traces(self,attempts:int=3)->None:"""Force flush the currently buffered compressed runs."""ifself.compressed_tracesisNone:returnifself._futuresisNone:raiseValueError("Run compression is enabled but request pool futures is not set")# Attempt to drain and send any remaining datafromlangsmith._internal._background_threadimport(HTTP_REQUEST_THREAD_POOL,_tracing_thread_drain_compressed_buffer,)final_data_stream,compressed_traces_info=(_tracing_thread_drain_compressed_buffer(self,size_limit=1,size_limit_bytes=1))iffinal_data_streamisnotNone:# We have data to sendfuture=Nonetry:future=HTTP_REQUEST_THREAD_POOL.submit(self._send_compressed_multipart_req,final_data_stream,compressed_traces_info,attempts=attempts,)self._futures.add(future)exceptRuntimeError:# In case the ThreadPoolExecutor is already shutdownself._send_compressed_multipart_req(final_data_stream,compressed_traces_info,attempts=attempts)# If we got a future, wait for it to completeifself._futures:done,_=cf.wait(self._futures)# Remove completed futuresself._futures.difference_update(done)
[docs]defflush(self)->None:"""Flush either queue or compressed buffer, depending on mode."""ifself.compressed_tracesisnotNone:self.flush_compressed_traces()elifself.tracing_queueisnotNone:self.tracing_queue.join()
def_load_child_runs(self,run:ls_schemas.Run)->ls_schemas.Run:"""Load child runs for a given run. Args: run (Run): The run to load child runs for. Returns: Run: The run with loaded child runs. Raises: LangSmithError: If a child run has no parent. """child_runs=self.list_runs(id=run.child_run_ids)treemap:DefaultDict[uuid.UUID,List[ls_schemas.Run]]=collections.defaultdict(list)runs:Dict[uuid.UUID,ls_schemas.Run]={}forchild_runinsorted(child_runs,key=lambdar:r.dotted_order,):ifchild_run.parent_run_idisNone:raisels_utils.LangSmithError(f"Child run {child_run.id} has no parent")treemap[child_run.parent_run_id].append(child_run)runs[child_run.id]=child_runrun.child_runs=treemap.pop(run.id,[])forrun_id,childrenintreemap.items():runs[run_id].child_runs=childrenreturnrun
[docs]defread_run(self,run_id:ID_TYPE,load_child_runs:bool=False)->ls_schemas.Run:"""Read a run from the LangSmith API. Args: run_id (Union[UUID, str]): The ID of the run to read. load_child_runs (bool, default=False): Whether to load nested child runs. Returns: Run: The run read from the LangSmith API. Examples: .. code-block:: python from langsmith import Client # Existing run run_id = "your-run-id" client = Client() stored_run = client.read_run(run_id) """response=self.request_with_retries("GET",f"/runs/{_as_uuid(run_id,'run_id')}")attachments=_convert_stored_attachments_to_attachments_dict(response.json(),attachments_key="s3_urls")run=ls_schemas.Run(attachments=attachments,**response.json(),_host_url=self._host_url)ifload_child_runsandrun.child_run_ids:run=self._load_child_runs(run)returnrun
[docs]deflist_runs(self,*,project_id:Optional[Union[ID_TYPE,Sequence[ID_TYPE]]]=None,project_name:Optional[Union[str,Sequence[str]]]=None,run_type:Optional[str]=None,trace_id:Optional[ID_TYPE]=None,reference_example_id:Optional[ID_TYPE]=None,query:Optional[str]=None,filter:Optional[str]=None,trace_filter:Optional[str]=None,tree_filter:Optional[str]=None,is_root:Optional[bool]=None,parent_run_id:Optional[ID_TYPE]=None,start_time:Optional[datetime.datetime]=None,error:Optional[bool]=None,run_ids:Optional[Sequence[ID_TYPE]]=None,select:Optional[Sequence[str]]=None,limit:Optional[int]=None,**kwargs:Any,)->Iterator[ls_schemas.Run]:"""List runs from the LangSmith API. Args: project_id (Optional[Union[UUID, str], Sequence[Union[UUID, str]]]): The ID(s) of the project to filter by. project_name (Optional[Union[str, Sequence[str]]]): The name(s) of the project to filter by. run_type (Optional[str]): The type of the runs to filter by. trace_id (Optional[Union[UUID, str]]): The ID of the trace to filter by. reference_example_id (Optional[Union[UUID, str]]): The ID of the reference example to filter by. query (Optional[str]): The query string to filter by. filter (Optional[str]): The filter string to filter by. trace_filter (Optional[str]): Filter to apply to the ROOT run in the trace tree. This is meant to be used in conjunction with the regular `filter` parameter to let you filter runs by attributes of the root run within a trace. tree_filter (Optional[str]): Filter to apply to OTHER runs in the trace tree, including sibling and child runs. This is meant to be used in conjunction with the regular `filter` parameter to let you filter runs by attributes of any run within a trace. is_root (Optional[bool]): Whether to filter by root runs. parent_run_id (Optional[Union[UUID, str]]): The ID of the parent run to filter by. start_time (Optional[datetime.datetime]): The start time to filter by. error (Optional[bool]): Whether to filter by error status. run_ids (Optional[Sequence[Union[UUID, str]]]): The IDs of the runs to filter by. select (Optional[Sequence[str]]): The fields to select. limit (Optional[int]): The maximum number of runs to return. **kwargs (Any): Additional keyword arguments. Yields: The runs. Examples: .. code-block:: python # List all runs in a project project_runs = client.list_runs(project_name="<your_project>") # List LLM and Chat runs in the last 24 hours todays_llm_runs = client.list_runs( project_name="<your_project>", start_time=datetime.now() - timedelta(days=1), run_type="llm", ) # List root traces in a project root_runs = client.list_runs(project_name="<your_project>", is_root=1) # List runs without errors correct_runs = client.list_runs(project_name="<your_project>", error=False) # List runs and only return their inputs/outputs (to speed up the query) input_output_runs = client.list_runs( project_name="<your_project>", select=["inputs", "outputs"] ) # List runs by run ID run_ids = [ "a36092d2-4ad5-4fb4-9c0d-0dba9a2ed836", "9398e6be-964f-4aa4-8ae9-ad78cd4b7074", ] selected_runs = client.list_runs(id=run_ids) # List all "chain" type runs that took more than 10 seconds and had # `total_tokens` greater than 5000 chain_runs = client.list_runs( project_name="<your_project>", filter='and(eq(run_type, "chain"), gt(latency, 10), gt(total_tokens, 5000))', ) # List all runs called "extractor" whose root of the trace was assigned feedback "user_score" score of 1 good_extractor_runs = client.list_runs( project_name="<your_project>", filter='eq(name, "extractor")', trace_filter='and(eq(feedback_key, "user_score"), eq(feedback_score, 1))', ) # List all runs that started after a specific timestamp and either have "error" not equal to null or a "Correctness" feedback score equal to 0 complex_runs = client.list_runs( project_name="<your_project>", filter='and(gt(start_time, "2023-07-15T12:34:56Z"), or(neq(error, null), and(eq(feedback_key, "Correctness"), eq(feedback_score, 0.0))))', ) # List all runs where `tags` include "experimental" or "beta" and `latency` is greater than 2 seconds tagged_runs = client.list_runs( project_name="<your_project>", filter='and(or(has(tags, "experimental"), has(tags, "beta")), gt(latency, 2))', ) """# noqa: E501project_ids=[]ifisinstance(project_id,(uuid.UUID,str)):project_ids.append(project_id)elifisinstance(project_id,list):project_ids.extend(project_id)ifproject_nameisnotNone:ifisinstance(project_name,str):project_name=[project_name]project_ids.extend([self.read_project(project_name=name).idfornameinproject_name])default_select=["app_path","child_run_ids","completion_cost","completion_tokens","dotted_order","end_time","error","events","extra","feedback_stats","first_token_time","id","inputs","name","outputs","parent_run_id","parent_run_ids","prompt_cost","prompt_tokens","reference_example_id","run_type","session_id","start_time","status","tags","total_cost","total_tokens","trace_id",]select=selectordefault_selectbody_query:Dict[str,Any]={"session":project_idsifproject_idselseNone,"run_type":run_type,"reference_example":([reference_example_id]ifreference_example_idelseNone),"query":query,"filter":filter,"trace_filter":trace_filter,"tree_filter":tree_filter,"is_root":is_root,"parent_run":parent_run_id,"start_time":start_time.isoformat()ifstart_timeelseNone,"error":error,"id":run_ids,"trace":trace_id,"select":select,**kwargs,}body_query={k:vfork,vinbody_query.items()ifvisnotNone}fori,runinenumerate(self._get_cursor_paginated_list("/runs/query",body=body_query)):# Should this be behind a flag?attachments=_convert_stored_attachments_to_attachments_dict(run,attachments_key="s3_urls")yieldls_schemas.Run(attachments=attachments,**run,_host_url=self._host_url)iflimitisnotNoneandi+1>=limit:break
[docs]defget_run_stats(self,*,id:Optional[List[ID_TYPE]]=None,trace:Optional[ID_TYPE]=None,parent_run:Optional[ID_TYPE]=None,run_type:Optional[str]=None,project_names:Optional[List[str]]=None,project_ids:Optional[List[ID_TYPE]]=None,reference_example_ids:Optional[List[ID_TYPE]]=None,start_time:Optional[str]=None,end_time:Optional[str]=None,error:Optional[bool]=None,query:Optional[str]=None,filter:Optional[str]=None,trace_filter:Optional[str]=None,tree_filter:Optional[str]=None,is_root:Optional[bool]=None,data_source_type:Optional[str]=None,)->Dict[str,Any]:"""Get aggregate statistics over queried runs. Takes in similar query parameters to `list_runs` and returns statistics based on the runs that match the query. Args: id (Optional[List[Union[UUID, str]]]): List of run IDs to filter by. trace (Optional[Union[UUID, str]]): Trace ID to filter by. parent_run (Optional[Union[UUID, str]]): Parent run ID to filter by. run_type (Optional[str]): Run type to filter by. project_names (Optional[List[str]]): List of project names to filter by. project_ids (Optional[List[Union[UUID, str]]]): List of project IDs to filter by. reference_example_ids (Optional[List[Union[UUID, str]]]): List of reference example IDs to filter by. start_time (Optional[str]): Start time to filter by. end_time (Optional[str]): End time to filter by. error (Optional[bool]): Filter by error status. query (Optional[str]): Query string to filter by. filter (Optional[str]): Filter string to apply. trace_filter (Optional[str]): Trace filter string to apply. tree_filter (Optional[str]): Tree filter string to apply. is_root (Optional[bool]): Filter by root run status. data_source_type (Optional[str]): Data source type to filter by. Returns: Dict[str, Any]: A dictionary containing the run statistics. """# noqa: E501fromconcurrent.futuresimportThreadPoolExecutor,as_completed# type: ignoreproject_ids=project_idsor[]ifproject_names:withThreadPoolExecutor()asexecutor:futures=[executor.submit(self.read_project,project_name=name)fornameinproject_names]forfutureinas_completed(futures):project_ids.append(future.result().id)payload={"id":id,"trace":trace,"parent_run":parent_run,"run_type":run_type,"session":project_ids,"reference_example":reference_example_ids,"start_time":start_time,"end_time":end_time,"error":error,"query":query,"filter":filter,"trace_filter":trace_filter,"tree_filter":tree_filter,"is_root":is_root,"data_source_type":data_source_type,}# Remove None values from the payloadpayload={k:vfork,vinpayload.items()ifvisnotNone}response=self.request_with_retries("POST","/runs/stats",request_kwargs={"data":_dumps_json(payload),},)ls_utils.raise_for_status_with_text(response)returnresponse.json()
[docs]defget_run_url(self,*,run:ls_schemas.RunBase,project_name:Optional[str]=None,project_id:Optional[ID_TYPE]=None,)->str:"""Get the URL for a run. Not recommended for use within your agent runtime. More for use interacting with runs after the fact for data analysis or ETL workloads. Args: run (RunBase): The run. project_name (Optional[str]): The name of the project. project_id (Optional[Union[UUID, str]]): The ID of the project. Returns: str: The URL for the run. """ifsession_id:=getattr(run,"session_id",None):passelifsession_name:=getattr(run,"session_name",None):session_id=self.read_project(project_name=session_name).idelifproject_idisnotNone:session_id=project_idelifproject_nameisnotNone:session_id=self.read_project(project_name=project_name).idelse:project_name=ls_utils.get_tracer_project()session_id=self.read_project(project_name=project_name).idsession_id_=_as_uuid(session_id,"session_id")return(f"{self._host_url}/o/{self._get_tenant_id()}/projects/p/{session_id_}/"f"r/{run.id}?poll=true")
[docs]defshare_run(self,run_id:ID_TYPE,*,share_id:Optional[ID_TYPE]=None)->str:"""Get a share link for a run. Args: run_id (Union[UUID, str]): The ID of the run to share. share_id (Optional[Union[UUID, str]]): Custom share ID. If not provided, a random UUID will be generated. Returns: str: The URL of the shared run. """run_id_=_as_uuid(run_id,"run_id")data={"run_id":str(run_id_),"share_token":share_idorstr(uuid.uuid4()),}response=self.request_with_retries("PUT",f"/runs/{run_id_}/share",headers=self._headers,json=data,)ls_utils.raise_for_status_with_text(response)share_token=response.json()["share_token"]returnf"{self._host_url}/public/{share_token}/r"
[docs]defunshare_run(self,run_id:ID_TYPE)->None:"""Delete share link for a run. Args: run_id (Union[UUID, str]): The ID of the run to unshare. Returns: None """response=self.request_with_retries("DELETE",f"/runs/{_as_uuid(run_id,'run_id')}/share",headers=self._headers,)ls_utils.raise_for_status_with_text(response)
[docs]defread_run_shared_link(self,run_id:ID_TYPE)->Optional[str]:"""Retrieve the shared link for a specific run. Args: run_id (Union[UUID, str]): The ID of the run. Returns: Optional[str]: The shared link for the run, or None if the link is not available. """response=self.request_with_retries("GET",f"/runs/{_as_uuid(run_id,'run_id')}/share",headers=self._headers,)ls_utils.raise_for_status_with_text(response)result=response.json()ifresultisNoneor"share_token"notinresult:returnNonereturnf"{self._host_url}/public/{result['share_token']}/r"
[docs]defrun_is_shared(self,run_id:ID_TYPE)->bool:"""Get share state for a run. Args: run_id (Union[UUID, str]): The ID of the run. Returns: bool: True if the run is shared, False otherwise. """link=self.read_run_shared_link(_as_uuid(run_id,"run_id"))returnlinkisnotNone
[docs]defread_shared_run(self,share_token:Union[ID_TYPE,str],run_id:Optional[ID_TYPE]=None)->ls_schemas.Run:"""Get shared runs. Args: share_token (Union[UUID, str]): The share token or URL of the shared run. run_id (Optional[Union[UUID, str]]): The ID of the specific run to retrieve. If not provided, the full shared run will be returned. Returns: Run: The shared run. """_,token_uuid=_parse_token_or_url(share_token,"",kind="run")path=f"/public/{token_uuid}/run"ifrun_idisnotNone:path+=f"/{_as_uuid(run_id,'run_id')}"response=self.request_with_retries("GET",path,headers=self._headers,)ls_utils.raise_for_status_with_text(response)returnls_schemas.Run(**response.json(),_host_url=self._host_url)
[docs]deflist_shared_runs(self,share_token:Union[ID_TYPE,str],run_ids:Optional[List[str]]=None)->Iterator[ls_schemas.Run]:"""Get shared runs. Args: share_token (Union[UUID, str]): The share token or URL of the shared run. run_ids (Optional[List[str]]): A list of run IDs to filter the results by. Yields: A shared run. """body={"id":run_ids}ifrun_idselse{}_,token_uuid=_parse_token_or_url(share_token,"",kind="run")forruninself._get_cursor_paginated_list(f"/public/{token_uuid}/runs/query",body=body):yieldls_schemas.Run(**run,_host_url=self._host_url)
[docs]defread_dataset_shared_schema(self,dataset_id:Optional[ID_TYPE]=None,*,dataset_name:Optional[str]=None,)->ls_schemas.DatasetShareSchema:"""Retrieve the shared schema of a dataset. Args: dataset_id (Optional[Union[UUID, str]]): The ID of the dataset. Either `dataset_id` or `dataset_name` must be given. dataset_name (Optional[str]): The name of the dataset. Either `dataset_id` or `dataset_name` must be given. Returns: ls_schemas.DatasetShareSchema: The shared schema of the dataset. Raises: ValueError: If neither `dataset_id` nor `dataset_name` is given. """ifdataset_idisNoneanddataset_nameisNone:raiseValueError("Either dataset_id or dataset_name must be given")ifdataset_idisNone:dataset_id=self.read_dataset(dataset_name=dataset_name).idresponse=self.request_with_retries("GET",f"/datasets/{_as_uuid(dataset_id,'dataset_id')}/share",headers=self._headers,)ls_utils.raise_for_status_with_text(response)d=response.json()returncast(ls_schemas.DatasetShareSchema,{**d,"url":f"{self._host_url}/public/"f"{_as_uuid(d['share_token'],'response.share_token')}/d",},)
[docs]defshare_dataset(self,dataset_id:Optional[ID_TYPE]=None,*,dataset_name:Optional[str]=None,)->ls_schemas.DatasetShareSchema:"""Get a share link for a dataset. Args: dataset_id (Optional[Union[UUID, str]]): The ID of the dataset. Either `dataset_id` or `dataset_name` must be given. dataset_name (Optional[str]): The name of the dataset. Either `dataset_id` or `dataset_name` must be given. Returns: ls_schemas.DatasetShareSchema: The shared schema of the dataset. Raises: ValueError: If neither `dataset_id` nor `dataset_name` is given. """ifdataset_idisNoneanddataset_nameisNone:raiseValueError("Either dataset_id or dataset_name must be given")ifdataset_idisNone:dataset_id=self.read_dataset(dataset_name=dataset_name).iddata={"dataset_id":str(dataset_id),}response=self.request_with_retries("PUT",f"/datasets/{_as_uuid(dataset_id,'dataset_id')}/share",headers=self._headers,json=data,)ls_utils.raise_for_status_with_text(response)d:dict=response.json()returncast(ls_schemas.DatasetShareSchema,{**d,"url":f"{self._host_url}/public/{d['share_token']}/d"},)
[docs]defunshare_dataset(self,dataset_id:ID_TYPE)->None:"""Delete share link for a dataset. Args: dataset_id (Union[UUID, str]): The ID of the dataset to unshare. Returns: None """response=self.request_with_retries("DELETE",f"/datasets/{_as_uuid(dataset_id,'dataset_id')}/share",headers=self._headers,)ls_utils.raise_for_status_with_text(response)
[docs]defread_shared_dataset(self,share_token:str,)->ls_schemas.Dataset:"""Get shared datasets. Args: share_token (Union[UUID, str]): The share token or URL of the shared dataset. Returns: Dataset: The shared dataset. """_,token_uuid=_parse_token_or_url(share_token,self.api_url)response=self.request_with_retries("GET",f"/public/{token_uuid}/datasets",headers=self._headers,)ls_utils.raise_for_status_with_text(response)returnls_schemas.Dataset(**response.json(),_host_url=self._host_url,_public_path=f"/public/{share_token}/d",)
[docs]deflist_shared_examples(self,share_token:str,*,example_ids:Optional[List[ID_TYPE]]=None)->List[ls_schemas.Example]:"""Get shared examples. Args: share_token (Union[UUID, str]): The share token or URL of the shared dataset. example_ids (Optional[List[UUID, str]], optional): The IDs of the examples to filter by. Defaults to None. Returns: List[ls_schemas.Example]: The list of shared examples. """params={}ifexample_idsisnotNone:params["id"]=[str(id)foridinexample_ids]response=self.request_with_retries("GET",f"/public/{_as_uuid(share_token,'share_token')}/examples",headers=self._headers,params=params,)ls_utils.raise_for_status_with_text(response)return[ls_schemas.Example(**dataset,_host_url=self._host_url)fordatasetinresponse.json()]
[docs]deflist_shared_projects(self,*,dataset_share_token:str,project_ids:Optional[List[ID_TYPE]]=None,name:Optional[str]=None,name_contains:Optional[str]=None,limit:Optional[int]=None,)->Iterator[ls_schemas.TracerSessionResult]:"""List shared projects. Args: dataset_share_token (str): The share token of the dataset. project_ids (Optional[List[Union[UUID, str]]]): List of project IDs to filter the results, by default None. name (Optional[str]): Name of the project to filter the results, by default None. name_contains (Optional[str]): Substring to search for in project names, by default None. limit (Optional[int]): Maximum number of projects to return, by default None. Yields: The shared projects. """params={"id":project_ids,"name":name,"name_contains":name_contains}share_token=_as_uuid(dataset_share_token,"dataset_share_token")fori,projectinenumerate(self._get_paginated_list(f"/public/{share_token}/datasets/sessions",params=params,)):yieldls_schemas.TracerSessionResult(**project,_host_url=self._host_url)iflimitisnotNoneandi+1>=limit:break
[docs]defcreate_project(self,project_name:str,*,description:Optional[str]=None,metadata:Optional[dict]=None,upsert:bool=False,project_extra:Optional[dict]=None,reference_dataset_id:Optional[ID_TYPE]=None,)->ls_schemas.TracerSession:"""Create a project on the LangSmith API. Args: project_name (str): The name of the project. project_extra (Optional[dict]): Additional project information. metadata (Optional[dict]): Additional metadata to associate with the project. description (Optional[str]): The description of the project. upsert (bool, default=False): Whether to update the project if it already exists. reference_dataset_id (Optional[Union[UUID, str]): The ID of the reference dataset to associate with the project. Returns: TracerSession: The created project. """endpoint=f"{self.api_url}/sessions"extra=project_extraifmetadata:extra={**(extraor{}),"metadata":metadata}body:Dict[str,Any]={"name":project_name,"extra":extra,"description":description,"id":str(uuid.uuid4()),}params={}ifupsert:params["upsert"]=Trueifreference_dataset_idisnotNone:body["reference_dataset_id"]=reference_dataset_idresponse=self.request_with_retries("POST",endpoint,headers={**self._headers,"Content-Type":"application/json"},data=_dumps_json(body),)ls_utils.raise_for_status_with_text(response)returnls_schemas.TracerSession(**response.json(),_host_url=self._host_url)
[docs]defupdate_project(self,project_id:ID_TYPE,*,name:Optional[str]=None,description:Optional[str]=None,metadata:Optional[dict]=None,project_extra:Optional[dict]=None,end_time:Optional[datetime.datetime]=None,)->ls_schemas.TracerSession:"""Update a LangSmith project. Args: project_id (Union[UUID, str]): The ID of the project to update. name (Optional[str]): The new name to give the project. This is only valid if the project has been assigned an end_time, meaning it has been completed/closed. description (Optional[str]): The new description to give the project. metadata (Optional[dict]): Additional metadata to associate with the project. project_extra (Optional[dict]): Additional project information. end_time (Optional[datetime.datetime]): The time the project was completed. Returns: TracerSession: The updated project. """endpoint=f"{self.api_url}/sessions/{_as_uuid(project_id,'project_id')}"extra=project_extraifmetadata:extra={**(extraor{}),"metadata":metadata}body:Dict[str,Any]={"name":name,"extra":extra,"description":description,"end_time":end_time.isoformat()ifend_timeelseNone,}response=self.request_with_retries("PATCH",endpoint,headers={**self._headers,"Content-Type":"application/json"},data=_dumps_json(body),)ls_utils.raise_for_status_with_text(response)returnls_schemas.TracerSession(**response.json(),_host_url=self._host_url)
def_get_optional_tenant_id(self)->Optional[uuid.UUID]:ifself._tenant_idisnotNone:returnself._tenant_idtry:response=self.request_with_retries("GET","/sessions",params={"limit":1})result=response.json()ifisinstance(result,list)andlen(result)>0:tracer_session=ls_schemas.TracerSessionResult(**result[0],_host_url=self._host_url)self._tenant_id=tracer_session.tenant_idreturnself._tenant_idexceptExceptionase:logger.debug("Failed to get tenant ID from LangSmith: %s",repr(e),exc_info=True)returnNonedef_get_tenant_id(self)->uuid.UUID:tenant_id=self._get_optional_tenant_id()iftenant_idisNone:raisels_utils.LangSmithError("No tenant ID found")returntenant_id
[docs]@ls_utils.xor_args(("project_id","project_name"))defread_project(self,*,project_id:Optional[str]=None,project_name:Optional[str]=None,include_stats:bool=False,)->ls_schemas.TracerSessionResult:"""Read a project from the LangSmith API. Args: project_id (Optional[str]): The ID of the project to read. project_name (Optional[str]): The name of the project to read. Only one of project_id or project_name may be given. include_stats (bool, default=False): Whether to include a project's aggregate statistics in the response. Returns: TracerSessionResult: The project. """path="/sessions"params:Dict[str,Any]={"limit":1}ifproject_idisnotNone:path+=f"/{_as_uuid(project_id,'project_id')}"elifproject_nameisnotNone:params["name"]=project_nameelse:raiseValueError("Must provide project_name or project_id")params["include_stats"]=include_statsresponse=self.request_with_retries("GET",path,params=params)result=response.json()ifisinstance(result,list):iflen(result)==0:raisels_utils.LangSmithNotFoundError(f"Project {project_name} not found")returnls_schemas.TracerSessionResult(**result[0],_host_url=self._host_url)returnls_schemas.TracerSessionResult(**response.json(),_host_url=self._host_url)
[docs]defhas_project(self,project_name:str,*,project_id:Optional[str]=None)->bool:"""Check if a project exists. Args: project_name (str): The name of the project to check for. project_id (Optional[str]): The ID of the project to check for. Returns: bool: Whether the project exists. """try:self.read_project(project_name=project_name)exceptls_utils.LangSmithNotFoundError:returnFalsereturnTrue
[docs]defget_test_results(self,*,project_id:Optional[ID_TYPE]=None,project_name:Optional[str]=None,)->pd.DataFrame:"""Read the record-level information from an experiment into a Pandas DF. Note: this will fetch whatever data exists in the DB. Results are not immediately available in the DB upon evaluation run completion. Feedback score values will be returned as an average across all runs for the experiment. Note that non-numeric feedback scores will be omitted. Args: project_id (Optional[Union[UUID, str]]): The ID of the project. project_name (Optional[str]): The name of the project. Returns: pd.DataFrame: A dataframe containing the test results. """warnings.warn("Function get_test_results is in beta.",UserWarning,stacklevel=2)fromconcurrent.futuresimportThreadPoolExecutor,as_completed# type: ignoreimportpandasaspd# type: ignoreruns=self.list_runs(project_id=project_id,project_name=project_name,is_root=True,select=["id","reference_example_id","inputs","outputs","error","feedback_stats","start_time","end_time",],)results:list[dict]=[]example_ids=[]deffetch_examples(batch):examples=self.list_examples(example_ids=batch)return[{"example_id":example.id,**{f"reference.{k}":vfork,vin(example.outputsor{}).items()},}forexampleinexamples]batch_size=50cursor=0withThreadPoolExecutor()asexecutor:futures=[]forrinruns:row={"example_id":r.reference_example_id,**{f"input.{k}":vfork,vinr.inputs.items()},**{f"outputs.{k}":vfork,vin(r.outputsor{}).items()},"execution_time":((r.end_time-r.start_time).total_seconds()ifr.end_timeelseNone),"error":r.error,"id":r.id,}ifr.feedback_stats:row.update({f"feedback.{k}":v.get("avg")fork,vinr.feedback_stats.items()})ifr.reference_example_id:example_ids.append(r.reference_example_id)else:logger.warning(f"Run {r.id} has no reference example ID.")iflen(example_ids)%batch_size==0:# Ensure not emptyifbatch:=example_ids[cursor:cursor+batch_size]:futures.append(executor.submit(fetch_examples,batch))cursor+=batch_sizeresults.append(row)# Handle any remaining examplesifexample_ids[cursor:]:futures.append(executor.submit(fetch_examples,example_ids[cursor:]))result_df=pd.DataFrame(results).set_index("example_id")example_outputs=[outputforfutureinas_completed(futures)foroutputinfuture.result()]ifexample_outputs:example_df=pd.DataFrame(example_outputs).set_index("example_id")result_df=example_df.merge(result_df,left_index=True,right_index=True)# Flatten dict columns into dot syntax for easier accessreturnpd.json_normalize(result_df.to_dict(orient="records"))
[docs]deflist_projects(self,project_ids:Optional[List[ID_TYPE]]=None,name:Optional[str]=None,name_contains:Optional[str]=None,reference_dataset_id:Optional[ID_TYPE]=None,reference_dataset_name:Optional[str]=None,reference_free:Optional[bool]=None,limit:Optional[int]=None,metadata:Optional[Dict[str,Any]]=None,)->Iterator[ls_schemas.TracerSession]:"""List projects from the LangSmith API. Args: project_ids (Optional[List[Union[UUID, str]]]): A list of project IDs to filter by, by default None name (Optional[str]): The name of the project to filter by, by default None name_contains (Optional[str]): A string to search for in the project name, by default None reference_dataset_id (Optional[List[Union[UUID, str]]]): A dataset ID to filter by, by default None reference_dataset_name (Optional[str]): The name of the reference dataset to filter by, by default None reference_free (Optional[bool]): Whether to filter for only projects not associated with a dataset. limit (Optional[int]): The maximum number of projects to return, by default None metadata (Optional[Dict[str, Any]]): Metadata to filter by. Yields: The projects. Raises: ValueError: If both reference_dataset_id and reference_dataset_name are given. """params:Dict[str,Any]={"limit":min(limit,100)iflimitisnotNoneelse100}ifproject_idsisnotNone:params["id"]=project_idsifnameisnotNone:params["name"]=nameifname_containsisnotNone:params["name_contains"]=name_containsifreference_dataset_idisnotNone:ifreference_dataset_nameisnotNone:raiseValueError("Only one of reference_dataset_id or"" reference_dataset_name may be given")params["reference_dataset"]=reference_dataset_idelifreference_dataset_nameisnotNone:reference_dataset_id=self.read_dataset(dataset_name=reference_dataset_name).idparams["reference_dataset"]=reference_dataset_idifreference_freeisnotNone:params["reference_free"]=reference_freeifmetadataisnotNone:params["metadata"]=json.dumps(metadata)fori,projectinenumerate(self._get_paginated_list("/sessions",params=params)):yieldls_schemas.TracerSession(**project,_host_url=self._host_url)iflimitisnotNoneandi+1>=limit:break
[docs]@ls_utils.xor_args(("project_name","project_id"))defdelete_project(self,*,project_name:Optional[str]=None,project_id:Optional[str]=None)->None:"""Delete a project from LangSmith. Args: project_name (Optional[str]): The name of the project to delete. project_id (Optional[str]): The ID of the project to delete. Returns: None Raises: ValueError: If neither project_name or project_id is provided. """ifproject_nameisnotNone:project_id=str(self.read_project(project_name=project_name).id)elifproject_idisNone:raiseValueError("Must provide project_name or project_id")response=self.request_with_retries("DELETE",f"/sessions/{_as_uuid(project_id,'project_id')}",headers=self._headers,)ls_utils.raise_for_status_with_text(response)
[docs]defcreate_dataset(self,dataset_name:str,*,description:Optional[str]=None,data_type:ls_schemas.DataType=ls_schemas.DataType.kv,inputs_schema:Optional[Dict[str,Any]]=None,outputs_schema:Optional[Dict[str,Any]]=None,transformations:Optional[List[ls_schemas.DatasetTransformation]]=None,metadata:Optional[dict]=None,)->ls_schemas.Dataset:"""Create a dataset in the LangSmith API. Args: dataset_name (str): The name of the dataset. description (Optional[str]): The description of the dataset. data_type (DataType, default=DataType.kv): The data type of the dataset. inputs_schema (Optional[Dict[str, Any]]): The schema definition for the inputs of the dataset. outputs_schema (Optional[Dict[str, Any]]): The schema definition for the outputs of the dataset. transformations (Optional[List[DatasetTransformation]]): A list of transformations to apply to the dataset. metadata (Optional[dict]): Additional metadata to associate with the dataset. Returns: Dataset: The created dataset. Raises: requests.HTTPError: If the request to create the dataset fails. """dataset:Dict[str,Any]={"name":dataset_name,"data_type":data_type.value,"transformations":transformations,"extra":{"metadata":metadata}ifmetadataelseNone,}ifdescriptionisnotNone:dataset["description"]=descriptionifinputs_schemaisnotNone:dataset["inputs_schema_definition"]=inputs_schemaifoutputs_schemaisnotNone:dataset["outputs_schema_definition"]=outputs_schemaresponse=self.request_with_retries("POST","/datasets",headers={**self._headers,"Content-Type":"application/json"},data=_orjson.dumps(dataset),)ls_utils.raise_for_status_with_text(response)returnls_schemas.Dataset(**response.json(),_host_url=self._host_url,_tenant_id=self._get_optional_tenant_id(),)
[docs]defhas_dataset(self,*,dataset_name:Optional[str]=None,dataset_id:Optional[ID_TYPE]=None,)->bool:"""Check whether a dataset exists in your tenant. Args: dataset_name (Optional[str]): The name of the dataset to check. dataset_id (Optional[Union[UUID, str]]): The ID of the dataset to check. Returns: bool: Whether the dataset exists. """try:self.read_dataset(dataset_name=dataset_name,dataset_id=dataset_id)returnTrueexceptls_utils.LangSmithNotFoundError:returnFalse
[docs]@ls_utils.xor_args(("dataset_name","dataset_id"))defread_dataset(self,*,dataset_name:Optional[str]=None,dataset_id:Optional[ID_TYPE]=None,)->ls_schemas.Dataset:"""Read a dataset from the LangSmith API. Args: dataset_name (Optional[str]): The name of the dataset to read. dataset_id (Optional[Union[UUID, str]]): The ID of the dataset to read. Returns: Dataset: The dataset. """path="/datasets"params:Dict[str,Any]={"limit":1}ifdataset_idisnotNone:path+=f"/{_as_uuid(dataset_id,'dataset_id')}"elifdataset_nameisnotNone:params["name"]=dataset_nameelse:raiseValueError("Must provide dataset_name or dataset_id")response=self.request_with_retries("GET",path,params=params,)result=response.json()ifisinstance(result,list):iflen(result)==0:raisels_utils.LangSmithNotFoundError(f"Dataset {dataset_name} not found")returnls_schemas.Dataset(**result[0],_host_url=self._host_url,_tenant_id=self._get_optional_tenant_id(),)returnls_schemas.Dataset(**result,_host_url=self._host_url,_tenant_id=self._get_optional_tenant_id(),)
[docs]defdiff_dataset_versions(self,dataset_id:Optional[ID_TYPE]=None,*,dataset_name:Optional[str]=None,from_version:Union[str,datetime.datetime],to_version:Union[str,datetime.datetime],)->ls_schemas.DatasetDiffInfo:"""Get the difference between two versions of a dataset. Args: dataset_id (Optional[Union[UUID, str]]): The ID of the dataset. dataset_name (Optional[str]): The name of the dataset. from_version (Union[str, datetime.datetime]): The starting version for the diff. to_version (Union[str, datetime.datetime]): The ending version for the diff. Returns: DatasetDiffInfo: The difference between the two versions of the dataset. Examples: .. code-block:: python # Get the difference between two tagged versions of a dataset from_version = "prod" to_version = "dev" diff = client.diff_dataset_versions( dataset_name="my-dataset", from_version=from_version, to_version=to_version, ) # Get the difference between two timestamped versions of a dataset from_version = datetime.datetime(2024, 1, 1) to_version = datetime.datetime(2024, 2, 1) diff = client.diff_dataset_versions( dataset_name="my-dataset", from_version=from_version, to_version=to_version, ) """ifdataset_idisNone:ifdataset_nameisNone:raiseValueError("Must provide either dataset name or ID")dataset_id=self.read_dataset(dataset_name=dataset_name).iddsid=_as_uuid(dataset_id,"dataset_id")response=self.request_with_retries("GET",f"/datasets/{dsid}/versions/diff",headers=self._headers,params={"from_version":(from_version.isoformat()ifisinstance(from_version,datetime.datetime)elsefrom_version),"to_version":(to_version.isoformat()ifisinstance(to_version,datetime.datetime)elseto_version),},)ls_utils.raise_for_status_with_text(response)returnls_schemas.DatasetDiffInfo(**response.json())
[docs]defread_dataset_openai_finetuning(self,dataset_id:Optional[ID_TYPE]=None,*,dataset_name:Optional[str]=None,)->list:"""Download a dataset in OpenAI Jsonl format and load it as a list of dicts. Args: dataset_id (Optional[Union[UUID, str]]): The ID of the dataset to download. dataset_name (Optional[str]): The name of the dataset to download. Returns: list[dict]: The dataset loaded as a list of dicts. Raises: ValueError: If neither dataset_id nor dataset_name is provided. """path="/datasets"ifdataset_idisnotNone:passelifdataset_nameisnotNone:dataset_id=self.read_dataset(dataset_name=dataset_name).idelse:raiseValueError("Must provide dataset_name or dataset_id")response=self.request_with_retries("GET",f"{path}/{_as_uuid(dataset_id,'dataset_id')}/openai_ft",)dataset=[json.loads(line)forlineinresponse.text.strip().split("\n")]returndataset
[docs]deflist_datasets(self,*,dataset_ids:Optional[List[ID_TYPE]]=None,data_type:Optional[str]=None,dataset_name:Optional[str]=None,dataset_name_contains:Optional[str]=None,metadata:Optional[Dict[str,Any]]=None,limit:Optional[int]=None,)->Iterator[ls_schemas.Dataset]:"""List the datasets on the LangSmith API. Args: dataset_ids (Optional[List[Union[UUID, str]]]): A list of dataset IDs to filter the results by. data_type (Optional[str]): The data type of the datasets to filter the results by. dataset_name (Optional[str]): The name of the dataset to filter the results by. dataset_name_contains (Optional[str]): A substring to search for in the dataset names. metadata (Optional[Dict[str, Any]]): A dictionary of metadata to filter the results by. limit (Optional[int]): The maximum number of datasets to return. Yields: The datasets. """params:Dict[str,Any]={"limit":min(limit,100)iflimitisnotNoneelse100}ifdataset_idsisnotNone:params["id"]=dataset_idsifdata_typeisnotNone:params["data_type"]=data_typeifdataset_nameisnotNone:params["name"]=dataset_nameifdataset_name_containsisnotNone:params["name_contains"]=dataset_name_containsifmetadataisnotNone:params["metadata"]=json.dumps(metadata)fori,datasetinenumerate(self._get_paginated_list("/datasets",params=params)):yieldls_schemas.Dataset(**dataset,_host_url=self._host_url,_tenant_id=self._get_optional_tenant_id(),)iflimitisnotNoneandi+1>=limit:break
[docs]@ls_utils.xor_args(("dataset_id","dataset_name"))defdelete_dataset(self,*,dataset_id:Optional[ID_TYPE]=None,dataset_name:Optional[str]=None,)->None:"""Delete a dataset from the LangSmith API. Args: dataset_id (Optional[Union[UUID, str]]): The ID of the dataset to delete. dataset_name (Optional[str]): The name of the dataset to delete. Returns: None """ifdataset_nameisnotNone:dataset_id=self.read_dataset(dataset_name=dataset_name).idifdataset_idisNone:raiseValueError("Must provide either dataset name or ID")response=self.request_with_retries("DELETE",f"/datasets/{_as_uuid(dataset_id,'dataset_id')}",headers=self._headers,)ls_utils.raise_for_status_with_text(response)
[docs]defupdate_dataset_tag(self,*,dataset_id:Optional[ID_TYPE]=None,dataset_name:Optional[str]=None,as_of:datetime.datetime,tag:str,)->None:"""Update the tags of a dataset. If the tag is already assigned to a different version of this dataset, the tag will be moved to the new version. The as_of parameter is used to determine which version of the dataset to apply the new tags to. It must be an exact version of the dataset to succeed. You can use the read_dataset_version method to find the exact version to apply the tags to. Args: dataset_id (Optional[Union[UUID, str]]): The ID of the dataset to update. dataset_name (Optional[str]): The name of the dataset to update. as_of (datetime.datetime): The timestamp of the dataset to apply the new tags to. tag (str): The new tag to apply to the dataset. Returns: None Examples: .. code-block:: python dataset_name = "my-dataset" # Get the version of a dataset <= a given timestamp dataset_version = client.read_dataset_version( dataset_name=dataset_name, as_of=datetime.datetime(2024, 1, 1) ) # Assign that version a new tag client.update_dataset_tags( dataset_name="my-dataset", as_of=dataset_version.as_of, tag="prod", ) """ifdataset_nameisnotNone:dataset_id=self.read_dataset(dataset_name=dataset_name).idifdataset_idisNone:raiseValueError("Must provide either dataset name or ID")response=self.request_with_retries("PUT",f"/datasets/{_as_uuid(dataset_id,'dataset_id')}/tags",headers=self._headers,json={"as_of":as_of.isoformat(),"tag":tag,},)ls_utils.raise_for_status_with_text(response)
[docs]deflist_dataset_versions(self,*,dataset_id:Optional[ID_TYPE]=None,dataset_name:Optional[str]=None,search:Optional[str]=None,limit:Optional[int]=None,)->Iterator[ls_schemas.DatasetVersion]:"""List dataset versions. Args: dataset_id (Optional[Union[UUID, str]]): The ID of the dataset. dataset_name (Optional[str]): The name of the dataset. search (Optional[str]): The search query. limit (Optional[int]): The maximum number of versions to return. Yields: The dataset versions. """ifdataset_idisNone:dataset_id=self.read_dataset(dataset_name=dataset_name).idparams={"search":search,"limit":min(limit,100)iflimitisnotNoneelse100,}fori,versioninenumerate(self._get_paginated_list(f"/datasets/{_as_uuid(dataset_id,'dataset_id')}/versions",params=params,)):yieldls_schemas.DatasetVersion(**version)iflimitisnotNoneandi+1>=limit:break
[docs]defread_dataset_version(self,*,dataset_id:Optional[ID_TYPE]=None,dataset_name:Optional[str]=None,as_of:Optional[datetime.datetime]=None,tag:Optional[str]=None,)->ls_schemas.DatasetVersion:"""Get dataset version by as_of or exact tag. Ues this to resolve the nearest version to a given timestamp or for a given tag. Args: dataset_id (Optional[ID_TYPE]): The ID of the dataset. dataset_name (Optional[str]): The name of the dataset. as_of (Optional[datetime.datetime]): The timestamp of the dataset to retrieve. tag (Optional[str]): The tag of the dataset to retrieve. Returns: DatasetVersion: The dataset version. Examples: .. code-block:: python # Get the latest version of a dataset client.read_dataset_version(dataset_name="my-dataset", tag="latest") # Get the version of a dataset <= a given timestamp client.read_dataset_version( dataset_name="my-dataset", as_of=datetime.datetime(2024, 1, 1), ) # Get the version of a dataset with a specific tag client.read_dataset_version(dataset_name="my-dataset", tag="prod") """ifdataset_idisNone:dataset_id=self.read_dataset(dataset_name=dataset_name).idif(as_ofandtag)or(as_ofisNoneandtagisNone):raiseValueError("Exactly one of as_of and tag must be specified.")response=self.request_with_retries("GET",f"/datasets/{_as_uuid(dataset_id,'dataset_id')}/version",params={"as_of":as_of,"tag":tag},)returnls_schemas.DatasetVersion(**response.json())
[docs]defclone_public_dataset(self,token_or_url:str,*,source_api_url:Optional[str]=None,dataset_name:Optional[str]=None,)->ls_schemas.Dataset:"""Clone a public dataset to your own langsmith tenant. This operation is idempotent. If you already have a dataset with the given name, this function will do nothing. Args: token_or_url (str): The token of the public dataset to clone. source_api_url (Optional[str]): The URL of the langsmith server where the data is hosted. Defaults to the API URL of your current client. dataset_name (Optional[str]): The name of the dataset to create in your tenant. Defaults to the name of the public dataset. Returns: Dataset: The cloned dataset. """source_api_url=source_api_urlorself.api_urlsource_api_url,token_uuid=_parse_token_or_url(token_or_url,source_api_url)source_client=Client(# Placeholder API key not needed anymore in most cases, but# some private deployments may have API key-based rate limiting# that would cause this to fail if we provide no value.api_url=source_api_url,api_key="placeholder",)ds=source_client.read_shared_dataset(token_uuid)dataset_name=dataset_nameords.nametry:ds=self.read_dataset(dataset_name=dataset_name)logger.info(f"Dataset {dataset_name} already exists in your tenant. Skipping.")returndsexceptls_utils.LangSmithNotFoundError:passtry:# Fetch examples firstexamples=list(source_client.list_shared_examples(token_uuid))dataset=self.create_dataset(dataset_name=dataset_name,description=ds.description,data_type=ds.data_typeorls_schemas.DataType.kv,inputs_schema=ds.inputs_schema,outputs_schema=ds.outputs_schema,transformations=ds.transformations,)try:self.create_examples(inputs=[e.inputsforeinexamples],outputs=[e.outputsforeinexamples],dataset_id=dataset.id,)exceptBaseExceptionase:# Let's not do automatic clean up for now in case there might be# some other reasons why create_examples fails (i.e., not network issue# or keyboard interrupt).# The risk is that this is an existing dataset that has valid examples# populated from another source so we don't want to delete it.logger.error(f"An error occurred while creating dataset {dataset_name}. ""You should delete it manually.")raiseefinally:delsource_clientreturndataset
[docs]@ls_utils.xor_args(("dataset_id","dataset_name"))defcreate_llm_example(self,prompt:str,generation:Optional[str]=None,dataset_id:Optional[ID_TYPE]=None,dataset_name:Optional[str]=None,created_at:Optional[datetime.datetime]=None,)->ls_schemas.Example:"""Add an example (row) to an LLM-type dataset. Args: prompt (str): The input prompt for the example. generation (Optional[str]): The output generation for the example. dataset_id (Optional[Union[UUID, str]]): The ID of the dataset. dataset_name (Optional[str]): The name of the dataset. created_at (Optional[datetime.datetime]): The creation timestamp of the example. Returns: Example: The created example """returnself.create_example(inputs={"input":prompt},outputs={"output":generation},dataset_id=dataset_id,dataset_name=dataset_name,created_at=created_at,)
[docs]@ls_utils.xor_args(("dataset_id","dataset_name"))defcreate_chat_example(self,messages:List[Union[Mapping[str,Any],ls_schemas.BaseMessageLike]],generations:Optional[Union[Mapping[str,Any],ls_schemas.BaseMessageLike]]=None,dataset_id:Optional[ID_TYPE]=None,dataset_name:Optional[str]=None,created_at:Optional[datetime.datetime]=None,)->ls_schemas.Example:"""Add an example (row) to a Chat-type dataset. Args: messages (List[Union[Mapping[str, Any], BaseMessageLike]]): The input messages for the example. generations (Optional[Union[Mapping[str, Any], BaseMessageLike]]): The output messages for the example. dataset_id (Optional[Union[UUID, str]]): The ID of the dataset. dataset_name (Optional[str]): The name of the dataset. created_at (Optional[datetime.datetime]): The creation timestamp of the example. Returns: Example: The created example """final_input=[]formessageinmessages:ifls_utils.is_base_message_like(message):final_input.append(ls_utils.convert_langchain_message(cast(ls_schemas.BaseMessageLike,message)))else:final_input.append(cast(dict,message))final_generations=NoneifgenerationsisnotNone:ifls_utils.is_base_message_like(generations):final_generations=ls_utils.convert_langchain_message(cast(ls_schemas.BaseMessageLike,generations))else:final_generations=cast(dict,generations)returnself.create_example(inputs={"input":final_input},outputs=({"output":final_generations}iffinal_generationsisnotNoneelseNone),dataset_id=dataset_id,dataset_name=dataset_name,created_at=created_at,)
[docs]defcreate_example_from_run(self,run:ls_schemas.Run,dataset_id:Optional[ID_TYPE]=None,dataset_name:Optional[str]=None,created_at:Optional[datetime.datetime]=None,)->ls_schemas.Example:"""Add an example (row) to a dataset from a run. Args: run (Run): The run to create an example from. dataset_id (Optional[Union[UUID, str]]): The ID of the dataset. dataset_name (Optional[str]): The name of the dataset. created_at (Optional[datetime.datetime]): The creation timestamp of the example. Returns: Example: The created example """ifdataset_idisNone:dataset_id=self.read_dataset(dataset_name=dataset_name).iddataset_name=None# Nested call expects only 1 defineddataset_type=self._get_data_type_cached(dataset_id)ifdataset_type==ls_schemas.DataType.llm:ifrun.run_type!="llm":raiseValueError(f"Run type {run.run_type} is not supported"" for dataset of type 'LLM'")try:prompt=ls_utils.get_prompt_from_inputs(run.inputs)exceptValueError:raiseValueError("Error converting LLM run inputs to prompt for run"f" {run.id} with inputs {run.inputs}")inputs:Dict[str,Any]={"input":prompt}ifnotrun.outputs:outputs:Optional[Dict[str,Any]]=Noneelse:try:generation=ls_utils.get_llm_generation_from_outputs(run.outputs)exceptValueError:raiseValueError("Error converting LLM run outputs to generation for run"f" {run.id} with outputs {run.outputs}")outputs={"output":generation}elifdataset_type==ls_schemas.DataType.chat:ifrun.run_type!="llm":raiseValueError(f"Run type {run.run_type} is not supported"" for dataset of type 'chat'")try:inputs={"input":ls_utils.get_messages_from_inputs(run.inputs)}exceptValueError:raiseValueError("Error converting LLM run inputs to chat messages for run"f" {run.id} with inputs {run.inputs}")ifnotrun.outputs:outputs=Noneelse:try:outputs={"output":ls_utils.get_message_generation_from_outputs(run.outputs)}exceptValueError:raiseValueError("Error converting LLM run outputs to chat generations"f" for run {run.id} with outputs {run.outputs}")elifdataset_type==ls_schemas.DataType.kv:# Anything goesinputs=run.inputsoutputs=run.outputselse:raiseValueError(f"Dataset type {dataset_type} not recognized.")returnself.create_example(inputs=inputs,outputs=outputs,dataset_id=dataset_id,dataset_name=dataset_name,created_at=created_at,)
def_prepare_multipart_data(self,examples:Union[List[ls_schemas.ExampleCreate]|List[ls_schemas.ExampleUpsertWithAttachments]|List[ls_schemas.ExampleUpdate],],include_dataset_id:bool=False,dangerously_allow_filesystem:bool=False,)->tuple[Any,bytes,Dict[str,io.BufferedReader]]:parts:List[MultipartPart]=[]opened_files_dict:Dict[str,io.BufferedReader]={}ifinclude_dataset_id:ifnotisinstance(examples[0],ls_schemas.ExampleUpsertWithAttachments):raiseValueError("The examples must be of type ExampleUpsertWithAttachments"" if include_dataset_id is True")dataset_id=examples[0].dataset_idforexampleinexamples:if(notisinstance(example,ls_schemas.ExampleCreate)andnotisinstance(example,ls_schemas.ExampleUpsertWithAttachments)andnotisinstance(example,ls_schemas.ExampleUpdate)):raiseValueError("The examples must be of type ExampleCreate"" or ExampleUpsertWithAttachments"" or ExampleUpdate")ifexample.idisnotNone:example_id=str(example.id)else:example_id=str(uuid.uuid4())ifisinstance(example,ls_schemas.ExampleUpdate):created_at=Noneelse:created_at=example.created_atifisinstance(example,ls_schemas.ExampleCreate):use_source_run_io=example.use_source_run_iouse_source_run_attachments=example.use_source_run_attachmentssource_run_id=example.source_run_idelse:use_source_run_io,use_source_run_attachments,source_run_id=(None,None,None,)example_body={**({"dataset_id":dataset_id}ifinclude_dataset_idelse{}),**({"created_at":created_at}ifcreated_atisnotNoneelse{}),**({"use_source_run_io":use_source_run_io}ifuse_source_run_ioelse{}),**({"use_source_run_attachments":use_source_run_attachments}ifuse_source_run_attachmentselse{}),**({"source_run_id":source_run_id}ifsource_run_idelse{}),}ifexample.metadataisnotNone:example_body["metadata"]=example.metadataifexample.splitisnotNone:example_body["split"]=example.splitvalb=_dumps_json(example_body)parts.append((f"{example_id}",(None,valb,"application/json",{},),))inputsb=_dumps_json(example.inputsor{})parts.append((f"{example_id}.inputs",(None,inputsb,"application/json",{},),))outputsb=_dumps_json(example.outputsor{})parts.append((f"{example_id}.outputs",(None,outputsb,"application/json",{},),))ifexample.attachments:forname,attachmentinexample.attachments.items():ifisinstance(attachment,dict):mime_type=attachment["mime_type"]attachment_data=attachment["data"]else:mime_type,attachment_data=attachmentifisinstance(attachment_data,Path):ifdangerously_allow_filesystem:file_size=os.path.getsize(attachment_data)file=open(attachment_data,"rb")opened_files_dict[str(attachment_data)+str(uuid.uuid4())]=fileparts.append((f"{example_id}.attachment.{name}",(None,file,# type: ignore[arg-type]f"{mime_type}; length={file_size}",{},),))else:raiseValueError("dangerously_allow_filesystem must be True to upload files from the filesystem")else:parts.append((f"{example_id}.attachment.{name}",(None,attachment_data,f"{mime_type}; length={len(attachment_data)}",{},),))if(isinstance(example,ls_schemas.ExampleUpdate)andexample.attachments_operations):attachments_operationsb=_dumps_json(example.attachments_operations)parts.append((f"{example_id}.attachments_operations",(None,attachments_operationsb,"application/json",{},),))encoder=rqtb_multipart.MultipartEncoder(parts,boundary=_BOUNDARY)ifencoder.len<=20_000_000:# ~20 MBdata=encoder.to_string()else:data=encoderreturnencoder,data,opened_files_dict
[docs]defupdate_examples_multipart(self,*,dataset_id:ID_TYPE,updates:Optional[List[ls_schemas.ExampleUpdate]]=None,dangerously_allow_filesystem:bool=False,)->ls_schemas.UpsertExamplesResponse:"""Update examples using multipart. .. deprecated:: 0.3.9 Use Client.update_examples instead. Will be removed in 0.4.0. """returnself._update_examples_multipart(dataset_id=dataset_id,updates=updates,dangerously_allow_filesystem=dangerously_allow_filesystem,)
def_update_examples_multipart(self,*,dataset_id:ID_TYPE,updates:Optional[List[ls_schemas.ExampleUpdate]]=None,dangerously_allow_filesystem:bool=False,)->ls_schemas.UpsertExamplesResponse:"""Update examples using multipart. Args: dataset_id (Union[UUID, str]): The ID of the dataset to update. updates (Optional[List[ExampleUpdate]]): The updates to apply to the examples. Raises: ValueError: If the multipart examples endpoint is not enabled. """ifnot(self.info.instance_flagsor{}).get("dataset_examples_multipart_enabled",False):raiseValueError("Your LangSmith version does not allow using the latest examples ""endpoints, please update to the latest version or downgrade your SDK ""to langsmith<0.3.9.")ifupdatesisNone:updates=[]encoder,data,opened_files_dict=self._prepare_multipart_data(updates,include_dataset_id=False,dangerously_allow_filesystem=dangerously_allow_filesystem,)try:response=self.request_with_retries("PATCH",_dataset_examples_path(self.api_url,dataset_id),request_kwargs={"data":data,"headers":{**self._headers,"Content-Type":encoder.content_type,},},)ls_utils.raise_for_status_with_text(response)finally:_close_files(list(opened_files_dict.values()))returnresponse.json()
[docs]defupload_examples_multipart(self,*,dataset_id:ID_TYPE,uploads:Optional[List[ls_schemas.ExampleCreate]]=None,dangerously_allow_filesystem:bool=False,)->ls_schemas.UpsertExamplesResponse:"""Upload examples using multipart. .. deprecated:: 0.3.9 Use Client.create_examples instead. Will be removed in 0.4.0. """returnself._upload_examples_multipart(dataset_id=dataset_id,uploads=uploads,dangerously_allow_filesystem=dangerously_allow_filesystem,)
def_upload_examples_multipart(self,*,dataset_id:ID_TYPE,uploads:Optional[List[ls_schemas.ExampleCreate]]=None,dangerously_allow_filesystem:bool=False,)->ls_schemas.UpsertExamplesResponse:"""Upload examples using multipart. Args: dataset_id (Union[UUID, str]): The ID of the dataset to upload to. uploads (Optional[List[ExampleCreate]]): The examples to upload. dangerously_allow_filesystem (bool): Whether to allow uploading files from the filesystem. Returns: ls_schemas.UpsertExamplesResponse: The count and ids of the successfully uploaded examples Raises: ValueError: If the multipart examples endpoint is not enabled. """ifnot(self.info.instance_flagsor{}).get("dataset_examples_multipart_enabled",False):raiseValueError("Your LangSmith version does not allow using the multipart examples endpoint, please update to the latest version.")ifuploadsisNone:uploads=[]encoder,data,opened_files_dict=self._prepare_multipart_data(uploads,include_dataset_id=False,dangerously_allow_filesystem=dangerously_allow_filesystem,)try:response=self.request_with_retries("POST",_dataset_examples_path(self.api_url,dataset_id),request_kwargs={"data":data,"headers":{**self._headers,"Content-Type":encoder.content_type,},},)ls_utils.raise_for_status_with_text(response)finally:_close_files(list(opened_files_dict.values()))returnresponse.json()
[docs]defupsert_examples_multipart(self,*,upserts:Optional[List[ls_schemas.ExampleUpsertWithAttachments]]=None,dangerously_allow_filesystem:bool=False,)->ls_schemas.UpsertExamplesResponse:"""Upsert examples. .. deprecated:: 0.3.9 Use Client.create_examples and Client.update_examples instead. Will be removed in 0.4.0. """ifnot(self.info.instance_flagsor{}).get("examples_multipart_enabled",False):raiseValueError("Your LangSmith version does not allow using the multipart examples endpoint, please update to the latest version.")ifupsertsisNone:upserts=[]encoder,data,opened_files_dict=self._prepare_multipart_data(upserts,include_dataset_id=True,dangerously_allow_filesystem=dangerously_allow_filesystem,)try:response=self.request_with_retries("POST",("/v1/platform/examples/multipart"ifself.api_url[-3:]!="/v1"andself.api_url[-4:]!="/v1/"else"/platform/examples/multipart"),request_kwargs={"data":data,"headers":{**self._headers,"Content-Type":encoder.content_type,},},)ls_utils.raise_for_status_with_text(response)finally:_close_files(list(opened_files_dict.values()))returnresponse.json()
[docs]@ls_utils.xor_args(("dataset_id","dataset_name"))defcreate_examples(self,*,dataset_name:Optional[str]=None,dataset_id:Optional[ID_TYPE]=None,examples:Optional[Sequence[ls_schemas.ExampleCreate|dict]]=None,dangerously_allow_filesystem:bool=False,**kwargs:Any,)->ls_schemas.UpsertExamplesResponse:"""Create examples in a dataset. Args: dataset_name (str | None): The name of the dataset to create the examples in. Must specify exactly one of dataset_name or dataset_id. dataset_id (UUID | str | None): The ID of the dataset to create the examples in. Must specify exactly one of dataset_name or dataset_id examples (Sequence[ExampleCreate | dict]): The examples to create. dangerously_allow_filesystem (bool): Whether to allow uploading files from the filesystem. **kwargs (Any): Legacy keyword args. Should not be specified if 'examples' is specified. - inputs (Sequence[Mapping[str, Any]]): The input values for the examples. - outputs (Optional[Sequence[Optional[Mapping[str, Any]]]]): The output values for the examples. - metadata (Optional[Sequence[Optional[Mapping[str, Any]]]]): The metadata for the examples. - splits (Optional[Sequence[Optional[str | List[str]]]]): The splits for the examples, which are divisions of your dataset such as 'train', 'test', or 'validation'. - source_run_ids (Optional[Sequence[Optional[Union[UUID, str]]]]): The IDs of the source runs associated with the examples. - ids (Optional[Sequence[Union[UUID, str]]]): The IDs of the examples. Raises: ValueError: If 'examples' and legacy args are both provided. Returns: The LangSmith JSON response. Includes 'count' and 'example_ids'. .. versionchanged:: 0.3.11 Updated to take argument 'examples', a single list where each element is the full example to create. This should be used instead of the legacy 'inputs', 'outputs', etc. arguments which split each examples attributes across arguments. Updated to support creating examples with attachments. Example: .. code-block:: python from langsmith import Client client = Client() dataset = client.create_dataset("agent-qa") examples = [ { "inputs": {"question": "what's an agent"}, "outputs": {"answer": "an agent is..."}, "metadata": {"difficulty": "easy"}, }, { "inputs": { "question": "can you explain the agent architecture in this diagram?" }, "outputs": {"answer": "this diagram shows..."}, "attachments": {"diagram": {"mime_type": "image/png", "data": b"..."}}, "metadata": {"difficulty": "medium"}, }, # more examples... ] response = client.create_examples(dataset_name="agent-qa", examples=examples) # -> {"example_ids": [... """# noqa: E501ifkwargsandexamples:kwarg_keys=", ".join([f"'{k}'"forkinkwargs])raiseValueError(f"Cannot specify {kwarg_keys} when 'examples' is specified.")supported_kwargs={"inputs","outputs","metadata","splits","ids","source_run_ids",}ifkwargsand(unsupported:=set(kwargs).difference(supported_kwargs)):raiseValueError(f"Received unsupported keyword arguments: {tuple(unsupported)}.")ifnot(dataset_idordataset_name):raiseValueError("Either dataset_id or dataset_name must be provided.")elifnotdataset_id:dataset_id=self.read_dataset(dataset_name=dataset_name).idifexamples:uploads=[ls_schemas.ExampleCreate(**x)ifisinstance(x,dict)elsexforxinexamples]# For backwards compatibilityelse:inputs=kwargs.get("inputs")ifnotinputs:raiseValueError("Must specify either 'examples' or 'inputs.'")# Since inputs are required, we will check against theminput_len=len(inputs)forarg_name,arg_valueinkwargs.items():ifarg_valueisnotNoneandlen(arg_value)!=input_len:raiseValueError(f"Length of {arg_name} ({len(arg_value)}) does not match"f" length of inputs ({input_len})")uploads=[ls_schemas.ExampleCreate(**{"inputs":in_,"outputs":out_,"metadata":metadata_,"split":split_,"id":id_orstr(uuid.uuid4()),"source_run_id":source_run_id_,})forin_,out_,metadata_,split_,id_,source_run_id_inzip(inputs,kwargs.get("outputs")or(Nonefor_inrange(input_len)),kwargs.get("metadata")or(Nonefor_inrange(input_len)),kwargs.get("splits")or(Nonefor_inrange(input_len)),kwargs.get("ids")or(Nonefor_inrange(input_len)),kwargs.get("source_run_ids")or(Nonefor_inrange(input_len)),)]returnself._upload_examples_multipart(dataset_id=cast(uuid.UUID,dataset_id),uploads=uploads,dangerously_allow_filesystem=dangerously_allow_filesystem,)
[docs]@ls_utils.xor_args(("dataset_id","dataset_name"))defcreate_example(self,inputs:Optional[Mapping[str,Any]]=None,dataset_id:Optional[ID_TYPE]=None,dataset_name:Optional[str]=None,created_at:Optional[datetime.datetime]=None,outputs:Optional[Mapping[str,Any]]=None,metadata:Optional[Mapping[str,Any]]=None,split:Optional[str|List[str]]=None,example_id:Optional[ID_TYPE]=None,source_run_id:Optional[ID_TYPE]=None,use_source_run_io:bool=False,use_source_run_attachments:Optional[List[str]]=None,attachments:Optional[ls_schemas.Attachments]=None,)->ls_schemas.Example:"""Create a dataset example in the LangSmith API. Examples are rows in a dataset, containing the inputs and expected outputs (or other reference information) for a model or chain. Args: inputs (Mapping[str, Any]): The input values for the example. dataset_id (Optional[Union[UUID, str]]): The ID of the dataset to create the example in. dataset_name (Optional[str]): The name of the dataset to create the example in. created_at (Optional[datetime.datetime]): The creation timestamp of the example. outputs (Optional[Mapping[str, Any]]): The output values for the example. metadata (Optional[Mapping[str, Any]]): The metadata for the example. split (Optional[str | List[str]]): The splits for the example, which are divisions of your dataset such as 'train', 'test', or 'validation'. example_id (Optional[Union[UUID, str]]): The ID of the example to create. If not provided, a new example will be created. source_run_id (Optional[Union[UUID, str]]): The ID of the source run associated with this example. use_source_run_io (bool): Whether to use the inputs, outputs, and attachments from the source run. use_source_run_attachments (Optional[List[str]]): Which attachments to use from the source run. If use_source_run_io is True, all attachments will be used regardless of this param. attachments (Optional[Attachments]): The attachments for the example. Returns: Example: The created example. """ifinputsisNoneandnotuse_source_run_io:raiseValueError("Must provide either inputs or use_source_run_io")ifdataset_idisNone:dataset_id=self.read_dataset(dataset_name=dataset_name).iddata=ls_schemas.ExampleCreate(**{"inputs":inputs,"outputs":outputs,"metadata":metadata,"split":split,"source_run_id":source_run_id,"use_source_run_io":use_source_run_io,"use_source_run_attachments":use_source_run_attachments,"attachments":attachments,})ifcreated_at:data.created_at=created_atdata.id=((uuid.UUID(example_id)ifisinstance(example_id,str)elseexample_id)ifexample_idelseuuid.uuid4())self._upload_examples_multipart(dataset_id=dataset_id,uploads=[data])returnself.read_example(example_id=data.id)
[docs]defread_example(self,example_id:ID_TYPE,*,as_of:Optional[datetime.datetime]=None)->ls_schemas.Example:"""Read an example from the LangSmith API. Args: example_id (Union[UUID, str]): The ID of the example to read. as_of (Optional[datetime.datetime]): The dataset version tag OR timestamp to retrieve the example as of. Response examples will only be those that were present at the time of the tagged (or timestamped) version. Returns: Example: The example. """response=self.request_with_retries("GET",f"/examples/{_as_uuid(example_id,'example_id')}",params={"as_of":as_of.isoformat()ifas_ofelseNone,},)example=response.json()attachments=_convert_stored_attachments_to_attachments_dict(example,attachments_key="attachment_urls")returnls_schemas.Example(**{k:vfork,vinexample.items()ifk!="attachment_urls"},attachments=attachments,_host_url=self._host_url,_tenant_id=self._get_optional_tenant_id(),)
[docs]deflist_examples(self,dataset_id:Optional[ID_TYPE]=None,dataset_name:Optional[str]=None,example_ids:Optional[Sequence[ID_TYPE]]=None,as_of:Optional[Union[datetime.datetime,str]]=None,splits:Optional[Sequence[str]]=None,inline_s3_urls:bool=True,*,offset:int=0,limit:Optional[int]=None,metadata:Optional[dict]=None,filter:Optional[str]=None,include_attachments:bool=False,**kwargs:Any,)->Iterator[ls_schemas.Example]:r"""Retrieve the example rows of the specified dataset. Args: dataset_id (Optional[Union[UUID, str]]): The ID of the dataset to filter by. Defaults to None. dataset_name (Optional[str]): The name of the dataset to filter by. Defaults to None. example_ids (Optional[Sequence[Union[UUID, str]]): The IDs of the examples to filter by. Defaults to None. as_of (Optional[Union[datetime.datetime, str]]): The dataset version tag OR timestamp to retrieve the examples as of. Response examples will only be those that were present at the time of the tagged (or timestamped) version. splits (Optional[Sequence[str]]): A list of dataset splits, which are divisions of your dataset such as 'train', 'test', or 'validation'. Returns examples only from the specified splits. inline_s3_urls (bool, default=True): Whether to inline S3 URLs. Defaults to True. offset (int, default=0): The offset to start from. Defaults to 0. limit (Optional[int]): The maximum number of examples to return. metadata (Optional[dict]): A dictionary of metadata to filter by. filter (Optional[str]): A structured filter string to apply to the examples. include_attachments (bool, default=False): Whether to include the attachments in the response. Defaults to False. **kwargs (Any): Additional keyword arguments are ignored. Yields: The examples. Examples: List all examples for a dataset: .. code-block:: python from langsmith import Client client = Client() # By Dataset ID examples = client.list_examples( dataset_id="c9ace0d8-a82c-4b6c-13d2-83401d68e9ab" ) # By Dataset Name examples = client.list_examples(dataset_name="My Test Dataset") List examples by id .. code-block:: python example_ids = [ "734fc6a0-c187-4266-9721-90b7a025751a", "d6b4c1b9-6160-4d63-9b61-b034c585074f", "4d31df4e-f9c3-4a6e-8b6c-65701c2fed13", ] examples = client.list_examples(example_ids=example_ids) List examples by metadata .. code-block:: python examples = client.list_examples( dataset_name=dataset_name, metadata={"foo": "bar"} ) List examples by structured filter .. code-block:: python examples = client.list_examples( dataset_name=dataset_name, filter='and(not(has(metadata, \'{"foo": "bar"}\')), exists(metadata, "tenant_id"))', ) """params:Dict[str,Any]={**kwargs,"offset":offset,"id":example_ids,"as_of":(as_of.isoformat()ifisinstance(as_of,datetime.datetime)elseas_of),"splits":splits,"inline_s3_urls":inline_s3_urls,"limit":min(limit,100)iflimitisnotNoneelse100,"filter":filter,}ifmetadataisnotNone:params["metadata"]=_dumps_json(metadata)ifdataset_idisnotNone:params["dataset"]=dataset_idelifdataset_nameisnotNone:dataset_id=self.read_dataset(dataset_name=dataset_name).idparams["dataset"]=dataset_idelse:passifinclude_attachments:params["select"]=["attachment_urls","outputs","metadata"]fori,exampleinenumerate(self._get_paginated_list("/examples",params=params)):attachments=_convert_stored_attachments_to_attachments_dict(example,attachments_key="attachment_urls")yieldls_schemas.Example(**{k:vfork,vinexample.items()ifk!="attachment_urls"},attachments=attachments,_host_url=self._host_url,_tenant_id=self._get_optional_tenant_id(),)iflimitisnotNoneandi+1>=limit:break
[docs]@warn_betadefindex_dataset(self,*,dataset_id:ID_TYPE,tag:str="latest",**kwargs:Any,)->None:"""Enable dataset indexing. Examples are indexed by their inputs. This enables searching for similar examples by inputs with ``client.similar_examples()``. Args: dataset_id (Union[UUID, str]): The ID of the dataset to index. tag (Optional[str]): The version of the dataset to index. If 'latest' then any updates to the dataset (additions, updates, deletions of examples) will be reflected in the index. **kwargs (Any): Additional keyword arguments to pass as part of request body. Returns: None """# noqa: E501dataset_id=_as_uuid(dataset_id,"dataset_id")resp=self.request_with_retries("POST",f"/datasets/{dataset_id}/index",headers=self._headers,data=json.dumps({"tag":tag,**kwargs}),)ls_utils.raise_for_status_with_text(resp)
# NOTE: dataset_name arg explicitly not supported to avoid extra API calls.
[docs]@warn_betadefsimilar_examples(self,inputs:dict,/,*,limit:int,dataset_id:ID_TYPE,filter:Optional[str]=None,**kwargs:Any,)->List[ls_schemas.ExampleSearch]:r"""Retrieve the dataset examples whose inputs best match the current inputs. **Note**: Must have few-shot indexing enabled for the dataset. See `client.index_dataset()`. Args: inputs (dict): The inputs to use as a search query. Must match the dataset input schema. Must be JSON serializable. limit (int): The maximum number of examples to return. dataset_id (Union[UUID, str]): The ID of the dataset to search over. filter (Optional[str]): A filter string to apply to the search results. Uses the same syntax as the `filter` parameter in `list_runs()`. Only a subset of operations are supported. Defaults to None. For example, you can use ``and(eq(metadata.some_tag, 'some_value'), neq(metadata.env, 'dev'))`` to filter only examples where some_tag has some_value, and the environment is not dev. **kwargs: Additional keyword arguments to pass as part of request body. Returns: list[ExampleSearch]: List of ExampleSearch objects. Examples: .. code-block:: python from langsmith import Client client = Client() client.similar_examples( {"question": "When would i use the runnable generator"}, limit=3, dataset_id="...", ) .. code-block:: python [ ExampleSearch( inputs={ "question": "How do I cache a Chat model? What caches can I use?" }, outputs={ "answer": "You can use LangChain's caching layer for Chat Models. This can save you money by reducing the number of API calls you make to the LLM provider, if you're often requesting the same completion multiple times, and speed up your application.\n\nfrom langchain.cache import InMemoryCache\nlangchain.llm_cache = InMemoryCache()\n\n# The first time, it is not yet in cache, so it should take longer\nllm.predict('Tell me a joke')\n\nYou can also use SQLite Cache which uses a SQLite database:\n\nrm .langchain.db\n\nfrom langchain.cache import SQLiteCache\nlangchain.llm_cache = SQLiteCache(database_path=\".langchain.db\")\n\n# The first time, it is not yet in cache, so it should take longer\nllm.predict('Tell me a joke') \n" }, metadata=None, id=UUID("b2ddd1c4-dff6-49ae-8544-f48e39053398"), dataset_id=UUID("01b6ce0f-bfb6-4f48-bbb8-f19272135d40"), ), ExampleSearch( inputs={"question": "What's a runnable lambda?"}, outputs={ "answer": "A runnable lambda is an object that implements LangChain's `Runnable` interface and runs a callbale (i.e., a function). Note the function must accept a single argument." }, metadata=None, id=UUID("f94104a7-2434-4ba7-8293-6a283f4860b4"), dataset_id=UUID("01b6ce0f-bfb6-4f48-bbb8-f19272135d40"), ), ExampleSearch( inputs={"question": "Show me how to use RecursiveURLLoader"}, outputs={ "answer": 'The RecursiveURLLoader comes from the langchain.document_loaders.recursive_url_loader module. Here\'s an example of how to use it:\n\nfrom langchain.document_loaders.recursive_url_loader import RecursiveUrlLoader\n\n# Create an instance of RecursiveUrlLoader with the URL you want to load\nloader = RecursiveUrlLoader(url="https://example.com")\n\n# Load all child links from the URL page\nchild_links = loader.load()\n\n# Print the child links\nfor link in child_links:\n print(link)\n\nMake sure to replace "https://example.com" with the actual URL you want to load. The load() method returns a list of child links found on the URL page. You can iterate over this list to access each child link.' }, metadata=None, id=UUID("0308ea70-a803-4181-a37d-39e95f138f8c"), dataset_id=UUID("01b6ce0f-bfb6-4f48-bbb8-f19272135d40"), ), ] """dataset_id=_as_uuid(dataset_id,"dataset_id")req={"inputs":inputs,"limit":limit,**kwargs,}iffilterisnotNone:req["filter"]=filterresp=self.request_with_retries("POST",f"/datasets/{dataset_id}/search",headers=self._headers,data=json.dumps(req),)ls_utils.raise_for_status_with_text(resp)examples=[]forexinresp.json()["examples"]:examples.append(ls_schemas.ExampleSearch(**ex,dataset_id=dataset_id))returnexamples
[docs]defupdate_example(self,example_id:ID_TYPE,*,inputs:Optional[Dict[str,Any]]=None,outputs:Optional[Mapping[str,Any]]=None,metadata:Optional[Dict]=None,split:Optional[str|List[str]]=None,dataset_id:Optional[ID_TYPE]=None,attachments_operations:Optional[ls_schemas.AttachmentsOperations]=None,attachments:Optional[ls_schemas.Attachments]=None,)->Dict[str,Any]:"""Update a specific example. Args: example_id (Union[UUID, str]): The ID of the example to update. inputs (Optional[Dict[str, Any]]): The input values to update. outputs (Optional[Mapping[str, Any]]): The output values to update. metadata (Optional[Dict]): The metadata to update. split (Optional[str | List[str]]): The dataset split to update, such as 'train', 'test', or 'validation'. dataset_id (Optional[Union[UUID, str]]): The ID of the dataset to update. attachments_operations (Optional[AttachmentsOperations]): The attachments operations to perform. attachments (Optional[Attachments]): The attachments to add to the example. Returns: Dict[str, Any]: The updated example. """ifattachments_operationsisnotNone:ifnot(self.info.instance_flagsor{}).get("dataset_examples_multipart_enabled",False):raiseValueError("Your LangSmith version does not allow using the attachment operations, please update to the latest version.")example_dict=dict(inputs=inputs,outputs=outputs,id=example_id,metadata=metadata,split=split,attachments_operations=attachments_operations,attachments=attachments,)example=ls_schemas.ExampleUpdate(**{k:vfork,vinexample_dict.items()ifvisnotNone})ifdataset_idisnotNone:returndict(self._update_examples_multipart(dataset_id=dataset_id,updates=[example]))else:dataset_id=self.read_example(example_id).dataset_idreturndict(self._update_examples_multipart(dataset_id=dataset_id,updates=[example]))
[docs]defupdate_examples(self,*,dataset_name:str|None=None,dataset_id:ID_TYPE|None=None,updates:Optional[Sequence[ls_schemas.ExampleUpdate|dict]]=None,dangerously_allow_filesystem:bool=False,**kwargs:Any,)->Dict[str,Any]:"""Update multiple examples. Examples are expected to all be part of the same dataset. Args: dataset_name (str | None): The name of the dataset to update. Should specify exactly one of 'dataset_name' or 'dataset_id'. dataset_id (UUID | str | None): The ID of the dataset to update. Should specify exactly one of 'dataset_name' or 'dataset_id'. updates (Sequence[ExampleUpdate | dict] | None): The example updates. Overwrites any specified fields and does not update any unspecified fields. dangerously_allow_filesystem (bool): Whether to allow using filesystem paths as attachments. **kwargs (Any): Legacy keyword args. Should not be specified if 'updates' is specified. - example_ids (Sequence[UUID | str]): The IDs of the examples to update. - inputs (Sequence[dict | None] | None): The input values for the examples. - outputs (Sequence[dict | None] | None): The output values for the examples. - metadata (Sequence[dict | None] | None): The metadata for the examples. - splits (Sequence[str | list[str] | None] | None): The splits for the examples, which are divisions of your dataset such as 'train', 'test', or 'validation'. - attachments_operations (Sequence[AttachmentsOperations | None] | None): The operations to perform on the attachments. - dataset_ids (Sequence[UUID | str] | None): The IDs of the datasets to move the examples to. Returns: The LangSmith JSON response. Includes 'message', 'count', and 'example_ids'. .. versionchanged:: 0.3.9 Updated to ... Example: .. code-block:: python from langsmith import Client client = Client() dataset = client.create_dataset("agent-qa") examples = [ { "inputs": {"question": "what's an agent"}, "outputs": {"answer": "an agent is..."}, "metadata": {"difficulty": "easy"}, }, { "inputs": { "question": "can you explain the agent architecture in this diagram?" }, "outputs": {"answer": "this diagram shows..."}, "attachments": {"diagram": {"mime_type": "image/png", "data": b"..."}}, "metadata": {"difficulty": "medium"}, }, # more examples... ] response = client.create_examples(dataset_name="agent-qa", examples=examples) example_ids = response["example_ids"] updates = [ { "id": example_ids[0], "inputs": {"question": "what isn't an agent"}, "outputs": {"answer": "an agent is not..."}, }, { "id": example_ids[1], "attachments_operations": [ {"rename": {"diagram": "agent_diagram"}, "retain": []} ], }, ] response = client.update_examples(dataset_name="agent-qa", updates=updates) # -> {"example_ids": [... """# noqa: E501ifkwargsandupdates:raiseValueError(f"Must pass in either 'updates' or args {tuple(kwargs)}, not both.")ifnot(kwargsorupdates):raiseValueError("Please pass in a non-empty sequence for arg 'updates'.")ifdataset_nameanddataset_id:raiseValueError("Must pass in exactly one of 'dataset_name' or 'dataset_id'.")elifdataset_name:dataset_id=self.read_dataset(dataset_name=dataset_name).idifupdates:updates_obj=[ls_schemas.ExampleUpdate(**x)ifisinstance(x,dict)elsexforxinupdates]ifnotdataset_id:ifupdates_obj[0].dataset_id:dataset_id=updates_obj[0].dataset_idelse:raiseValueError("Must pass in (exactly) one of 'dataset_name' or 'dataset_id'.")# For backwards compatibilityelse:example_ids=kwargs.get("example_ids",None)ifnotexample_ids:raiseValueError("Must pass in (exactly) one of 'updates' or 'example_ids'.")ifnotdataset_id:if"dataset_ids"notinkwargs:# Assume all examples belong to same datasetdataset_id=self.read_example(example_ids[0]).dataset_ideliflen(set(kwargs["dataset_ids"]))>1:raiseValueError("Dataset IDs must be the same for all examples")elifnotkwargs["dataset_ids"][0]:raiseValueError("If specified, dataset_ids must be non-null.")else:dataset_id=kwargs["dataset_ids"][0]multipart_enabled=(self.info.instance_flagsor{}).get("dataset_examples_multipart_enabled")if(notmultipart_enabledand(kwargs.get("attachments_operations")orkwargs.get("attachments"))isnotNone):raiseValueError("Your LangSmith version does not allow using the attachment ""operations, please update to the latest version.")# Since ids are required, we will check against themexamples_len=len(example_ids)forarg_name,arg_valueinkwargs.items():ifarg_valueisnotNoneandlen(arg_value)!=examples_len:raiseValueError(f"Length of {arg_name} ({len(arg_value)}) does not match"f" length of examples ({examples_len})")updates_obj=[ls_schemas.ExampleUpdate(**{"id":id_,"inputs":in_,"outputs":out_,"dataset_id":dataset_id_,"metadata":metadata_,"split":split_,"attachments":attachments_,"attachments_operations":attachments_operations_,})forid_,in_,out_,metadata_,split_,dataset_id_,attachments_,attachments_operations_inzip(example_ids,kwargs.get("inputs",(Nonefor_inrange(examples_len))),kwargs.get("outputs",(Nonefor_inrange(examples_len))),kwargs.get("metadata",(Nonefor_inrange(examples_len))),kwargs.get("splits",(Nonefor_inrange(examples_len))),kwargs.get("dataset_ids",(Nonefor_inrange(examples_len))),kwargs.get("attachments",(Nonefor_inrange(examples_len))),kwargs.get("attachments_operations",(Nonefor_inrange(examples_len))),)]response=self._update_examples_multipart(dataset_id=cast(uuid.UUID,dataset_id),updates=updates_obj,dangerously_allow_filesystem=dangerously_allow_filesystem,)return{"message":f"{response.get('count',0)} examples updated",**response}
[docs]defdelete_example(self,example_id:ID_TYPE)->None:"""Delete an example by ID. Args: example_id (Union[UUID, str]): The ID of the example to delete. Returns: None """response=self.request_with_retries("DELETE",f"/examples/{_as_uuid(example_id,'example_id')}",headers=self._headers,)ls_utils.raise_for_status_with_text(response)
[docs]defdelete_examples(self,example_ids:Sequence[ID_TYPE])->None:"""Delete multiple examples by ID. Parameters ---------- example_ids : Sequence[ID_TYPE] The IDs of the examples to delete. """response=self.request_with_retries("DELETE","/examples",headers={**self._headers,"Content-Type":"application/json"},params={"example_ids":[str(_as_uuid(id_,f"example_ids[{i}]"))fori,id_inenumerate(example_ids)]},)ls_utils.raise_for_status_with_text(response)
[docs]deflist_dataset_splits(self,*,dataset_id:Optional[ID_TYPE]=None,dataset_name:Optional[str]=None,as_of:Optional[Union[str,datetime.datetime]]=None,)->List[str]:"""Get the splits for a dataset. Args: dataset_id (Optional[Union[UUID, str]]): The ID of the dataset. dataset_name (Optional[str]): The name of the dataset. as_of (Optional[Union[str, datetime.datetime]]): The version of the dataset to retrieve splits for. Can be a timestamp or a string tag. Defaults to "latest". Returns: List[str]: The names of this dataset's splits. """ifdataset_idisNone:ifdataset_nameisNone:raiseValueError("Must provide dataset name or ID")dataset_id=self.read_dataset(dataset_name=dataset_name).idparams={}ifas_ofisnotNone:params["as_of"]=(as_of.isoformat()ifisinstance(as_of,datetime.datetime)elseas_of)response=self.request_with_retries("GET",f"/datasets/{_as_uuid(dataset_id,'dataset_id')}/splits",params=params,)ls_utils.raise_for_status_with_text(response)returnresponse.json()
[docs]defupdate_dataset_splits(self,*,dataset_id:Optional[ID_TYPE]=None,dataset_name:Optional[str]=None,split_name:str,example_ids:List[ID_TYPE],remove:bool=False,)->None:"""Update the splits for a dataset. Args: dataset_id (Optional[Union[UUID, str]]): The ID of the dataset to update. dataset_name (Optional[str]): The name of the dataset to update. split_name (str): The name of the split to update. example_ids (List[Union[UUID, str]]): The IDs of the examples to add to or remove from the split. remove (Optional[bool]): If True, remove the examples from the split. If False, add the examples to the split. Defaults to False. Returns: None """ifdataset_idisNone:ifdataset_nameisNone:raiseValueError("Must provide dataset name or ID")dataset_id=self.read_dataset(dataset_name=dataset_name).iddata={"split_name":split_name,"examples":[str(_as_uuid(id_,f"example_ids[{i}]"))fori,id_inenumerate(example_ids)],"remove":remove,}response=self.request_with_retries("PUT",f"/datasets/{_as_uuid(dataset_id,'dataset_id')}/splits",json=data)ls_utils.raise_for_status_with_text(response)
def_resolve_run_id(self,run:Union[ls_schemas.Run,ls_schemas.RunBase,str,uuid.UUID],load_child_runs:bool,)->ls_schemas.Run:"""Resolve the run ID. Args: run (Union[Run, RunBase, str, UUID]): The run to resolve. load_child_runs (bool): Whether to load child runs. Returns: Run: The resolved run. Raises: TypeError: If the run type is invalid. """ifisinstance(run,(str,uuid.UUID)):run_=self.read_run(run,load_child_runs=load_child_runs)else:run_=cast(ls_schemas.Run,run)returnrun_def_resolve_example_id(self,example:Union[ls_schemas.Example,str,uuid.UUID,dict,None],run:ls_schemas.Run,)->Optional[ls_schemas.Example]:"""Resolve the example ID. Args: example (Optional[Union[Example, str, UUID, dict]]): The example to resolve. run (Run): The run associated with the example. Returns: Optional[Example]: The resolved example. """ifisinstance(example,(str,uuid.UUID)):reference_example_=self.read_example(example)elifisinstance(example,ls_schemas.Example):reference_example_=exampleelifisinstance(example,dict):reference_example_=ls_schemas.Example(**example,_host_url=self._host_url,_tenant_id=self._get_optional_tenant_id(),)elifrun.reference_example_idisnotNone:reference_example_=self.read_example(run.reference_example_id)else:reference_example_=Nonereturnreference_example_def_select_eval_results(self,results:Union[ls_evaluator.EvaluationResult,ls_evaluator.EvaluationResults,dict],*,fn_name:Optional[str]=None,)->List[ls_evaluator.EvaluationResult]:fromlangsmith.evaluationimportevaluatorasls_evaluator# noqa: F811def_cast_result(single_result:Union[ls_evaluator.EvaluationResult,dict],)->ls_evaluator.EvaluationResult:ifisinstance(single_result,dict):returnls_evaluator.EvaluationResult(**{"key":fn_name,"comment":single_result.get("reasoning"),**single_result,})returnsingle_resultdef_is_eval_results(results:Any)->TypeGuard[ls_evaluator.EvaluationResults]:returnisinstance(results,dict)and"results"inresultsifisinstance(results,ls_evaluator.EvaluationResult):results_=[results]elif_is_eval_results(results):results_=[_cast_result(r)forrinresults["results"]]elifisinstance(results,dict):results_=[_cast_result(cast(dict,results))]else:raiseValueError(f"Invalid evaluation results type: {type(results)}."" Must be EvaluationResult, EvaluationResults.")returnresults_
[docs]defevaluate_run(self,run:Union[ls_schemas.Run,ls_schemas.RunBase,str,uuid.UUID],evaluator:ls_evaluator.RunEvaluator,*,source_info:Optional[Dict[str,Any]]=None,reference_example:Optional[Union[ls_schemas.Example,str,dict,uuid.UUID]]=None,load_child_runs:bool=False,)->ls_evaluator.EvaluationResult:"""Evaluate a run. Args: run (Union[Run, RunBase, str, UUID]): The run to evaluate. evaluator (RunEvaluator): The evaluator to use. source_info (Optional[Dict[str, Any]]): Additional information about the source of the evaluation to log as feedback metadata. reference_example (Optional[Union[Example, str, dict, UUID]]): The example to use as a reference for the evaluation. If not provided, the run's reference example will be used. load_child_runs (bool, default=False): Whether to load child runs when resolving the run ID. Returns: Feedback: The feedback object created by the evaluation. """run_=self._resolve_run_id(run,load_child_runs=load_child_runs)reference_example_=self._resolve_example_id(reference_example,run_)evaluator_response=evaluator.evaluate_run(run_,example=reference_example_,)results=self._log_evaluation_feedback(evaluator_response,run_,source_info=source_info,)# TODO: Return all resultsreturnresults[0]
[docs]asyncdefaevaluate_run(self,run:Union[ls_schemas.Run,str,uuid.UUID],evaluator:ls_evaluator.RunEvaluator,*,source_info:Optional[Dict[str,Any]]=None,reference_example:Optional[Union[ls_schemas.Example,str,dict,uuid.UUID]]=None,load_child_runs:bool=False,)->ls_evaluator.EvaluationResult:"""Evaluate a run asynchronously. Args: run (Union[Run, str, UUID]): The run to evaluate. evaluator (RunEvaluator): The evaluator to use. source_info (Optional[Dict[str, Any]]): Additional information about the source of the evaluation to log as feedback metadata. reference_example (Optional[Union[Example, str, dict, UUID]]): The example to use as a reference for the evaluation. If not provided, the run's reference example will be used. load_child_runs (bool, default=False) Whether to load child runs when resolving the run ID. Returns: EvaluationResult: The evaluation result object created by the evaluation. """run_=self._resolve_run_id(run,load_child_runs=load_child_runs)reference_example_=self._resolve_example_id(reference_example,run_)evaluator_response=awaitevaluator.aevaluate_run(run_,example=reference_example_,)# TODO: Return all results and use async APIresults=self._log_evaluation_feedback(evaluator_response,run_,source_info=source_info,)returnresults[0]
[docs]defcreate_feedback(self,run_id:Optional[ID_TYPE],key:str,*,score:Union[float,int,bool,None]=None,value:Union[str,dict,None]=None,correction:Union[dict,None]=None,comment:Union[str,None]=None,source_info:Optional[Dict[str,Any]]=None,feedback_source_type:Union[ls_schemas.FeedbackSourceType,str]=ls_schemas.FeedbackSourceType.API,source_run_id:Optional[ID_TYPE]=None,feedback_id:Optional[ID_TYPE]=None,feedback_config:Optional[ls_schemas.FeedbackConfig]=None,stop_after_attempt:int=10,project_id:Optional[ID_TYPE]=None,comparative_experiment_id:Optional[ID_TYPE]=None,feedback_group_id:Optional[ID_TYPE]=None,extra:Optional[Dict]=None,trace_id:Optional[ID_TYPE]=None,error:Optional[bool]=None,**kwargs:Any,)->ls_schemas.Feedback:"""Create a feedback in the LangSmith API. Args: run_id (Optional[Union[UUID, str]]): The ID of the run to provide feedback for. Either the run_id OR the project_id must be provided. key (str): The name of the metric or 'aspect' this feedback is about. score (Optional[Union[float, int, bool]]): The score to rate this run on the metric or aspect. value (Optional[Union[float, int, bool, str, dict]]): The display value or non-numeric value for this feedback. correction (Optional[dict]): The proper ground truth for this run. comment (Optional[str]): A comment about this feedback, such as a justification for the score or chain-of-thought trajectory for an LLM judge. source_info (Optional[Dict[str, Any]]): Information about the source of this feedback. feedback_source_type (Union[FeedbackSourceType, str]): The type of feedback source, such as model (for model-generated feedback) or API. source_run_id (Optional[Union[UUID, str]]): The ID of the run that generated this feedback, if a "model" type. feedback_id (Optional[Union[UUID, str]]): The ID of the feedback to create. If not provided, a random UUID will be generated. feedback_config (Optional[FeedbackConfig]): The configuration specifying how to interpret feedback with this key. Examples include continuous (with min/max bounds), categorical, or freeform. stop_after_attempt (int, default=10): The number of times to retry the request before giving up. project_id (Optional[Union[UUID, str]]): The ID of the project_id to provide feedback on. One - and only one - of this and run_id must be provided. comparative_experiment_id (Optional[Union[UUID, str]]): If this feedback was logged as a part of a comparative experiment, this associates the feedback with that experiment. feedback_group_id (Optional[Union[UUID, str]]): When logging preferences, ranking runs, or other comparative feedback, this is used to group feedback together. extra (Optional[Dict]): Metadata for the feedback. trace_id (Optional[Union[UUID, str]]): The trace ID of the run to provide feedback for. Enables batch ingestion. **kwargs (Any): Additional keyword arguments. Returns: Feedback: The created feedback object. """ifrun_idisNoneandproject_idisNone:raiseValueError("One of run_id and project_id must be provided")ifrun_idisnotNoneandproject_idisnotNone:raiseValueError("Only one of run_id and project_id must be provided")ifkwargs:warnings.warn("The following arguments are no longer used in the create_feedback"f" endpoint: {sorted(kwargs)}",DeprecationWarning,)try:ifnotisinstance(feedback_source_type,ls_schemas.FeedbackSourceType):feedback_source_type=ls_schemas.FeedbackSourceType(feedback_source_type)iffeedback_source_type==ls_schemas.FeedbackSourceType.API:feedback_source:ls_schemas.FeedbackSourceBase=(ls_schemas.APIFeedbackSource(metadata=source_info))eliffeedback_source_type==ls_schemas.FeedbackSourceType.MODEL:feedback_source=ls_schemas.ModelFeedbackSource(metadata=source_info)else:raiseValueError(f"Unknown feedback source type {feedback_source_type}")feedback_source.metadata=(feedback_source.metadataiffeedback_source.metadataisnotNoneelse{})ifsource_run_idisnotNoneand"__run"notinfeedback_source.metadata:feedback_source.metadata["__run"]={"run_id":str(source_run_id)}iffeedback_source.metadataand"__run"infeedback_source.metadata:# Validate that the linked run ID is a valid UUID# Run info may be a base model or dict._run_meta:Union[dict,Any]=feedback_source.metadata["__run"]ifhasattr(_run_meta,"dict")andcallable(_run_meta):_run_meta=_run_meta.dict()if"run_id"in_run_meta:_run_meta["run_id"]=str(_as_uuid(feedback_source.metadata["__run"]["run_id"],"feedback_source.metadata['__run']['run_id']",))feedback_source.metadata["__run"]=_run_metafeedback=ls_schemas.FeedbackCreate(id=_ensure_uuid(feedback_id),# If run_id is None, this is interpreted as session-level# feedback.run_id=_ensure_uuid(run_id,accept_null=True),trace_id=_ensure_uuid(trace_id,accept_null=True),key=key,score=_format_feedback_score(score),value=value,correction=correction,comment=comment,feedback_source=feedback_source,created_at=datetime.datetime.now(datetime.timezone.utc),modified_at=datetime.datetime.now(datetime.timezone.utc),feedback_config=feedback_config,session_id=_ensure_uuid(project_id,accept_null=True),comparative_experiment_id=_ensure_uuid(comparative_experiment_id,accept_null=True),feedback_group_id=_ensure_uuid(feedback_group_id,accept_null=True),extra=extra,error=error,)use_multipart=(self.info.batch_ingest_configor{}).get("use_multipart_endpoint",False)if(use_multipartandself.info.version# TODO: Remove version check once versions have updatedandls_utils.is_version_greater_or_equal(self.info.version,"0.8.10")and(self.tracing_queueisnotNoneorself.compressed_tracesisnotNone)andfeedback.trace_idisnotNoneandself.otel_exporterisNone):serialized_op=serialize_feedback_dict(feedback)ifself.compressed_tracesisnotNone:multipart_form=(serialized_feedback_operation_to_multipart_parts_and_context(serialized_op))withself.compressed_traces.lock:compress_multipart_parts_and_context(multipart_form,self.compressed_traces,_BOUNDARY,)self.compressed_traces.trace_count+=1ifself._data_available_event:self._data_available_event.set()elifself.tracing_queueisnotNone:self.tracing_queue.put(TracingQueueItem(str(feedback.id),serialized_op))else:feedback_block=_dumps_json(feedback.dict(exclude_none=True))self.request_with_retries("POST","/feedback",request_kwargs={"data":feedback_block,},stop_after_attempt=stop_after_attempt,retry_on=(ls_utils.LangSmithNotFoundError,),)returnls_schemas.Feedback(**feedback.dict())exceptExceptionase:logger.error("Error creating feedback",exc_info=True)raisee
[docs]defupdate_feedback(self,feedback_id:ID_TYPE,*,score:Union[float,int,bool,None]=None,value:Union[float,int,bool,str,dict,None]=None,correction:Union[dict,None]=None,comment:Union[str,None]=None,)->None:"""Update a feedback in the LangSmith API. Args: feedback_id (Union[UUID, str]): The ID of the feedback to update. score (Optional[Union[float, int, bool]]): The score to update the feedback with. value (Optional[Union[float, int, bool, str, dict]]): The value to update the feedback with. correction (Optional[dict]): The correction to update the feedback with. comment (Optional[str]): The comment to update the feedback with. Returns: None """feedback_update:Dict[str,Any]={}ifscoreisnotNone:feedback_update["score"]=_format_feedback_score(score)ifvalueisnotNone:feedback_update["value"]=valueifcorrectionisnotNone:feedback_update["correction"]=correctionifcommentisnotNone:feedback_update["comment"]=commentresponse=self.request_with_retries("PATCH",f"/feedback/{_as_uuid(feedback_id,'feedback_id')}",headers={**self._headers,"Content-Type":"application/json"},data=_dumps_json(feedback_update),)ls_utils.raise_for_status_with_text(response)
[docs]defread_feedback(self,feedback_id:ID_TYPE)->ls_schemas.Feedback:"""Read a feedback from the LangSmith API. Args: feedback_id (Union[UUID, str]): The ID of the feedback to read. Returns: Feedback: The feedback. """response=self.request_with_retries("GET",f"/feedback/{_as_uuid(feedback_id,'feedback_id')}",)returnls_schemas.Feedback(**response.json())
[docs]deflist_feedback(self,*,run_ids:Optional[Sequence[ID_TYPE]]=None,feedback_key:Optional[Sequence[str]]=None,feedback_source_type:Optional[Sequence[ls_schemas.FeedbackSourceType]]=None,limit:Optional[int]=None,**kwargs:Any,)->Iterator[ls_schemas.Feedback]:"""List the feedback objects on the LangSmith API. Args: run_ids (Optional[Sequence[Union[UUID, str]]]): The IDs of the runs to filter by. feedback_key (Optional[Sequence[str]]): The feedback key(s) to filter by. Examples: 'correctness' The query performs a union of all feedback keys. feedback_source_type (Optional[Sequence[FeedbackSourceType]]): The type of feedback source, such as model or API. limit (Optional[int]): The maximum number of feedback to return. **kwargs (Any): Additional keyword arguments. Yields: The feedback objects. """params:dict={"run":run_ids,"limit":min(limit,100)iflimitisnotNoneelse100,**kwargs,}iffeedback_keyisnotNone:params["key"]=feedback_keyiffeedback_source_typeisnotNone:params["source"]=feedback_source_typefori,feedbackinenumerate(self._get_paginated_list("/feedback",params=params)):yieldls_schemas.Feedback(**feedback)iflimitisnotNoneandi+1>=limit:break
[docs]defdelete_feedback(self,feedback_id:ID_TYPE)->None:"""Delete a feedback by ID. Args: feedback_id (Union[UUID, str]): The ID of the feedback to delete. Returns: None """response=self.request_with_retries("DELETE",f"/feedback/{_as_uuid(feedback_id,'feedback_id')}",headers=self._headers,)ls_utils.raise_for_status_with_text(response)
[docs]defcreate_feedback_from_token(self,token_or_url:Union[str,uuid.UUID],score:Union[float,int,bool,None]=None,*,value:Union[float,int,bool,str,dict,None]=None,correction:Union[dict,None]=None,comment:Union[str,None]=None,metadata:Optional[dict]=None,)->None:"""Create feedback from a presigned token or URL. Args: token_or_url (Union[str, uuid.UUID]): The token or URL from which to create feedback. score (Optional[Union[float, int, bool]]): The score of the feedback. Defaults to None. value (Optional[Union[float, int, bool, str, dict]]): The value of the feedback. Defaults to None. correction (Optional[dict]): The correction of the feedback. Defaults to None. comment (Optional[str]): The comment of the feedback. Defaults to None. metadata (Optional[dict]): Additional metadata for the feedback. Defaults to None. Raises: ValueError: If the source API URL is invalid. Returns: None """source_api_url,token_uuid=_parse_token_or_url(token_or_url,self.api_url,num_parts=1)ifsource_api_url!=self.api_url:raiseValueError(f"Invalid source API URL. {source_api_url}")response=self.request_with_retries("POST",f"/feedback/tokens/{_as_uuid(token_uuid)}",data=_dumps_json({"score":score,"value":value,"correction":correction,"comment":comment,"metadata":metadata,# TODO: Add ID once the API supports it.}),headers=self._headers,)ls_utils.raise_for_status_with_text(response)
[docs]defcreate_presigned_feedback_token(self,run_id:ID_TYPE,feedback_key:str,*,expiration:Optional[datetime.datetime|datetime.timedelta]=None,feedback_config:Optional[ls_schemas.FeedbackConfig]=None,feedback_id:Optional[ID_TYPE]=None,)->ls_schemas.FeedbackIngestToken:"""Create a pre-signed URL to send feedback data to. This is useful for giving browser-based clients a way to upload feedback data directly to LangSmith without accessing the API key. Args: run_id (Union[UUID, str]): The ID of the run. feedback_key (str): The key of the feedback to create. expiration (Optional[datetime.datetime | datetime.timedelta]): The expiration time of the pre-signed URL. Either a datetime or a timedelta offset from now. Default to 3 hours. feedback_config (Optional[FeedbackConfig]): If creating a feedback_key for the first time, this defines how the metric should be interpreted, such as a continuous score (w/ optional bounds), or distribution over categorical values. feedback_id (Optional[Union[UUID, str]): The ID of the feedback to create. If not provided, a new feedback will be created. Returns: FeedbackIngestToken: The pre-signed URL for uploading feedback data. """body:Dict[str,Any]={"run_id":run_id,"feedback_key":feedback_key,"feedback_config":feedback_config,"id":feedback_idorstr(uuid.uuid4()),}ifexpirationisNone:body["expires_in"]=ls_schemas.TimeDeltaInput(days=0,hours=3,minutes=0,)elifisinstance(expiration,datetime.datetime):body["expires_at"]=expiration.isoformat()elifisinstance(expiration,datetime.timedelta):body["expires_in"]=ls_schemas.TimeDeltaInput(days=expiration.days,hours=expiration.seconds//3600,minutes=(expiration.seconds//60)%60,)else:raiseValueError(f"Unknown expiration type: {type(expiration)}")response=self.request_with_retries("POST","/feedback/tokens",data=_dumps_json(body),)ls_utils.raise_for_status_with_text(response)returnls_schemas.FeedbackIngestToken(**response.json())
[docs]defcreate_presigned_feedback_tokens(self,run_id:ID_TYPE,feedback_keys:Sequence[str],*,expiration:Optional[datetime.datetime|datetime.timedelta]=None,feedback_configs:Optional[Sequence[Optional[ls_schemas.FeedbackConfig]]]=None,)->Sequence[ls_schemas.FeedbackIngestToken]:"""Create a pre-signed URL to send feedback data to. This is useful for giving browser-based clients a way to upload feedback data directly to LangSmith without accessing the API key. Args: run_id (Union[UUID, str]): The ID of the run. feedback_keys (Sequence[str]): The key of the feedback to create. expiration (Optional[datetime.datetime | datetime.timedelta]): The expiration time of the pre-signed URL. Either a datetime or a timedelta offset from now. Default to 3 hours. feedback_configs (Optional[Sequence[Optional[FeedbackConfig]]]): If creating a feedback_key for the first time, this defines how the metric should be interpreted, such as a continuous score (w/ optional bounds), or distribution over categorical values. Returns: Sequence[FeedbackIngestToken]: The pre-signed URL for uploading feedback data. """# validateiffeedback_configsisnotNoneandlen(feedback_keys)!=len(feedback_configs):raiseValueError("The length of feedback_keys and feedback_configs must be the same.")ifnotfeedback_configs:feedback_configs=[None]*len(feedback_keys)# build expiry optionexpires_in,expires_at=None,NoneifexpirationisNone:expires_in=ls_schemas.TimeDeltaInput(days=0,hours=3,minutes=0,)elifisinstance(expiration,datetime.datetime):expires_at=expiration.isoformat()elifisinstance(expiration,datetime.timedelta):expires_in=ls_schemas.TimeDeltaInput(days=expiration.days,hours=expiration.seconds//3600,minutes=(expiration.seconds//60)%60,)else:raiseValueError(f"Unknown expiration type: {type(expiration)}")# assemble body, one entry per keybody=_dumps_json([{"run_id":run_id,"feedback_key":feedback_key,"feedback_config":feedback_config,"expires_in":expires_in,"expires_at":expires_at,}forfeedback_key,feedback_configinzip(feedback_keys,feedback_configs)])defreq(api_url:str,api_key:Optional[str])->list:response=self.request_with_retries("POST",f"{api_url}/feedback/tokens",request_kwargs={"data":body,"headers":{**self._headers,X_API_KEY:api_keyorself.api_key,},},)ls_utils.raise_for_status_with_text(response)returnresponse.json()tokens=[]withcf.ThreadPoolExecutor(max_workers=len(self._write_api_urls))asexecutor:futs=[executor.submit(req,api_url,api_key)forapi_url,api_keyinself._write_api_urls.items()]forfutincf.as_completed(futs):response=fut.result()tokens.extend([ls_schemas.FeedbackIngestToken(**part)forpartinresponse])returntokens
[docs]deflist_presigned_feedback_tokens(self,run_id:ID_TYPE,*,limit:Optional[int]=None,)->Iterator[ls_schemas.FeedbackIngestToken]:"""List the feedback ingest tokens for a run. Args: run_id (Union[UUID, str]): The ID of the run to filter by. limit (Optional[int]): The maximum number of tokens to return. Yields: The feedback ingest tokens. """params={"run_id":_as_uuid(run_id,"run_id"),"limit":min(limit,100)iflimitisnotNoneelse100,}fori,tokeninenumerate(self._get_paginated_list("/feedback/tokens",params=params)):yieldls_schemas.FeedbackIngestToken(**token)iflimitisnotNoneandi+1>=limit:break
# Annotation Queue API
[docs]deflist_annotation_queues(self,*,queue_ids:Optional[List[ID_TYPE]]=None,name:Optional[str]=None,name_contains:Optional[str]=None,limit:Optional[int]=None,)->Iterator[ls_schemas.AnnotationQueue]:"""List the annotation queues on the LangSmith API. Args: queue_ids (Optional[List[Union[UUID, str]]]): The IDs of the queues to filter by. name (Optional[str]): The name of the queue to filter by. name_contains (Optional[str]): The substring that the queue name should contain. limit (Optional[int]): The maximum number of queues to return. Yields: The annotation queues. """params:dict={"ids":([_as_uuid(id_,f"queue_ids[{i}]")fori,id_inenumerate(queue_ids)]ifqueue_idsisnotNoneelseNone),"name":name,"name_contains":name_contains,"limit":min(limit,100)iflimitisnotNoneelse100,}fori,queueinenumerate(self._get_paginated_list("/annotation-queues",params=params)):yieldls_schemas.AnnotationQueue(**queue,)iflimitisnotNoneandi+1>=limit:break
[docs]defcreate_annotation_queue(self,*,name:str,description:Optional[str]=None,queue_id:Optional[ID_TYPE]=None,)->ls_schemas.AnnotationQueue:"""Create an annotation queue on the LangSmith API. Args: name (str): The name of the annotation queue. description (Optional[str]): The description of the annotation queue. queue_id (Optional[Union[UUID, str]]): The ID of the annotation queue. Returns: AnnotationQueue: The created annotation queue object. """body={"name":name,"description":description,"id":str(queue_id)ifqueue_idisnotNoneelsestr(uuid.uuid4()),}response=self.request_with_retries("POST","/annotation-queues",json={k:vfork,vinbody.items()ifvisnotNone},)ls_utils.raise_for_status_with_text(response)returnls_schemas.AnnotationQueue(**response.json(),)
[docs]defread_annotation_queue(self,queue_id:ID_TYPE)->ls_schemas.AnnotationQueue:"""Read an annotation queue with the specified queue ID. Args: queue_id (Union[UUID, str]): The ID of the annotation queue to read. Returns: AnnotationQueue: The annotation queue object. """# TODO: Replace when actual endpoint is addedreturnnext(self.list_annotation_queues(queue_ids=[queue_id]))
[docs]defupdate_annotation_queue(self,queue_id:ID_TYPE,*,name:str,description:Optional[str]=None)->None:"""Update an annotation queue with the specified queue_id. Args: queue_id (Union[UUID, str]): The ID of the annotation queue to update. name (str): The new name for the annotation queue. description (Optional[str]): The new description for the annotation queue. Defaults to None. Returns: None """response=self.request_with_retries("PATCH",f"/annotation-queues/{_as_uuid(queue_id,'queue_id')}",json={"name":name,"description":description,},)ls_utils.raise_for_status_with_text(response)
[docs]defdelete_annotation_queue(self,queue_id:ID_TYPE)->None:"""Delete an annotation queue with the specified queue ID. Args: queue_id (Union[UUID, str]): The ID of the annotation queue to delete. Returns: None """response=self.request_with_retries("DELETE",f"/annotation-queues/{_as_uuid(queue_id,'queue_id')}",headers={"Accept":"application/json",**self._headers},)ls_utils.raise_for_status_with_text(response)
[docs]defadd_runs_to_annotation_queue(self,queue_id:ID_TYPE,*,run_ids:List[ID_TYPE])->None:"""Add runs to an annotation queue with the specified queue ID. Args: queue_id (Union[UUID, str]): The ID of the annotation queue. run_ids (List[Union[UUID, str]]): The IDs of the runs to be added to the annotation queue. Returns: None """response=self.request_with_retries("POST",f"/annotation-queues/{_as_uuid(queue_id,'queue_id')}/runs",json=[str(_as_uuid(id_,f"run_ids[{i}]"))fori,id_inenumerate(run_ids)],)ls_utils.raise_for_status_with_text(response)
[docs]defdelete_run_from_annotation_queue(self,queue_id:ID_TYPE,*,run_id:ID_TYPE)->None:"""Delete a run from an annotation queue with the specified queue ID and run ID. Args: queue_id (Union[UUID, str]): The ID of the annotation queue. run_id (Union[UUID, str]): The ID of the run to be added to the annotation queue. Returns: None """response=self.request_with_retries("DELETE",f"/annotation-queues/{_as_uuid(queue_id,'queue_id')}/runs/{_as_uuid(run_id,'run_id')}",)ls_utils.raise_for_status_with_text(response)
[docs]defget_run_from_annotation_queue(self,queue_id:ID_TYPE,*,index:int)->ls_schemas.RunWithAnnotationQueueInfo:"""Get a run from an annotation queue at the specified index. Args: queue_id (Union[UUID, str]): The ID of the annotation queue. index (int): The index of the run to retrieve. Returns: RunWithAnnotationQueueInfo: The run at the specified index. Raises: LangSmithNotFoundError: If the run is not found at the given index. LangSmithError: For other API-related errors. """base_url=f"/annotation-queues/{_as_uuid(queue_id,'queue_id')}/run"response=self.request_with_retries("GET",f"{base_url}/{index}",headers=self._headers,)ls_utils.raise_for_status_with_text(response)returnls_schemas.RunWithAnnotationQueueInfo(**response.json())
[docs]defcreate_comparative_experiment(self,name:str,experiments:Sequence[ID_TYPE],*,reference_dataset:Optional[ID_TYPE]=None,description:Optional[str]=None,created_at:Optional[datetime.datetime]=None,metadata:Optional[Dict[str,Any]]=None,id:Optional[ID_TYPE]=None,)->ls_schemas.ComparativeExperiment:"""Create a comparative experiment on the LangSmith API. These experiments compare 2 or more experiment results over a shared dataset. Args: name (str): The name of the comparative experiment. experiments (Sequence[Union[UUID, str]]): The IDs of the experiments to compare. reference_dataset (Optional[Union[UUID, str]]): The ID of the dataset these experiments are compared on. description (Optional[str]): The description of the comparative experiment. created_at (Optional[datetime.datetime]): The creation time of the comparative experiment. metadata (Optional[Dict[str, Any]]): Additional metadata for the comparative experiment. id (Optional[Union[UUID, str]]): The ID of the comparative experiment. Returns: ComparativeExperiment: The created comparative experiment object. """ifnotexperiments:raiseValueError("At least one experiment is required.")ifreference_datasetisNone:# Get one of the experiments' reference datasetreference_dataset=self.read_project(project_id=experiments[0]).reference_dataset_idifnotreference_dataset:raiseValueError("A reference dataset is required.")body:Dict[str,Any]={"id":idorstr(uuid.uuid4()),"name":name,"experiment_ids":experiments,"reference_dataset_id":reference_dataset,"description":description,"created_at":created_atordatetime.datetime.now(datetime.timezone.utc),"extra":{},}ifmetadataisnotNone:body["extra"]["metadata"]=metadataser=_dumps_json({k:vfork,vinbody.items()})# if v is not None})response=self.request_with_retries("POST","/datasets/comparative",request_kwargs={"data":ser,},)ls_utils.raise_for_status_with_text(response)response_d=response.json()returnls_schemas.ComparativeExperiment(**response_d)
[docs]asyncdefarun_on_dataset(self,dataset_name:str,llm_or_chain_factory:Any,*,evaluation:Optional[Any]=None,concurrency_level:int=5,project_name:Optional[str]=None,project_metadata:Optional[Dict[str,Any]]=None,dataset_version:Optional[Union[datetime.datetime,str]]=None,verbose:bool=False,input_mapper:Optional[Callable[[Dict],Any]]=None,revision_id:Optional[str]=None,**kwargs:Any,)->Dict[str,Any]:"""Asynchronously run the Chain or language model on a dataset. .. deprecated:: 0.1.0 This method is deprecated. Use :func:`langsmith.aevaluate` instead. """# noqa: E501warnings.warn("The `arun_on_dataset` method is deprecated and"" will be removed in a future version.""Please use the `aevaluate` method instead.",DeprecationWarning,)try:fromlangchain.smithimportarun_on_datasetas_arun_on_datasetexceptImportError:raiseImportError("The client.arun_on_dataset function requires the langchain""package to run.\nInstall with pip install langchain")returnawait_arun_on_dataset(dataset_name=dataset_name,llm_or_chain_factory=llm_or_chain_factory,client=self,evaluation=evaluation,concurrency_level=concurrency_level,project_name=project_name,project_metadata=project_metadata,verbose=verbose,input_mapper=input_mapper,revision_id=revision_id,dataset_version=dataset_version,**kwargs,)
[docs]defrun_on_dataset(self,dataset_name:str,llm_or_chain_factory:Any,*,evaluation:Optional[Any]=None,concurrency_level:int=5,project_name:Optional[str]=None,project_metadata:Optional[Dict[str,Any]]=None,dataset_version:Optional[Union[datetime.datetime,str]]=None,verbose:bool=False,input_mapper:Optional[Callable[[Dict],Any]]=None,revision_id:Optional[str]=None,**kwargs:Any,)->Dict[str,Any]:"""Run the Chain or language model on a dataset. .. deprecated:: 0.1.0 This method is deprecated. Use :func:`langsmith.aevaluate` instead. """# noqa: E501 # noqa: E501warnings.warn("The `run_on_dataset` method is deprecated and"" will be removed in a future version.""Please use the `evaluate` method instead.",DeprecationWarning,)try:fromlangchain.smithimport(run_on_datasetas_run_on_dataset,# type: ignore)exceptImportError:raiseImportError("The client.run_on_dataset function requires the langchain""package to run.\nInstall with pip install langchain")return_run_on_dataset(dataset_name=dataset_name,llm_or_chain_factory=llm_or_chain_factory,concurrency_level=concurrency_level,client=self,evaluation=evaluation,project_name=project_name,project_metadata=project_metadata,verbose=verbose,input_mapper=input_mapper,revision_id=revision_id,dataset_version=dataset_version,**kwargs,)
def_current_tenant_is_owner(self,owner:str)->bool:"""Check if the current workspace has the same handle as owner. Args: owner (str): The owner to check against. Returns: bool: True if the current tenant is the owner, False otherwise. """settings=self._get_settings()returnowner=="-"orsettings.tenant_handle==ownerdef_owner_conflict_error(self,action:str,owner:str)->ls_utils.LangSmithUserError:returnls_utils.LangSmithUserError(f"Cannot {action} for another tenant.\n"f"Current tenant: {self._get_settings().tenant_handle},\n"f"Requested tenant: {owner}")def_get_latest_commit_hash(self,prompt_owner_and_name:str,limit:int=1,offset:int=0)->Optional[str]:"""Get the latest commit hash for a prompt. Args: prompt_owner_and_name (str): The owner and name of the prompt. limit (int, default=1): The maximum number of commits to fetch. Defaults to 1. offset (int, default=0): The number of commits to skip. Defaults to 0. Returns: Optional[str]: The latest commit hash, or None if no commits are found. """response=self.request_with_retries("GET",f"/commits/{prompt_owner_and_name}/",params={"limit":limit,"offset":offset},)commits=response.json()["commits"]returncommits[0]["commit_hash"]ifcommitselseNonedef_like_or_unlike_prompt(self,prompt_identifier:str,like:bool)->Dict[str,int]:"""Like or unlike a prompt. Args: prompt_identifier (str): The identifier of the prompt. like (bool): True to like the prompt, False to unlike it. Returns: A dictionary with the key 'likes' and the count of likes as the value. Raises: requests.exceptions.HTTPError: If the prompt is not found or another error occurs. """owner,prompt_name,_=ls_utils.parse_prompt_identifier(prompt_identifier)response=self.request_with_retries("POST",f"/likes/{owner}/{prompt_name}",json={"like":like})response.raise_for_status()returnresponse.json()def_get_prompt_url(self,prompt_identifier:str)->str:"""Get a URL for a prompt. Args: prompt_identifier (str): The identifier of the prompt. Returns: str: The URL for the prompt. """owner,prompt_name,commit_hash=ls_utils.parse_prompt_identifier(prompt_identifier)ifnotself._current_tenant_is_owner(owner):returnf"{self._host_url}/hub/{owner}/{prompt_name}:{commit_hash[:8]}"settings=self._get_settings()return(f"{self._host_url}/prompts/{prompt_name}/{commit_hash[:8]}"f"?organizationId={settings.id}")def_prompt_exists(self,prompt_identifier:str)->bool:"""Check if a prompt exists. Args: prompt_identifier (str): The identifier of the prompt. Returns: bool: True if the prompt exists, False otherwise. """prompt=self.get_prompt(prompt_identifier)returnTrueifpromptelseFalse
[docs]deflike_prompt(self,prompt_identifier:str)->Dict[str,int]:"""Like a prompt. Args: prompt_identifier (str): The identifier of the prompt. Returns: Dict[str, int]: A dictionary with the key 'likes' and the count of likes as the value. """returnself._like_or_unlike_prompt(prompt_identifier,like=True)
[docs]defunlike_prompt(self,prompt_identifier:str)->Dict[str,int]:"""Unlike a prompt. Args: prompt_identifier (str): The identifier of the prompt. Returns: Dict[str, int]: A dictionary with the key 'likes' and the count of likes as the value. """returnself._like_or_unlike_prompt(prompt_identifier,like=False)
[docs]deflist_prompts(self,*,limit:int=100,offset:int=0,is_public:Optional[bool]=None,is_archived:Optional[bool]=False,sort_field:ls_schemas.PromptSortField=ls_schemas.PromptSortField.updated_at,sort_direction:Literal["desc","asc"]="desc",query:Optional[str]=None,)->ls_schemas.ListPromptsResponse:"""List prompts with pagination. Args: limit (int, default=100): The maximum number of prompts to return. Defaults to 100. offset (int, default=0): The number of prompts to skip. Defaults to 0. is_public (Optional[bool]): Filter prompts by if they are public. is_archived (Optional[bool]): Filter prompts by if they are archived. sort_field (PromptSortField): The field to sort by. Defaults to "updated_at". sort_direction (Literal["desc", "asc"], default="desc"): The order to sort by. Defaults to "desc". query (Optional[str]): Filter prompts by a search query. Returns: ListPromptsResponse: A response object containing the list of prompts. """params={"limit":limit,"offset":offset,"is_public":("true"ifis_publicelse"false"ifis_publicisnotNoneelseNone),"is_archived":"true"ifis_archivedelse"false","sort_field":sort_field,"sort_direction":sort_direction,"query":query,"match_prefix":"true"ifqueryelseNone,}response=self.request_with_retries("GET","/repos/",params=params)returnls_schemas.ListPromptsResponse(**response.json())
[docs]defget_prompt(self,prompt_identifier:str)->Optional[ls_schemas.Prompt]:"""Get a specific prompt by its identifier. Args: prompt_identifier (str): The identifier of the prompt. The identifier should be in the format "prompt_name" or "owner/prompt_name". Returns: Optional[Prompt]: The prompt object. Raises: requests.exceptions.HTTPError: If the prompt is not found or another error occurs. """owner,prompt_name,_=ls_utils.parse_prompt_identifier(prompt_identifier)try:response=self.request_with_retries("GET",f"/repos/{owner}/{prompt_name}")returnls_schemas.Prompt(**response.json()["repo"])exceptls_utils.LangSmithNotFoundError:returnNone
[docs]defcreate_prompt(self,prompt_identifier:str,*,description:Optional[str]=None,readme:Optional[str]=None,tags:Optional[Sequence[str]]=None,is_public:bool=False,)->ls_schemas.Prompt:"""Create a new prompt. Does not attach prompt object, just creates an empty prompt. Args: prompt_identifier (str): The identifier of the prompt. The identifier should be in the formatof owner/name:hash, name:hash, owner/name, or name description (Optional[str]): A description of the prompt. readme (Optional[str]): A readme for the prompt. tags (Optional[Sequence[str]]): A list of tags for the prompt. is_public (bool): Whether the prompt should be public. Defaults to False. Returns: Prompt: The created prompt object. Raises: ValueError: If the current tenant is not the owner. HTTPError: If the server request fails. """settings=self._get_settings()ifis_publicandnotsettings.tenant_handle:raisels_utils.LangSmithUserError("Cannot create a public prompt without first\n""creating a LangChain Hub handle. ""You can add a handle by creating a public prompt at:\n""https://smith.langchain.com/prompts")owner,prompt_name,_=ls_utils.parse_prompt_identifier(prompt_identifier)ifnotself._current_tenant_is_owner(owner=owner):raiseself._owner_conflict_error("create a prompt",owner)json:Dict[str,Union[str,bool,Sequence[str]]]={"repo_handle":prompt_name,"description":descriptionor"","readme":readmeor"","tags":tagsor[],"is_public":is_public,}response=self.request_with_retries("POST","/repos/",json=json)response.raise_for_status()returnls_schemas.Prompt(**response.json()["repo"])
[docs]defcreate_commit(self,prompt_identifier:str,object:Any,*,parent_commit_hash:Optional[str]=None,)->str:"""Create a commit for an existing prompt. Args: prompt_identifier (str): The identifier of the prompt. object (Any): The LangChain object to commit. parent_commit_hash (Optional[str]): The hash of the parent commit. Defaults to latest commit. Returns: str: The url of the prompt commit. Raises: HTTPError: If the server request fails. ValueError: If the prompt does not exist. """ifnotself._prompt_exists(prompt_identifier):raisels_utils.LangSmithNotFoundError("Prompt does not exist, you must create it first.")try:fromlangchain_core.load.dumpimportdumpsexceptImportError:raiseImportError("The client.create_commit function requires the langchain_core""package to run.\nInstall with `pip install langchain_core`")json_object=dumps(object)manifest_dict=json.loads(json_object)owner,prompt_name,_=ls_utils.parse_prompt_identifier(prompt_identifier)prompt_owner_and_name=f"{owner}/{prompt_name}"ifparent_commit_hash=="latest"orparent_commit_hashisNone:parent_commit_hash=self._get_latest_commit_hash(prompt_owner_and_name)request_dict={"parent_commit":parent_commit_hash,"manifest":manifest_dict}response=self.request_with_retries("POST",f"/commits/{prompt_owner_and_name}",json=request_dict)commit_hash=response.json()["commit"]["commit_hash"]returnself._get_prompt_url(f"{prompt_owner_and_name}:{commit_hash}")
[docs]defupdate_prompt(self,prompt_identifier:str,*,description:Optional[str]=None,readme:Optional[str]=None,tags:Optional[Sequence[str]]=None,is_public:Optional[bool]=None,is_archived:Optional[bool]=None,)->Dict[str,Any]:"""Update a prompt's metadata. To update the content of a prompt, use push_prompt or create_commit instead. Args: prompt_identifier (str): The identifier of the prompt to update. description (Optional[str]): New description for the prompt. readme (Optional[str]): New readme for the prompt. tags (Optional[Sequence[str]]): New list of tags for the prompt. is_public (Optional[bool]): New public status for the prompt. is_archived (Optional[bool]): New archived status for the prompt. Returns: Dict[str, Any]: The updated prompt data as returned by the server. Raises: ValueError: If the prompt_identifier is empty. HTTPError: If the server request fails. """settings=self._get_settings()ifis_publicandnotsettings.tenant_handle:raiseValueError("Cannot create a public prompt without first\n""creating a LangChain Hub handle. ""You can add a handle by creating a public prompt at:\n""https://smith.langchain.com/prompts")json:Dict[str,Union[str,bool,Sequence[str]]]={}ifdescriptionisnotNone:json["description"]=descriptionifreadmeisnotNone:json["readme"]=readmeifis_publicisnotNone:json["is_public"]=is_publicifis_archivedisnotNone:json["is_archived"]=is_archivediftagsisnotNone:json["tags"]=tagsowner,prompt_name,_=ls_utils.parse_prompt_identifier(prompt_identifier)response=self.request_with_retries("PATCH",f"/repos/{owner}/{prompt_name}",json=json)response.raise_for_status()returnresponse.json()
[docs]defdelete_prompt(self,prompt_identifier:str)->None:"""Delete a prompt. Args: prompt_identifier (str): The identifier of the prompt to delete. Returns: bool: True if the prompt was successfully deleted, False otherwise. Raises: ValueError: If the current tenant is not the owner of the prompt. """owner,prompt_name,_=ls_utils.parse_prompt_identifier(prompt_identifier)ifnotself._current_tenant_is_owner(owner):raiseself._owner_conflict_error("delete a prompt",owner)response=self.request_with_retries("DELETE",f"/repos/{owner}/{prompt_name}")response.raise_for_status()
[docs]defpull_prompt_commit(self,prompt_identifier:str,*,include_model:Optional[bool]=False,)->ls_schemas.PromptCommit:"""Pull a prompt object from the LangSmith API. Args: prompt_identifier (str): The identifier of the prompt. Returns: PromptCommit: The prompt object. Raises: ValueError: If no commits are found for the prompt. """owner,prompt_name,commit_hash=ls_utils.parse_prompt_identifier(prompt_identifier)response=self.request_with_retries("GET",(f"/commits/{owner}/{prompt_name}/{commit_hash}"f"{'?include_model=true'ifinclude_modelelse''}"),)returnls_schemas.PromptCommit(**{"owner":owner,"repo":prompt_name,**response.json()})
[docs]deflist_prompt_commits(self,prompt_identifier:str,*,limit:Optional[int]=None,offset:int=0,include_model:bool=False,)->Iterator[ls_schemas.ListedPromptCommit]:"""List commits for a given prompt. Args: prompt_identifier (str): The identifier of the prompt in the format 'owner/repo_name'. limit (Optional[int]): The maximum number of commits to return. If None, returns all commits. Defaults to None. offset (int, default=0): The number of commits to skip before starting to return results. Defaults to 0. include_model (bool, default=False): Whether to include the model information in the commit data. Defaults to False. Yields: A ListedPromptCommit object for each commit. Note: This method uses pagination to retrieve commits. It will make multiple API calls if necessary to retrieve all commits or up to the specified limit. """owner,prompt_name,_=ls_utils.parse_prompt_identifier(prompt_identifier)params={"limit":min(100,limit)iflimitisnotNoneelselimit,"offset":offset,"include_model":include_model,}i=0whileTrue:params["offset"]=offsetresponse=self.request_with_retries("GET",f"/commits/{owner}/{prompt_name}/",params=params,)val=response.json()items=val["commits"]total=val["total"]ifnotitems:breakforitinitems:iflimitisnotNoneandi>=limit:return# Stop iteration if we've reached the limityieldls_schemas.ListedPromptCommit(**{"owner":owner,"repo":prompt_name,**it})i+=1offset+=len(items)ifoffset>=total:break
[docs]defpull_prompt(self,prompt_identifier:str,*,include_model:Optional[bool]=False)->Any:"""Pull a prompt and return it as a LangChain PromptTemplate. This method requires `langchain_core`. Args: prompt_identifier (str): The identifier of the prompt. include_model (Optional[bool], default=False): Whether to include the model information in the prompt data. Returns: Any: The prompt object in the specified format. """try:fromlangchain_core.language_models.baseimportBaseLanguageModelfromlangchain_core.load.loadimportloadsfromlangchain_core.output_parsersimportBaseOutputParserfromlangchain_core.promptsimportBasePromptTemplatefromlangchain_core.prompts.structuredimportStructuredPromptfromlangchain_core.runnables.baseimportRunnableBinding,RunnableSequenceexceptImportError:raiseImportError("The client.pull_prompt function requires the langchain_core""package to run.\nInstall with `pip install langchain_core`")try:fromlangchain_core._apiimportsuppress_langchain_beta_warningexceptImportError:@contextlib.contextmanagerdefsuppress_langchain_beta_warning():yieldprompt_object=self.pull_prompt_commit(prompt_identifier,include_model=include_model)withsuppress_langchain_beta_warning():prompt=loads(json.dumps(prompt_object.manifest))if(isinstance(prompt,BasePromptTemplate)orisinstance(prompt,RunnableSequence)andisinstance(prompt.first,BasePromptTemplate)):prompt_template=(promptifisinstance(prompt,BasePromptTemplate)else(prompt.firstifisinstance(prompt,RunnableSequence)andisinstance(prompt.first,BasePromptTemplate)elseNone))ifprompt_templateisNone:raisels_utils.LangSmithError("Prompt object is not a valid prompt template.")ifprompt_template.metadataisNone:prompt_template.metadata={}prompt_template.metadata.update({"lc_hub_owner":prompt_object.owner,"lc_hub_repo":prompt_object.repo,"lc_hub_commit_hash":prompt_object.commit_hash,})if(include_modelandisinstance(prompt,RunnableSequence)andisinstance(prompt.first,StructuredPrompt)# Make forward-compatible in case we let update the response typeand(len(prompt.steps)==2andnotisinstance(prompt.last,BaseOutputParser))):ifisinstance(prompt.last,RunnableBinding)andisinstance(prompt.last.bound,BaseLanguageModel):seq=cast(RunnableSequence,prompt.first|prompt.last.bound)iflen(seq.steps)==3:# prompt | bound llm | output parserrebound_llm=seq.steps[1]prompt=RunnableSequence(prompt.first,rebound_llm.bind(**{**prompt.last.kwargs}),seq.last,)else:prompt=seq# Not sureelifisinstance(prompt.last,BaseLanguageModel):prompt:RunnableSequence=prompt.first|prompt.last# type: ignore[no-redef, assignment]else:passreturnprompt
[docs]defpush_prompt(self,prompt_identifier:str,*,object:Optional[Any]=None,parent_commit_hash:str="latest",is_public:Optional[bool]=None,description:Optional[str]=None,readme:Optional[str]=None,tags:Optional[Sequence[str]]=None,)->str:"""Push a prompt to the LangSmith API. Can be used to update prompt metadata or prompt content. If the prompt does not exist, it will be created. If the prompt exists, it will be updated. Args: prompt_identifier (str): The identifier of the prompt. object (Optional[Any]): The LangChain object to push. parent_commit_hash (str): The parent commit hash. Defaults to "latest". is_public (Optional[bool]): Whether the prompt should be public. If None (default), the current visibility status is maintained for existing prompts. For new prompts, None defaults to private. Set to True to make public, or False to make private. description (Optional[str]): A description of the prompt. Defaults to an empty string. readme (Optional[str]): A readme for the prompt. Defaults to an empty string. tags (Optional[Sequence[str]]): A list of tags for the prompt. Defaults to an empty list. Returns: str: The URL of the prompt. """# Create or update prompt metadataifself._prompt_exists(prompt_identifier):ifany(paramisnotNoneforparamin[is_public,description,readme,tags]):self.update_prompt(prompt_identifier,description=description,readme=readme,tags=tags,is_public=is_public,)else:self.create_prompt(prompt_identifier,is_public=is_publicifis_publicisnotNoneelseFalse,description=description,readme=readme,tags=tags,)ifobjectisNone:returnself._get_prompt_url(prompt_identifier=prompt_identifier)# Create a commit with the new manifesturl=self.create_commit(prompt_identifier,object,parent_commit_hash=parent_commit_hash,)returnurl
[docs]defcleanup(self)->None:"""Manually trigger cleanup of the background thread."""self._manual_cleanup=True
[docs]defevaluate(self,target:Union[TARGET_T,Runnable,EXPERIMENT_T,Tuple[EXPERIMENT_T,EXPERIMENT_T]],/,data:Optional[DATA_T]=None,evaluators:Optional[Union[Sequence[EVALUATOR_T],Sequence[COMPARATIVE_EVALUATOR_T]]]=None,summary_evaluators:Optional[Sequence[SUMMARY_EVALUATOR_T]]=None,metadata:Optional[dict]=None,experiment_prefix:Optional[str]=None,description:Optional[str]=None,max_concurrency:Optional[int]=0,num_repetitions:int=1,blocking:bool=True,experiment:Optional[EXPERIMENT_T]=None,upload_results:bool=True,**kwargs:Any,)->Union[ExperimentResults,ComparativeExperimentResults]:r"""Evaluate a target system on a given dataset. Args: target (Union[TARGET_T, Runnable, EXPERIMENT_T, Tuple[EXPERIMENT_T, EXPERIMENT_T]]): The target system or experiment(s) to evaluate. Can be a function that takes a dict and returns a dict, a langchain Runnable, an existing experiment ID, or a two-tuple of experiment IDs. data (DATA_T): The dataset to evaluate on. Can be a dataset name, a list of examples, or a generator of examples. evaluators (Optional[Union[Sequence[EVALUATOR_T], Sequence[COMPARATIVE_EVALUATOR_T]]]): A list of evaluators to run on each example. The evaluator signature depends on the target type. Default to None. summary_evaluators (Optional[Sequence[SUMMARY_EVALUATOR_T]]): A list of summary evaluators to run on the entire dataset. Should not be specified if comparing two existing experiments. Defaults to None. metadata (Optional[dict]): Metadata to attach to the experiment. Defaults to None. experiment_prefix (Optional[str]): A prefix to provide for your experiment name. Defaults to None. description (Optional[str]): A free-form text description for the experiment. max_concurrency (Optional[int], default=0): The maximum number of concurrent evaluations to run. If None then no limit is set. If 0 then no concurrency. Defaults to 0. blocking (bool, default=True): Whether to block until the evaluation is complete. Defaults to True. num_repetitions (int, default=1): The number of times to run the evaluation. Each item in the dataset will be run and evaluated this many times. Defaults to 1. experiment (Optional[EXPERIMENT_T]): An existing experiment to extend. If provided, experiment_prefix is ignored. For advanced usage only. Should not be specified if target is an existing experiment or two-tuple fo experiments. upload_results (bool, default=True): Whether to upload the results to LangSmith. Defaults to True. **kwargs (Any): Additional keyword arguments to pass to the evaluator. Returns: ExperimentResults: If target is a function, Runnable, or existing experiment. ComparativeExperimentResults: If target is a two-tuple of existing experiments. Examples: Prepare the dataset: .. code-block:: python from langsmith import Client client = Client() dataset = client.clone_public_dataset( "https://smith.langchain.com/public/419dcab2-1d66-4b94-8901-0357ead390df/d" ) dataset_name = "Evaluate Examples" Basic usage: .. code-block:: python def accuracy(outputs: dict, reference_outputs: dict) -> dict: # Row-level evaluator for accuracy. pred = outputs["response"] expected = reference_outputs["answer"] return {"score": expected.lower() == pred.lower()} .. code-block:: python def precision(outputs: list[dict], reference_outputs: list[dict]) -> dict: # Experiment-level evaluator for precision. # TP / (TP + FP) predictions = [out["response"].lower() for out in outputs] expected = [ref["answer"].lower() for ref in reference_outputs] # yes and no are the only possible answers tp = sum([p == e for p, e in zip(predictions, expected) if p == "yes"]) fp = sum([p == "yes" and e == "no" for p, e in zip(predictions, expected)]) return {"score": tp / (tp + fp)} def predict(inputs: dict) -> dict: # This can be any function or just an API call to your app. return {"response": "Yes"} results = client.evaluate( predict, data=dataset_name, evaluators=[accuracy], summary_evaluators=[precision], experiment_prefix="My Experiment", description="Evaluating the accuracy of a simple prediction model.", metadata={ "my-prompt-version": "abcd-1234", }, ) Evaluating over only a subset of the examples .. code-block:: python experiment_name = results.experiment_name examples = client.list_examples(dataset_name=dataset_name, limit=5) results = client.evaluate( predict, data=examples, evaluators=[accuracy], summary_evaluators=[precision], experiment_prefix="My Experiment", description="Just testing a subset synchronously.", ) Streaming each prediction to more easily + eagerly debug. .. code-block:: python results = client.evaluate( predict, data=dataset_name, evaluators=[accuracy], summary_evaluators=[precision], description="I don't even have to block!", blocking=False, ) for i, result in enumerate(results): # doctest: +ELLIPSIS pass Using the `evaluate` API with an off-the-shelf LangChain evaluator: .. code-block:: python from langsmith.evaluation import LangChainStringEvaluator from langchain.chat_models import init_chat_model def prepare_criteria_data(run: Run, example: Example): return { "prediction": run.outputs["output"], "reference": example.outputs["answer"], "input": str(example.inputs), } results = client.evaluate( predict, data=dataset_name, evaluators=[ accuracy, LangChainStringEvaluator("embedding_distance"), LangChainStringEvaluator( "labeled_criteria", config={ "criteria": { "usefulness": "The prediction is useful if it is correct" " and/or asks a useful followup question." }, "llm": init_chat_model("gpt-4o"), }, prepare_data=prepare_criteria_data, ), ], description="Evaluating with off-the-shelf LangChain evaluators.", summary_evaluators=[precision], ) View the evaluation results for experiment:... Evaluating a LangChain object: .. code-block:: python from langchain_core.runnables import chain as as_runnable @as_runnable def nested_predict(inputs): return {"response": "Yes"} @as_runnable def lc_predict(inputs): return nested_predict.invoke(inputs) results = client.evaluate( lc_predict, data=dataset_name, evaluators=[accuracy], description="This time we're evaluating a LangChain object.", summary_evaluators=[precision], ) Comparative evaluation: .. code-block:: python results = client.evaluate( # The target is a tuple of the experiment IDs to compare target=( "12345678-1234-1234-1234-123456789012", "98765432-1234-1234-1234-123456789012", ), evaluators=[accuracy], summary_evaluators=[precision], ) Evaluate an existing experiment: .. code-block:: python results = client.evaluate( # The target is the ID of the experiment we are evaluating target="12345678-1234-1234-1234-123456789012", evaluators=[accuracy], summary_evaluators=[precision], ) .. versionadded:: 0.2.0 """# noqa: E501fromlangsmith.evaluation._runnerimportevaluateasevaluate_# Need to ignore because it fails when there are too many union types +# overloads.returnevaluate_(# type: ignore[misc]target,# type: ignore[arg-type]data=data,evaluators=evaluators,# type: ignore[arg-type]summary_evaluators=summary_evaluators,metadata=metadata,experiment_prefix=experiment_prefix,description=description,max_concurrency=max_concurrency,num_repetitions=num_repetitions,client=self,blocking=blocking,experiment=experiment,upload_results=upload_results,**kwargs,)
[docs]asyncdefaevaluate(self,target:Union[ATARGET_T,AsyncIterable[dict],Runnable,str,uuid.UUID,schemas.TracerSession,],/,data:Union[DATA_T,AsyncIterable[schemas.Example],Iterable[schemas.Example],None]=None,evaluators:Optional[Sequence[Union[EVALUATOR_T,AEVALUATOR_T]]]=None,summary_evaluators:Optional[Sequence[SUMMARY_EVALUATOR_T]]=None,metadata:Optional[dict]=None,experiment_prefix:Optional[str]=None,description:Optional[str]=None,max_concurrency:Optional[int]=0,num_repetitions:int=1,blocking:bool=True,experiment:Optional[Union[schemas.TracerSession,str,uuid.UUID]]=None,upload_results:bool=True,**kwargs:Any,)->AsyncExperimentResults:r"""Evaluate an async target system on a given dataset. Args: target (Union[ATARGET_T, AsyncIterable[dict], Runnable, str, uuid.UUID, TracerSession]): The target system or experiment(s) to evaluate. Can be an async function that takes a dict and returns a dict, a langchain Runnable, an existing experiment ID, or a two-tuple of experiment IDs. data (Union[DATA_T, AsyncIterable[Example]]): The dataset to evaluate on. Can be a dataset name, a list of examples, an async generator of examples, or an async iterable of examples. evaluators (Optional[Sequence[EVALUATOR_T]]): A list of evaluators to run on each example. Defaults to None. summary_evaluators (Optional[Sequence[SUMMARY_EVALUATOR_T]]): A list of summary evaluators to run on the entire dataset. Defaults to None. metadata (Optional[dict]): Metadata to attach to the experiment. Defaults to None. experiment_prefix (Optional[str]): A prefix to provide for your experiment name. Defaults to None. description (Optional[str]): A description of the experiment. max_concurrency (Optional[int], default=0): The maximum number of concurrent evaluations to run. If None then no limit is set. If 0 then no concurrency. Defaults to 0. num_repetitions (int, default=1): The number of times to run the evaluation. Each item in the dataset will be run and evaluated this many times. Defaults to 1. blocking (bool, default=True): Whether to block until the evaluation is complete. Defaults to True. experiment (Optional[TracerSession]): An existing experiment to extend. If provided, experiment_prefix is ignored. For advanced usage only. upload_results (bool, default=True): Whether to upload the results to LangSmith. Defaults to True. **kwargs (Any): Additional keyword arguments to pass to the evaluator. Returns: AsyncIterator[ExperimentResultRow]: An async iterator over the experiment results. Environment: - LANGSMITH_TEST_CACHE: If set, API calls will be cached to disk to save time and cost during testing. Recommended to commit the cache files to your repository for faster CI/CD runs. Requires the 'langsmith[vcr]' package to be installed. Examples: Prepare the dataset: .. code-block:: python import asyncio from langsmith import Client client = Client() dataset = client.clone_public_dataset( "https://smith.langchain.com/public/419dcab2-1d66-4b94-8901-0357ead390df/d" ) dataset_name = "Evaluate Examples" Basic usage: .. code-block:: python def accuracy(outputs: dict, reference_outputs: dict) -> dict: # Row-level evaluator for accuracy. pred = outputs["resposen"] expected = reference_outputs["answer"] return {"score": expected.lower() == pred.lower()} def precision(outputs: list[dict], reference_outputs: list[dict]) -> dict: # Experiment-level evaluator for precision. # TP / (TP + FP) predictions = [out["response"].lower() for out in outputs] expected = [ref["answer"].lower() for ref in reference_outputs] # yes and no are the only possible answers tp = sum([p == e for p, e in zip(predictions, expected) if p == "yes"]) fp = sum([p == "yes" and e == "no" for p, e in zip(predictions, expected)]) return {"score": tp / (tp + fp)} async def apredict(inputs: dict) -> dict: # This can be any async function or just an API call to your app. await asyncio.sleep(0.1) return {"response": "Yes"} results = asyncio.run( client.aevaluate( apredict, data=dataset_name, evaluators=[accuracy], summary_evaluators=[precision], experiment_prefix="My Experiment", description="Evaluate the accuracy of the model asynchronously.", metadata={ "my-prompt-version": "abcd-1234", }, ) ) Evaluating over only a subset of the examples using an async generator: .. code-block:: python async def example_generator(): examples = client.list_examples(dataset_name=dataset_name, limit=5) for example in examples: yield example results = asyncio.run( client.aevaluate( apredict, data=example_generator(), evaluators=[accuracy], summary_evaluators=[precision], experiment_prefix="My Subset Experiment", description="Evaluate a subset of examples asynchronously.", ) ) Streaming each prediction to more easily + eagerly debug. .. code-block:: python results = asyncio.run( client.aevaluate( apredict, data=dataset_name, evaluators=[accuracy], summary_evaluators=[precision], experiment_prefix="My Streaming Experiment", description="Streaming predictions for debugging.", blocking=False, ) ) async def aenumerate(iterable): async for elem in iterable: print(elem) asyncio.run(aenumerate(results)) Running without concurrency: .. code-block:: python results = asyncio.run( client.aevaluate( apredict, data=dataset_name, evaluators=[accuracy], summary_evaluators=[precision], experiment_prefix="My Experiment Without Concurrency", description="This was run without concurrency.", max_concurrency=0, ) ) Using Async evaluators: .. code-block:: python async def helpfulness(outputs: dict) -> dict: # Row-level evaluator for helpfulness. await asyncio.sleep(5) # Replace with your LLM API call return {"score": outputs["output"] == "Yes"} results = asyncio.run( client.aevaluate( apredict, data=dataset_name, evaluators=[helpfulness], summary_evaluators=[precision], experiment_prefix="My Helpful Experiment", description="Applying async evaluators example.", ) ) Evaluate an existing experiment: .. code-block:: python results = asyncio.run( client.aevaluate( # The target is the ID of the experiment we are evaluating target="419dcab2-1d66-4b94-8901-0357ead390df", evaluators=[accuracy, helpfulness], summary_evaluators=[precision], ) ) .. versionadded:: 0.2.0 """# noqa: E501fromlangsmith.evaluation._arunnerimportaevaluateasaevaluate_returnawaitaevaluate_(target,data=data,evaluators=evaluators,summary_evaluators=summary_evaluators,metadata=metadata,experiment_prefix=experiment_prefix,description=description,max_concurrency=max_concurrency,num_repetitions=num_repetitions,client=self,blocking=blocking,experiment=experiment,upload_results=upload_results,**kwargs,)
defconvert_prompt_to_openai_format(messages:Any,model_kwargs:Optional[Dict[str,Any]]=None,)->dict:"""Convert a prompt to OpenAI format. Requires the `langchain_openai` package to be installed. Args: messages (Any): The messages to convert. model_kwargs (Optional[Dict[str, Any]]): Model configuration arguments including `stop` and any other required arguments. Defaults to None. Returns: dict: The prompt in OpenAI format. Raises: ImportError: If the `langchain_openai` package is not installed. ls_utils.LangSmithError: If there is an error during the conversion process. """try:fromlangchain_openaiimportChatOpenAI# type: ignoreexceptImportError:raiseImportError("The convert_prompt_to_openai_format function requires the langchain_openai""package to run.\nInstall with `pip install langchain_openai`")openai=ChatOpenAI()model_kwargs=model_kwargsor{}stop=model_kwargs.pop("stop",None)try:returnopenai._get_request_payload(messages,stop=stop,**model_kwargs)exceptExceptionase:raisels_utils.LangSmithError(f"Error converting to OpenAI format: {e}")defconvert_prompt_to_anthropic_format(messages:Any,model_kwargs:Optional[Dict[str,Any]]=None,)->dict:"""Convert a prompt to Anthropic format. Requires the `langchain_anthropic` package to be installed. Args: messages (Any): The messages to convert. model_kwargs (Optional[Dict[str, Any]]): Model configuration arguments including `model_name` and `stop`. Defaults to None. Returns: dict: The prompt in Anthropic format. """try:fromlangchain_anthropicimportChatAnthropic# type: ignoreexceptImportError:raiseImportError("The convert_prompt_to_anthropic_format function requires the ""langchain_anthropic package to run.\n""Install with `pip install langchain_anthropic`")model_kwargs=model_kwargsor{}model_name=model_kwargs.pop("model_name","claude-3-haiku-20240307")stop=model_kwargs.pop("stop",None)timeout=model_kwargs.pop("timeout",None)anthropic=ChatAnthropic(model_name=model_name,timeout=timeout,stop=stop,**model_kwargs)try:returnanthropic._get_request_payload(messages,stop=stop)exceptExceptionase:raisels_utils.LangSmithError(f"Error converting to Anthropic format: {e}")def_convert_stored_attachments_to_attachments_dict(data:dict,*,attachments_key:str)->dict[str,AttachmentInfo]:"""Convert attachments from the backend database format to the user facing format."""attachments_dict={}ifattachments_keyindataanddata[attachments_key]:forkey,valueindata[attachments_key].items():ifnotkey.startswith("attachment."):continueresponse=requests.get(value["presigned_url"],stream=True)response.raise_for_status()reader=io.BytesIO(response.content)attachments_dict[key.removeprefix("attachment.")]=AttachmentInfo(**{"presigned_url":value["presigned_url"],"reader":reader,"mime_type":value.get("mime_type"),})returnattachments_dictdef_close_files(files:List[io.BufferedReader])->None:"""Close all opened files used in multipart requests."""forfileinfiles:try:file.close()exceptException:logger.debug("Could not close file: %s",file.name)passdef_dataset_examples_path(api_url:str,dataset_id:ID_TYPE)->str:ifapi_url.rstrip("/").endswith("/v1"):returnf"/platform/datasets/{dataset_id}/examples"else:returnf"/v1/platform/datasets/{dataset_id}/examples"def_construct_url(api_url:str,pathname:str)->str:ifpathname.startswith("http"):returnpathnameelifpathname.startswith("/"):returnapi_url.rstrip("/")+pathnameelse:returnapi_url.rstrip("/")+"/"+pathname