Goal: Daedalus can connect and receive telemetry Status: Not Started Blocked By: Phase 1 Complete Exit Criteria: External WebSocket client receives binary telemetry at 60 Hz
Overview
Phase 2 adds the WebSocket server that enables Daedalus (and other clients) to connect, receive telemetry streams, and send control commands. This transforms Hermes from a console application into a network service.
The WebSocket server reads signal data from the shared memory backplane established in Phase 1, enabling efficient telemetry streaming without copying data through the scheduler.
Architecture Context
┌─────────────────────────────────────────────────────────────────┐
│ HERMES │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Shared Memory Backplane │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ Header │ │ Signals │ │ Data │ │ │
│ │ │frame/time│ │Directory│ │ Region │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ │ │
│ └──────────────────────┬──────────────────────────────────┘ │
│ │ │
│ ┌───────────────┼───────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────┐ ┌──────────┐ ┌──────────────┐ │
│ │ Module A │ │ Module B │ │ WebSocket │◄─── Daedalus │
│ │ Process │ │ Process │ │ Server │ │
│ └──────────┘ └──────────┘ └──────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Dependencies
- Phase 1 complete (backplane, process manager, scheduler)
- websockets>=12.0
Task 2.1: Protocol Messages
Issue ID: (create after Phase 1) Priority: Critical (P0) Blocked By: Phase 1
Objective
Define the JSON message protocol for client-server communication.
Deliverables
Message Types
Server → Client:
| Type | Description |
| schema | Full signal schema on connect |
| event | State changes (paused, running, reset) |
| error | Error responses |
| ack | Command acknowledgments |
Client → Server:
| Type | Description |
| cmd | Control commands with action and params |
Message Format
from dataclasses import dataclass
from enum import Enum
import json
class MessageType(str, Enum):
SCHEMA = "schema"
EVENT = "event"
ERROR = "error"
ACK = "ack"
CMD = "cmd"
@dataclass
class ServerMessage:
type: MessageType
payload: dict
def to_json(self) -> str:
return json.dumps({"type": self.type.value, **self.payload})
@dataclass
class Command:
action: str
params: dict
@classmethod
def from_json(cls, data: str) -> "Command":
parsed = json.loads(data)
return cls(action=parsed["action"], params=parsed.get("params", {}))
Acceptance Criteria
- All message types defined
- JSON serialization/deserialization works
- Unit tests for protocol parsing
Task 2.2: Binary Telemetry
Issue ID: (create after Phase 1) Priority: Critical (P0) Blocked By: Task 2.1
Objective
Implement efficient binary telemetry encoding that reads from shared memory.
Deliverables
Binary Format
Header (24 bytes):
- magic: u32 (4 bytes) - 0x48455254 ("HERT")
- frame: u64 (8 bytes)
- time: f64 (8 bytes)
- count: u32 (4 bytes)
Payload:
- values: f64[] (8 bytes * count)
Total: 24 + 8*N bytes per frame
Implementation
import struct
class TelemetryEncoder:
MAGIC = 0x48455254
HEADER_FORMAT = "<I Q d I"
HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
def __init__(self, shm: SharedMemoryManager, signals: list[str]) -> None:
self._shm = shm
self._signals = signals
def encode(self) -> bytes:
"""Encode current state from shared memory to binary frame."""
frame = self._shm.get_frame()
time = self._shm.get_time()
count = len(self._signals)
header = struct.pack(
self.HEADER_FORMAT,
self.MAGIC, frame, time, count
)
values = [self._shm.get_signal(s) for s in self._signals]
payload = struct.pack(f"<{count}d", *values)
return header + payload
Acceptance Criteria
- Encoder reads from shared memory
- Binary format matches specification
- Round-trip test passes (encode → decode)
- Signal order matches subscription order
- Handles empty signal list
Task 2.3: WebSocket Server
Issue ID: (create after Phase 1) Priority: Critical (P0) Blocked By: Task 2.2
Objective
Create the async WebSocket server with client management.
Deliverables
Features
- Accept multiple client connections
- Send schema on connect (read from shared memory registry)
- Broadcast telemetry to all clients
- Handle client disconnects gracefully
- Structured logging for connections
Implementation Sketch
import asyncio
import websockets
from websockets.server import WebSocketServerProtocol
import structlog
log = structlog.get_logger()
class HermesServer:
def __init__(
self,
shm: SharedMemoryManager,
host: str = "0.0.0.0",
port: int = 8765,
) -> None:
self._shm = shm
self._host = host
self._port = port
self._clients: set[WebSocketServerProtocol] = set()
self._encoder: TelemetryEncoder | None = None
async def start(self) -> None:
async with websockets.serve(self._handler, self._host, self._port):
log.info("Server started", host=self._host, port=self._port)
await asyncio.Future()
async def _handler(self, ws: WebSocketServerProtocol) -> None:
self._clients.add(ws)
log.info("Client connected", remote=ws.remote_address)
try:
await self._send_schema(ws)
async for message in ws:
await self._handle_message(ws, message)
finally:
self._clients.discard(ws)
log.info("Client disconnected", remote=ws.remote_address)
async def broadcast_telemetry(self) -> None:
"""Broadcast binary telemetry to all clients."""
if not self._clients or not self._encoder:
return
frame = self._encoder.encode()
await asyncio.gather(
*[c.send(frame) for c in self._clients],
return_exceptions=True,
)
Acceptance Criteria
- Server starts and accepts connections
- Schema sent to new clients
- Multiple clients can connect
- Clean disconnect handling
- Proper async context management
Task 2.4: Command Handling
Issue ID: (create after Phase 1) Priority: High (P1) Blocked By: Task 2.3
Objective
Implement handlers for client control commands that interface with the scheduler.
Commands
| Action | Params | Description |
| pause | - | Pause simulation loop |
| resume | - | Start/resume simulation |
| reset | - | Reset all modules to initial conditions |
| step | count | Execute N frames (default 1) |
| set | signal, value | Set signal value in shared memory |
| subscribe | signals | Configure telemetry subscription |
Implementation
async def _handle_command(self, ws: WebSocketServerProtocol, cmd: Command) -> None:
match cmd.action:
case "pause":
self._scheduler.pause()
await self._broadcast_event("paused")
await ws.send(make_ack("pause"))
case "resume":
self._scheduler.resume()
await self._broadcast_event("running")
await ws.send(make_ack("resume"))
case "reset":
self._scheduler.reset()
await self._broadcast_event("reset")
await ws.send(make_ack("reset"))
case "step":
count = cmd.params.get("count", 1)
self._scheduler.step(count)
await ws.send(make_ack("step", {"count": count}))
case "set":
signal = cmd.params["signal"]
value = cmd.params["value"]
self._shm.set_signal(signal, value)
await ws.send(make_ack("set"))
case "subscribe":
signals = cmd.params["signals"]
self._encoder = TelemetryEncoder(self._shm, signals)
await ws.send(make_ack("subscribe", {"count": len(signals)}))
case _:
await ws.send(make_error(f"Unknown action: {cmd.action}"))
Acceptance Criteria
- Error responses for invalid commands
- State change events broadcast to all clients
- Ack sent for successful commands
- set writes to shared memory
Task 2.5: Telemetry Streaming
Issue ID: (create after Phase 1) Priority: High (P1) Blocked By: Task 2.4
Objective
Stream telemetry to connected clients at configurable rate.
Features
- Configurable rate (default 60 Hz)
- Signal subscription with wildcards
- Decimation (send at telemetry rate, not sim rate)
- Binary frame broadcast
Signal Patterns
def expand_pattern(pattern: str, registry: SignalRegistry) -> list[str]:
"""Expand signal pattern to list of qualified names."""
if pattern == "*":
return list(registry.all_signals().keys())
if pattern.endswith(".*"):
module = pattern[:-2]
return registry.list_module(module)
return [pattern]
Telemetry Loop
async def telemetry_loop(self, rate_hz: float = 60.0) -> None:
"""Background task that broadcasts telemetry at fixed rate."""
interval = 1.0 / rate_hz
while True:
await asyncio.sleep(interval)
await self.broadcast_telemetry()
Acceptance Criteria
- Telemetry streams at configured rate
- Subscription filters work
- Wildcards expand correctly
- Binary frames sent to all clients
- Decimation independent of sim rate
Task 2.6: Server Integration
Issue ID: (create after Phase 1) Priority: High (P1) Blocked By: Task 2.5
Objective
Integrate WebSocket server with CLI and scheduler.
CLI Changes
@cli.command()
@click.argument("config_path", type=click.Path(exists=True, path_type=Path))
@click.option("--no-server", is_flag=True, help="Run without WebSocket server")
def run(config_path: Path, no_server: bool) -> None:
"""Run simulation from configuration file."""
config = HermesConfig.from_yaml(config_path)
with ProcessManager(config) as pm:
scheduler = Scheduler(pm, config.execution)
async def main() -> None:
tasks = []
if not no_server and config.server.enabled:
server = HermesServer(
shm=pm.shm,
host=config.server.host,
port=config.server.port,
)
tasks.append(asyncio.create_task(server.start()))
tasks.append(asyncio.create_task(
server.telemetry_loop(config.server.telemetry_hz)
))
scheduler.stage()
await scheduler.run()
for task in tasks:
task.cancel()
asyncio.run(main())
Acceptance Criteria
- hermes run config.yaml starts server if enabled
- hermes run config.yaml --no-server skips server
- Server shuts down cleanly on simulation end
- Ctrl+C terminates both server and simulation
Task 2.7: Integration Test
Issue ID: (create after Phase 1) Priority: High (P1) Blocked By: Task 2.6
Objective
End-to-end test with Python WebSocket client.
Test Scenario
- Start Hermes with test config
- Connect WebSocket client
- Receive schema message
- Send subscribe command
- Send resume command
- Receive binary telemetry frames
- Verify frame format and values
- Send pause command
- Verify simulation paused
- Disconnect cleanly
Deliverables
- tests/integration/test_websocket.py
- Test fixture for server lifecycle
Test Implementation
import pytest
import asyncio
import websockets
import struct
@pytest.fixture
async def hermes_server():
"""Start Hermes server for testing."""
@pytest.mark.asyncio
async def test_telemetry_stream(hermes_server):
async with websockets.connect(hermes_server) as ws:
schema = await ws.recv()
assert "modules" in json.loads(schema)
await ws.send(json.dumps({
"action": "subscribe",
"params": {"signals": ["*"]}
}))
ack = await ws.recv()
assert json.loads(ack)["type"] == "ack"
await ws.send(json.dumps({"action": "resume"}))
frame = await ws.recv()
assert isinstance(frame, bytes)
magic, = struct.unpack("<I", frame[:4])
assert magic == 0x48455254
Acceptance Criteria
- Client connects and receives schema
- Telemetry frames decode correctly
- Commands execute properly
- Clean shutdown on test end
Beads Integration
Issues will be created after Phase 1 is complete:
# Create Phase 2 issues
bd create --title "Protocol Messages" -d "JSON message types and serialization" -p 0 -l phase2,server
bd create --title "Binary Telemetry" -d "Efficient binary encoder reading from shared memory" -p 0 -l phase2,server
bd create --title "WebSocket Server" -d "asyncio WebSocket server with client management" -p 0 -l phase2,server
bd create --title "Command Handling" -d "pause, resume, reset, step, set, subscribe commands" -p 1 -l phase2,server
bd create --title "Telemetry Streaming" -d "Decimation, subscription wildcards, broadcast" -p 1 -l phase2,server
bd create --title "Server Integration" -d "CLI integration and server lifecycle" -p 1 -l phase2,cli
bd create --title "WebSocket Integration Test" -d "Python client end-to-end test" -p 1 -l phase2,tests
# View phase 2 work
bd list --label phase2
Phase 2 Completion Checklist
- All Phase 2 issues closed
- hermes run config.yaml starts WebSocket server
- External client can connect and receive telemetry
- bd sync && git push completed
Implementation Notes
Async Architecture
Main Process:
│
├── Process Manager
│ ├── Module Process 1
│ ├── Module Process 2
│ └── ...
│
├── Scheduler Task (asyncio)
│ └── step() → update shared memory
│
└── WebSocket Server (asyncio)
├── Client Handler 1
├── Client Handler 2
├── Telemetry Loop (reads shared memory)
└── ...
Telemetry Flow
Scheduler.step()
│
▼
Shared Memory Updated
│
▼
Telemetry Loop (60 Hz)
│
├─ Read frame/time from header
│
├─ Read subscribed signals from data region
│
├─ Encode binary frame
│
└─ Broadcast to clients
Next Phase Preview
Phase 3 (Multi-Module & Wiring) will add:
- Signal wiring between modules
- Wire routing through shared memory
- Dynamic module configuration
- Cross-module telemetry
See phase3_multimodule.md for details.