Skip to content

Request library

RequestLibrary

Source code in agentuniverse/agent_serve/web/dal/request_library.py
Python
@singleton
class RequestLibrary:
    def __init__(self, configer: Configer = None):
        """Init the database connection. Use uri in config file or use sqlite
        as default database."""
        system_db_uri = None
        if Configer:
            system_db_uri = configer.get('DB', {}).get('system_db_uri')
            if not system_db_uri:
                system_db_uri = configer.get('DB', {}).get('mysql_uri')
        if system_db_uri and system_db_uri.strip():
            db_uri = system_db_uri
        else:
            db_path = get_project_root_path() / 'DB' / 'agent_universe.db'
            db_path.parent.mkdir(parents=True, exist_ok=True)
            db_uri = f'sqlite:////{db_path}'

        self.db_uri = db_uri
        self.Session = None

    def query_request_by_request_id(self, request_id: str) -> RequestDO | None:
        """Get a RequestDO with given request_id.

        Args:
            request_id(`str`): The unique request id of request task.

        Return:
            The target RequestDO or none when no such data.
        """
        session = self.__get_session()
        try:
            result = session.execute(
                select(RequestORM).where(RequestORM.request_id == request_id)
            ).scalars().first()
            if not result:
                return None
            return self.__request_orm_to_do(result)
        finally:
            session.close()

    def add_request(self, request_do: RequestDO) -> int:
        """Add the given RequestDO to database.

        Args:
            request_do(`RequestDO`): A new RequestDO to be added.

        Return:
            A int stands unique data id in table.
        """
        session = self.__get_session()
        try:
            request_orm = RequestORM(**request_do.model_dump())
            session.add(request_orm)
            session.commit()
            return request_orm.id
        finally:
            session.close()

    def update_request(self, request_do: RequestDO):
        """Update the request data with same request id as the given
        RequestDO."""
        session = self.__get_session()
        try:
            db_request_do = session.query(RequestORM).filter(
                RequestORM.request_id == request_do.request_id).first()
            if db_request_do:
                update_data = request_do.model_dump(exclude_unset=True)
                for key, value in update_data.items():
                    setattr(db_request_do, key, value)
                session.commit()
                session.refresh(db_request_do)
        finally:
            session.close()

    def update_gmt_modified(self, request_id: str):
        """Update the request task latest active time."""
        session = self.__get_session()
        try:
            db_request_do = session.query(RequestORM).filter(
                RequestORM.request_id == request_id).first()
            if db_request_do:
                setattr(db_request_do, "gmt_modified", datetime.datetime.now())
                session.commit()
                session.refresh(db_request_do)
        finally:
            session.close()

    def __get_session(self):
        if self.Session:
            return self.Session()
        # Create database engine
        self.engine = create_engine(
            self.db_uri,
            json_serializer=lambda x: json.dumps(x, ensure_ascii=False)
        )
        self.Session = sessionmaker(bind=self.engine)

        with self.engine.connect() as conn:
            if not conn.dialect.has_table(conn, REQUEST_TABLE_NAME):
                Base.metadata.create_all(self.engine)
        return self.Session()

    def __request_orm_to_do(self, request_orm: RequestORM) -> RequestDO:
        """Transfer a RequestORM to RequestDO."""
        request_obj = RequestDO(
            request_id='',
            session_id="",
            query='',
            state='',
            result=dict(),
            steps=[],
            additional_args=dict(),
            gmt_create=datetime.datetime.now(),
            gmt_modified=datetime.datetime.now(),
        )
        for column in request_orm.__table__.columns:
            setattr(request_obj, column.name,
                    getattr(request_orm, column.name))
        return request_obj

__init__(configer=None)

Init the database connection. Use uri in config file or use sqlite as default database.

Source code in agentuniverse/agent_serve/web/dal/request_library.py
Python
def __init__(self, configer: Configer = None):
    """Init the database connection. Use uri in config file or use sqlite
    as default database."""
    system_db_uri = None
    if Configer:
        system_db_uri = configer.get('DB', {}).get('system_db_uri')
        if not system_db_uri:
            system_db_uri = configer.get('DB', {}).get('mysql_uri')
    if system_db_uri and system_db_uri.strip():
        db_uri = system_db_uri
    else:
        db_path = get_project_root_path() / 'DB' / 'agent_universe.db'
        db_path.parent.mkdir(parents=True, exist_ok=True)
        db_uri = f'sqlite:////{db_path}'

    self.db_uri = db_uri
    self.Session = None

__request_orm_to_do(request_orm)

Transfer a RequestORM to RequestDO.

Source code in agentuniverse/agent_serve/web/dal/request_library.py
Python
def __request_orm_to_do(self, request_orm: RequestORM) -> RequestDO:
    """Transfer a RequestORM to RequestDO."""
    request_obj = RequestDO(
        request_id='',
        session_id="",
        query='',
        state='',
        result=dict(),
        steps=[],
        additional_args=dict(),
        gmt_create=datetime.datetime.now(),
        gmt_modified=datetime.datetime.now(),
    )
    for column in request_orm.__table__.columns:
        setattr(request_obj, column.name,
                getattr(request_orm, column.name))
    return request_obj

add_request(request_do)

Add the given RequestDO to database.

Parameters:

Name Type Description Default
request_do(`RequestDO`)

A new RequestDO to be added.

required
Return

A int stands unique data id in table.

Source code in agentuniverse/agent_serve/web/dal/request_library.py
Python
def add_request(self, request_do: RequestDO) -> int:
    """Add the given RequestDO to database.

    Args:
        request_do(`RequestDO`): A new RequestDO to be added.

    Return:
        A int stands unique data id in table.
    """
    session = self.__get_session()
    try:
        request_orm = RequestORM(**request_do.model_dump())
        session.add(request_orm)
        session.commit()
        return request_orm.id
    finally:
        session.close()

query_request_by_request_id(request_id)

Get a RequestDO with given request_id.

Parameters:

Name Type Description Default
request_id(`str`)

The unique request id of request task.

required
Return

The target RequestDO or none when no such data.

Source code in agentuniverse/agent_serve/web/dal/request_library.py
Python
def query_request_by_request_id(self, request_id: str) -> RequestDO | None:
    """Get a RequestDO with given request_id.

    Args:
        request_id(`str`): The unique request id of request task.

    Return:
        The target RequestDO or none when no such data.
    """
    session = self.__get_session()
    try:
        result = session.execute(
            select(RequestORM).where(RequestORM.request_id == request_id)
        ).scalars().first()
        if not result:
            return None
        return self.__request_orm_to_do(result)
    finally:
        session.close()

update_gmt_modified(request_id)

Update the request task latest active time.

Source code in agentuniverse/agent_serve/web/dal/request_library.py
Python
def update_gmt_modified(self, request_id: str):
    """Update the request task latest active time."""
    session = self.__get_session()
    try:
        db_request_do = session.query(RequestORM).filter(
            RequestORM.request_id == request_id).first()
        if db_request_do:
            setattr(db_request_do, "gmt_modified", datetime.datetime.now())
            session.commit()
            session.refresh(db_request_do)
    finally:
        session.close()

update_request(request_do)

Update the request data with same request id as the given RequestDO.

Source code in agentuniverse/agent_serve/web/dal/request_library.py
Python
def update_request(self, request_do: RequestDO):
    """Update the request data with same request id as the given
    RequestDO."""
    session = self.__get_session()
    try:
        db_request_do = session.query(RequestORM).filter(
            RequestORM.request_id == request_do.request_id).first()
        if db_request_do:
            update_data = request_do.model_dump(exclude_unset=True)
            for key, value in update_data.items():
                setattr(db_request_do, key, value)
            session.commit()
            session.refresh(db_request_do)
    finally:
        session.close()

RequestORM

Bases: Base

SQLAlchemy ORM Model for RequestDO.

Source code in agentuniverse/agent_serve/web/dal/request_library.py
Python
class RequestORM(Base):
    """SQLAlchemy ORM Model for RequestDO."""
    __tablename__ = REQUEST_TABLE_NAME
    id = Column(Integer, primary_key=True, autoincrement=True)
    request_id = Column(String(20), nullable=False)
    query = Column(Text)
    session_id = Column(String(50))
    state = Column(String(20))
    result = Column(JSON)
    steps = Column(JSON)
    additional_args = Column(JSON)
    gmt_create = Column(DateTime, default=datetime.datetime.now)
    gmt_modified = Column(DateTime, default=datetime.datetime.now,
                          onupdate=datetime.datetime.now)