This document describes the core classes implemented in Hermes Phase 1 and how they work together.
Module Layout
src/hermes/
├── backplane/ # IPC infrastructure
│ ├── shm.py # Shared memory management
│ ├── signals.py # Signal types and registry
│ └── sync.py # Synchronization primitives
├── core/ # Orchestration logic
│ ├── config.py # YAML configuration models
│ ├── process.py # Process lifecycle management
│ └── scheduler.py # Execution scheduling
├── scripting/ # Runtime API
│ └── api.py # Python inspection/injection
└── cli/ # Command-line interface
└── main.py # CLI commands
Backplane Layer
The backplane provides the low-level IPC primitives for inter-process communication.
SignalType & SignalFlags
SignalType.F64
SignalType.F32
SignalType.I64
SignalType.I32
SignalType.BOOL
SignalFlags.NONE
SignalFlags.WRITABLE
SignalFlags.PUBLISHED
SignalDescriptor
Immutable metadata about a signal:
desc = SignalDescriptor(
name="position.x",
type=SignalType.F64,
flags=SignalFlags.WRITABLE | SignalFlags.PUBLISHED,
unit="m",
description="X position in world frame"
)
SignalRegistry
Central registry mapping qualified signal names to descriptors:
registry = SignalRegistry()
registry.register("vehicle", SignalDescriptor(name="position.x"))
registry.register("vehicle", SignalDescriptor(name="velocity.x"))
desc = registry.get("vehicle.position.x")
vehicle_signals = registry.list_module("vehicle")
SharedMemoryManager
Manages the POSIX shared memory segment containing all signal values:
shm = SharedMemoryManager("/hermes_sim")
shm.create([
SignalDescriptor(name="position.x"),
SignalDescriptor(name="velocity.x"),
])
shm.set_signal("position.x", 100.0)
value = shm.get_signal("position.x")
shm.set_frame(42)
shm.set_time(0.42)
shm.set_time_ns(420_000_000)
frame = shm.get_frame()
time = shm.get_time()
time_ns = shm.get_time_ns()
shm.destroy()
Shared Memory Layout:
┌─────────────────────────────────────────────────────────────┐
│ Header (64 bytes) │
│ - magic: u32 ("HERM") │
│ - version: u32 (currently 3) │
│ - frame: u64 │
│ - time_ns: u64 (nanoseconds for determinism) │
│ - signal_count: u32 │
├─────────────────────────────────────────────────────────────┤
│ Signal Directory │
│ - [SignalEntry] × signal_count │
├─────────────────────────────────────────────────────────────┤
│ String Table (signal names) │
├─────────────────────────────────────────────────────────────┤
│ Data Region (signal values) │
└─────────────────────────────────────────────────────────────┘
Deterministic Time Tracking:
Time is stored as integer nanoseconds (u64) rather than floating-point seconds. This ensures bit-exact reproducibility across runs and platforms. For rates that don't divide evenly into 1 billion (e.g., 600 Hz), the timestep is rounded to the nearest nanosecond, introducing bounded error (~0.72ms/hour at 600 Hz) that does not accumulate.
FrameBarrier
Semaphore-based synchronization for coordinating module execution:
barrier = FrameBarrier("/hermes_barrier", count=3)
barrier.create()
module_barrier = FrameBarrier("/hermes_barrier", count=3)
module_barrier.attach()
barrier.signal_step()
module_barrier.wait_step(timeout=5.0)
module_barrier.signal_done()
barrier.wait_all_done(timeout=5.0)
barrier.destroy()
Core Layer
The core layer implements orchestration logic.
Configuration Models
Pydantic models for YAML configuration:
HermesConfig,
ModuleConfig,
ModuleType,
ExecutionConfig,
ExecutionMode,
WireConfig,
SignalConfig,
)
config = HermesConfig.from_yaml("simulation.yaml")
for name, module in config.modules.items():
print(f"Module: {name}, Type: {module.type}")
dt = config.get_dt()
order = config.get_module_names()
Module Types:
| Type | Description |
| ModuleType.PROCESS | External executable (C, C++, Rust) |
| ModuleType.SCRIPT | Python script as subprocess |
| ModuleType.INPROC | In-process (future: pybind11) |
Execution Modes:
| Mode | Description |
| ExecutionMode.REALTIME | Paced to wall-clock |
| ExecutionMode.AFAP | As fast as possible |
| ExecutionMode.SINGLE_FRAME | Manual stepping |
ModuleProcess
Manages a single module subprocess:
module.load()
module.stage()
module.terminate()
module.kill()
state = module.state
pid = module.pid
alive = module.is_alive
Module Lifecycle:
load() stage() step()... terminate()
│ │ │ │
▼ ▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐
│ INIT │──▶│ STAGED │──▶│ RUNNING │────▶│ DONE │
└─────────┘ └─────────┘ └─────────┘ └─────────┘
ProcessManager
Coordinates all module processes and IPC resources:
config = HermesConfig.from_yaml("sim.yaml")
with ProcessManager(config) as pm:
pm.load_all()
pm.stage_all()
for _ in range(100):
pm.update_time(frame, time)
pm.step_all()
ProcessManager responsibilities:
- Create shared memory segment with all signals
- Create synchronization barrier
- Spawn and manage module processes
- Coordinate frame stepping via barrier
- Clean up resources on shutdown
Scheduler
High-level simulation control:
config = HermesConfig.from_yaml("sim.yaml")
with ProcessManager(config) as pm:
pm.load_all()
scheduler = Scheduler(pm, config.execution)
scheduler.stage()
scheduler.step(10)
async def telemetry(frame: int, time: float) -> None:
if frame % 100 == 0:
print(f"Frame {frame}, Time {time:.3f}s")
await scheduler.run(callback=telemetry)
scheduler.pause()
scheduler.resume()
scheduler.stop()
Definition scheduler.py:1
Scheduler properties:
| Property | Description |
| frame | Current frame number |
| time | Current simulation time (float seconds, derived from time_ns) |
| time_ns | Current simulation time (integer nanoseconds, authoritative) |
| dt | Timestep (float seconds, derived from dt_ns) |
| dt_ns | Timestep (integer nanoseconds, authoritative) |
| running | Whether run loop is active |
| paused | Whether simulation is paused |
| mode | Current execution mode |
Deterministic Time:
The scheduler uses integer nanoseconds internally for determinism. Any positive rate_hz is allowed—rates that don't divide evenly into 1 billion are rounded to the nearest nanosecond.
Scripting Layer
SimulationAPI
Python API for runtime inspection and injection:
with SimulationAPI("/hermes_sim") as sim:
x = sim.get("vehicle.position.x")
sim.set("controller.thrust_cmd", 1000.0)
sim.inject({
"controller.thrust_cmd": 1000.0,
"controller.pitch_cmd": 0.1,
})
values = sim.sample([
"vehicle.position.x",
"vehicle.position.y",
])
sim.wait_frame(100, timeout=10.0)
frame = sim.get_frame()
time = sim.get_time()
time_ns = sim.get_time_ns()
sim.wait_time_ns(1_000_000_000, timeout=10.0)
CLI Layer
Commands
# Run simulation
hermes run config.yaml
hermes run config.yaml --verbose
hermes run config.yaml --quiet
# Validate configuration
hermes validate config.yaml
# List signals from running simulation
hermes list-signals --shm-name /hermes_sim
Data Flow
Here's how data flows through the system during a simulation frame:
1. Scheduler.step() called
│
▼
2. ProcessManager.update_time()
- Writes frame/time to shared memory
│
▼
3. ProcessManager.step_all()
- FrameBarrier.signal_step() releases all modules
│
▼
4. Each module:
- FrameBarrier.wait_step() returns
- Reads inputs from shared memory
- Executes physics/logic
- Writes outputs to shared memory
- FrameBarrier.signal_done()
│
▼
5. ProcessManager waits:
- FrameBarrier.wait_all_done()
│
▼
6. Scheduler increments frame/time
│
▼
7. Repeat for next frame
Thread/Process Safety
- SharedMemoryManager: Thread-safe for concurrent reads; writes should be synchronized externally
- FrameBarrier: Designed for multi-process synchronization
- Scheduler: Single-threaded; use async/await for non-blocking operation
- SignalRegistry: Not thread-safe; populate before starting simulation