Skip to content

Store

Store

Bases: BaseModel

The basic class for the knowledge store.

Store of the knowledge, store class is used to store knowledge and provide retrieval capabilities, vector storage, such as ChromaDB store, or non-vector storage, such as Redis Store.

Attributes:

Name Type Description
client Any

The client of the store,

async_client Any

The async client of the store,

embedding_model Embedding

The embedding model of the store,

Source code in agentuniverse/agent/action/knowledge/store/store.py
Python
class Store(BaseModel):
    """The basic class for the knowledge store.

    Store of the knowledge, store class is used to store knowledge
    and provide retrieval capabilities,
    vector storage, such as ChromaDB store, or non-vector storage, such as Redis Store.

    Attributes:
        client (Any): The client of the store,
        client is usually used to connect to the storage and provides knowledge insertion,
        update, deletion, and query operations

        async_client (Any): The async client of the store,
        the function is the same as that of the `client` parameter  in asynchronous mode.

        embedding_model (Embedding): The embedding model of the store,
        used to provided embedding operations on texts to generate a list of floats.
    """

    client: Any = None
    async_client: Any = None
    embedding_model: Optional[Embedding] = None

    class Config:
        """Configuration for this pydantic object."""
        arbitrary_types_allowed = True

    def __init__(self, **kwargs):
        """Initialize the store class."""
        super().__init__(**kwargs)
        if self.client is None:
            self.client = self._new_client()
        if self.async_client is None:
            self.async_client = self._new_async_client()

    def _new_client(self) -> Any:
        """Initialize the client."""
        pass

    def _new_async_client(self) -> Any:
        """Initialize the async client."""
        pass

    def query(self, query: Query, **kwargs) -> List[Document]:
        """Query documents."""
        raise NotImplementedError

    async def async_query(self, query: Query, **kwargs) -> List[Document]:
        """Asynchronously query documents."""
        raise NotImplementedError

    def insert_documents(self, documents: List[Document], **kwargs):
        """Insert documents into the store."""
        raise NotImplementedError

    async def async_insert_documents(self, documents: List[Document], **kwargs):
        """Asynchronously insert documents into the store."""
        raise NotImplementedError

    def insert_document(self, document: Document, **kwargs):
        """Insert document into the store."""
        raise NotImplementedError

    async def async_insert_document(self, document: Document, **kwargs):
        """Asynchronously insert document into the store."""
        raise NotImplementedError

    def delete_document(self, document_id: str, **kwargs):
        """Delete the specific document by the document id."""
        raise NotImplementedError

    async def async_delete_document(self, document_id: str, **kwargs):
        """Asynchronously delete the specific document by the document id."""
        raise NotImplementedError

    def upsert_document(self, documents: List[Document], **kwargs):
        """Upsert document into the store."""
        raise NotImplementedError

    async def async_upsert_document(self, documents: List[Document], **kwargs):
        """Asynchronously upsert documents into the store."""
        raise NotImplementedError

    def update_document(self, documents: List[Document], **kwargs):
        """Update document into the store."""
        raise NotImplementedError

    async def async_update_document(self, documents: List[Document], **kwargs):
        """Asynchronously update documents into the store."""
        raise NotImplementedError

Config

Configuration for this pydantic object.

Source code in agentuniverse/agent/action/knowledge/store/store.py
Python
class Config:
    """Configuration for this pydantic object."""
    arbitrary_types_allowed = True

__init__(**kwargs)

Initialize the store class.

Source code in agentuniverse/agent/action/knowledge/store/store.py
Python
def __init__(self, **kwargs):
    """Initialize the store class."""
    super().__init__(**kwargs)
    if self.client is None:
        self.client = self._new_client()
    if self.async_client is None:
        self.async_client = self._new_async_client()

async_delete_document(document_id, **kwargs) async

Asynchronously delete the specific document by the document id.

Source code in agentuniverse/agent/action/knowledge/store/store.py
Python
async def async_delete_document(self, document_id: str, **kwargs):
    """Asynchronously delete the specific document by the document id."""
    raise NotImplementedError

async_insert_document(document, **kwargs) async

Asynchronously insert document into the store.

Source code in agentuniverse/agent/action/knowledge/store/store.py
Python
async def async_insert_document(self, document: Document, **kwargs):
    """Asynchronously insert document into the store."""
    raise NotImplementedError

async_insert_documents(documents, **kwargs) async

Asynchronously insert documents into the store.

Source code in agentuniverse/agent/action/knowledge/store/store.py
Python
async def async_insert_documents(self, documents: List[Document], **kwargs):
    """Asynchronously insert documents into the store."""
    raise NotImplementedError

async_query(query, **kwargs) async

Asynchronously query documents.

Source code in agentuniverse/agent/action/knowledge/store/store.py
Python
async def async_query(self, query: Query, **kwargs) -> List[Document]:
    """Asynchronously query documents."""
    raise NotImplementedError

async_update_document(documents, **kwargs) async

Asynchronously update documents into the store.

Source code in agentuniverse/agent/action/knowledge/store/store.py
Python
async def async_update_document(self, documents: List[Document], **kwargs):
    """Asynchronously update documents into the store."""
    raise NotImplementedError

async_upsert_document(documents, **kwargs) async

Asynchronously upsert documents into the store.

Source code in agentuniverse/agent/action/knowledge/store/store.py
Python
async def async_upsert_document(self, documents: List[Document], **kwargs):
    """Asynchronously upsert documents into the store."""
    raise NotImplementedError

delete_document(document_id, **kwargs)

Delete the specific document by the document id.

Source code in agentuniverse/agent/action/knowledge/store/store.py
Python
def delete_document(self, document_id: str, **kwargs):
    """Delete the specific document by the document id."""
    raise NotImplementedError

insert_document(document, **kwargs)

Insert document into the store.

Source code in agentuniverse/agent/action/knowledge/store/store.py
Python
def insert_document(self, document: Document, **kwargs):
    """Insert document into the store."""
    raise NotImplementedError

insert_documents(documents, **kwargs)

Insert documents into the store.

Source code in agentuniverse/agent/action/knowledge/store/store.py
Python
def insert_documents(self, documents: List[Document], **kwargs):
    """Insert documents into the store."""
    raise NotImplementedError

query(query, **kwargs)

Query documents.

Source code in agentuniverse/agent/action/knowledge/store/store.py
Python
def query(self, query: Query, **kwargs) -> List[Document]:
    """Query documents."""
    raise NotImplementedError

update_document(documents, **kwargs)

Update document into the store.

Source code in agentuniverse/agent/action/knowledge/store/store.py
Python
def update_document(self, documents: List[Document], **kwargs):
    """Update document into the store."""
    raise NotImplementedError

upsert_document(documents, **kwargs)

Upsert document into the store.

Source code in agentuniverse/agent/action/knowledge/store/store.py
Python
def upsert_document(self, documents: List[Document], **kwargs):
    """Upsert document into the store."""
    raise NotImplementedError