[docs]classLangSmithConnectionError(LangSmithError):"""Couldn't connect to the LangSmith API."""
## Warning classes
[docs]classLangSmithWarning(UserWarning):"""Base class for warnings."""
[docs]classLangSmithMissingAPIKeyWarning(LangSmithWarning):"""Warning for missing API key."""
deftracing_is_enabled(ctx:Optional[dict]=None)->Union[bool,Literal["local"]]:"""Return True if tracing is enabled."""fromlangsmith.run_helpersimportget_current_run_tree,get_tracing_contexttc=ctxorget_tracing_context()# You can manually override the environment using context vars.# Check that first.# Doing this before checking the run tree lets us# disable a branch within a trace.iftc["enabled"]isnotNone:returntc["enabled"]# Next check if we're mid-traceifget_current_run_tree():returnTrue# Finally, check the global environmentvar_result=get_env_var("TRACING_V2",default=get_env_var("TRACING",default=""))returnvar_result=="true"deftest_tracking_is_disabled()->bool:"""Return True if testing is enabled."""returnget_env_var("TEST_TRACKING",default="")=="false"defxor_args(*arg_groups:Tuple[str,...])->Callable:"""Validate specified keyword args are mutually exclusive."""defdecorator(func:Callable)->Callable:@functools.wraps(func)defwrapper(*args:Any,**kwargs:Any)->Any:"""Validate exactly one arg in each group is not None."""counts=[sum(1forarginarg_groupifkwargs.get(arg)isnotNone)forarg_groupinarg_groups]invalid_groups=[ifori,countinenumerate(counts)ifcount!=1]ifinvalid_groups:invalid_group_names=[", ".join(arg_groups[i])foriininvalid_groups]raiseValueError("Exactly one argument in each of the following"" groups must be defined:"f" {', '.join(invalid_group_names)}")returnfunc(*args,**kwargs)returnwrapperreturndecoratordefraise_for_status_with_text(response:Union[requests.Response,httpx.Response],)->None:"""Raise an error with the response text."""try:response.raise_for_status()exceptrequests.HTTPErrorase:raiserequests.HTTPError(str(e),response.text)frome# type: ignore[call-arg]defget_enum_value(enu:Union[enum.Enum,str])->str:"""Get the value of a string enum."""ifisinstance(enu,enum.Enum):returnenu.valuereturnenu@functools.lru_cache(maxsize=1)deflog_once(level:int,message:str)->None:"""Log a message at the specified level, but only once."""_LOGGER.log(level,message)def_get_message_type(message:Mapping[str,Any])->str:ifnotmessage:raiseValueError("Message is empty.")if"lc"inmessage:if"id"notinmessage:raiseValueError(f"Unexpected format for serialized message: {message}"" Message does not have an id.")returnmessage["id"][-1].replace("Message","").lower()else:if"type"notinmessage:raiseValueError(f"Unexpected format for stored message: {message}"" Message does not have a type.")returnmessage["type"]def_get_message_fields(message:Mapping[str,Any])->Mapping[str,Any]:ifnotmessage:raiseValueError("Message is empty.")if"lc"inmessage:if"kwargs"notinmessage:raiseValueError(f"Unexpected format for serialized message: {message}"" Message does not have kwargs.")returnmessage["kwargs"]else:if"data"notinmessage:raiseValueError(f"Unexpected format for stored message: {message}"" Message does not have data.")returnmessage["data"]def_convert_message(message:Mapping[str,Any])->Dict[str,Any]:"""Extract message from a message object."""message_type=_get_message_type(message)message_data=_get_message_fields(message)return{"type":message_type,"data":message_data}defget_messages_from_inputs(inputs:Mapping[str,Any])->List[Dict[str,Any]]:"""Extract messages from the given inputs dictionary. Args: inputs (Mapping[str, Any]): The inputs dictionary. Returns: List[Dict[str, Any]]: A list of dictionaries representing the extracted messages. Raises: ValueError: If no message(s) are found in the inputs dictionary. """if"messages"ininputs:return[_convert_message(message)formessageininputs["messages"]]if"message"ininputs:return[_convert_message(inputs["message"])]raiseValueError(f"Could not find message(s) in run with inputs {inputs}.")defget_message_generation_from_outputs(outputs:Mapping[str,Any])->Dict[str,Any]:"""Retrieve the message generation from the given outputs. Args: outputs (Mapping[str, Any]): The outputs dictionary. Returns: Dict[str, Any]: The message generation. Raises: ValueError: If no generations are found or if multiple generations are present. """if"generations"notinoutputs:raiseValueError(f"No generations found in in run with output: {outputs}.")generations=outputs["generations"]iflen(generations)!=1:raiseValueError("Chat examples expect exactly one generation."f" Found {len(generations)} generations: {generations}.")first_generation=generations[0]if"message"notinfirst_generation:raiseValueError(f"Unexpected format for generation: {first_generation}."" Generation does not have a message.")return_convert_message(first_generation["message"])defget_prompt_from_inputs(inputs:Mapping[str,Any])->str:"""Retrieve the prompt from the given inputs. Args: inputs (Mapping[str, Any]): The inputs dictionary. Returns: str: The prompt. Raises: ValueError: If the prompt is not found or if multiple prompts are present. """if"prompt"ininputs:returninputs["prompt"]if"prompts"ininputs:prompts=inputs["prompts"]iflen(prompts)==1:returnprompts[0]raiseValueError(f"Multiple prompts in run with inputs {inputs}."" Please create example manually.")raiseValueError(f"Could not find prompt in run with inputs {inputs}.")defget_llm_generation_from_outputs(outputs:Mapping[str,Any])->str:"""Get the LLM generation from the outputs."""if"generations"notinoutputs:raiseValueError(f"No generations found in in run with output: {outputs}.")generations=outputs["generations"]iflen(generations)!=1:raiseValueError(f"Multiple generations in run: {generations}")first_generation=generations[0]if"text"notinfirst_generation:raiseValueError(f"No text in generation: {first_generation}")returnfirst_generation["text"]@functools.lru_cache(maxsize=1)defget_docker_compose_command()->List[str]:"""Get the correct docker compose command for this system."""try:subprocess.check_call(["docker","compose","--version"],stdout=subprocess.DEVNULL,stderr=subprocess.DEVNULL,)return["docker","compose"]except(subprocess.CalledProcessError,FileNotFoundError):try:subprocess.check_call(["docker-compose","--version"],stdout=subprocess.DEVNULL,stderr=subprocess.DEVNULL,)return["docker-compose"]except(subprocess.CalledProcessError,FileNotFoundError):raiseValueError("Neither 'docker compose' nor 'docker-compose'"" commands are available. Please install the Docker"" server following the instructions for your operating"" system at https://docs.docker.com/engine/install/")defconvert_langchain_message(message:ls_schemas.BaseMessageLike)->dict:"""Convert a LangChain message to an example."""converted:Dict[str,Any]={"type":message.type,"data":{"content":message.content},}# Check for presence of keys in additional_kwargsifmessage.additional_kwargsandlen(message.additional_kwargs)>0:converted["data"]["additional_kwargs"]={**message.additional_kwargs}returnconverteddefis_base_message_like(obj:object)->bool:"""Check if the given object is similar to BaseMessage. Args: obj (object): The object to check. Returns: bool: True if the object is similar to BaseMessage, False otherwise. """returnall([isinstance(getattr(obj,"content",None),str),isinstance(getattr(obj,"additional_kwargs",None),dict),hasattr(obj,"type")andisinstance(getattr(obj,"type"),str),])@functools.lru_cache(maxsize=100)defget_env_var(name:str,default:Optional[str]=None,*,namespaces:Tuple=("LANGSMITH","LANGCHAIN"),)->Optional[str]:"""Retrieve an environment variable from a list of namespaces. Args: name (str): The name of the environment variable. default (Optional[str], optional): The default value to return if the environment variable is not found. Defaults to None. namespaces (Tuple, optional): A tuple of namespaces to search for the environment variable. Defaults to ("LANGSMITH", "LANGCHAINs"). Returns: Optional[str]: The value of the environment variable if found, otherwise the default value. """names=[f"{namespace}_{name}"fornamespaceinnamespaces]fornameinnames:value=os.environ.get(name)ifvalueisnotNone:returnvaluereturndefault@functools.lru_cache(maxsize=1)defget_tracer_project(return_default_value=True)->Optional[str]:"""Get the project name for a LangSmith tracer."""returnos.environ.get(# Hosted LangServe projects get precedence over all other defaults.# This is to make sure that we always use the associated project# for a hosted langserve deployment even if the customer sets some# other project name in their environment."HOSTED_LANGSERVE_PROJECT_NAME",get_env_var("PROJECT",# This is the legacy name for a LANGCHAIN_PROJECT, so it# has lower precedence than LANGCHAIN_PROJECTdefault=get_env_var("SESSION",default="default"ifreturn_default_valueelseNone),),)classFilterPoolFullWarning(logging.Filter):"""Filter urrllib3 warnings logged when the connection pool isn't reused."""def__init__(self,name:str="",host:str="")->None:"""Initialize the FilterPoolFullWarning filter. Args: name (str, optional): The name of the filter. Defaults to "". host (str, optional): The host to filter. Defaults to "". """super().__init__(name)self._host=hostdeffilter(self,record)->bool:"""urllib3.connectionpool:Connection pool is full, discarding connection: ..."""msg=record.getMessage()if"Connection pool is full, discarding connection"notinmsg:returnTruereturnself._hostnotinmsgclassFilterLangSmithRetry(logging.Filter):"""Filter for retries from this lib."""deffilter(self,record)->bool:"""Filter retries from this library."""# We re-raise/log manually.msg=record.getMessage()return"LangSmithRetry"notinmsg
[docs]classLangSmithRetry(Retry):"""Wrapper to filter logs with this name."""
_FILTER_LOCK=threading.RLock()@contextlib.contextmanagerdeffilter_logs(logger:logging.Logger,filters:Sequence[logging.Filter])->Generator[None,None,None]:"""Temporarily adds specified filters to a logger. Parameters: - logger: The logger to which the filters will be added. - filters: A sequence of logging.Filter objects to be temporarily added to the logger. """with_FILTER_LOCK:forfilterinfilters:logger.addFilter(filter)# Not actually perfectly thread-safe, but it's only log filterstry:yieldfinally:with_FILTER_LOCK:forfilterinfilters:try:logger.removeFilter(filter)exceptBaseException:_LOGGER.warning("Failed to remove filter")defget_cache_dir(cache:Optional[str])->Optional[str]:"""Get the testing cache directory. Args: cache (Optional[str]): The cache path. Returns: Optional[str]: The cache path if provided, otherwise the value from the LANGSMITH_TEST_CACHE environment variable. """ifcacheisnotNone:returncachereturnget_env_var("TEST_CACHE",default=None)@contextlib.contextmanagerdefwith_cache(path:Union[str,pathlib.Path],ignore_hosts:Optional[Sequence[str]]=None)->Generator[None,None,None]:"""Use a cache for requests."""try:importvcr# type: ignore[import-untyped]exceptImportError:raiseImportError("vcrpy is required to use caching. Install with:"'pip install -U "langsmith[vcr]"')# Fix concurrency issue in vcrpy's patchingfromlangsmith._internalimport_patchaspatch_urllib3patch_urllib3.patch_urllib3()def_filter_request_headers(request:Any)->Any:ifignore_hostsandany(request.url.startswith(host)forhostinignore_hosts):returnNonerequest.headers={}returnrequestcache_dir,cache_file=os.path.split(path)ls_vcr=vcr.VCR(serializer=("yaml"ifcache_file.endswith(".yaml")orcache_file.endswith(".yml")else"json"),cassette_library_dir=cache_dir,# Replay previous requests, record new ones# TODO: Support other modesrecord_mode="new_episodes",match_on=["uri","method","path","body"],filter_headers=["authorization","Set-Cookie"],before_record_request=_filter_request_headers,)withls_vcr.use_cassette(cache_file):yield@contextlib.contextmanagerdefwith_optional_cache(path:Optional[Union[str,pathlib.Path]],ignore_hosts:Optional[Sequence[str]]=None,)->Generator[None,None,None]:"""Use a cache for requests."""ifpathisnotNone:withwith_cache(path,ignore_hosts):yieldelse:yielddef_format_exc()->str:# Used internally to format exceptions without cluttering the tracebacktb_lines=traceback.format_exception(*sys.exc_info())filtered_lines=[lineforlineintb_linesif"langsmith/"notinline]return"".join(filtered_lines)T=TypeVar("T")def_middle_copy(val:T,memo:Dict[int,Any],max_depth:int=4,_depth:int=0)->T:cls=type(val)copier=getattr(cls,"__deepcopy__",None)ifcopierisnotNone:try:returncopier(memo)exceptBaseException:passif_depth>=max_depth:returnvalifisinstance(val,dict):return{# type: ignore[return-value]_middle_copy(k,memo,max_depth,_depth+1):_middle_copy(v,memo,max_depth,_depth+1)fork,vinval.items()}ifisinstance(val,list):return[_middle_copy(item,memo,max_depth,_depth+1)foriteminval]# type: ignore[return-value]ifisinstance(val,tuple):returntuple(_middle_copy(item,memo,max_depth,_depth+1)foriteminval)# type: ignore[return-value]ifisinstance(val,set):return{_middle_copy(item,memo,max_depth,_depth+1)foriteminval}# type: ignore[return-value]returnvaldefdeepish_copy(val:T)->T:"""Deep copy a value with a compromise for uncopyable objects. Args: val: The value to be deep copied. Returns: The deep copied value. """memo:Dict[int,Any]={}try:returncopy.deepcopy(val,memo)exceptBaseExceptionase:# Generators, locks, etc. cannot be copied# and raise a TypeError (mentioning pickling, since the dunder methods)# are re-used for copying. We'll try to do a compromise and copy# what we can_LOGGER.debug("Failed to deepcopy input: %s",repr(e))return_middle_copy(val,memo)defis_version_greater_or_equal(current_version:str,target_version:str)->bool:"""Check if the current version is greater or equal to the target version."""frompackagingimportversioncurrent=version.parse(current_version)target=version.parse(target_version)returncurrent>=targetdefparse_prompt_identifier(identifier:str)->Tuple[str,str,str]:"""Parse a string in the format of owner/name:hash, name:hash, owner/name, or name. Args: identifier (str): The prompt identifier to parse. Returns: Tuple[str, str, str]: A tuple containing (owner, name, hash). Raises: ValueError: If the identifier doesn't match the expected formats. """if(notidentifieroridentifier.count("/")>1oridentifier.startswith("/")oridentifier.endswith("/")):raiseValueError(f"Invalid identifier format: {identifier}")parts=identifier.split(":",1)owner_name=parts[0]commit=parts[1]iflen(parts)>1else"latest"if"/"inowner_name:owner,name=owner_name.split("/",1)ifnotownerornotname:raiseValueError(f"Invalid identifier format: {identifier}")returnowner,name,commitelse:ifnotowner_name:raiseValueError(f"Invalid identifier format: {identifier}")return"-",owner_name,commitP=ParamSpec("P")
[docs]classContextThreadPoolExecutor(ThreadPoolExecutor):"""ThreadPoolExecutor that copies the context to the child thread."""
[docs]defsubmit(# type: ignore[override]self,func:Callable[P,T],*args:P.args,**kwargs:P.kwargs,)->Future[T]:"""Submit a function to the executor. Args: func (Callable[..., T]): The function to submit. *args (Any): The positional arguments to the function. **kwargs (Any): The keyword arguments to the function. Returns: Future[T]: The future for the function. """returnsuper().submit(cast(Callable[...,T],functools.partial(contextvars.copy_context().run,func,*args,**kwargs),))
[docs]defmap(self,fn:Callable[...,T],*iterables:Iterable[Any],timeout:Optional[float]=None,chunksize:int=1,)->Iterator[T]:"""Return an iterator equivalent to stdlib map. Each function will receive its own copy of the context from the parent thread. Args: fn: A callable that will take as many arguments as there are passed iterables. timeout: The maximum number of seconds to wait. If None, then there is no limit on the wait time. chunksize: The size of the chunks the iterable will be broken into before being passed to a child process. This argument is only used by ProcessPoolExecutor; it is ignored by ThreadPoolExecutor. Returns: An iterator equivalent to: map(func, *iterables) but the calls may be evaluated out-of-order. Raises: TimeoutError: If the entire result iterator could not be generated before the given timeout. Exception: If fn(*args) raises for any values. """contexts=[contextvars.copy_context()for_inrange(len(iterables[0]))]# type: ignore[arg-type]def_wrapped_fn(*args:Any)->T:returncontexts.pop().run(fn,*args)returnsuper().map(_wrapped_fn,*iterables,timeout=timeout,chunksize=chunksize,)
defget_api_url(api_url:Optional[str])->str:"""Get the LangSmith API URL from the environment or the given value."""_api_url=api_urlorcast(str,get_env_var("ENDPOINT",default="https://api.smith.langchain.com",),)ifnot_api_url.strip():raiseLangSmithUserError("LangSmith API URL cannot be empty")return_api_url.strip().strip('"').strip("'").rstrip("/")defget_api_key(api_key:Optional[str])->Optional[str]:"""Get the API key from the environment or the given value."""api_key_=api_keyifapi_keyisnotNoneelseget_env_var("API_KEY",default=None)ifapi_key_isNoneornotapi_key_.strip():returnNonereturnapi_key_.strip().strip('"').strip("'")def_is_localhost(url:str)->bool:"""Check if the URL is localhost. Parameters ---------- url : str The URL to check. Returns: ------- bool True if the URL is localhost, False otherwise. """try:netloc=urllib_parse.urlsplit(url).netloc.split(":")[0]ip=socket.gethostbyname(netloc)returnip=="127.0.0.1"orip.startswith("0.0.0.0")orip.startswith("::")exceptsocket.gaierror:returnFalse@functools.lru_cache(maxsize=2)defget_host_url(web_url:Optional[str],api_url:str):"""Get the host URL based on the web URL or API URL."""ifweb_url:returnweb_urlparsed_url=urllib_parse.urlparse(api_url)if_is_localhost(api_url):link="http://localhost"elifstr(parsed_url.path).endswith("/api"):new_path=str(parsed_url.path).rsplit("/api",1)[0]link=urllib_parse.urlunparse(parsed_url._replace(path=new_path))elifstr(parsed_url.path).endswith("/api/v1"):new_path=str(parsed_url.path).rsplit("/api/v1",1)[0]link=urllib_parse.urlunparse(parsed_url._replace(path=new_path))elifstr(parsed_url.netloc).startswith("eu."):link="https://eu.smith.langchain.com"elifstr(parsed_url.netloc).startswith("dev."):link="https://dev.smith.langchain.com"elifstr(parsed_url.netloc).startswith("beta."):link="https://beta.smith.langchain.com"else:link="https://smith.langchain.com"returnlinkdef_get_function_name(fn:Callable,depth:int=0)->str:ifdepth>2ornotcallable(fn):returnstr(fn)ifhasattr(fn,"__name__"):returnfn.__name__ifisinstance(fn,functools.partial):return_get_function_name(fn.func,depth+1)ifhasattr(fn,"__call__"):ifhasattr(fn,"__class__")andhasattr(fn.__class__,"__name__"):returnfn.__class__.__name__return_get_function_name(fn.__call__,depth+1)returnstr(fn)defis_truish(val:Any)->bool:"""Check if the value is truish. Args: val (Any): The value to check. Returns: bool: True if the value is truish, False otherwise. """ifisinstance(val,str):returnval.lower()=="true"orval=="1"returnbool(val)