1use anyhow::Result;
30use async_trait::async_trait;
31use std::sync::Arc;
32use tokio::sync::mpsc;
33
34use wactorz_core::{
35 Actor, ActorConfig, ActorMetrics, ActorState, ActorSystem, EventPublisher, Message,
36};
37
38pub struct UdxAgent {
42 config: ActorConfig,
43 system: ActorSystem,
44 state: ActorState,
45 metrics: Arc<ActorMetrics>,
46 mailbox_tx: mpsc::Sender<Message>,
47 mailbox_rx: Option<mpsc::Receiver<Message>>,
48 publisher: Option<EventPublisher>,
49}
50
51impl UdxAgent {
52 pub fn new(config: ActorConfig, system: ActorSystem) -> Self {
53 let (tx, rx) = mpsc::channel(config.mailbox_capacity);
54 Self {
55 config,
56 system,
57 state: ActorState::Initializing,
58 metrics: Arc::new(ActorMetrics::new()),
59 mailbox_tx: tx,
60 mailbox_rx: Some(rx),
61 publisher: None,
62 }
63 }
64
65 pub fn with_publisher(mut self, p: EventPublisher) -> Self {
66 self.publisher = Some(p);
67 self
68 }
69
70 fn now_ms() -> u64 {
71 std::time::SystemTime::now()
72 .duration_since(std::time::UNIX_EPOCH)
73 .unwrap_or_default()
74 .as_millis() as u64
75 }
76
77 fn reply(&self, content: &str) {
79 if let Some(pub_) = &self.publisher {
80 pub_.publish(
81 wactorz_mqtt::topics::chat(&self.config.id),
82 &serde_json::json!({
83 "from": self.config.name,
84 "to": "user",
85 "content": content,
86 "timestampMs": Self::now_ms(),
87 }),
88 );
89 }
90 }
91
92 async fn dispatch(&self, raw: &str) -> String {
94 let text = raw.trim();
95 let lower = text.to_lowercase();
96 let mut parts = lower.splitn(2, char::is_whitespace);
97 let cmd = parts.next().unwrap_or("");
98 let rest = parts.next().unwrap_or("").trim();
99
100 match cmd {
101 "help" => self.cmd_help(rest),
102 "docs" => Self::cmd_docs(rest),
103 "explain" => Self::cmd_explain(rest),
104 "version" => Self::cmd_version(),
105 "agents" => self.cmd_agents().await,
106 "status" => self.cmd_status().await,
107 _ => Self::fallback(text),
108 }
109 }
110
111 fn cmd_help(&self, topic: &str) -> String {
114 if topic.is_empty() {
115 return format!(
116 "**UDX — User and Developer Xpert** · `{}`\n\n\
117 **Commands**\n\
118 `help [topic]` → this message, or topic-specific help\n\
119 `docs <topic>` → in-depth docs on a topic\n\
120 `explain <thing>` → explain a concept\n\
121 `agents` → list all live agents\n\
122 `status` → system health snapshot\n\
123 `version` → build info\n\n\
124 **Topics**: `architecture` · `agents` · `chat` · `dashboard` · `api` · `mqtt` · `deploy`\n\
125 **Concepts**: `actor-model` · `hlc-wid` · `rust` · `babylon` · agent names\n\n\
126 For open questions try `@main-actor <your question>`.",
127 self.config.id
128 );
129 }
130 match topic {
132 "chat" => Self::doc_chat(),
133 "dashboard" => Self::doc_dashboard(),
134 "api" => Self::doc_api(),
135 "mqtt" => Self::doc_mqtt(),
136 "deploy" => Self::doc_deploy(),
137 "architecture" | "arch" => Self::doc_architecture(),
138 "agents" => Self::doc_agents(),
139 _ => Self::cmd_explain(topic),
140 }
141 }
142
143 fn cmd_docs(topic: &str) -> String {
144 match topic {
145 "architecture" | "arch" => Self::doc_architecture(),
146 "agents" => Self::doc_agents(),
147 "chat" => Self::doc_chat(),
148 "dashboard" => Self::doc_dashboard(),
149 "api" => Self::doc_api(),
150 "mqtt" => Self::doc_mqtt(),
151 "deploy" => Self::doc_deploy(),
152 _ => format!(
153 "Unknown topic **{topic}**.\n\
154 Available: `architecture` · `agents` · `chat` · `dashboard` · `api` · `mqtt` · `deploy`"
155 ),
156 }
157 }
158
159 fn cmd_explain(concept: &str) -> String {
160 match concept {
161 "actor-model" | "actor" | "actors" => "\
162 **Actor Model**\n\
163 Each agent is an independent *actor* with its own message inbox \
164 (a bounded async channel). Actors never share mutable state — they \
165 communicate exclusively by passing immutable messages. This eliminates \
166 data races and makes the system trivially scalable: add more actors, \
167 no locks needed.\n\
168 AgentFlow actors each run on their own `tokio::spawn` task and expose \
169 three lifecycle hooks: `on_start`, `handle_message`, `on_heartbeat`."
170 .to_string(),
171
172 "mqtt" => "\
173 **MQTT pub/sub**\n\
174 All inter-service communication in AgentFlow uses MQTT (Mosquitto broker).\n\
175 Topic structure:\n\
176 `agents/{id}/spawn` → agent came online\n\
177 `agents/{id}/heartbeat` → liveness tick (every 10 s)\n\
178 `agents/{id}/status` → state change\n\
179 `agents/{id}/alert` → warning or error\n\
180 `agents/{id}/chat` → message to/from an agent\n\
181 `system/health` → MonitorAgent health digest\n\
182 `io/chat` → frontend → IOAgent gateway\n\n\
183 The WebSocket bridge re-broadcasts every MQTT message to the browser \
184 so the Babylon.js frontend stays in sync without polling."
185 .to_string(),
186
187 "hlc-wid" | "wid" | "hlc" => "\
188 **HLC-WID (Hybrid Logical Clock Wide ID)**\n\
189 All actor IDs use HLC-WIDs — time-ordered, globally unique identifiers \
190 generated by the `waldiez-wid` crate.\n\
191 Format: `<hlc-timestamp>-<node-tag>` (e.g. `01JQND5X-a1b2c3d4`)\n\
192 Properties:\n\
193 • Monotonically increasing (even across clock skew)\n\
194 • Embeds wall-clock time for human readability\n\
195 • Collision-free across distributed nodes\n\
196 Message IDs use plain WIDs (simpler, no node tag needed)."
197 .to_string(),
198
199 "rust" => "\
200 **Rust backend**\n\
201 The server is a single `wactorz` binary built with Tokio async runtime.\n\
202 Workspace crates:\n\
203 `wactorz-core` → Actor trait, registry, message types, publisher\n\
204 `wactorz-agents` → all concrete agent implementations\n\
205 `wactorz-mqtt` → MQTT client + topic helpers\n\
206 `wactorz-interfaces` → REST (axum) + WebSocket bridge + CLI\n\
207 `wactorz-server` → binary entry point, wires everything together\n\n\
208 Build: `cargo build --release --bin wactorz`\n\
209 Cross-compile for Linux: use `docker buildx` (see `scripts/build-native.sh`)."
210 .to_string(),
211
212 "babylon" | "babylonjs" => "\
213 **Babylon.js frontend**\n\
214 The dashboard is a Vite + TypeScript SPA using Babylon.js 7.x for 3D rendering.\n\
215 Themes (switchable at runtime):\n\
216 `Graph` → spring-force layout, glowing spheres, Bezier chat arcs\n\
217 `Galaxy` → orbiting planets with moons for sub-agents\n\
218 `Cards` → classic card grid (pure HTML, no WebGL)\n\
219 `Social` → Instagram×Twitter hybrid profile cards\n\
220 `Graveyard` → stopped agents shown as tombstones\n\n\
221 All themes share the same MQTT event stream; switching is instantaneous."
222 .to_string(),
223
224 "nautilus" | "nautilus-agent" => "\
225 **NautilusAgent** — SSH & rsync file-transfer bridge\n\
226 Named after the nautilus shell (SSH = *Secure Shell*) and Jules Verne's submarine.\n\
227 Commands: `ping`, `exec`, `sync`, `push`, `help`\n\
228 Example:\n\
229 `@nautilus-agent ping user@host`\n\
230 `@nautilus-agent exec user@host df -h`\n\
231 `@nautilus-agent sync user@host:/var/data /mnt/local`\n\
232 `@nautilus-agent push ./dist/ user@host:/var/www/html/`\n\n\
233 Arguments are never shell-interpolated — each token is a discrete \
234 `Command::arg()`, preventing injection attacks.\n\
235 Configure via env: `NAUTILUS_SSH_KEY`, `NAUTILUS_STRICT_HOST_KEYS`."
236 .to_string(),
237
238 "io" | "io-agent" | "ioagent" => "\
239 **IOAgent** — UI gateway\n\
240 Bridges the frontend chat bar to the actor system.\n\
241 Listens on the fixed MQTT topic `io/chat`.\n\
242 Route with `@agent-name` prefix: `@monitor-agent status`\n\
243 No prefix → message goes to `main-actor`.\n\
244 The agent parses only the first `@name` token; everything after \
245 is the message body."
246 .to_string(),
247
248 "qa" | "qa-agent" => "\
249 **QAAgent** — quality-assurance observer\n\
250 Passively inspects every chat message that flows through the broker.\n\
251 Flags messages containing harmful patterns (prompt injection, PII leakage, \
252 profanity) and publishes a `system/alert` if a policy is violated.\n\
253 Protected — cannot be stopped or deleted via the dashboard."
254 .to_string(),
255
256 "monitor" | "monitor-agent" => "\
257 **MonitorAgent** — health watchdog\n\
258 Polls all registered actors every 15 seconds.\n\
259 Raises a `severity: error` alert if an actor's last heartbeat is \
260 older than 60 seconds.\n\
261 Publishes a `system/health` digest on every heartbeat tick.\n\
262 Protected — cannot be stopped or deleted via the dashboard."
263 .to_string(),
264
265 "dynamic" | "dynamic-agent" => "\
266 **DynamicAgent** — runtime script executor\n\
267 Spawned on-demand by `MainActor` when it parses a `<spawn>` directive \
268 from the LLM response. Executes Rhai scripts generated at runtime, \
269 enabling the LLM to extend the system with new capabilities without \
270 a server restart."
271 .to_string(),
272
273 "main" | "main-actor" => "\
274 **MainActor** — LLM orchestrator\n\
275 The central intelligence of AgentFlow. Receives user messages, \
276 calls the configured LLM (Anthropic / OpenAI / Ollama), and parses \
277 `<spawn agent-type=\"…\">…</spawn>` blocks in the response to \
278 dynamically create new agents.\n\
279 Protected — cannot be stopped or deleted via the dashboard."
280 .to_string(),
281
282 "udx" | "udx-agent" => "\
283 **UDXAgent** — User and Developer Xpert (that's me!)\n\
284 A zero-LLM, always-available knowledge agent.\n\
285 I answer questions about AgentFlow instantly from a built-in knowledge \
286 base — no API key needed, no network round-trip.\n\
287 For questions outside my knowledge, ask `@main-actor`."
288 .to_string(),
289
290 _ => format!(
291 "I don't have built-in docs for **{concept}**.\n\
292 Try `explain actor-model`, `explain mqtt`, `explain rust`, `explain babylon`, \
293 or ask `@main-actor {concept}` for an LLM-powered answer."
294 ),
295 }
296 }
297
298 fn cmd_version() -> String {
299 format!(
300 "**AgentFlow** · Rust {rust} · built {built}\n\
301 Backend crates: `wactorz-core` · `wactorz-agents` · `wactorz-mqtt` · \
302 `wactorz-interfaces` · `wactorz-server`\n\
303 Frontend: Vite + TypeScript + Babylon.js 7.x\n\
304 Default LLM: `claude-sonnet-4-6` (Anthropic)",
305 rust = env!("CARGO_PKG_RUST_VERSION", "unknown"),
306 built = env!("CARGO_PKG_VERSION"),
307 )
308 }
309
310 async fn cmd_agents(&self) -> String {
311 let entries = self.system.registry.list().await;
312 if entries.is_empty() {
313 return "No agents currently registered.".to_string();
314 }
315 let mut lines = vec![format!("**Live agents** ({})", entries.len())];
316 for e in &entries {
317 let state = format!("{:?}", e.state).to_lowercase();
318 let prot = if e.protected { " ⭐" } else { "" };
319 lines.push(format!("• `{}` — {} [{}]{}", e.name, e.id, state, prot));
320 }
321 lines.join("\n")
322 }
323
324 async fn cmd_status(&self) -> String {
325 let entries = self.system.registry.list().await;
326 let total = entries.len();
327 let running = entries
328 .iter()
329 .filter(|e| format!("{:?}", e.state).to_lowercase() == "running")
330 .count();
331 let stopped = entries
332 .iter()
333 .filter(|e| format!("{:?}", e.state).to_lowercase() == "stopped")
334 .count();
335 let paused = entries
336 .iter()
337 .filter(|e| format!("{:?}", e.state).to_lowercase() == "paused")
338 .count();
339 let other = total - running - stopped - paused;
340 format!(
341 "**System status**\n\
342 Total agents : {total}\n\
343 Running : {running}\n\
344 Paused : {paused}\n\
345 Stopped : {stopped}\n\
346 Other : {other}\n\n\
347 Use `agents` for the full list, or ask `@monitor-agent` for health alerts."
348 )
349 }
350
351 fn fallback(text: &str) -> String {
352 format!(
353 "I don't recognise **{text}** as a UDX command.\n\
354 Type `help` for a full command list, or try:\n\
355 • `explain <concept>` — e.g. `explain mqtt`\n\
356 • `docs <topic>` — e.g. `docs api`\n\
357 • `agents` / `status` / `version`\n\n\
358 For open-ended questions: `@main-actor {text}`"
359 )
360 }
361
362 fn doc_architecture() -> String {
365 "\
366 **AgentFlow Architecture**\n\
367 \n\
368 ```\n\
369 Browser (Babylon.js SPA)\n\
370 │ WebSocket (MQTT re-broadcast)\n\
371 │ REST /api/\n\
372 ▼\n\
373 nginx ──── /mqtt ──► Mosquitto (MQTT broker)\n\
374 ▲ │\n\
375 MQTT pub/sub │\n\
376 │ ▼\n\
377 wactorz-server (Rust binary)\n\
378 ├── MainActor (LLM orchestrator)\n\
379 ├── MonitorAgent (health watchdog)\n\
380 ├── IOAgent (UI gateway)\n\
381 ├── QAAgent (safety observer)\n\
382 ├── NautilusAgent (SSH/rsync)\n\
383 ├── UDXAgent (knowledge base)\n\
384 └── DynamicAgent (LLM-generated scripts)\n\
385 ```\n\
386 \n\
387 Every agent is an async actor; all communication is via MQTT topics.\n\
388 Use `explain actor-model` or `explain mqtt` for deeper dives."
389 .to_string()
390 }
391
392 fn doc_agents() -> String {
393 "\
394 **Agent Roster**\n\n\
395 | Agent | Type | Protected | Role |\n\
396 |------------------|--------------|-----------|-------------------------------|\n\
397 | main-actor | orchestrator | ⭐ yes | LLM brain, spawns sub-agents |\n\
398 | monitor-agent | monitor | ⭐ yes | Health watchdog, alerts |\n\
399 | io-agent | gateway | no | UI↔actor message bridge |\n\
400 | qa-agent | qa | ⭐ yes | Passive safety observer |\n\
401 | nautilus-agent | transfer | no | SSH & rsync file bridge |\n\
402 | udx-agent | expert | no | Built-in knowledge base |\n\
403 | dynamic-* | dynamic | no | LLM-generated script agents |\n\n\
404 Use `explain <agent-name>` for details on any agent."
405 .to_string()
406 }
407
408 fn doc_chat() -> String {
409 "\
410 **Chat Panel**\n\n\
411 • Click any agent card / node → opens the chat panel for that agent\n\
412 • Type in the IO bar (bottom) to send a message\n\
413 • Prefix with `@agent-name` to route to a specific agent:\n\
414 `@nautilus-agent ping user@host`\n\
415 • Shift+Enter → newline (Enter alone sends)\n\
416 • Arrow Up/Down → message history\n\
417 • Swipe right (mobile) or press Escape → close the panel\n\
418 • 3-dot indicator shows when an agent is processing your message"
419 .to_string()
420 }
421
422 fn doc_dashboard() -> String {
423 "\
424 **Dashboard Views**\n\n\
425 Switch with the buttons top-right:\n\
426 `3D Graph` → spring-force layout; chat arcs animate between nodes\n\
427 `Galaxy` → orbital view; main-actor at centre, others orbit\n\
428 `Cards` → compact HTML card grid; fastest on low-end devices\n\
429 `Social` → Instagram-style profile cards with stats\n\
430 `Graveyard` → shows stopped/failed agents as tombstones\n\n\
431 Controls per card: 💬 Chat · ⏸ Pause · ▶ Resume · ⏹ Stop · 🗑 Delete\n\
432 Protected agents (⭐) cannot be stopped or deleted."
433 .to_string()
434 }
435
436 fn doc_api() -> String {
437 "\
438 **REST API** (`/api/`)\n\n\
439 `GET /api/actors` → list all actors\n\
440 `GET /api/actors/:id` → get actor info\n\
441 `POST /api/actors/:id/pause` → pause actor\n\
442 `POST /api/actors/:id/resume` → resume actor\n\
443 `DELETE /api/actors/:id` → stop + remove actor\n\
444 `POST /api/chat` → send a message (body: `{\"to\":\"…\",\"content\":\"…\"}`)\n\n\
445 WebSocket bridge: `ws://host/ws`\n\
446 Subscribes once; receives every MQTT message as `{\"topic\":\"…\",\"payload\":{…}}`.".to_string()
447 }
448
449 fn doc_mqtt() -> String {
450 "\
451 **MQTT Topic Reference**\n\n\
452 `agents/{id}/spawn` → `{agentId, agentName, agentType, timestampMs}`\n\
453 `agents/{id}/heartbeat` → `{agentId, agentName, state, timestampMs}`\n\
454 `agents/{id}/status` → `{agentId, state, timestampMs}`\n\
455 `agents/{id}/alert` → `{agentId, severity, message, timestampMs}`\n\
456 `agents/{id}/chat` → `{from, to, content, timestampMs}`\n\
457 `system/health` → `{agentCount, staleAgents[], timestampMs}`\n\
458 `system/spawn` → dynamic-agent spawn announcements\n\
459 `io/chat` → frontend → IOAgent gateway\n\n\
460 Broker: Mosquitto TCP 1883 (internal) / WS 9001 via nginx `/mqtt`\n\
461 All payloads are camelCase JSON."
462 .to_string()
463 }
464
465 fn doc_deploy() -> String {
466 "\
467 **Deployment Options**\n\n\
468 **Full Docker** (`compose.yaml`):\n\
469 `docker compose up -d` — runs everything in containers\n\n\
470 **Native binary** (`compose.native.yaml`):\n\
471 `bash scripts/package-native.sh` — builds `wactorz-native-*.tar.gz`\n\
472 `bash deploy-native.sh` — wizard: starts Mosquitto+nginx in Docker,\n\
473 runs the binary directly on the host\n\
474 Benefits: SSH keys work automatically (NautilusAgent), faster startup,\n\
475 smaller footprint (~12 MB binary vs 39 MB image).\n\n\
476 **systemd** (persistent on reboot):\n\
477 `sudo cp systemd/wactorz.service /etc/systemd/system/`\n\
478 `sudo systemctl enable --now wactorz`\n\
479 `journalctl -u wactorz -f`"
480 .to_string()
481 }
482}
483
484#[async_trait]
487impl Actor for UdxAgent {
488 fn id(&self) -> String {
489 self.config.id.clone()
490 }
491 fn name(&self) -> &str {
492 &self.config.name
493 }
494 fn state(&self) -> ActorState {
495 self.state.clone()
496 }
497 fn metrics(&self) -> Arc<ActorMetrics> {
498 Arc::clone(&self.metrics)
499 }
500 fn mailbox(&self) -> mpsc::Sender<Message> {
501 self.mailbox_tx.clone()
502 }
503 fn is_protected(&self) -> bool {
504 self.config.protected
505 }
506
507 async fn on_start(&mut self) -> Result<()> {
508 self.state = ActorState::Running;
509 if let Some(pub_) = &self.publisher {
510 pub_.publish(
511 wactorz_mqtt::topics::spawn(&self.config.id),
512 &serde_json::json!({
513 "agentId": self.config.id,
514 "agentName": self.config.name,
515 "agentType": "expert",
516 "timestampMs": Self::now_ms(),
517 }),
518 );
519 }
520 Ok(())
521 }
522
523 async fn handle_message(&mut self, message: Message) -> Result<()> {
524 use wactorz_core::message::MessageType;
525 let content = match &message.payload {
526 MessageType::Text { content } => content.clone(),
527 MessageType::Task { description, .. } => description.clone(),
528 _ => return Ok(()),
529 };
530 let response = self.dispatch(content.trim()).await;
531 self.reply(&response);
532 Ok(())
533 }
534
535 async fn on_heartbeat(&mut self) -> Result<()> {
536 if let Some(pub_) = &self.publisher {
537 pub_.publish(
538 wactorz_mqtt::topics::heartbeat(&self.config.id),
539 &serde_json::json!({
540 "agentId": self.config.id,
541 "agentName": self.config.name,
542 "state": self.state,
543 "timestampMs": Self::now_ms(),
544 }),
545 );
546 }
547 Ok(())
548 }
549
550 async fn run(&mut self) -> Result<()> {
551 self.on_start().await?;
552 let mut rx = self
553 .mailbox_rx
554 .take()
555 .ok_or_else(|| anyhow::anyhow!("UdxAgent already running"))?;
556 let mut hb = tokio::time::interval(std::time::Duration::from_secs(
557 self.config.heartbeat_interval_secs,
558 ));
559 hb.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
560 loop {
561 tokio::select! {
562 biased;
563 msg = rx.recv() => {
564 match msg {
565 None => break,
566 Some(m) => {
567 self.metrics.record_received();
568 if let wactorz_core::message::MessageType::Command {
569 command: wactorz_core::message::ActorCommand::Stop
570 } = &m.payload {
571 break;
572 }
573 match self.handle_message(m).await {
574 Ok(_) => self.metrics.record_processed(),
575 Err(e) => {
576 tracing::error!("[{}] {e}", self.config.name);
577 self.metrics.record_failed();
578 }
579 }
580 }
581 }
582 }
583 _ = hb.tick() => {
584 self.metrics.record_heartbeat();
585 if let Err(e) = self.on_heartbeat().await {
586 tracing::error!("[{}] heartbeat: {e}", self.config.name);
587 }
588 }
589 }
590 }
591 self.state = ActorState::Stopped;
592 self.on_stop().await
593 }
594}