Wenxin llm
WenXinLLM
¶
Bases: LLM
WenXin LLM wrapper for LangChain. This is a wrapper around the Qianfan Chat Completion API. It uses the QianfanChatEndpoint class from the langchain_community package. It also supports streaming and token counting. It can be used with any other LangChain tools or agents.
Attributes:
| Name | Type | Description |
|---|---|---|
qianfan_ak |
str
|
The access key of the Qianfan API. Defaults to the value of the QIANFAN_AK environment variable. |
qianfan_sk |
str
|
The secret key of the Qianfan API. Defaults to the value of the QIANFAN_SK environment variable. |
Source code in agentuniverse/llm/default/wenxin_llm.py
Python
class WenXinLLM(LLM):
"""WenXin LLM wrapper for LangChain.
This is a wrapper around the Qianfan Chat Completion API.
It uses the QianfanChatEndpoint class from the langchain_community package.
It also supports streaming and token counting.
It can be used with any other LangChain tools or agents.
Attributes:
qianfan_ak (str): The access key of the Qianfan API.
Defaults to the value of the QIANFAN_AK environment variable.
qianfan_sk (str): The secret key of the Qianfan API.
Defaults to the value of the QIANFAN_SK environment variable.
"""
qianfan_ak: str = Field(default_factory=lambda: get_from_env("QIANFAN_AK"))
qianfan_sk: str = Field(default_factory=lambda: get_from_env("QIANFAN_SK"))
def _new_client(self):
"""Create a new Qianfan client."""
return qianfan.ChatCompletion(ak=self.qianfan_ak, sk=self.qianfan_sk)
def call(self, messages: list, **kwargs: Any) -> Union[LLMOutput, Iterator[LLMOutput]]:
"""Run the OpenAI LLM.
Args:
messages (list): The messages to send to the LLM.
**kwargs: Arbitrary keyword arguments.
"""
streaming = kwargs.pop("streaming") if "streaming" in kwargs else self.streaming
self.client = self._new_client()
client = self.client
chat_completion = client.do(
messages=messages,
model=kwargs.pop('model', self.model_name),
temperature=kwargs.pop('temperature', self.temperature),
stream=kwargs.pop('stream', streaming),
max_tokens=kwargs.pop('max_tokens', self.max_tokens),
**kwargs,
)
if not streaming:
return self.parse_result(chat_completion)
return self.generate_stream_result(chat_completion)
async def acall(self, messages: list, **kwargs: Any) -> Union[LLMOutput, AsyncIterator[LLMOutput]]:
"""Asynchronously run the OpenAI LLM.
Args:
messages (list): The messages to send to the LLM.
**kwargs: Arbitrary keyword arguments.
"""
streaming = kwargs.pop("streaming") if "streaming" in kwargs else self.streaming
self.async_client = self._new_client()
async_client = self.async_client
chat_completion = await async_client.ado(
messages=messages,
model=kwargs.pop('model', self.model_name),
temperature=kwargs.pop('temperature', self.temperature),
stream=kwargs.pop('stream', streaming),
max_tokens=kwargs.pop('max_tokens', self.max_tokens),
**kwargs,
)
if not streaming:
return self.parse_result(chat_completion)
return self.agenerate_stream_result(chat_completion)
def max_context_length(self) -> int:
if super().max_context_length():
return super().max_context_length()
res = self._new_client().get_model_info(self.model_name)
if res.max_input_tokens:
return res.max_input_tokens
return res.max_input_chars
def get_num_tokens(self, text: str) -> int:
model_name = ''
if self.model_name.lower() in TokenModelList:
model_name = self.model_name.lower()
token_cnt = tokenizer.Tokenizer().count_tokens(
text=text,
mode='remote',
model=model_name
)
return token_cnt
@staticmethod
def parse_result(chunk: QfResponse):
text = chunk.body.get('result')
if not text:
return None
return LLMOutput(text=text, raw=chunk)
def generate_stream_result(self, chat_completion) -> Iterator[LLMOutput]:
for chunk in chat_completion:
data = self.parse_result(chunk)
if data:
yield data
async def agenerate_stream_result(self, chat_completion: AsyncIterator) -> AsyncIterator[LLMOutput]:
async for chunk in chat_completion:
data = self.parse_result(chunk)
if data:
yield data
def as_langchain(self) -> BaseLanguageModel:
"""Return an instance of the LangChain `BaseLanguageModel` class."""
return WenXinLangChainInstance(llm=self)
acall(messages, **kwargs)
async
¶
Asynchronously run the OpenAI LLM.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
messages |
list
|
The messages to send to the LLM. |
required |
**kwargs |
Any
|
Arbitrary keyword arguments. |
{}
|
Source code in agentuniverse/llm/default/wenxin_llm.py
Python
async def acall(self, messages: list, **kwargs: Any) -> Union[LLMOutput, AsyncIterator[LLMOutput]]:
"""Asynchronously run the OpenAI LLM.
Args:
messages (list): The messages to send to the LLM.
**kwargs: Arbitrary keyword arguments.
"""
streaming = kwargs.pop("streaming") if "streaming" in kwargs else self.streaming
self.async_client = self._new_client()
async_client = self.async_client
chat_completion = await async_client.ado(
messages=messages,
model=kwargs.pop('model', self.model_name),
temperature=kwargs.pop('temperature', self.temperature),
stream=kwargs.pop('stream', streaming),
max_tokens=kwargs.pop('max_tokens', self.max_tokens),
**kwargs,
)
if not streaming:
return self.parse_result(chat_completion)
return self.agenerate_stream_result(chat_completion)
as_langchain()
¶
call(messages, **kwargs)
¶
Run the OpenAI LLM.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
messages |
list
|
The messages to send to the LLM. |
required |
**kwargs |
Any
|
Arbitrary keyword arguments. |
{}
|
Source code in agentuniverse/llm/default/wenxin_llm.py
Python
def call(self, messages: list, **kwargs: Any) -> Union[LLMOutput, Iterator[LLMOutput]]:
"""Run the OpenAI LLM.
Args:
messages (list): The messages to send to the LLM.
**kwargs: Arbitrary keyword arguments.
"""
streaming = kwargs.pop("streaming") if "streaming" in kwargs else self.streaming
self.client = self._new_client()
client = self.client
chat_completion = client.do(
messages=messages,
model=kwargs.pop('model', self.model_name),
temperature=kwargs.pop('temperature', self.temperature),
stream=kwargs.pop('stream', streaming),
max_tokens=kwargs.pop('max_tokens', self.max_tokens),
**kwargs,
)
if not streaming:
return self.parse_result(chat_completion)
return self.generate_stream_result(chat_completion)