Grpc server booster
AgentUniverseService
¶
Bases: AgentUniverseService
Implementation class of grpc service.
Source code in agentuniverse/agent_serve/web/rpc/grpc/grpc_server_booster.py
class AgentUniverseService(agentuniverse_service_pb2_grpc.AgentUniverseService):
"""Implementation class of grpc service."""
def service_run(self, request, context):
"""
Synchronous invocation of an agent service. Used in rpc implementation.
Request Args:
request(`agentuniverse_service_pb2.AgentServiceRequest`):
service_id(`str`): The id of the agent service.
params(`str`): A Json String contains agent params passed to service.
saved(`bool`): Save the request and result into database.
context: grpc context, needn't pass anything.
Return:
Returns agentuniverse_service_pb2.AgentServiceResponse:
success: This key holds a boolean value indicating the task was
successfully or not.
result: This key points to a nested dictionary that includes the
result of the task.
"""
service_result = service_run(
saved=request.saved,
params=request.params,
service_id=request.service_id
)
return agentuniverse_service_pb2.AgentServiceResponse(
result=service_result['result'],
request_id=service_result['request_id'],
success=service_result['success'],
message=service_result['message']
)
def service_run_async(self, request, context):
"""
Async invocation of an agent service, return the request id used to
get result later. Used in rpc implementation.
Request Args:
request(`agentuniverse_service_pb2.AgentServiceRequest`):
service_id(`str`): The id of the agent service.
params(`str`): A Json String contains agent params passed to service.
saved(`bool`): Save the request and result into database.
context: grpc context, needn't pass anything.
Return:
Returns agentuniverse_service_pb2.AgentServiceResponse:
success: This key holds a boolean value indicating the task was
successfully or not.
request_id: Stand for a single request taski, can be used in
service_run_result api to get the result of async task.
"""
service_result = service_run_async(
saved=request.saved,
params=request.params,
service_id=request.service_id
)
return agentuniverse_service_pb2.AgentServiceResponse(
result=service_result['result'],
request_id=service_result['request_id'],
success=service_result['success'],
message=service_result['message']
)
def service_run_result(self, request, context):
"""
Get the async service result.
Request Args:
request(`agentuniverse_service_pb2.AgentResultRequest`):
request_id(`str`): Request id returned by async run api.
context: grpc context, needn't pass anything.
Return:
Returns agentuniverse_service_pb2.AgentServiceResponse if
request_id exists in database.
success: This key holds a boolean value indicating the task was
successfully or not.
result: This key points to a nested dictionary that includes the
result of the task.
"""
service_result = service_run_result(
request_id=request.request_id
)
return agentuniverse_service_pb2.AgentServiceResponse(
result=service_result['result'],
request_id=service_result['request_id'],
success=service_result['success'],
message=service_result['message']
)
service_run(request, context)
¶
Synchronous invocation of an agent service. Used in rpc implementation.
Request Args
request(agentuniverse_service_pb2.AgentServiceRequest):
service_id(str): The id of the agent service.
params(str): A Json String contains agent params passed to service.
saved(bool): Save the request and result into database.
context: grpc context, needn't pass anything.
Return
Returns agentuniverse_service_pb2.AgentServiceResponse: success: This key holds a boolean value indicating the task was successfully or not. result: This key points to a nested dictionary that includes the result of the task.
Source code in agentuniverse/agent_serve/web/rpc/grpc/grpc_server_booster.py
def service_run(self, request, context):
"""
Synchronous invocation of an agent service. Used in rpc implementation.
Request Args:
request(`agentuniverse_service_pb2.AgentServiceRequest`):
service_id(`str`): The id of the agent service.
params(`str`): A Json String contains agent params passed to service.
saved(`bool`): Save the request and result into database.
context: grpc context, needn't pass anything.
Return:
Returns agentuniverse_service_pb2.AgentServiceResponse:
success: This key holds a boolean value indicating the task was
successfully or not.
result: This key points to a nested dictionary that includes the
result of the task.
"""
service_result = service_run(
saved=request.saved,
params=request.params,
service_id=request.service_id
)
return agentuniverse_service_pb2.AgentServiceResponse(
result=service_result['result'],
request_id=service_result['request_id'],
success=service_result['success'],
message=service_result['message']
)
service_run_async(request, context)
¶
Async invocation of an agent service, return the request id used to get result later. Used in rpc implementation.
Request Args
request(agentuniverse_service_pb2.AgentServiceRequest):
service_id(str): The id of the agent service.
params(str): A Json String contains agent params passed to service.
saved(bool): Save the request and result into database.
context: grpc context, needn't pass anything.
Return
Returns agentuniverse_service_pb2.AgentServiceResponse: success: This key holds a boolean value indicating the task was successfully or not. request_id: Stand for a single request taski, can be used in service_run_result api to get the result of async task.
Source code in agentuniverse/agent_serve/web/rpc/grpc/grpc_server_booster.py
def service_run_async(self, request, context):
"""
Async invocation of an agent service, return the request id used to
get result later. Used in rpc implementation.
Request Args:
request(`agentuniverse_service_pb2.AgentServiceRequest`):
service_id(`str`): The id of the agent service.
params(`str`): A Json String contains agent params passed to service.
saved(`bool`): Save the request and result into database.
context: grpc context, needn't pass anything.
Return:
Returns agentuniverse_service_pb2.AgentServiceResponse:
success: This key holds a boolean value indicating the task was
successfully or not.
request_id: Stand for a single request taski, can be used in
service_run_result api to get the result of async task.
"""
service_result = service_run_async(
saved=request.saved,
params=request.params,
service_id=request.service_id
)
return agentuniverse_service_pb2.AgentServiceResponse(
result=service_result['result'],
request_id=service_result['request_id'],
success=service_result['success'],
message=service_result['message']
)
service_run_result(request, context)
¶
Get the async service result.
Request Args
request(agentuniverse_service_pb2.AgentResultRequest):
request_id(str): Request id returned by async run api.
context: grpc context, needn't pass anything.
Return
Returns agentuniverse_service_pb2.AgentServiceResponse if request_id exists in database. success: This key holds a boolean value indicating the task was successfully or not. result: This key points to a nested dictionary that includes the result of the task.
Source code in agentuniverse/agent_serve/web/rpc/grpc/grpc_server_booster.py
def service_run_result(self, request, context):
"""
Get the async service result.
Request Args:
request(`agentuniverse_service_pb2.AgentResultRequest`):
request_id(`str`): Request id returned by async run api.
context: grpc context, needn't pass anything.
Return:
Returns agentuniverse_service_pb2.AgentServiceResponse if
request_id exists in database.
success: This key holds a boolean value indicating the task was
successfully or not.
result: This key points to a nested dictionary that includes the
result of the task.
"""
service_result = service_run_result(
request_id=request.request_id
)
return agentuniverse_service_pb2.AgentServiceResponse(
result=service_result['result'],
request_id=service_result['request_id'],
success=service_result['success'],
message=service_result['message']
)
start_grpc_server(configer=None)
¶
Used to start a grpc server, use configer to read grpc server config if applied, or use default config of 10 workers and 50051 port.
Source code in agentuniverse/agent_serve/web/rpc/grpc/grpc_server_booster.py
def start_grpc_server(configer=None):
"""Used to start a grpc server, use configer to read grpc server config if
applied, or use default config of 10 workers and 50051 port."""
if configer:
server_port = configer.value.get('GRPC', {}).get('server_port')
max_workers = configer.value.get('GRPC', {}).get('max_workers')
else:
max_workers = 10
server_port = 50051
server = grpc.server(futures.ThreadPoolExecutor(max_workers=max_workers))
agentuniverse_service_pb2_grpc.add_AgentUniverseServiceServicer_to_server(
AgentUniverseService(), server
)
server.add_insecure_port(f'[::]:{str(server_port)}')
server.start()
print(f"AgentUniverse grpc server start at port {str(server_port)}.")
server.wait_for_termination()