Ngiler SH3LL 360
Home
Information
Create File
Create Folder
:
/
opt
/
cloudlinux
/
venv
/
lib
/
python3.11
/
site-packages
/
ssa
/
Information Server
MySQL :
OFF
Perl :
OFF
CURL :
ON
WGET :
OFF
PKEXEC :
OFF
Directive
Local Value
IP Address
63.250.38.10
System
Linux premium90.web-hosting.com 4.18.0-553.44.1.lve.el8.x86_64 #1 SMP Thu Mar 13 14:29:12 UTC 2025 x86_64
User
likhxapw
PHP Version
8.3.30
Software
LiteSpeed
Doc root
Writable
close
Edit File :
agent.py
| Size :
4.59
KB
Copy
# -*- coding: utf-8 -*- # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT """ This module contains contains classes implementing SSA Agent behaviour """ import atexit import json import logging import socket as socket_module import struct from concurrent.futures import ThreadPoolExecutor from threading import current_thread from .internal.constants import agent_sock from .internal.exceptions import SSAError from .internal.utils import create_socket from .modules.processor import RequestProcessor # Maximum number of concurrent worker threads for handling requests. # Limits memory usage on high-traffic servers. MAX_WORKERS = 50 class SimpleAgent: """ SSA Simple Agent class """ def __init__(self): self.logger = logging.getLogger('agent') self.request_processor = RequestProcessor() self.executor = ThreadPoolExecutor(max_workers=MAX_WORKERS) atexit.register(self._shutdown) # start serving incoming connections self.listen() def _shutdown(self): """Gracefully shutdown the thread pool executor.""" self.executor.shutdown(wait=False) def listen(self) -> None: """ Start listening socket """ _socket = create_socket(agent_sock) while True: connection, address = _socket.accept() self.executor.submit(self.handle, connection) self.logger.debug('[ThreadPool] Submitted task') # Fields that the PHP extension sends (see dump.c: ssa_agent_dump) _REQUIRED_FIELDS = frozenset({ 'timestamp', 'url', 'duration', 'hitting_limits', 'throttled_time', 'io_throttled_time', 'wordpress' }) @staticmethod def _get_peer_uid(connection: 'socket object') -> int: """ Get the UID of the peer process using SO_PEERCRED. :param connection: socket object :return: UID of the connecting process """ cred = connection.getsockopt( socket_module.SOL_SOCKET, socket_module.SO_PEERCRED, struct.calcsize('3i') ) _pid, uid, _gid = struct.unpack('3i', cred) return uid @classmethod def _validate_input(cls, data: dict) -> bool: """ Validate that input data contains exactly the expected metric fields. The PHP extension always sends all 7 fields (see dump.c), so we require an exact match to reject both unknown and partial payloads. """ if not isinstance(data, dict) or not data: return False return set(data.keys()) == cls._REQUIRED_FIELDS def handle(self, connection: 'socket object') -> None: """ Handle incoming connection :param connection: socket object usable to send and receive data on the connection """ try: peer_uid = self._get_peer_uid(connection) except (OSError, struct.error) as e: self.logger.error( '[%s] Failed to get peer credentials: %s', current_thread().name, str(e)) connection.close() return fileobj = connection.makefile(errors='ignore') try: input_data = self.read_input(fileobj) if not self._validate_input(input_data): self.logger.warning( '[%s] Rejected invalid payload from UID=%d: keys=%s', current_thread().name, peer_uid, sorted(input_data.keys()) if isinstance(input_data, dict) else type(input_data).__name__) return self.request_processor.handle(input_data) except (SSAError, json.JSONDecodeError, ValueError) as e: self.logger.error('Handled exception in [%s]: %s', current_thread().name, str(e)) except Exception as e: self.logger.exception('Unexpected exception in [%s]: %s', current_thread().name, str(e)) finally: fileobj.close() connection.close() def read_input(self, fileio: 'file object') -> dict: """ Read input data and return decoded json :param fileio: a file-like object providing read method """ data = fileio.read() self.logger.info('[%s] I received %i bytes: %s', current_thread().name, len(data.encode()), data) if data: return json.loads(data.strip(), strict=False) else: return {}
Back