Web util
make_standard_response(success, result=None, message=None, request_id=None, status_code=200)
¶
Construct a standard flask response.
Source code in agentuniverse/agent_serve/web/web_util.py
Python
def make_standard_response(success: bool,
result=None,
message: str = None,
request_id: str = None,
status_code=200):
"""Construct a standard flask response."""
response_data = {
"success": success,
"result": result,
"message": message,
"request_id": request_id
}
return make_response(jsonify(response_data), status_code)
request_param(func)
¶
An annotation used to parse the flask request params.
Source code in agentuniverse/agent_serve/web/web_util.py
Python
def request_param(func):
"""An annotation used to parse the flask request params."""
def wrapper(*args, **kwargs):
if request.method == "GET":
req_data = request.args.to_dict()
# Get the post params from body according to different content type.
else:
if "application/json" in request.headers.get("Content-Type"):
raw_data = request.data.decode('utf-8')
req_data = json.loads(raw_data)
else:
req_data = request.form.to_dict()
# Get the func arguments name and type.
sig = inspect.signature(func)
for name, param in sig.parameters.items():
if name == "kwargs":
for key in req_data:
if key not in kwargs:
kwargs[key] = req_data[key]
continue
if name == "saved":
if "saved" in req_data:
kwargs['saved'] = req_data['saved']
else:
kwargs['saved'] = sig.parameters['saved'].default
continue
if name == "session_id":
kwargs[name] = request.headers.get("X-Session-Id")
elif param.annotation in (str, int, dict, list):
kwargs[name] = req_data.get(name)
else:
kwargs[name] = param.annotation(**req_data)
return func(*args, **kwargs)
wrapper.__name__ = func.__name__
return wrapper
service_run_queue(service_id, **kwargs)
¶
The func used in a separate thread to run an agent service. The result will be saved in a queue if one is provided.
Source code in agentuniverse/agent_serve/web/web_util.py
Python
def service_run_queue(service_id, **kwargs):
"""The func used in a separate thread to run an agent service. The result
will be saved in a queue if one is provided."""
stream: queue.Queue = kwargs.get('output_stream')
try:
res = ServiceInstance(service_id).run(**kwargs)
return res
finally:
if stream:
stream.put_nowait('{"type": "EOF"}')