Skip to content

Openai style llm

OpenAIStyleLLM

Bases: LLM

This is a wrapper around the OpenAI API that implements a chat interface for the LLM. It uses the chat endpoint of the OpenAI API. It also supports using the completion endpoint instead of the chat endpoint. It supports both sync and async modes. Attributes: api_key (Optional[str]): The API key to use for authentication. organization (Optional[str]): The organization ID to use for authentication. api_base (Optional[str]): The base URL to use for the API requests. proxy (Optional[str]): The proxy to use for the API requests. client_args (Optional[dict]): Additional arguments to pass to the client.

Source code in agentuniverse/llm/openai_style_llm.py
Python
class OpenAIStyleLLM(LLM):
    """
    This is a wrapper around the OpenAI API that implements a chat interface for the LLM.
    It uses the `chat` endpoint of the OpenAI API.
    It also supports using the `completion` endpoint instead of the `chat` endpoint.
    It supports both sync and async modes.
    Attributes:
        api_key (Optional[str]): The API key to use for authentication.
        organization (Optional[str]): The organization ID to use for authentication.
        api_base (Optional[str]): The base URL to use for the API requests.
        proxy (Optional[str]): The proxy to use for the API requests.
        client_args (Optional[dict]): Additional arguments to pass to the client.
    """

    api_key: Optional[str] = None
    organization: Optional[str] = None
    api_base: Optional[str] = None
    proxy: Optional[str] = None
    client_args: Optional[dict] = None

    def _new_client(self):
        """Initialize the openai client."""
        return OpenAI(
            api_key=self.api_key,
            organization=self.organization,
            base_url=self.api_base,
            timeout=self.request_timeout,
            max_retries=self.max_retries,
            http_client=httpx.Client(proxy=self.openai_proxy) if self.proxy else None,
            **(self.client_args or {}),
        )

    def _new_async_client(self):
        """Initialize the openai async client."""
        return AsyncOpenAI(
            api_key=self.api_key,
            organization=self.organization,
            base_url=self.api_base,
            timeout=self.request_timeout,
            max_retries=self.max_retries,
            http_client=httpx.AsyncClient(proxy=self.proxy) if self.proxy else None,
            **(self.client_args or {}),
        )

    def call(self, messages: list, **kwargs: Any) -> Union[LLMOutput, Iterator[LLMOutput]]:
        """Run the OpenAI LLM.

        Args:
            messages (list): The messages to send to the LLM.
            **kwargs: Arbitrary keyword arguments.
        """
        streaming = kwargs.pop("streaming") if "streaming" in kwargs else self.streaming
        self.client = self._new_client()
        client = self.client
        chat_completion = client.chat.completions.create(
            messages=messages,
            model=kwargs.pop('model', self.model_name),
            temperature=kwargs.pop('temperature', self.temperature),
            stream=kwargs.pop('stream', streaming),
            max_tokens=kwargs.pop('max_tokens', self.max_tokens),
            **kwargs,
        )
        if not streaming:
            text = chat_completion.choices[0].message.content
            self.close()
            return LLMOutput(text=text, raw=chat_completion.model_dump())
        return self.generate_stream_result(chat_completion)

    async def acall(self, messages: list, **kwargs: Any) -> Union[LLMOutput, AsyncIterator[LLMOutput]]:
        """Asynchronously run the OpenAI LLM.

        Args:
            messages (list): The messages to send to the LLM.
            **kwargs: Arbitrary keyword arguments.
        """
        streaming = kwargs.pop("streaming") if "streaming" in kwargs else self.streaming
        self.async_client = self._new_async_client()
        async_client = self.async_client
        chat_completion = await async_client.chat.completions.create(
            messages=messages,
            model=kwargs.pop('model', self.model_name),
            temperature=kwargs.pop('temperature', self.temperature),
            stream=kwargs.pop('stream', streaming),
            max_tokens=kwargs.pop('max_tokens', self.max_tokens),
            **kwargs,
        )
        if not streaming:
            text = chat_completion.choices[0].message.content
            await self.aclose()
            return LLMOutput(text=text, raw=chat_completion.model_dump())
        return self.agenerate_stream_result(chat_completion)

    def as_langchain(self) -> BaseLanguageModel:
        """Convert the AgentUniverse(AU) openai llm class to the langchain openai llm class."""
        return LangchainOpenAIStyleInstance(self)

    def set_by_agent_model(self, **kwargs) -> None:
        """ Assign values of parameters to the OpenAILLM model in the agent configuration."""
        super().set_by_agent_model(**kwargs)
        if 'api_key' in kwargs and kwargs['api_key']:
            self.api_key = kwargs['api_key']
        if 'api_base' in kwargs and kwargs['api_base']:
            self.api_base = kwargs['api_base']
        if 'proxy' in kwargs and kwargs['proxy']:
            self.proxy = kwargs['proxy']
        if 'client_args' in kwargs and kwargs['client_args']:
            self.client_args = kwargs['client_args']

    @staticmethod
    def parse_result(chunk):
        """Generate the result of the stream."""
        chat_completion = chunk
        if not isinstance(chunk, dict):
            chunk = chunk.dict()
        if len(chunk["choices"]) == 0:
            return
        choice = chunk["choices"][0]
        message = choice.get("delta")
        text = message.get("content")
        if not text:
            return
        return LLMOutput(text=text, raw=chat_completion.model_dump())

    def generate_stream_result(self, stream: openai.Stream):
        """Generate the result of the stream."""
        for chunk in stream:
            llm_output = self.parse_result(chunk)
            if llm_output:
                yield llm_output
        self.close()

    async def agenerate_stream_result(self, stream: AsyncIterator) -> AsyncIterator[LLMOutput]:
        """Generate the result of the stream."""
        async for chunk in stream:
            llm_output = self.parse_result(chunk)
            if llm_output:
                yield llm_output

        await self.aclose()

    def get_num_tokens(self, text: str) -> int:
        """Get the number of tokens present in the text.

        Useful for checking if an input will fit in an openai model's context window.

        Args:
            text: The string input to tokenize.

        Returns:
            The integer number of tokens in the text.
        """
        try:
            encoding = tiktoken.encoding_for_model(self.model_name)
        except KeyError:
            encoding = tiktoken.get_encoding("cl100k_base")
        return len(encoding.encode(text))

    def close(self):
        """Close the client."""
        if hasattr(self, 'client') and self.client:
            self.client.close()

    async def aclose(self):
        """Async close the client."""
        if hasattr(self, 'async_client') and self.async_client:
            await self.async_client.close()

acall(messages, **kwargs) async

Asynchronously run the OpenAI LLM.

Parameters:

Name Type Description Default
messages list

The messages to send to the LLM.

required
**kwargs Any

Arbitrary keyword arguments.

{}
Source code in agentuniverse/llm/openai_style_llm.py
Python
async def acall(self, messages: list, **kwargs: Any) -> Union[LLMOutput, AsyncIterator[LLMOutput]]:
    """Asynchronously run the OpenAI LLM.

    Args:
        messages (list): The messages to send to the LLM.
        **kwargs: Arbitrary keyword arguments.
    """
    streaming = kwargs.pop("streaming") if "streaming" in kwargs else self.streaming
    self.async_client = self._new_async_client()
    async_client = self.async_client
    chat_completion = await async_client.chat.completions.create(
        messages=messages,
        model=kwargs.pop('model', self.model_name),
        temperature=kwargs.pop('temperature', self.temperature),
        stream=kwargs.pop('stream', streaming),
        max_tokens=kwargs.pop('max_tokens', self.max_tokens),
        **kwargs,
    )
    if not streaming:
        text = chat_completion.choices[0].message.content
        await self.aclose()
        return LLMOutput(text=text, raw=chat_completion.model_dump())
    return self.agenerate_stream_result(chat_completion)

aclose() async

Async close the client.

Source code in agentuniverse/llm/openai_style_llm.py
Python
async def aclose(self):
    """Async close the client."""
    if hasattr(self, 'async_client') and self.async_client:
        await self.async_client.close()

agenerate_stream_result(stream) async

Generate the result of the stream.

Source code in agentuniverse/llm/openai_style_llm.py
Python
async def agenerate_stream_result(self, stream: AsyncIterator) -> AsyncIterator[LLMOutput]:
    """Generate the result of the stream."""
    async for chunk in stream:
        llm_output = self.parse_result(chunk)
        if llm_output:
            yield llm_output

    await self.aclose()

as_langchain()

Convert the AgentUniverse(AU) openai llm class to the langchain openai llm class.

Source code in agentuniverse/llm/openai_style_llm.py
Python
def as_langchain(self) -> BaseLanguageModel:
    """Convert the AgentUniverse(AU) openai llm class to the langchain openai llm class."""
    return LangchainOpenAIStyleInstance(self)

call(messages, **kwargs)

Run the OpenAI LLM.

Parameters:

Name Type Description Default
messages list

The messages to send to the LLM.

required
**kwargs Any

Arbitrary keyword arguments.

{}
Source code in agentuniverse/llm/openai_style_llm.py
Python
def call(self, messages: list, **kwargs: Any) -> Union[LLMOutput, Iterator[LLMOutput]]:
    """Run the OpenAI LLM.

    Args:
        messages (list): The messages to send to the LLM.
        **kwargs: Arbitrary keyword arguments.
    """
    streaming = kwargs.pop("streaming") if "streaming" in kwargs else self.streaming
    self.client = self._new_client()
    client = self.client
    chat_completion = client.chat.completions.create(
        messages=messages,
        model=kwargs.pop('model', self.model_name),
        temperature=kwargs.pop('temperature', self.temperature),
        stream=kwargs.pop('stream', streaming),
        max_tokens=kwargs.pop('max_tokens', self.max_tokens),
        **kwargs,
    )
    if not streaming:
        text = chat_completion.choices[0].message.content
        self.close()
        return LLMOutput(text=text, raw=chat_completion.model_dump())
    return self.generate_stream_result(chat_completion)

close()

Close the client.

Source code in agentuniverse/llm/openai_style_llm.py
Python
def close(self):
    """Close the client."""
    if hasattr(self, 'client') and self.client:
        self.client.close()

generate_stream_result(stream)

Generate the result of the stream.

Source code in agentuniverse/llm/openai_style_llm.py
Python
def generate_stream_result(self, stream: openai.Stream):
    """Generate the result of the stream."""
    for chunk in stream:
        llm_output = self.parse_result(chunk)
        if llm_output:
            yield llm_output
    self.close()

get_num_tokens(text)

Get the number of tokens present in the text.

Useful for checking if an input will fit in an openai model's context window.

Parameters:

Name Type Description Default
text str

The string input to tokenize.

required

Returns:

Type Description
int

The integer number of tokens in the text.

Source code in agentuniverse/llm/openai_style_llm.py
Python
def get_num_tokens(self, text: str) -> int:
    """Get the number of tokens present in the text.

    Useful for checking if an input will fit in an openai model's context window.

    Args:
        text: The string input to tokenize.

    Returns:
        The integer number of tokens in the text.
    """
    try:
        encoding = tiktoken.encoding_for_model(self.model_name)
    except KeyError:
        encoding = tiktoken.get_encoding("cl100k_base")
    return len(encoding.encode(text))

parse_result(chunk) staticmethod

Generate the result of the stream.

Source code in agentuniverse/llm/openai_style_llm.py
Python
@staticmethod
def parse_result(chunk):
    """Generate the result of the stream."""
    chat_completion = chunk
    if not isinstance(chunk, dict):
        chunk = chunk.dict()
    if len(chunk["choices"]) == 0:
        return
    choice = chunk["choices"][0]
    message = choice.get("delta")
    text = message.get("content")
    if not text:
        return
    return LLMOutput(text=text, raw=chat_completion.model_dump())

set_by_agent_model(**kwargs)

Assign values of parameters to the OpenAILLM model in the agent configuration.

Source code in agentuniverse/llm/openai_style_llm.py
Python
def set_by_agent_model(self, **kwargs) -> None:
    """ Assign values of parameters to the OpenAILLM model in the agent configuration."""
    super().set_by_agent_model(**kwargs)
    if 'api_key' in kwargs and kwargs['api_key']:
        self.api_key = kwargs['api_key']
    if 'api_base' in kwargs and kwargs['api_base']:
        self.api_base = kwargs['api_base']
    if 'proxy' in kwargs and kwargs['proxy']:
        self.proxy = kwargs['proxy']
    if 'client_args' in kwargs and kwargs['client_args']:
        self.client_args = kwargs['client_args']