Skip to content

Web util

make_standard_response(success, result=None, message=None, request_id=None, status_code=200)

Construct a standard flask response.

Source code in agentuniverse/agent_serve/web/web_util.py
Python
def make_standard_response(success: bool,
                           result=None,
                           message: str = None,
                           request_id: str = None,
                           status_code=200):
    """Construct a standard flask response."""
    response_data = {
        "success": success,
        "result": result,
        "message": message,
        "request_id": request_id
    }
    return make_response(jsonify(response_data), status_code)

request_param(func)

An annotation used to parse the flask request params.

Source code in agentuniverse/agent_serve/web/web_util.py
Python
def request_param(func):
    """An annotation used to parse the flask request params."""
    def wrapper(*args, **kwargs):
        if request.method == "GET":
            req_data = request.args.to_dict()
        # Get the post params from body according to different content type.
        else:
            if "application/json" in request.headers.get("Content-Type"):
                raw_data = request.data.decode('utf-8')
                req_data = json.loads(raw_data)
            else:
                req_data = request.form.to_dict()
        # Get the func arguments name and type.
        sig = inspect.signature(func)
        for name, param in sig.parameters.items():
            if name == "kwargs":
                for key in req_data:
                    if key not in kwargs:
                        kwargs[key] = req_data[key]
                continue
            if name == "saved":
                if "saved" in req_data:
                    kwargs['saved'] = req_data['saved']
                else:
                    kwargs['saved'] = sig.parameters['saved'].default
                continue
            if name == "session_id":
                kwargs[name] = request.headers.get("X-Session-Id")
            elif param.annotation in (str, int, dict, list):
                kwargs[name] = req_data.get(name)
            else:
                kwargs[name] = param.annotation(**req_data)
        return func(*args, **kwargs)

    wrapper.__name__ = func.__name__
    return wrapper

service_run_queue(service_id, **kwargs)

The func used in a separate thread to run an agent service. The result will be saved in a queue if one is provided.

Source code in agentuniverse/agent_serve/web/web_util.py
Python
def service_run_queue(service_id, **kwargs):
    """The func used in a separate thread to run an agent service. The result
    will be saved in a queue if one is provided."""
    stream: queue.Queue = kwargs.get('output_stream')
    try:
        res = ServiceInstance(service_id).run(**kwargs)
        return res
    finally:
        if stream:
            stream.put_nowait('{"type": "EOF"}')