Skip to content

Sls sink

SlsSender

A class to send log to aliyun simple log server.

Source code in agentuniverse_extension/logger/sls_sink.py
Python
class SlsSender:
    """A class to send log to aliyun simple log server."""

    def __init__(
            self,
            project: str,
            log_store: str,
            endpoint: str,
            access_key_id: str,
            access_key_secret: str,
            queue_max_size: int,
            send_interval: float):
        """Initialize a sls sender.

        Args:
            project (`str`):
                Project name of aliyun sls.
            log_store (`str`):
                Log store of aliyun sls.
            endpoint (`str`):
                Endpoint of aliyun sls.
            access_key_id (`str`):
                Project name of aliyun sls.
            access_key_secret (`str`):
                Project name of aliyun sls.
            queue_max_size (`int`):
                Log queue max size, sls sender use a queue to save the logs
                to be sent, a separate thread will upload logs to aliyun sls
                periodically.
            send_interval (`float`):
                Interval of the separate thread sending logs to aliyun sls.
        """
        self.project = project
        self.log_store = log_store
        self.client = LogClient(endpoint, access_key_id, access_key_secret)
        self.log_queue = queue.Queue(queue_max_size)
        self.send_interval = send_interval

        self.send_thread_stop_event = threading.Event()
        self.send_thread = None
        self._logger = loguru.logger

    def _send_put_logs_request(self,
                               log_item_list: List[LogItem],
                               topic: str = "",
                               source: str = "") -> Optional[PutLogsResponse]:
        """Send a batch of logs to aliyun sls.

        Args:
            log_item_list (`List[LogItem]`):
                A list of log items to be sent to aliyun sls.
            topic (`str`, defaults to `""`):
                An attribute used to identify a group of logs in aliyun sls.
            source (`str`, defaults to `""`):
                An identifier that allows to discern the source of the log.

        Returns:
            If logs sent successfully, returns a PutLogsResponse, else returns
            none and log the error in local log file.
        """
        try:
            put_request = PutLogsRequest(self.project, self.log_store, topic,
                                         source, log_item_list)
            put_response = self.client.put_logs(put_request)
        except Exception as e:
            self._logger.error(
                f"send single log to sls failed: {str(e)}", )
            return None
        return put_response

    def send_single_log(self,
                        message: str,
                        topic: str = "",
                        source: str = "") -> Optional[PutLogsResponse]:
        """Send a single log to aliyun sls.

        Args:
            message (`str`):
                The message to be sent to aliyun sls.
            topic (`str`, defaults to `""`):
                An attribute used to identify a group of logs in aliyun sls.
            source (`str`, defaults to `""`):
                An identifier that allows to discern the source of the log.

        Returns:
            If logs sent successfully, returns a PutLogsResponse, else returns
            none and log the error in local log file.
        """
        log_item_list = list()
        log_item = LogItem()
        log_item.set_contents(message)
        log_item_list.append(log_item)
        return self._send_put_logs_request(log_item_list, topic, source)

    def put_log_queue(self, log_item: LogItem):
        """Put a single log item into the waiting queue.

        Args:
            log_item (`LogItem`):
                The log item to be put into the queue.
        """
        try:
            self.log_queue.put(log_item, block=False)
        except queue.Full:
            self._logger.error("sls log queue is full, discard new log")

    def batch_send(self,
                   topic: str = "",
                   source: str = "") -> Optional[PutLogsResponse]:
        """Send all log items in waiting queue to aliyun sls.

        Args:
            topic (`str`, defaults to `""`):
                An attribute used to identify a group of logs in aliyun sls.
            source (`str`, defaults to `""`):
                An identifier that allows to discern the source of the log.

        Returns:
            If logs sent successfully, returns a PutLogsResponse, else returns
            none and log the error in local log file.
        """

        # Get all log items in waiting queue.
        size = self.log_queue.qsize()
        log_item_list = []
        if self.log_queue is not None and size > 0:
            for i in range(size):
                try:
                    log_item = self.log_queue.get(block=False)
                except queue.Empty:
                    self._logger.error(
                        "sls log queue shorter than expected, "
                        "all logs have been sent")
                    break
                log_item_list.append(log_item)

        # Send all log items to aliyun sls.
        length = len(log_item_list)
        if length > 0:
            return self._send_put_logs_request(log_item_list, topic, source)
        return None

    def start_batch_send_thread(self):
        """Start the log sending thread."""
        if self.send_thread is None or not self.send_thread.is_alive():
            self.send_thread_stop_event.clear()
            self.send_thread = threading.Thread(
                target=self._schedule_send_log,
                name="loop_send_log_thread", daemon=True)
            self.send_thread.start()

    def stop_batch_send_thread(self):
        """Stop the log sending thread."""
        if self.send_thread is not None:
            self.send_thread_stop_event.set()
            self.send_thread.join()
            self.send_thread = None

    def _schedule_send_log(self):
        """Create an infinite loop uploading logs in queue to aliyun sls."""
        while not self.send_thread_stop_event.is_set():
            self.batch_send()
            time.sleep(self.send_interval)

__init__(project, log_store, endpoint, access_key_id, access_key_secret, queue_max_size, send_interval)

Initialize a sls sender.

Parameters:

Name Type Description Default
project `str`

Project name of aliyun sls.

required
log_store `str`

Log store of aliyun sls.

required
endpoint `str`

Endpoint of aliyun sls.

required
access_key_id `str`

Project name of aliyun sls.

required
access_key_secret `str`

Project name of aliyun sls.

required
queue_max_size `int`

Log queue max size, sls sender use a queue to save the logs to be sent, a separate thread will upload logs to aliyun sls periodically.

required
send_interval `float`

Interval of the separate thread sending logs to aliyun sls.

required
Source code in agentuniverse_extension/logger/sls_sink.py
Python
def __init__(
        self,
        project: str,
        log_store: str,
        endpoint: str,
        access_key_id: str,
        access_key_secret: str,
        queue_max_size: int,
        send_interval: float):
    """Initialize a sls sender.

    Args:
        project (`str`):
            Project name of aliyun sls.
        log_store (`str`):
            Log store of aliyun sls.
        endpoint (`str`):
            Endpoint of aliyun sls.
        access_key_id (`str`):
            Project name of aliyun sls.
        access_key_secret (`str`):
            Project name of aliyun sls.
        queue_max_size (`int`):
            Log queue max size, sls sender use a queue to save the logs
            to be sent, a separate thread will upload logs to aliyun sls
            periodically.
        send_interval (`float`):
            Interval of the separate thread sending logs to aliyun sls.
    """
    self.project = project
    self.log_store = log_store
    self.client = LogClient(endpoint, access_key_id, access_key_secret)
    self.log_queue = queue.Queue(queue_max_size)
    self.send_interval = send_interval

    self.send_thread_stop_event = threading.Event()
    self.send_thread = None
    self._logger = loguru.logger

batch_send(topic='', source='')

Send all log items in waiting queue to aliyun sls.

Parameters:

Name Type Description Default
topic `str`, defaults to `""`

An attribute used to identify a group of logs in aliyun sls.

''
source `str`, defaults to `""`

An identifier that allows to discern the source of the log.

''

Returns:

Type Description
Optional[PutLogsResponse]

If logs sent successfully, returns a PutLogsResponse, else returns

Optional[PutLogsResponse]

none and log the error in local log file.

Source code in agentuniverse_extension/logger/sls_sink.py
Python
def batch_send(self,
               topic: str = "",
               source: str = "") -> Optional[PutLogsResponse]:
    """Send all log items in waiting queue to aliyun sls.

    Args:
        topic (`str`, defaults to `""`):
            An attribute used to identify a group of logs in aliyun sls.
        source (`str`, defaults to `""`):
            An identifier that allows to discern the source of the log.

    Returns:
        If logs sent successfully, returns a PutLogsResponse, else returns
        none and log the error in local log file.
    """

    # Get all log items in waiting queue.
    size = self.log_queue.qsize()
    log_item_list = []
    if self.log_queue is not None and size > 0:
        for i in range(size):
            try:
                log_item = self.log_queue.get(block=False)
            except queue.Empty:
                self._logger.error(
                    "sls log queue shorter than expected, "
                    "all logs have been sent")
                break
            log_item_list.append(log_item)

    # Send all log items to aliyun sls.
    length = len(log_item_list)
    if length > 0:
        return self._send_put_logs_request(log_item_list, topic, source)
    return None

put_log_queue(log_item)

Put a single log item into the waiting queue.

Parameters:

Name Type Description Default
log_item `LogItem`

The log item to be put into the queue.

required
Source code in agentuniverse_extension/logger/sls_sink.py
Python
def put_log_queue(self, log_item: LogItem):
    """Put a single log item into the waiting queue.

    Args:
        log_item (`LogItem`):
            The log item to be put into the queue.
    """
    try:
        self.log_queue.put(log_item, block=False)
    except queue.Full:
        self._logger.error("sls log queue is full, discard new log")

send_single_log(message, topic='', source='')

Send a single log to aliyun sls.

Parameters:

Name Type Description Default
message `str`

The message to be sent to aliyun sls.

required
topic `str`, defaults to `""`

An attribute used to identify a group of logs in aliyun sls.

''
source `str`, defaults to `""`

An identifier that allows to discern the source of the log.

''

Returns:

Type Description
Optional[PutLogsResponse]

If logs sent successfully, returns a PutLogsResponse, else returns

Optional[PutLogsResponse]

none and log the error in local log file.

Source code in agentuniverse_extension/logger/sls_sink.py
Python
def send_single_log(self,
                    message: str,
                    topic: str = "",
                    source: str = "") -> Optional[PutLogsResponse]:
    """Send a single log to aliyun sls.

    Args:
        message (`str`):
            The message to be sent to aliyun sls.
        topic (`str`, defaults to `""`):
            An attribute used to identify a group of logs in aliyun sls.
        source (`str`, defaults to `""`):
            An identifier that allows to discern the source of the log.

    Returns:
        If logs sent successfully, returns a PutLogsResponse, else returns
        none and log the error in local log file.
    """
    log_item_list = list()
    log_item = LogItem()
    log_item.set_contents(message)
    log_item_list.append(log_item)
    return self._send_put_logs_request(log_item_list, topic, source)

start_batch_send_thread()

Start the log sending thread.

Source code in agentuniverse_extension/logger/sls_sink.py
Python
def start_batch_send_thread(self):
    """Start the log sending thread."""
    if self.send_thread is None or not self.send_thread.is_alive():
        self.send_thread_stop_event.clear()
        self.send_thread = threading.Thread(
            target=self._schedule_send_log,
            name="loop_send_log_thread", daemon=True)
        self.send_thread.start()

stop_batch_send_thread()

Stop the log sending thread.

Source code in agentuniverse_extension/logger/sls_sink.py
Python
def stop_batch_send_thread(self):
    """Stop the log sending thread."""
    if self.send_thread is not None:
        self.send_thread_stop_event.set()
        self.send_thread.join()
        self.send_thread = None

SlsSink

A custom loguru sink used to send logs to aliyun sls.

Source code in agentuniverse_extension/logger/sls_sink.py
Python
class SlsSink:
    """A custom loguru sink used to send logs to aliyun sls."""

    def __init__(self, sls_sender: SlsSender):
        """Initialize the sls sink."""
        self.sls_sender = sls_sender

    def __call__(self, message):
        """Construct the message to a sls log item and put it to the queue
         waiting for sls sender to send to the aliyun sls."""
        log_item = LogItem()
        log_item.set_time(int(message.record["time"].timestamp()))
        log_content = list()
        log_content.append(("content", str(message)))
        log_item.set_contents(log_content)
        self.sls_sender.put_log_queue(log_item)

__call__(message)

Construct the message to a sls log item and put it to the queue waiting for sls sender to send to the aliyun sls.

Source code in agentuniverse_extension/logger/sls_sink.py
Python
def __call__(self, message):
    """Construct the message to a sls log item and put it to the queue
     waiting for sls sender to send to the aliyun sls."""
    log_item = LogItem()
    log_item.set_time(int(message.record["time"].timestamp()))
    log_content = list()
    log_content.append(("content", str(message)))
    log_item.set_contents(log_content)
    self.sls_sender.put_log_queue(log_item)

__init__(sls_sender)

Initialize the sls sink.

Source code in agentuniverse_extension/logger/sls_sink.py
Python
def __init__(self, sls_sender: SlsSender):
    """Initialize the sls sink."""
    self.sls_sender = sls_sender