Hermes
Simulation Orchestration Platform for Aerospace
Loading...
Searching...
No Matches
hermes.backplane.shm.SharedMemoryManager Class Reference

Public Member Functions

None __init__ (self, str name)
str name (self)
bool is_attached (self)
None create (self, list[SignalDescriptor] signals)
None attach (self)
None detach (self)
None destroy (self)
float get_signal (self, str name)
None set_signal (self, str name, float value)
int get_frame (self)
None set_frame (self, int frame)
int get_time_ns (self)
None set_time_ns (self, int time_ns)
float get_time (self)
None set_time (self, float time)
list[str] signal_names (self)
SharedMemoryManager __enter__ (self)
None __exit__ (self, *object args)

Static Public Attributes

int MAGIC = 0x4845524D
int VERSION = 3
int HEADER_SIZE = 64
str HEADER_FORMAT = "<I I Q Q I"
 HEADER_STRUCT_SIZE = struct.calcsize(HEADER_FORMAT)
int NANOSECONDS_PER_SECOND = 1_000_000_000

Protected Attributes

str _name = name
posix_ipc.SharedMemory|None _shm = None
mmap.mmap|None _mmap = None
dict _signal_offsets = {}
int _signal_count = 0
int _data_offset = 0

Detailed Description

Manages a shared memory segment for signal data.

This class handles creation, attachment, and access to a POSIX
shared memory segment containing simulation signals. The segment
can be accessed by multiple processes for efficient IPC.

Example:
    # Creator (main process)
    shm = SharedMemoryManager("/hermes_sim")
    shm.create(signals)

    # Attacher (module process)
    shm = SharedMemoryManager("/hermes_sim")
    shm.attach()
    value = shm.get_signal("module.signal")

Constructor & Destructor Documentation

◆ __init__()

None hermes.backplane.shm.SharedMemoryManager.__init__ ( self,
str name )
Initialize shared memory manager.

Args:
    name: Shared memory segment name (e.g., "/hermes_sim")

Member Function Documentation

◆ __enter__()

SharedMemoryManager hermes.backplane.shm.SharedMemoryManager.__enter__ ( self)

◆ __exit__()

None hermes.backplane.shm.SharedMemoryManager.__exit__ ( self,
*object args )

◆ attach()

None hermes.backplane.shm.SharedMemoryManager.attach ( self)
Attach to existing shared memory segment.

Raises:
    SharedMemoryError: If already attached
    posix_ipc.ExistentialError: If segment doesn't exist

◆ create()

None hermes.backplane.shm.SharedMemoryManager.create ( self,
list[SignalDescriptor] signals )
Create and initialize shared memory segment.

Args:
    signals: List of signal descriptors to allocate space for

Raises:
    SharedMemoryError: If already attached
    posix_ipc.ExistentialError: If segment already exists

◆ destroy()

None hermes.backplane.shm.SharedMemoryManager.destroy ( self)
Destroy the shared memory segment.

Should only be called by the creator after all attachers detach.

◆ detach()

None hermes.backplane.shm.SharedMemoryManager.detach ( self)
Detach from shared memory segment.

◆ get_frame()

int hermes.backplane.shm.SharedMemoryManager.get_frame ( self)
Get current frame number from header.

◆ get_signal()

float hermes.backplane.shm.SharedMemoryManager.get_signal ( self,
str name )
Read a signal value from shared memory.

Args:
    name: Signal name

Returns:
    Signal value as float

Raises:
    SharedMemoryError: If not attached
    SignalError: If signal not found

◆ get_time()

float hermes.backplane.shm.SharedMemoryManager.get_time ( self)
Get current simulation time in seconds from header.

This is derived from `get_time_ns()` for API convenience.
For deterministic comparisons, use `get_time_ns()` instead.

◆ get_time_ns()

int hermes.backplane.shm.SharedMemoryManager.get_time_ns ( self)
Get current simulation time in nanoseconds from header.

This is the authoritative time value for deterministic simulations.

◆ is_attached()

bool hermes.backplane.shm.SharedMemoryManager.is_attached ( self)
Whether currently attached to shared memory.

◆ name()

str hermes.backplane.shm.SharedMemoryManager.name ( self)
Shared memory segment name.

◆ set_frame()

None hermes.backplane.shm.SharedMemoryManager.set_frame ( self,
int frame )
Set frame number in header.

◆ set_signal()

None hermes.backplane.shm.SharedMemoryManager.set_signal ( self,
str name,
float value )
Write a signal value to shared memory.

Args:
    name: Signal name
    value: Value to write

Raises:
    SharedMemoryError: If not attached
    SignalError: If signal not found

◆ set_time()

None hermes.backplane.shm.SharedMemoryManager.set_time ( self,
float time )
Set simulation time in seconds in header.

Converts to nanoseconds internally. For precise control,
use `set_time_ns()` instead.

◆ set_time_ns()

None hermes.backplane.shm.SharedMemoryManager.set_time_ns ( self,
int time_ns )
Set simulation time in nanoseconds in header.

◆ signal_names()

list[str] hermes.backplane.shm.SharedMemoryManager.signal_names ( self)
Get list of all signal names.

Member Data Documentation

◆ _data_offset

hermes.backplane.shm.SharedMemoryManager._data_offset = 0
protected

◆ _mmap

mmap.mmap | None hermes.backplane.shm.SharedMemoryManager._mmap = None
protected

◆ _name

hermes.backplane.shm.SharedMemoryManager._name = name
protected

◆ _shm

posix_ipc.SharedMemory | None hermes.backplane.shm.SharedMemoryManager._shm = None
protected

◆ _signal_count

int hermes.backplane.shm.SharedMemoryManager._signal_count = 0
protected

◆ _signal_offsets

dict hermes.backplane.shm.SharedMemoryManager._signal_offsets = {}
protected

◆ HEADER_FORMAT

str hermes.backplane.shm.SharedMemoryManager.HEADER_FORMAT = "<I I Q Q I"
static

◆ HEADER_SIZE

hermes.backplane.shm.SharedMemoryManager.HEADER_SIZE = 64
static

◆ HEADER_STRUCT_SIZE

hermes.backplane.shm.SharedMemoryManager.HEADER_STRUCT_SIZE = struct.calcsize(HEADER_FORMAT)
static

◆ MAGIC

int hermes.backplane.shm.SharedMemoryManager.MAGIC = 0x4845524D
static

◆ NANOSECONDS_PER_SECOND

hermes.backplane.shm.SharedMemoryManager.NANOSECONDS_PER_SECOND = 1_000_000_000
static

◆ VERSION

int hermes.backplane.shm.SharedMemoryManager.VERSION = 3
static

The documentation for this class was generated from the following file: