Hermes
Simulation Orchestration Platform for Aerospace
Loading...
Searching...
No Matches
hermes.server.telemetry.TelemetryEncoder Class Reference

Public Member Functions

None __init__ (self, SharedMemoryManager shm, list[str] signals)
list[str] signals (self)
int signal_count (self)
bytes encode (self)
tuple[int, float, list[float]] decode (cls, bytes data)
int frame_size (self)

Static Public Attributes

int MAGIC = 0x48455254
str HEADER_FORMAT = "<I Q d I"
int HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
float NANOSECONDS_PER_SECOND = 1_000_000_000.0

Protected Attributes

 _shm = shm
 _signals = list(signals)

Detailed Description

Encodes telemetry frames from shared memory to binary format.

Reads signal values from shared memory and packs them into a compact
binary format suitable for WebSocket transmission.

Example:
    encoder = TelemetryEncoder(shm, ["sensor.x", "sensor.y", "controller.output"])
    binary_frame = encoder.encode()
    await websocket.send(binary_frame)

Constructor & Destructor Documentation

◆ __init__()

None hermes.server.telemetry.TelemetryEncoder.__init__ ( self,
SharedMemoryManager shm,
list[str] signals )
Initialize telemetry encoder.

Args:
    shm: Shared memory manager to read from
    signals: List of signal names to include in frames

Member Function Documentation

◆ decode()

tuple[int, float, list[float]] hermes.server.telemetry.TelemetryEncoder.decode ( cls,
bytes data )
Decode binary frame to frame number, time, and values.

This is primarily for testing and debugging.

Args:
    data: Binary frame data

Returns:
    Tuple of (frame_number, time_seconds, signal_values)

Raises:
    ValueError: If data is invalid or corrupted

◆ encode()

bytes hermes.server.telemetry.TelemetryEncoder.encode ( self)
Encode current state from shared memory to binary frame.

Reads the current frame number, simulation time, and all subscribed
signal values from shared memory and packs them into binary format.

Returns:
    Binary frame data ready for WebSocket transmission

Raises:
    RuntimeError: If shared memory is not attached
    KeyError: If any subscribed signal is not found

◆ frame_size()

int hermes.server.telemetry.TelemetryEncoder.frame_size ( self)
Calculate the size of encoded frames.

Returns:
    Size in bytes of frames produced by this encoder

◆ signal_count()

int hermes.server.telemetry.TelemetryEncoder.signal_count ( self)
Number of signals per frame.

◆ signals()

list[str] hermes.server.telemetry.TelemetryEncoder.signals ( self)
Signal names included in telemetry frames.

Member Data Documentation

◆ _shm

hermes.server.telemetry.TelemetryEncoder._shm = shm
protected

◆ _signals

hermes.server.telemetry.TelemetryEncoder._signals = list(signals)
protected

◆ HEADER_FORMAT

str hermes.server.telemetry.TelemetryEncoder.HEADER_FORMAT = "<I Q d I"
static

◆ HEADER_SIZE

int hermes.server.telemetry.TelemetryEncoder.HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
static

◆ MAGIC

int hermes.server.telemetry.TelemetryEncoder.MAGIC = 0x48455254
static

◆ NANOSECONDS_PER_SECOND

float hermes.server.telemetry.TelemetryEncoder.NANOSECONDS_PER_SECOND = 1_000_000_000.0
static

The documentation for this class was generated from the following file: