Version: 0.1 Date: January 2026 Status: Implementation Ready
1. Overview
Hermes is a Simulation Orchestration Platform that orchestrates simulation modules, routes signals between them, and serves telemetry to visualization clients. It is not a physics engine—it delegates computation to modules like Icarus.
Goals
- Load and manage multiple simulation modules via adapters
- Route signals between modules based on wiring configuration
- Execute synchronous simulation loops at configurable rates
- Serve telemetry to Daedalus clients via WebSocket
- Enable scripted test scenarios in Python
Non-Goals (Phase 1)
- Real-time execution with hard deadlines
- Multi-threaded parallel module execution
- Distributed simulation across multiple hosts
2. Project Structure
hermes/
├── pyproject.toml # Project metadata, dependencies
├── README.md
├── src/
│ └── hermes/
│ ├── __init__.py # Public API exports
│ ├── py.typed # PEP 561 marker
│ │
│ ├── core/ # Core abstractions
│ │ ├── __init__.py
│ │ ├── module.py # ModuleAdapter protocol
│ │ ├── signal.py # SignalDescriptor, SignalBus
│ │ ├── scheduler.py # Synchronous scheduler
│ │ └── config.py # Configuration dataclasses
│ │
│ ├── adapters/ # Module adapters
│ │ ├── __init__.py
│ │ ├── icarus.py # IcarusAdapter (cffi)
│ │ ├── script.py # ScriptAdapter (Python modules)
│ │ └── injection.py # InjectionAdapter (signal injection)
│ │
│ ├── server/ # WebSocket server
│ │ ├── __init__.py
│ │ ├── protocol.py # Message types, serialization
│ │ ├── telemetry.py # Binary telemetry encoder
│ │ └── websocket.py # WebSocket server (asyncio)
│ │
│ └── cli/ # Command-line interface
│ ├── __init__.py
│ └── main.py # Entry point
│
├── tests/
│ ├── conftest.py # Pytest fixtures
│ ├── test_signal_bus.py
│ ├── test_scheduler.py
│ ├── test_icarus_adapter.py
│ ├── test_protocol.py
│ └── integration/
│ └── test_full_loop.py
│
├── examples/
│ ├── basic_sim.yaml # Minimal configuration
│ ├── multi_module.yaml # Multiple modules with wiring
│ └── scripts/
│ └── test_attitude.py # Example test script
│
└── docs/
├── implementation_plan.md # This document
└── protocol.md # Wire protocol specification
3. Dependencies
Runtime Dependencies
[project]
dependencies = [
"websockets>=12.0", # Async WebSocket server
"pyyaml>=6.0", # Configuration parsing
"pydantic>=2.5", # Config validation & dataclasses
"structlog>=24.1", # Structured logging
"numpy>=1.26", # Signal array operations
"click>=8.1", # CLI framework
]
# icarus Python bindings (pybind11) provided by nix environment
Development Dependencies
[project.optional-dependencies]
dev = [
"pytest>=8.0",
"pytest-asyncio>=0.23",
"pytest-cov>=4.1",
"ruff>=0.1", # Linting + formatting
"mypy>=1.8", # Type checking
"pre-commit>=3.6",
]
4. Core Abstractions
4.1 ModuleAdapter Protocol
from typing import Protocol, runtime_checkable
from hermes.core.signal import SignalDescriptor
@runtime_checkable
class ModuleAdapter(Protocol):
"""Interface for simulation modules."""
@property
def name(self) -> str:
"""Unique module identifier."""
...
@property
def signals(self) -> dict[str, SignalDescriptor]:
"""Available signals with metadata."""
...
def stage(self) -> None:
"""Prepare for execution. Called once before run loop."""
...
def step(self, dt: float) -> None:
"""Advance module by dt seconds."""
...
def reset(self) -> None:
"""Return to initial conditions."""
...
def get(self, signal: str) -> float:
"""Get signal value by local name (without module prefix)."""
...
def set(self, signal: str, value: float) -> None:
"""Set signal value by local name."""
...
def get_bulk(self, signals: list[str]) -> list[float]:
"""Get multiple signal values efficiently."""
return [self.get(s) for s in signals]
def close(self) -> None:
"""Release resources."""
...
4.2 SignalDescriptor
from dataclasses import dataclass
from enum import Enum
from typing import Literal
class SignalType(Enum):
SCALAR = "f64"
VEC3 = "vec3"
QUAT = "quat"
@dataclass(frozen=True, slots=True)
class SignalDescriptor:
"""Metadata for a signal."""
name: str
type: SignalType = SignalType.SCALAR
unit: str = ""
writable: bool = True
description: str = ""
4.3 SignalBus
@dataclass
class Wire:
"""Connection between two signals."""
src_module: str
src_signal: str
dst_module: str
dst_signal: str
gain: float = 1.0
offset: float = 0.0
class SignalBus:
"""Routes signals between modules."""
def __init__(self) -> None:
self._modules: dict[str, ModuleAdapter] = {}
self._wires: list[Wire] = []
def register_module(self, module: ModuleAdapter) -> None:
"""Add a module to the bus."""
self._modules[module.name] = module
def add_wire(self, wire: Wire) -> None:
"""Add a signal wire."""
self._validate_wire(wire)
self._wires.append(wire)
def route(self) -> None:
"""Transfer all wired signals (src → dst)."""
for wire in self._wires:
src = self._modules[wire.src_module]
dst = self._modules[wire.dst_module]
value = src.get(wire.src_signal)
dst.set(wire.dst_signal, value * wire.gain + wire.offset)
def get(self, qualified_name: str) -> float:
"""Get signal by qualified name (module.signal)."""
module_name, signal_name = self._parse_qualified(qualified_name)
return self._modules[module_name].get(signal_name)
def set(self, qualified_name: str, value: float) -> None:
"""Set signal by qualified name."""
module_name, signal_name = self._parse_qualified(qualified_name)
self._modules[module_name].set(signal_name, value)
def get_schema(self) -> dict:
"""Return full schema for all modules."""
return {
"modules": {
name: {
"signals": {
sig.name: {"type": sig.type.value, "unit": sig.unit}
for sig in mod.signals.values()
}
}
for name, mod in self._modules.items()
},
"wiring": [
{
"src": f"{w.src_module}.{w.src_signal}",
"dst": f"{w.dst_module}.{w.dst_signal}",
}
for w in self._wires
],
}
4.4 Scheduler
import time
from enum import Enum
from dataclasses import dataclass
from hermes.core.signal import SignalBus
class ExecutionMode(Enum):
AS_FAST_AS_POSSIBLE = "afap"
REAL_TIME = "realtime"
PAUSED = "paused"
@dataclass
class SchedulerConfig:
dt: float = 0.01
mode: ExecutionMode = ExecutionMode.AS_FAST_AS_POSSIBLE
end_time: float | None = None
class Scheduler:
"""Synchronous simulation scheduler."""
def __init__(self, bus: SignalBus, config: SchedulerConfig) -> None:
self._bus = bus
self._config = config
self._time: float = 0.0
self._frame: int = 0
self._running: bool = False
@property
def time(self) -> float:
return self._time
@property
def frame(self) -> int:
return self._frame
def stage(self) -> None:
"""Stage all modules."""
for module in self._bus._modules.values():
module.stage()
self._time = 0.0
self._frame = 0
def step(self) -> None:
"""Execute one simulation frame."""
dt = self._config.dt
for module in self._bus._modules.values():
module.step(dt)
self._bus.route()
self._time += dt
self._frame += 1
def reset(self) -> None:
"""Reset all modules to initial state."""
for module in self._bus._modules.values():
module.reset()
self._time = 0.0
self._frame = 0
async def run(self, callback=None) -> None:
"""Run simulation loop until end_time or stopped."""
self._running = True
wall_start = time.perf_counter()
while self._running:
if self._config.end_time and self._time >= self._config.end_time:
break
self.step()
if callback:
await callback(self._frame, self._time)
if self._config.mode == ExecutionMode.REAL_TIME:
target_wall = wall_start + self._time
sleep_time = target_wall - time.perf_counter()
if sleep_time > 0:
await asyncio.sleep(sleep_time)
self._running = False
def stop(self) -> None:
"""Stop the run loop."""
self._running = False
5. Icarus Adapter
5.1 Using pybind11 Bindings
Icarus provides Python bindings via pybind11 (built with BUILD_INTERFACES=ON). The adapter wraps these bindings to provide the ModuleAdapter interface.
from pathlib import Path
from hermes.core.signal import SignalDescriptor, SignalType
class IcarusAdapter:
"""Adapter for Icarus 6DOF simulation via pybind11 bindings."""
def __init__(self, name: str, config_path: str | Path) -> None:
self._name = name
self._config_path = Path(config_path)
import icarus
self._sim = icarus.Simulator(str(self._config_path))
self._icarus = icarus
self._signals: dict[str, SignalDescriptor] = {}
for signal_name in self._sim.signals:
self._signals[signal_name] = SignalDescriptor(
name=signal_name,
type=SignalType.SCALAR,
)
@property
def name(self) -> str:
return self._name
@property
def signals(self) -> dict[str, SignalDescriptor]:
return self._signals
def stage(self) -> None:
self._sim.stage()
def step(self, dt: float) -> None:
if dt == 0 or dt == self._sim.dt:
self._sim.step()
else:
self._sim.step(dt)
def reset(self) -> None:
self._sim.reset()
def get(self, signal: str) -> float:
try:
return self._sim.get(signal)
except self._icarus.SignalNotFoundError as e:
raise KeyError(f"Signal not found: {signal}") from e
def set(self, signal: str, value: float) -> None:
try:
self._sim.set(signal, value)
except self._icarus.SignalNotFoundError as e:
raise KeyError(f"Signal not found: {signal}") from e
def get_time(self) -> float:
return self._sim.time
def close(self) -> None:
self._sim = None
Benefits of pybind11 over cffi:
- No need to maintain C API definitions in Python
- Automatic numpy integration for state vectors
- Python exceptions mapped from C++ exceptions
- Better type safety and IDE support
5.2 InjectionAdapter (Signal Injection)
from hermes.core.module import ModuleAdapter
from hermes.core.signal import SignalDescriptor
class InjectionAdapter:
"""Adapter for injecting test signals."""
def __init__(self, name: str, signals: list[str]) -> None:
self._name = name
self._values: dict[str, float] = {s: 0.0 for s in signals}
self._signals = {
s: SignalDescriptor(name=s, writable=True) for s in signals
}
@property
def name(self) -> str:
return self._name
@property
def signals(self) -> dict[str, SignalDescriptor]:
return self._signals
def stage(self) -> None:
pass
def step(self, dt: float) -> None:
pass
def reset(self) -> None:
self._values = {s: 0.0 for s in self._values}
def get(self, signal: str) -> float:
return self._values[signal]
def set(self, signal: str, value: float) -> None:
self._values[signal] = value
def close(self) -> None:
pass
6. WebSocket Server
6.1 Protocol Messages
from dataclasses import dataclass
from enum import Enum
from typing import Any
import json
class MessageType(Enum):
SCHEMA = "schema"
EVENT = "event"
ERROR = "error"
ACK = "ack"
COMMAND = "cmd"
@dataclass
class Command:
"""Client command."""
action: str
params: dict[str, Any]
@classmethod
def from_json(cls, data: str) -> "Command":
obj = json.loads(data)
return cls(
action=obj["action"],
params={k: v for k, v in obj.items() if k not in ("type", "action")},
)
def make_schema_message(schema: dict, version: str = "0.2") -> str:
"""Create schema message JSON."""
return json.dumps({
"type": "schema",
"version": version,
**schema,
})
def make_event_message(name: str, **data) -> str:
"""Create event message JSON."""
return json.dumps({"type": "event", "name": name, **data})
def make_error_message(message: str, code: str = "ERROR") -> str:
"""Create error message JSON."""
return json.dumps({"type": "error", "code": code, "message": message})
def make_ack_message(action: str) -> str:
"""Create acknowledgment message JSON."""
return json.dumps({"type": "ack", "action": action})
6.2 Binary Telemetry
import struct
from dataclasses import dataclass
@dataclass
class TelemetryConfig:
"""Telemetry streaming configuration."""
rate_hz: float = 60.0
subscribed_signals: list[str] | None = None
class TelemetryEncoder:
"""Encodes telemetry frames as binary."""
HEADER_FORMAT = "<I d H H"
HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
def __init__(self, signals: list[str]) -> None:
self._signals = signals
self._signal_format = f"<{len(signals)}d"
@property
def signals(self) -> list[str]:
return self._signals
def encode(self, frame: int, time: float, values: list[float]) -> bytes:
"""Encode a telemetry frame as binary."""
header = struct.pack(
self.HEADER_FORMAT,
frame,
time,
len(values),
0,
)
payload = struct.pack(self._signal_format, *values)
return header + payload
@classmethod
def decode_header(cls, data: bytes) -> tuple[int, float, int]:
"""Decode header, return (frame, time, count)."""
frame, time, count, _ = struct.unpack(cls.HEADER_FORMAT, data[:cls.HEADER_SIZE])
return frame, time, count
6.3 WebSocket Server
import asyncio
import structlog
from websockets.server import serve, WebSocketServerProtocol
from hermes.core.signal import SignalBus
Command, make_schema_message, make_event_message,
make_error_message, make_ack_message,
)
log = structlog.get_logger()
class HermesServer:
"""WebSocket server for Hermes protocol."""
def __init__(
self,
bus: SignalBus,
scheduler: Scheduler,
host: str = "0.0.0.0",
port: int = 8765,
telemetry_config: TelemetryConfig | None = None,
) -> None:
self._bus = bus
self._scheduler = scheduler
self._host = host
self._port = port
self._telemetry_config = telemetry_config or TelemetryConfig()
self._clients: set[WebSocketServerProtocol] = set()
self._encoder: TelemetryEncoder | None = None
self._running = False
async def start(self) -> None:
"""Start the WebSocket server."""
self._running = True
async with serve(self._handler, self._host, self._port) as server:
log.info("Server started", host=self._host, port=self._port)
await server.wait_closed()
async def _handler(self, websocket: WebSocketServerProtocol) -> None:
"""Handle a client connection."""
self._clients.add(websocket)
log.info("Client connected", remote=websocket.remote_address)
try:
schema = self._bus.get_schema()
await websocket.send(make_schema_message(schema))
async for message in websocket:
await self._handle_message(websocket, message)
except Exception as e:
log.error("Client error", error=str(e))
finally:
self._clients.discard(websocket)
log.info("Client disconnected", remote=websocket.remote_address)
async def _handle_message(
self, websocket: WebSocketServerProtocol, message: str
) -> None:
"""Process a client message."""
try:
cmd = Command.from_json(message)
await self._execute_command(websocket, cmd)
except Exception as e:
await websocket.send(make_error_message(str(e)))
async def _execute_command(
self, websocket: WebSocketServerProtocol, cmd: Command
) -> None:
"""Execute a command."""
match cmd.action:
case "pause":
self._scheduler.stop()
await self._broadcast(make_event_message("state_changed", state="paused"))
case "resume":
asyncio.create_task(self._run_with_telemetry())
await self._broadcast(make_event_message("state_changed", state="running"))
case "reset":
self._scheduler.reset()
await self._broadcast(make_event_message("state_changed", state="reset"))
case "step":
count = cmd.params.get("count", 1)
for _ in range(count):
self._scheduler.step()
await self._send_telemetry_frame()
case "set":
signal = cmd.params["signal"]
value = cmd.params["value"]
self._bus.set(signal, value)
await websocket.send(make_ack_message("set"))
case "subscribe":
signals = cmd.params["signals"]
self._setup_telemetry(signals)
await websocket.send(make_ack_message("subscribe"))
case _:
await websocket.send(make_error_message(f"Unknown action: {cmd.action}"))
async def _run_with_telemetry(self) -> None:
"""Run simulation with telemetry streaming."""
telemetry_interval = 1.0 / self._telemetry_config.rate_hz
last_telemetry = 0.0
async def callback(frame: int, time: float) -> None:
nonlocal last_telemetry
if time - last_telemetry >= telemetry_interval:
await self._send_telemetry_frame()
last_telemetry = time
await self._scheduler.run(callback=callback)
async def _send_telemetry_frame(self) -> None:
"""Send binary telemetry to all clients."""
if not self._encoder or not self._clients:
return
values = [self._bus.get(s) for s in self._encoder.signals]
data = self._encoder.encode(
self._scheduler.frame,
self._scheduler.time,
values,
)
await asyncio.gather(
*[client.send(data) for client in self._clients],
return_exceptions=True,
)
async def _broadcast(self, message: str) -> None:
"""Broadcast text message to all clients."""
if self._clients:
await asyncio.gather(
*[client.send(message) for client in self._clients],
return_exceptions=True,
)
def _setup_telemetry(self, signals: list[str]) -> None:
"""Configure telemetry encoder for specified signals."""
expanded = self._expand_signal_patterns(signals)
self._encoder = TelemetryEncoder(expanded)
def _expand_signal_patterns(self, patterns: list[str]) -> list[str]:
"""Expand wildcard patterns to concrete signal names."""
import fnmatch
all_signals = []
schema = self._bus.get_schema()
for module_name, module_data in schema["modules"].items():
for signal_name in module_data["signals"]:
all_signals.append(f"{module_name}.{signal_name}")
result = []
for pattern in patterns:
if "*" in pattern:
result.extend(fnmatch.filter(all_signals, pattern))
else:
result.append(pattern)
return result
Definition scheduler.py:1
Definition telemetry.py:1
7. Configuration
7.1 Config Schema
from pathlib import Path
from pydantic import BaseModel, Field
class ModuleConfig(BaseModel):
"""Configuration for a single module."""
adapter: str
config: str | None = None
lib_path: str | None = None
script: str | None = None
signals: list[str] | None = None
class WireConfig(BaseModel):
"""Configuration for a signal wire."""
src: str
dst: str
gain: float = 1.0
offset: float = 0.0
class ExecutionConfig(BaseModel):
"""Execution settings."""
mode: str = "afap"
rate_hz: float = 100.0
end_time: float | None = None
class ServerConfig(BaseModel):
"""WebSocket server settings."""
host: str = "0.0.0.0"
port: int = 8765
telemetry_hz: float = 60.0
class HermesConfig(BaseModel):
"""Root configuration."""
version: str = "0.2"
modules: dict[str, ModuleConfig]
wiring: list[WireConfig] = Field(default_factory=list)
execution: ExecutionConfig = Field(default_factory=ExecutionConfig)
server: ServerConfig = Field(default_factory=ServerConfig)
@classmethod
def from_yaml(cls, path: Path) -> "HermesConfig":
import yaml
with open(path) as f:
data = yaml.safe_load(f)
return cls(**data)
7.2 Example Configuration
# examples/basic_sim.yaml
version: "0.2"
modules:
icarus:
adapter: icarus
config: ./icarus_config.yaml
execution:
mode: afap
rate_hz: 100.0
end_time: 60.0
server:
host: "0.0.0.0"
port: 8765
telemetry_hz: 60.0
8. CLI Entry Point
import asyncio
from pathlib import Path
import click
import structlog
from hermes.core.signal import SignalBus, Wire
from hermes.adapters.icarus import IcarusAdapter
from hermes.adapters.injection import InjectionAdapter
structlog.configure(
processors=[
structlog.dev.ConsoleRenderer(),
],
)
log = structlog.get_logger()
ADAPTER_REGISTRY = {
"icarus": IcarusAdapter,
"injection": InjectionAdapter,
}
@click.group()
def cli():
"""Hermes - Simulation Orchestration Platform"""
pass
@cli.command()
@click.argument("config_path", type=click.Path(exists=True, path_type=Path))
@click.option("--no-server", is_flag=True, help="Run without WebSocket server")
def run(config_path: Path, no_server: bool):
"""Run simulation from configuration file."""
log.info("Loading configuration", path=config_path)
config = HermesConfig.from_yaml(config_path)
bus = SignalBus()
for name, mod_config in config.modules.items():
adapter_cls = ADAPTER_REGISTRY.get(mod_config.adapter)
if adapter_cls is None:
raise click.ClickException(f"Unknown adapter: {mod_config.adapter}")
if mod_config.adapter == "icarus":
adapter = adapter_cls(name, mod_config.config, mod_config.lib_path)
elif mod_config.adapter == "injection":
adapter = adapter_cls(name, mod_config.signals or [])
else:
raise click.ClickException(f"Adapter not implemented: {mod_config.adapter}")
bus.register_module(adapter)
log.info("Registered module", name=name, adapter=mod_config.adapter)
for wire_config in config.wiring:
src_module, src_signal = wire_config.src.rsplit(".", 1)
dst_module, dst_signal = wire_config.dst.rsplit(".", 1)
bus.add_wire(Wire(
src_module=src_module,
src_signal=src_signal,
dst_module=dst_module,
dst_signal=dst_signal,
gain=wire_config.gain,
offset=wire_config.offset,
))
scheduler_config = SchedulerConfig(
dt=1.0 / config.execution.rate_hz,
mode=ExecutionMode(config.execution.mode),
end_time=config.execution.end_time,
)
scheduler = Scheduler(bus, scheduler_config)
log.info("Staging simulation")
scheduler.stage()
if no_server:
log.info("Running simulation (no server)")
asyncio.run(scheduler.run())
else:
telemetry_config = TelemetryConfig(rate_hz=config.server.telemetry_hz)
server = HermesServer(
bus, scheduler,
host=config.server.host,
port=config.server.port,
telemetry_config=telemetry_config,
)
log.info("Starting server", host=config.server.host, port=config.server.port)
asyncio.run(server.start())
@cli.command()
@click.argument("config_path", type=click.Path(exists=True, path_type=Path))
def schema(config_path: Path):
"""Print schema JSON for a configuration."""
import json
config = HermesConfig.from_yaml(config_path)
bus = SignalBus()
for name, mod_config in config.modules.items():
pass
print(json.dumps(bus.get_schema(), indent=2))
def main():
cli()
if __name__ == "__main__":
main()
Definition websocket.py:1
9. Implementation Phases
Phase 1: Foundation
Goal: Minimal working system with Icarus adapter
| Task | Description | Deliverable |
| 1.1 | Project setup | pyproject.toml, dev dependencies, ruff/mypy config |
| 1.2 | Core abstractions | ModuleAdapter, SignalDescriptor, SignalBus |
| 1.3 | Icarus adapter | CFFI bindings, schema parsing, signal get/set |
| 1.4 | Synchronous scheduler | Basic step loop, time tracking |
| 1.5 | CLI skeleton | hermes run config.yaml --no-server |
| 1.6 | Tests | Unit tests for bus, scheduler, adapter |
Exit Criteria: hermes run steps Icarus and prints telemetry to console.
Phase 2: WebSocket Server
Goal: Daedalus can connect and receive telemetry
| Task | Description | Deliverable |
| 2.1 | Protocol messages | JSON schema, command, event types |
| 2.2 | Binary telemetry | Encoder/decoder with header + payload |
| 2.3 | WebSocket server | asyncio server, client management |
| 2.4 | Command handling | pause, resume, reset, step, set |
| 2.5 | Telemetry streaming | Decimation, subscription |
| 2.6 | Integration test | Python client connecting and receiving |
Exit Criteria: External WebSocket client receives binary telemetry at 60 Hz.
Phase 3: Multi-Module & Wiring
Goal: Multiple modules with signal routing
| Task | Description | Deliverable |
| 3.1 | Injection adapter | Simple value store for test signals |
| 3.2 | Wire configuration | YAML parsing, validation |
| 3.3 | Signal routing | bus.route() with gain/offset |
| 3.4 | Qualified names | module.signal parsing throughout |
| 3.5 | Schema generation | Combined schema from all modules |
| 3.6 | Multi-module test | Icarus + injection with wiring |
Exit Criteria: Injection adapter can override Icarus inputs via wiring.
Phase 4: Polish & Documentation
Goal: Production-ready for Daedalus development
| Task | Description | Deliverable |
| 4.1 | Error handling | Proper exceptions, logging |
| 4.2 | Configuration validation | Pydantic schema, helpful errors |
| 4.3 | Protocol documentation | docs/protocol.md with examples |
| 4.4 | Example configurations | examples/ directory |
| 4.5 | CI setup | GitHub Actions for tests, lint, type check |
Exit Criteria: Hermes is documented and tested enough for Daedalus to start development.
10. Testing Strategy
Unit Tests
def test_register_module():
bus = SignalBus()
adapter = MockAdapter("test")
bus.register_module(adapter)
assert "test" in bus._modules
def test_wire_routing():
bus = SignalBus()
src = MockAdapter("src", signals={"out": 42.0})
dst = MockAdapter("dst", signals={"in": 0.0})
bus.register_module(src)
bus.register_module(dst)
bus.add_wire(Wire("src", "out", "dst", "in"))
bus.route()
assert dst.get("in") == 42.0
def test_wire_with_gain():
Integration Tests
@pytest.mark.asyncio
async def test_icarus_step_loop():
"""Full integration: load Icarus, step, verify state changes."""
adapter = IcarusAdapter("icarus", "tests/fixtures/ball_drop.yaml")
bus = SignalBus()
bus.register_module(adapter)
scheduler = Scheduler(bus, SchedulerConfig(dt=0.01))
scheduler.stage()
z0 = bus.get("icarus.Ball.position.z")
for _ in range(100):
scheduler.step()
z1 = bus.get("icarus.Ball.position.z")
assert z1 < z0
WebSocket Protocol Tests
@pytest.mark.asyncio
async def test_client_receives_schema():
"""Client receives schema on connect."""
async with websockets.connect("ws://localhost:8765") as ws:
msg = await ws.recv()
data = json.loads(msg)
assert data["type"] == "schema"
assert "modules" in data
11. Nix Development Workflow
Hermes uses Nix flakes for reproducible development environments.
Setup
cd hermes
# Enter development shell (includes Icarus bindings)
nix develop
# Or with direnv
echo "use flake" > .envrc
direnv allow
Flake Structure
{
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable";
icarus.url = "path:/home/tanged/sources/icarus"; # Local dev
# icarus.url = "github:tanged123/icarus"; # CI/prod
};
outputs = { self, nixpkgs, icarus, ... }: {
devShells.default = mkShell {
packages = [ pythonEnv icarusPackage ruff ];
shellHook = ''
export PYTHONPATH="${icarusPackage}/lib/python3.12/site-packages:$PYTHONPATH"
'';
};
};
}
Building
# Build Hermes package
nix build
# Run tests
nix develop -c pytest
# Format code
nix fmt
12. Open Questions for Implementation
- Signal type handling: Should we support Vec3/Quat in the protocol, or flatten to scalars?
- Recommendation: Flatten for Phase 1, add structured types later.
- Module ordering: Should we implement topological sort for step order, or trust config order?
- Recommendation: Config order for Phase 1, topo sort in Phase 3.
- Async vs sync step: Should module.step() be async-capable for I/O-bound adapters?
- Recommendation: Keep sync for Phase 1, revisit for ProcessAdapter.
- Recording: Should Hermes handle recording, or delegate to Icarus?
- Recommendation: Icarus records its own data; Hermes records cross-module signals separately.