MQTT Auto-Wiring in Wactorz
How agents discover, connect, and validate data flows — without hardcoded names
The Problem
In a multi-agent system where the LLM generates agent code at runtime, wiring agents together is fragile. The traditional approach — hardcoding agent names in routing logic — breaks the moment an agent is renamed, replaced, or spawned dynamically. Worse, when one LLM writes a producer and another writes a consumer, they frequently disagree on field names: one publishes {"temp": 30.5}, the other reads payload["temperature"].
Wactorz solves this with topic-based auto-wiring: agents declare what data they produce and consume via MQTT topics, and the system wires them by data compatibility — not by name.
Architecture Overview
┌─────────────────────────────────┐
│ TopicBus │
│ (singleton, init at startup) │
│ │
│ ┌───────────────────────────┐ │
│ │ TopicRegistry │ │
│ │ name → TopicContract │ │
│ │ + observed_samples │ │
│ └───────────────────────────┘ │
│ │
│ ┌───────────────────────────┐ │
│ │ SharedStateHub │ │
│ │ retained MQTT topics │ │
│ └───────────────────────────┘ │
│ │
│ ┌───────────────────────────┐ │
│ │ StreamWindow factory │ │
│ │ sliding windows over │ │
│ │ topic streams │ │
│ └───────────────────────────┘ │
└────────────┬────────────────────┘
│
┌─────────────────────┼─────────────────────┐
│ │ │
┌──────▼──────┐ ┌───────▼──────┐ ┌──────▼──────┐
│ Producer │ │ Planner │ │ Consumer │
│ Agent │ │ Agent │ │ Agent │
│ │ │ │ │ │
│ publish() │ │ reads │ │ subscribe() │
│ → auto- │ │ contracts + │ │ → wired by │
│ registers │ │ observed │ │ planner │
│ contract │ │ schemas │ │ using │
│ + captures │ │ before code │ │ real │
│ schema │ │ generation │ │ field │
└─────────────┘ └──────────────┘ │ names │
└─────────────┘
Step 1: TopicContract — What an Agent Declares
Every agent that publishes or subscribes to MQTT topics has a TopicContract — a dataclass that declares its data interface:
@dataclass
class TopicContract:
name: str # agent name
publishes: list[str] # topics this agent writes to
subscribes: list[str] # topics this agent reads from
triggers_when: dict # conditions that trigger action
produces_schema: dict # declared field names + types
consumes_schema: dict # expected input field names
observed_samples: dict # AUTO-CAPTURED real payloads
node: Optional[str] # remote node name (if edge)
actor_id: Optional[str] # unique actor ID
Contracts are registered in the TopicRegistry — a global in-memory index accessible from anywhere via get_topic_bus().registry.
How Contracts Get Registered
Contracts are registered through three paths:
Path 1 — Implicit via agent.publish():
The first time a DynamicAgent publishes to a topic, _AgentAPI.publish() auto-creates a minimal contract:
# Inside _AgentAPI.publish():
contract = TopicContract(
name = self.name,
publishes = list(self._published_topics | {topic}),
actor_id = self.actor_id,
)
if isinstance(data, dict):
contract.update_observed(topic, data) # capture real fields
bus.register_contract(contract)
Path 2 — Implicit via agent.subscribe():
When agent.subscribe(topic, callback) is called, the topic is added to the contract's subscribes list:
# Inside _AgentAPI.subscribe():
existing = bus.registry.get(self.name)
if existing:
if topic not in existing.subscribes:
existing.subscribes.append(topic)
Path 3 — Explicit via agent.declare_contract():
Agents can declare their full contract in setup():
async def setup(agent):
agent.declare_contract(
publishes = ['custom/detections/camera'],
subscribes = ['homeassistant/state_changes/#'],
triggers_when = {'person_detected': True},
produces_schema = {'detected': 'bool', 'confidence': 'float'},
)
declare_contract() accepts common LLM kwarg variants (schema → produces_schema, topics → publishes, etc.) and coerces bare strings to lists.
Step 2: Observed Schema Capture — The Vocabulary Solution
The Problem in Detail
When the planner asks an LLM to write a producer agent, the LLM might use any reasonable field name:
# Producer (written by LLM call #1):
await agent.publish('sensors/data', {'temp': 30.5, 'humidity': 47.7})
Later, when the planner asks the LLM to write a consumer for that same topic, a different LLM call might use different names:
# Consumer (written by LLM call #2):
async def on_message(payload):
temperature = payload['temperature'] # KeyError! Field is 'temp', not 'temperature'
The produces_schema declared in the contract doesn't help because it was also written by the LLM — it suffers the same vocabulary problem.
The Solution: observed_samples
Instead of trusting LLM-declared schemas, Wactorz captures the actual field names from real published messages:
# TopicContract.update_observed():
def update_observed(self, topic: str, payload: dict):
fields = {
k: type(v).__name__
for k, v in payload.items()
if not k.startswith("_")
}
self.observed_samples[topic] = {
"fields": fields, # {'temp': 'float', 'humidity': 'float'}
"example": {k: v for ...}, # {'temp': 30.5, 'humidity': 47.7}
}
This is called automatically by _AgentAPI.publish() on every publish — no agent code changes needed.
The Data Flow
1. Producer publishes {'temp': 30.5, 'humidity': 47.7}
│
▼
2. _AgentAPI.publish() calls contract.update_observed()
│
▼
3. TopicContract.observed_samples now contains:
{'sensors/data': {
'fields': {'temp': 'float', 'humidity': 'float'},
'example': {'temp': 30.5, 'humidity': 47.7}
}}
│
▼
4. TopicRegistry.to_planner_context() includes:
"OBSERVED on 'sensors/data': fields={'temp': 'float'} example={'temp': 30.5}"
│
▼
5. Planner LLM sees exact field names in its prompt:
"═══ LIVE TOPIC SAMPLES ═══
Topic: sensors/data (published by temp-simulator)
Fields: {'temp': 'float', 'humidity': 'float'}
Example: {'temp': 30.5, 'humidity': 47.7}
CRITICAL: Use payload['temp'] — NOT payload['temperature']"
│
▼
6. Consumer code uses correct field names:
temperature = payload['temp'] ✓
Fallback: Live Topic Sampling
If observed_samples is empty (the producer started before the schema-capture code was deployed, or the agent hasn't published yet), the planner falls back to _sample_live_topics():
async def _sample_live_topics(self, bus) -> list[str]:
# Single MQTT connection subscribes to ALL known publish topics
# Collects one real message per topic with a global timeout
# Stores results back into contracts for future calls
This method:
1. Gathers all publish topics from all registered contracts
2. Opens one MQTT connection and subscribes to all of them
3. Waits up to 15 seconds total (not per-topic) for messages to arrive
4. Parses each payload, extracts field names and types
5. Stores results back into the contracts via contract.update_observed()
6. Returns formatted lines for the LLM prompt
Stale topics (no active publisher) are silently skipped after the timeout.
Step 3: Auto-Wiring Discovery
The TopicRegistry can find wiring opportunities — pairs of agents where one publishes a topic that another subscribes to:
def find_wiring_opportunities(self) -> list[tuple]:
for producer in contracts:
for pub_topic in producer.publishes:
for consumer in contracts:
if consumer.matches_topic(pub_topic):
opportunities.append((producer, consumer, pub_topic))
MQTT wildcards (# and +) are supported:
# Producer publishes: 'sensors/kitchen/temperature'
# Consumer subscribes: 'sensors/#'
# → Match! Auto-wiring opportunity detected.
When Wiring is Logged
Every time a new contract is registered, the TopicBus checks for new wiring opportunities and logs them:
[TopicBus] Auto-wiring opportunity: temp-simulator → mean-logger via sensors/data
[TopicBus] Auto-wiring opportunity: camera-detect → lamp-actuator via custom/detections/cam
This is informational — the TopicBus doesn't force-wire agents. The actual wiring happens through: 1. The planner reading the registry and designing the pipeline 2. Or the user explicitly creating agents that subscribe to matching topics
Step 4: Planner Integration — How It All Comes Together
When the planner receives a pipeline request like "if temp > 20 turn off the lamp", it goes through this sequence:
4.1 Topic Resolution
_resolve_data_references() scans the user's task for data-related keywords and searches the TopicRegistry:
# User says "temperature" → CONCEPT_MAP matches → search keywords: ["temperature", "temp", "thermal"]
# TopicRegistry.find_by_capability("temp") → finds TopicContract for temp-simulator
# Enriches task: "if temp > 20 turn off lamp [DATA SOURCE: subscribe to 'sensors/data']"
If multiple topics match, all candidates are provided to the LLM to pick the most relevant one. If none match, the task is enriched with a note and the planner proceeds anyway (it may create a new producer).
4.2 Schema Context Injection
Before generating code, the planner builds schema context from two sources:
Source A — observed_samples on contracts:
for contract in bus.registry.all_contracts():
samples = contract.observed_samples or {}
for topic, info in samples.items():
# Add to prompt: "Topic: sensors/data Fields: {'temp': 'float'} Example: {'temp': 30.5}"
Source B — Live sampling fallback:
if not sample_lines:
sample_lines = await self._sample_live_topics(bus)
Source C — Worker manifests:
# _discover_workers() includes observed_samples in each worker description
workers.append({
"name": actor.name,
"observed_samples": manifest.get("observed_samples", {}),
...
})
# _fmt_worker() surfaces them in the prompt:
# " - temp-simulator (DynamicAgent): Publishes random temp/humidity
# topic 'sensors/data' payload fields: {'temp': 'float'} example: {'temp': 30.5}"
4.3 Code Generation with Real Field Names
The LLM prompt includes all three sources, plus an explicit instruction:
═══ LIVE TOPIC SAMPLES (use EXACTLY these field names in code!) ═══
Topic: sensors/data (published by temp-simulator)
Fields: {'temp': 'float', 'humidity': 'float'}
Example payload: {'temp': 30.5, 'humidity': 47.7}
CRITICAL: When subscribing to a topic listed above, use the EXACT field names
from the sample payload. For example if the sample shows {'temp': 30.5},
use payload['temp'] — NOT payload['temperature']. The field names in the
samples are authoritative.
The LLM then generates consumer code with the correct field names:
async def on_temp(payload):
temp = payload.get('temp', 0) # ← correct: matches observed schema
if temp > 20:
await agent.publish('custom/triggers/lamp-temp', {'triggered': True})
4.4 Post-Generation Validation
After the LLM generates the plan, _validate_pipeline_code() scans each dynamic agent's code for common mistakes:
| Check | Action |
|---|---|
await agent.subscribe(...) |
Strip the await (subscribe is sync) |
await agent.persist(...) |
Strip the await (persist is sync) |
aiomqtt.Client() |
Rewrite to agent.subscribe() pattern |
httpx.post('/api/services/...') |
Flag — should use ha_actuator instead |
Step 5: Runtime — How the Wiring Holds Up
Once agents are spawned and running, the wiring is maintained through several mechanisms:
Contract Updates on Publish
Every publish() call updates the contract's observed_samples and re-registers it in the TopicBus. This means:
- If a producer changes its payload format, the contract is updated immediately
- The next time the planner generates a consumer, it sees the new field names
- Existing consumers are NOT automatically updated (they use the field names baked into their code)
Manifest Propagation
Each agent publishes a retained MQTT manifest at agents/{id}/manifest that includes observed_samples. This means:
- MainActor's manifest listener picks it up and stores it in _agent_manifests
- The planner can query manifests even for agents it didn't spawn
- Schema data survives agent restarts (retained MQTT messages persist in the broker)
The /bus Command
Users can inspect the full wiring state at any time:
/bus
Output:
TopicBus — Reactive Pub/Sub Registry
agents with contracts : 3
published topics : 2
subscribed topics : 2
auto-wiring pairs : 2
[temp-simulator]
publishes : sensors/data
OBSERVED on 'sensors/data': fields={'temp': 'float', 'humidity': 'float'}
example={'temp': 30.5, 'humidity': 47.7}
[mean-logger]
subscribes: sensors/data
[lamp-monitor]
subscribes: sensors/data, lamp/status
Auto-wiring opportunities:
temp-simulator → mean-logger via sensors/data
temp-simulator → lamp-monitor via sensors/data
Safety Guards
String-to-List Coercion
LLMs frequently write publishes="custom/topic" instead of publishes=["custom/topic"]. Iterating a string produces individual characters — so "custom/topic" would register 12 single-character "topics" (c, u, s, t, ...).
TopicContract.__post_init__() catches this:
def __post_init__(self):
if isinstance(self.publishes, str):
self.publishes = [self.publishes]
if isinstance(self.subscribes, str):
self.subscribes = [self.subscribes]
Bogus Topic Filter
LLMs sometimes pass kwarg names as values: declare_contract(subscribes="subscribes"). The __post_init__ filter strips known bogus entries:
_BOGUS = {"publishes", "subscribes", "publish", "subscribe",
"topics", "topic", "produces_schema", "consumes_schema",
"schema", "triggers_when", "name", "description", "type"}
self.publishes = [t for t in self.publishes if t not in _BOGUS]
self.subscribes = [t for t in self.subscribes if t not in _BOGUS]
Serialization Roundtrip
observed_samples is a proper dataclass field (not a monkey-patched attribute), so it survives to_dict() → from_dict() serialization:
contract = TopicContract(name="test", publishes=["sensors/data"])
contract.update_observed("sensors/data", {"temp": 30.5})
d = contract.to_dict() # includes observed_samples
c2 = TopicContract.from_dict(d) # preserves observed_samples
assert c2.observed_samples == contract.observed_samples # ✓
End-to-End Example
User says: "spawn an agent to log the mean of the last 5 temperature values"
1. MainActor classifies intent → PIPELINE
2. PlannerAgent spawned
3. Topic resolution:
_resolve_data_references() finds "temperature" keywords
TopicRegistry.find_by_capability("temp")
→ finds TopicContract for 'temp-simulator' publishing 'sensors/data'
→ enriches task: "... [DATA SOURCE: subscribe to 'sensors/data']"
4. Schema sampling:
contract.observed_samples['sensors/data'] = {
'fields': {'temp': 'float', 'humidity': 'float'},
'example': {'temp': 30.5, 'humidity': 47.7}
}
5. LLM prompt includes:
"OBSERVED on 'sensors/data': fields={'temp': 'float', 'humidity': 'float'}"
"CRITICAL: Use payload['temp'] — NOT payload['temperature']"
6. LLM generates spawn config:
{
"name": "temp-mean-logger",
"type": "dynamic",
"code": "
async def setup(agent):
agent.state['buffer'] = []
async def on_temp(payload):
agent.state['buffer'].append(payload.get('temp', 0)) # ← correct field name
if len(agent.state['buffer']) > 5:
agent.state['buffer'] = agent.state['buffer'][-5:]
if len(agent.state['buffer']) == 5:
mean = sum(agent.state['buffer']) / 5
await agent.log(f'Mean of last 5: {mean:.2f}°C')
agent.subscribe('sensors/data', on_temp)
"
}
7. _validate_pipeline_code():
- No 'await agent.subscribe' found (already correct ✓)
- No raw aiomqtt ✓
- No direct HA API calls ✓
8. Agent spawned, TopicBus logs:
[TopicBus] Auto-wiring opportunity: temp-simulator → temp-mean-logger via sensors/data
9. Agent receives messages, computes means:
[temp-mean-logger] Temperature received: 30.5°C | Buffer: [30.5]
[temp-mean-logger] Temperature received: 22.1°C | Buffer: [30.5, 22.1]
...
[temp-mean-logger] Mean of last 5: 24.8°C
Comparison: Before and After Auto-Wiring
| Aspect | Before (name-based) | After (topic-based) |
|---|---|---|
| Agent discovery | Planner must know agent names | Planner queries TopicRegistry by data type |
| Field name matching | LLM guesses → frequent KeyError | Real field names captured from live payloads |
| Adding a new producer | Must update all consumers | New producer auto-appears in registry |
| Removing a producer | Consumers silently break | TopicBus shows which consumers are orphaned |
| Schema documentation | Manual, always outdated | Auto-captured from real messages |
| Multi-node | Requires name→node mapping | Topics are node-agnostic (MQTT handles routing) |
File Reference
| File | What It Does |
|---|---|
core/topic_bus.py |
TopicContract, TopicRegistry, SharedStateHub, StreamWindow, TopicBus |
agents/dynamic_agent.py |
_AgentAPI.publish() — auto-registers contracts and captures schemas |
agents/dynamic_agent.py |
_AgentAPI.subscribe() — auto-registers subscription in TopicBus |
agents/dynamic_agent.py |
_AgentAPI.declare_contract() — explicit contract declaration with kwarg aliases |
agents/planner_agent.py |
_resolve_data_references() — topic resolution from natural language |
agents/planner_agent.py |
_sample_live_topics() — fallback MQTT sampling for schema capture |
agents/planner_agent.py |
_decompose_pipeline() — injects schema context into LLM prompt |
agents/planner_agent.py |
_validate_pipeline_code() — post-generation code validator |
agents/main_actor.py |
_manifest_listener() — subscribes to agents/+/manifest for schema propagation |