Diseño de Arquitectura

Conoce en profundidad los principios de diseño, arquitectura y detalles de implementación de Peeka.

Tabla de Contenidos

  1. Objetivos de Diseño
    1. Baja Intrusión
    2. Alta Confiabilidad
    3. Tiempo Real
    4. Extensibilidad
  2. Arquitectura General
  3. Componentes Centrales
    1. 1. Adjuntar Proceso (attach.py)
      1. Python 3.14+
      2. Python 3.8.1-3.13
    2. 2. Núcleo del Agente (agent.py)
      1. Servidor Socket
      2. Distribución de Comandos
      3. Gestión de Recursos
    3. 3. Inyector de Decoradores (injector.py)
      1. Envoltorio de Función
      2. Restauración Segura
    4. 4. Cliente (client.py)
      1. Cliente Sincrónico
      2. Cliente de Streaming
    5. 5. Sistema de Comandos (commands/)
      1. Clase Base
      2. Registro de Comandos
  4. Protocolo de Comunicación
    1. Formato de Mensaje
    2. Formato de Solicitud
    3. Formato de Respuesta
      1. Respuesta Exitosa
      2. Respuesta de Error
      3. Observación en Streaming
  5. Mecanismos de Seguridad
    1. 1. Seguridad de Expresiones (safeeval/)
      1. Lista Blanca AST
      2. Protección de Atributos
      3. Lista Negra de Funciones
    2. 2. Límites de Recursos
      1. Búfer de Memoria
      2. Control de Tiempo de Espera
    3. 3. Captura de Excepciones
      1. Tres Capas de Protección
  6. Optimizaciones de Rendimiento
    1. 1. Inyección de Decoradores
    2. 2. API sys.monitoring (Python 3.12+)
    3. 3. Transmisión en Streaming
  7. Desarrollo de Extensiones
    1. Agregar un Nuevo Comando
    2. Agregar una Vista TUI
  8. Referencias

Objetivos de Diseño

El diseño del Agente Peeka sigue estos objetivos centrales:

Baja Intrusión

La ejecución del Agente no debe afectar significativamente el rendimiento y la funcionalidad del proceso objetivo. Según la experiencia de la industria, el sobrecoste de rendimiento de una herramienta de diagnóstico en producción debe controlarse dentro del 5%. Peeka garantiza que el impacto de las operaciones de diagnóstico en el proceso objetivo se minimiza a través de un mecanismo de inyección de decoradores cuidadosamente diseñado y una estrategia de almacenamiento en búfer de datos de observación.

Alta Confiabilidad

El Agente debe mantener un funcionamiento estable en diversas situaciones anómalas y no debe causar que el proceso objetivo se bloquee o se vuelva anormal debido a sus propios errores. Durante el diseño se presta especial atención a la gestión de recursos, incluida la correcta liberación de recursos del sistema como memoria, descriptores de archivos e hilos, para evitar problemas de estabilidad a largo plazo causados por fugas de recursos.

Tiempo Real

Los datos de diagnóstico se pueden transmitir al cliente en tiempo real, permitiendo a los desarrolladores observar inmediatamente los cambios en el comportamiento del proceso objetivo. Esto es especialmente importante para localizar problemas intermitentes. Peeka adopta un protocolo de comunicación de flujo basado en Unix Domain Socket, logrando una latencia de transmisión de datos de nivel de milisegundos.

Extensibilidad

La arquitectura del Agente puede admitir convenientemente nuevos comandos de diagnóstico y extensiones de funcionalidad sin necesidad de refactorizar a gran escala el código existente. Adopta un diseño modular, separando preocupaciones como comunicación, ejecución de comandos y observación, e interactúa a través de interfaces claramente definidas.


Arquitectura General

┌─────────────────────────────────────────────────────────────┐
│                        User Space                            │
│                                                               │
│  ┌──────────────┐                    ┌──────────────┐       │
│  │  CLI/TUI     │                    │ Target       │       │
│  │              │                    │ Process      │       │
│  │  - peeka-cli │                    │              │       │
│  │  - peeka     │                    │ ┌──────────┐ │       │
│  └──────┬───────┘                    │ │  Agent   │ │       │
│         │                            │ │ (injected)│ │       │
│         │ Unix Domain Socket         │ └────┬─────┘ │       │
│         │ /tmp/peeka_<pid>.sock      │      │       │       │
│         └────────────────────────────┼──────┘       │       │
│                                      │              │       │
│  ┌─────────────────┐                 │ ┌──────────┐ │       │
│  │ AgentClient     │←────JSON────────┤ │ Commands │ │       │
│  │                 │                 │ │          │ │       │
│  │ - send_command  │                 │ │ - watch  │ │       │
│  │ - recv_response │                 │ │ - trace  │ │       │
│  └─────────────────┘                 │ │ - stack  │ │       │
│                                      │ │ - monitor│ │       │
│                                      │ │ - logger │ │       │
│                                      │ │ - memory │ │       │
│                                      │ │ - inspect│ │       │
│                                      │ │ - thread │ │       │
│                                      │ │ - top    │ │       │
│                                      │ │ - sc/sm  │ │       │
│                                      │ │ - reset  │ │       │
│                                      │ │ - detach │ │       │
│                                      │ └──────────┘ │       │
│                                      │              │       │
│                                      │ ┌──────────┐ │       │
│                                      │ │ Injector │ │       │
│                                      │ │          │ │       │
│                                      │ │ Function │ │       │
│                                      │ │ Wrapping │ │       │
│                                      │ └──────────┘ │       │
│                                      └──────────────┘       │
└─────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────┐
│                      Kernel Space                            │
│                                                               │
│  ┌──────────────────────────────────────────┐               │
│  │  Process Attachment                       │               │
│  │                                           │               │
│  │  Python 3.14+:  sys.remote_exec()        │               │
│  │  Python < 3.14: GDB/LLDB fallback        │               │
│  └──────────────────────────────────────────┘               │
└─────────────────────────────────────────────────────────────┘

Componentes Centrales

1. Adjuntar Proceso (attach.py)

Responsable de inyectar el código del Agente en el proceso objetivo.

Python 3.14+

Usa la API sys.remote_exec() de PEP 768:

import sys

# Inyectar y ejecutar código del Agente
sys.remote_exec(pid, agent_script_path)

Ventajas:

  • Soporte oficial, seguro y confiable
  • Sin dependencia de depurador externo en Python 3.14+
  • Compatible multiplataforma

Python 3.8.1-3.13

Usa una alternativa con depurador: GDB + ptrace en Linux y LLDB + dlopen en macOS. El ejemplo siguiente muestra la ruta GDB heredada de Linux:

# 1. Adjuntar GDB al proceso
gdb -p <pid>

# 2. Obtener el GIL
call PyGILState_Ensure()

# 3. Ejecutar código Python
call PyRun_SimpleString("exec(open('/path/agent.py').read())")

# 4. Liberar el GIL
call PyGILState_Release(gil_state)

# 5. Desadjuntar
detach
quit

Requisitos:

  • Linux: GDB 7.3+, símbolos de depuración de Python, permiso CAP_SYS_PTRACE
  • macOS: Xcode Command Line Tools (incluye LLDB)

2. Núcleo del Agente (agent.py)

Código central inyectado en el proceso objetivo, responsable de:

Servidor Socket

import socket
import threading

class PeekaAgent:
    def __init__(self):
        self.socket_path = f"/tmp/peeka_{os.getpid()}.sock"
        self.server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
        self.server.bind(self.socket_path)
        self.server.listen(1)

    def start(self):
        # Ejecutar en un hilo en segundo plano, no bloquea el hilo principal
        thread = threading.Thread(target=self._serve, daemon=True)
        thread.start()

Distribución de Comandos

def _handle_command(self, command: dict) -> dict:
    cmd_type = command.get("command")
    handler = self.handlers.get(cmd_type)

    if handler:
        return handler.execute(command.get("params", {}))
    else:
        return {"status": "error", "error": f"Unknown command: {cmd_type}"}

Gestión de Recursos

  • Búfer de datos de observación de tamaño fijo (10000 entradas por defecto)
  • Limpieza automática de observaciones expiradas
  • Apagado elegante de hilos

3. Inyector de Decoradores (injector.py)

Envuelve dinámicamente funciones objetivo, agregando lógica de observación.

Envoltorio de Función

class DecoratorInjector:
    def inject_function(self, func, observer):
        original_func = func

        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            # Observación en la entrada
            if observer.at_enter:
                observer.on_enter(args, kwargs)

            try:
                # Ejecutar la función original
                result = original_func(*args, **kwargs)

                # Observación exitosa
                if observer.at_exit:
                    observer.on_exit(result)

                return result
            except Exception as e:
                # Observación de excepción
                if observer.at_exception:
                    observer.on_exception(e)
                raise

        return wrapper

Restauración Segura

def restore_function(self, target):
    # Restaurar la función original
    if hasattr(target, '__wrapped__'):
        original = target.__wrapped__
        # Reemplazo seguro (maneja varios casos límite)
        ...

4. Cliente (client.py)

Biblioteca cliente para comunicarse con el Agente.

Cliente Sincrónico

class AgentClient:
    def send_command(self, command: dict) -> dict:
        # Enviar comando (prefijo de longitud + JSON)
        msg = json.dumps(command).encode()
        self.sock.sendall(len(msg).to_bytes(4, 'big') + msg)

        # Recibir respuesta
        length = int.from_bytes(self.sock.recv(4), 'big')
        response = self.sock.recv(length)
        return json.loads(response)

Cliente de Streaming

class StreamingAgentClient(AgentClient):
    def stream_observations(self):
        # Generador, recibe datos de observación en tiempo real
        while True:
            msg = self._recv_message()
            if msg.get("type") == "observation":
                yield msg
            elif msg.get("type") == "event":
                if msg["event"] == "watch_stopped":
                    break

5. Sistema de Comandos (commands/)

Implementación modular de comandos.

Clase Base

class BaseCommand(ABC):
    def __init__(self, agent: "PeekaAgent"):
        self.agent = agent

    @abstractmethod
    def execute(self, params: Dict[str, Any]) -> Dict[str, Any]:
        """Ejecutar comando y devolver resultado"""
        pass

Registro de Comandos

def _register_handlers(self):
    from peeka.commands.watch import WatchCommand
    from peeka.commands.trace import TraceCommand
    # ... importación retardada para evitar dependencias circulares

    self.handlers = {
        "watch": WatchCommand(self),
        "trace": TraceCommand(self),
        # ...
    }

Protocolo de Comunicación

Formato de Mensaje

Todos los mensajes usan el formato prefijo de longitud + JSON:

┌─────────────┬─────────────────────────┐
│  Length     │   JSON Payload          │
│  (4 bytes)  │   (variable length)     │
└─────────────┴─────────────────────────┘

Formato de Solicitud

{
  "command": "watch",
  "params": {
    "pattern": "module.func",
    "times": 10,
    "condition": "cost > 100"
  }
}

Formato de Respuesta

Respuesta Exitosa

{
  "status": "success",
  "data": {
    "watch_id": "watch_001",
    "pattern": "module.func"
  }
}

Respuesta de Error

{
  "status": "error",
  "error": "Cannot find target: invalid.pattern",
  "traceback": "..."
}

Observación en Streaming

{
  "type": "observation",
  "watch_id": "watch_001",
  "timestamp": 1705586200.123,
  "func_name": "module.func",
  "args": [1, 2],
  "result": 3,
  "duration_ms": 0.123
}

Mecanismos de Seguridad

1. Seguridad de Expresiones (safeeval/)

Evaluación segura de expresiones basada en la biblioteca simpleeval.

Lista Blanca AST

Solo permite nodos AST seguros:

SAFE_NODES = {
    ast.Expression, ast.Compare, ast.BinOp, ast.UnaryOp,
    ast.Num, ast.Str, ast.NameConstant, ast.Name,
    ast.List, ast.Tuple, ast.Dict, ast.Subscript,
    # ... otros nodos seguros
}

Protección de Atributos

Bloquea acceso a atributos peligrosos:

UNSAFE_ATTRS = {
    '__class__', '__subclasses__', '__bases__',
    '__import__', '__builtins__', '__globals__'
}

Lista Negra de Funciones

Deshabilita funciones peligrosas:

UNSAFE_FUNCTIONS = {
    'eval', 'exec', 'compile', 'open',
    '__import__', 'input', 'globals', 'locals'
}

2. Límites de Recursos

Búfer de Memoria

class ObservationManager:
    def __init__(self, max_size=10000):
        self.buffer = collections.deque(maxlen=max_size)

Control de Tiempo de Espera

@timeout(seconds=30)
def execute_command(command):
    # La ejecución de comandos tiene protección por timeout
    pass

3. Captura de Excepciones

Tres Capas de Protección

  1. Módulo central: Lanza excepciones, falla rápido
  2. Capa de comandos: Captura y devuelve diccionario de error
  3. Capa de Agente: Protección final, agrega traceback
try:
    result = command.execute(params)
except Exception as e:
    result = {
        "status": "error",
        "error": str(e),
        "traceback": traceback.format_exc()
    }

Optimizaciones de Rendimiento

1. Inyección de Decoradores

  • Usa functools.wraps para preservar metadatos
  • Minimiza el sobrecoste del envoltorio (< 1%)
  • Compilación condicional (solo inyecta cuando es necesario)

2. API sys.monitoring (Python 3.12+)

El comando trace usa la API sys.monitoring:

import sys

def trace_callback(code, instruction_offset, args):
    # Callback de bajo costo
    pass

sys.monitoring.use_tool_id(sys.monitoring.PROFILER_ID, "peeka")
sys.monitoring.register_callback(
    sys.monitoring.PROFILER_ID,
    sys.monitoring.events.CALL,
    trace_callback
)

Comparación de Rendimiento:

  • sys.monitoring: < 5% de sobrecoste
  • sys.settrace: < 20% de sobrecoste (Python 3.8.1-3.11)

3. Transmisión en Streaming

  • Usa generadores para evitar acumulación de memoria
  • Baja latencia de transmisión con Unix Domain Socket
  • Parseo JSON en streaming

Desarrollo de Extensiones

Agregar un Nuevo Comando

  1. Crear clase de comando:
# peeka/commands/mycommand.py
from peeka.commands.base import BaseCommand
from typing import Dict, Any

class MyCommand(BaseCommand):
    def execute(self, params: Dict[str, Any]) -> Dict[str, Any]:
        try:
            # Implementar lógica del comando
            result = self._do_work(params)
            return {"status": "success", "data": result}
        except Exception as e:
            return {"status": "error", "error": str(e)}
  1. Registrar en el Agente:
# peeka/core/agent.py
def _register_handlers(self):
    from peeka.commands.mycommand import MyCommand
    self.handlers["mycommand"] = MyCommand(self)
  1. Agregar interfaz CLI:
# peeka/cli/main.py
parser.add_subcommand("mycommand", help="My new command")

Agregar una Vista TUI

  1. Crear clase de vista:
# peeka/tui/views/myview.py
from textual.widgets import Static

class MyView(Static):
    def compose(self):
        yield Label("My View")
  1. Registrar atajo de teclado:
# peeka/tui/screens/main.py
BINDINGS = [
    ("5", "show_myview", "My View"),
]

Referencias


Volver al inicio

Copyright © 2026 Peeka contributors. Distributed under the Apache License 2.0.

This site uses Just the Docs, a documentation theme for Jekyll.