wactorz_agents/
main_actor.rs

1//! Main orchestrator actor.
2//!
3//! [`MainActor`] is the central LLM-powered orchestrator.  It:
4//! 1. Receives user input and routes it to the appropriate agent
5//! 2. Sends the full system context to its LLM backend
6//! 3. Parses `<spawn>` blocks in the LLM's reply to dynamically create agents
7//! 4. Is **protected** — it cannot be killed by external commands
8//!
9//! Spawn block format (JSON inside XML-like tags):
10//! ```text
11//! <spawn>
12//! {
13//!   "agent_type": "DynamicAgent",
14//!   "agent_name": "data-fetcher",
15//!   "script": "...",
16//!   "description": "Fetches weather data"
17//! }
18//! </spawn>
19//! ```
20
21use anyhow::Result;
22use async_trait::async_trait;
23use serde::{Deserialize, Serialize};
24use std::sync::Arc;
25use tokio::sync::mpsc;
26
27use wactorz_core::{
28    Actor, ActorConfig, ActorMetrics, ActorState, ActorSystem, EventPublisher, Message,
29};
30
31use crate::llm_agent::{LlmAgent, LlmConfig};
32
33fn default_agent_type() -> String {
34    "DynamicAgent".into()
35}
36
37/// Parsed content of a `<spawn>` block.
38#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct SpawnDirective {
40    #[serde(default = "default_agent_type")]
41    pub agent_type: String,
42    pub agent_name: String,
43    pub script: Option<String>,
44    pub description: Option<String>,
45    pub config: Option<serde_json::Value>,
46}
47
48/// The central orchestrator.
49pub struct MainActor {
50    config: ActorConfig,
51    llm: LlmAgent,
52    llm_config: LlmConfig,
53    system: ActorSystem,
54    state: ActorState,
55    metrics: Arc<ActorMetrics>,
56    mailbox_tx: mpsc::Sender<Message>,
57    mailbox_rx: Option<mpsc::Receiver<Message>>,
58    publisher: Option<EventPublisher>,
59}
60
61impl MainActor {
62    pub fn new(config: ActorConfig, llm_config: LlmConfig, system: ActorSystem) -> Self {
63        let protected_config = ActorConfig {
64            protected: true,
65            ..config.clone()
66        };
67        let llm = LlmAgent::new(
68            ActorConfig::new(format!("{}-llm", config.name)),
69            llm_config.clone(),
70        );
71        let (tx, rx) = mpsc::channel(config.mailbox_capacity);
72        Self {
73            config: protected_config,
74            llm,
75            llm_config,
76            system,
77            state: ActorState::Initializing,
78            metrics: Arc::new(ActorMetrics::new()),
79            mailbox_tx: tx,
80            mailbox_rx: Some(rx),
81            publisher: None,
82        }
83    }
84
85    /// Attach an EventPublisher for MQTT output.
86    pub fn with_publisher(mut self, p: EventPublisher) -> Self {
87        self.publisher = Some(p);
88        self
89    }
90
91    /// Parse all `<spawn>...</spawn>` blocks from an LLM response string.
92    pub fn parse_spawn_blocks(response: &str) -> Vec<SpawnDirective> {
93        let mut out = Vec::new();
94        let mut rest = response;
95        while let Some(open) = rest.find("<spawn>") {
96            let after = &rest[open + 7..];
97            if let Some(close) = after.find("</spawn>") {
98                let json_str = after[..close].trim();
99                match serde_json::from_str::<SpawnDirective>(json_str) {
100                    Ok(d) => out.push(d),
101                    Err(e) => tracing::warn!("Bad <spawn> block: {e}"),
102                }
103                rest = &after[close + 8..];
104            } else {
105                break;
106            }
107        }
108        out
109    }
110
111    /// Execute a parsed spawn directive via the actor system.
112    async fn execute_spawn(&self, directive: SpawnDirective) -> Result<()> {
113        use crate::{DynamicAgent, MonitorAgent};
114
115        let cfg = ActorConfig::new(&directive.agent_name);
116        let agent_id = cfg.id.clone();
117        let agent_name = cfg.name.clone();
118        let agent_type = directive.agent_type.clone();
119
120        let description = directive.description.unwrap_or_default();
121        let actor: Box<dyn wactorz_core::Actor> = match directive.agent_type.as_str() {
122            "DynamicAgent" | "dynamic" => {
123                let script = directive.script.unwrap_or_default();
124                let mut a =
125                    DynamicAgent::new(cfg, script).with_llm(self.llm_config.clone(), description);
126                if let Some(pub_) = &self.publisher {
127                    a = a.with_publisher(pub_.clone());
128                }
129                Box::new(a)
130            }
131            "MonitorAgent" | "monitor" => {
132                let a = MonitorAgent::new(cfg, self.system.clone());
133                if let Some(pub_) = &self.publisher {
134                    Box::new(a.with_publisher(pub_.clone()))
135                } else {
136                    Box::new(a)
137                }
138            }
139            _ => {
140                tracing::warn!(
141                    "Unknown agent_type '{}', defaulting to DynamicAgent",
142                    directive.agent_type
143                );
144                let mut a = DynamicAgent::new(cfg, String::new())
145                    .with_llm(self.llm_config.clone(), description);
146                if let Some(pub_) = &self.publisher {
147                    a = a.with_publisher(pub_.clone());
148                }
149                Box::new(a)
150            }
151        };
152        self.system.spawn_actor(actor).await?;
153        tracing::info!("Spawned {} as {}", agent_name, agent_type);
154
155        // Announce to frontend immediately (don't wait for the first heartbeat)
156        if let Some(pub_) = &self.publisher {
157            pub_.publish(
158                wactorz_mqtt::topics::spawn(&agent_id),
159                &serde_json::json!({
160                    "agentId":   agent_id,
161                    "agentName": agent_name,
162                    "agentType": agent_type,
163                    "timestampMs": Self::now_ms(),
164                }),
165            );
166        }
167        Ok(())
168    }
169
170    fn now_ms() -> u64 {
171        std::time::SystemTime::now()
172            .duration_since(std::time::UNIX_EPOCH)
173            .unwrap_or_default()
174            .as_millis() as u64
175    }
176
177    /// Build the system prompt describing all currently running agents.
178    async fn build_system_prompt(&self) -> String {
179        let actors = self.system.registry.list().await;
180        let agent_list: Vec<String> = actors
181            .iter()
182            .map(|e| format!("- {} (id={}, state={})", e.name, e.id, e.state))
183            .collect();
184        format!(
185            "You are the main orchestrator of the AgentFlow multi-agent system.\n\
186             \n\
187             Current agents:\n\
188             {agents}\n\
189             \n\
190             To spawn a new agent, embed a JSON block using EXACTLY this format:\n\
191             <spawn>\n\
192             {{\n\
193               \"agent_type\": \"DynamicAgent\",\n\
194               \"agent_name\": \"my-agent\",\n\
195               \"description\": \"What this agent does\",\n\
196               \"script\": \"fn main(msg) {{ agent_log(\\\"got: \\\" + msg); \\\"ok\\\" }}\"\n\
197             }}\n\
198             </spawn>\n\
199             \n\
200             Rules:\n\
201             - \"agent_type\" must be exactly \"DynamicAgent\" or \"MonitorAgent\" (required)\n\
202             - \"agent_name\" must be lowercase-hyphenated, no spaces (required)\n\
203             - \"script\" MUST define a Rhai function: fn main(msg) {{ ... }}\n\
204               `msg` is the PLAIN TEXT the user typed (e.g. \"3 / 4\" or \"hello\").\n\
205               The function must return a string (the reply to send back).\n\
206               Available API calls inside the script:\n\
207                 agent_log(text)              — log a message\n\
208                 agent_alert(text)            — broadcast an alert\n\
209                 agent_state_get(key) → value — read persistent state\n\
210                 agent_state_set(key, value)  — write persistent state\n\
211             - Example math agent script:\n\
212               \"fn main(msg) {{ let expr = msg.trim(); let result = eval(expr); \\\"= \\\" + result.to_string() }}\"\n\
213             - Example echo agent: \"fn main(msg) {{ \\\"Echo: \\\" + msg }}\"\n\
214             - Example counter agent:\n\
215               \"fn main(msg) {{ let n = agent_state_get(\\\"count\\\"); let c = if n == () {{ 0 }} else {{ n }}; agent_state_set(\\\"count\\\", c + 1); \\\"Count: \\\" + (c + 1).to_string() }}\"\n\
216             - Respond conversationally; include <spawn> blocks ONLY when the user explicitly asks to create a new agent.",
217            agents = agent_list.join("\n")
218        )
219    }
220}
221
222#[async_trait]
223impl Actor for MainActor {
224    fn id(&self) -> String {
225        self.config.id.clone()
226    }
227    fn name(&self) -> &str {
228        &self.config.name
229    }
230    fn state(&self) -> ActorState {
231        self.state.clone()
232    }
233    fn metrics(&self) -> Arc<ActorMetrics> {
234        Arc::clone(&self.metrics)
235    }
236    fn mailbox(&self) -> mpsc::Sender<Message> {
237        self.mailbox_tx.clone()
238    }
239    fn is_protected(&self) -> bool {
240        self.config.protected
241    }
242
243    async fn on_start(&mut self) -> Result<()> {
244        self.state = ActorState::Running;
245        if let Some(pub_) = &self.publisher {
246            pub_.publish(
247                wactorz_mqtt::topics::spawn(&self.config.id),
248                &serde_json::json!({
249                    "agentId":   self.config.id,
250                    "agentName": self.config.name,
251                    "agentType": "orchestrator",
252                    "timestampMs": Self::now_ms(),
253                }),
254            );
255        }
256        Ok(())
257    }
258
259    async fn handle_message(&mut self, message: Message) -> Result<()> {
260        use wactorz_core::message::MessageType;
261        let user_text = match &message.payload {
262            MessageType::Text { content } => content.clone(),
263            MessageType::Task { description, .. } => description.clone(),
264            _ => return Ok(()),
265        };
266
267        let system_prompt = self.build_system_prompt().await;
268        let full_prompt = format!("{}\n\nUser: {}", system_prompt, user_text);
269        let response = self
270            .llm
271            .complete(&full_prompt)
272            .await
273            .unwrap_or_else(|e| format!("LLM error: {e}"));
274
275        // Parse and execute any spawn directives
276        let directives = Self::parse_spawn_blocks(&response);
277        for dir in directives {
278            if let Err(e) = self.execute_spawn(dir).await {
279                tracing::error!("Spawn failed: {e}");
280            }
281        }
282
283        // Publish response to MQTT chat topic
284        if let Some(pub_) = &self.publisher {
285            let msg_id = wid::HLCWidGen::new("msg".to_string(), 4, 0)
286                .expect("HLCWidGen init")
287                .next_hlc_wid();
288            pub_.publish(
289                wactorz_mqtt::topics::chat(&self.config.id),
290                &serde_json::json!({
291                    "id": msg_id,
292                    "from": self.config.name,
293                    "to": "user",
294                    "content": response,
295                    "timestampMs": std::time::SystemTime::now()
296                        .duration_since(std::time::UNIX_EPOCH)
297                        .unwrap_or_default().as_millis() as u64,
298                }),
299            );
300        }
301        Ok(())
302    }
303
304    async fn on_heartbeat(&mut self) -> Result<()> {
305        use std::sync::atomic::Ordering;
306        if let Some(pub_) = &self.publisher {
307            pub_.publish(
308                wactorz_mqtt::topics::heartbeat(&self.config.id),
309                &serde_json::json!({
310                    "agentId": self.config.id,
311                    "agentName": self.config.name,
312                    "state": self.state,
313                    "sequence": self.metrics.heartbeats.load(Ordering::Relaxed),
314                    "timestampMs": std::time::SystemTime::now()
315                        .duration_since(std::time::UNIX_EPOCH)
316                        .unwrap_or_default().as_millis() as u64,
317                }),
318            );
319        }
320        Ok(())
321    }
322
323    async fn run(&mut self) -> Result<()> {
324        self.on_start().await?;
325        let mut rx = self
326            .mailbox_rx
327            .take()
328            .ok_or_else(|| anyhow::anyhow!("MainActor already running"))?;
329        let mut hb = tokio::time::interval(std::time::Duration::from_secs(
330            self.config.heartbeat_interval_secs,
331        ));
332        hb.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
333        loop {
334            tokio::select! {
335                biased;
336                msg = rx.recv() => {
337                    match msg {
338                        None => break,
339                        Some(m) => {
340                            self.metrics.record_received();
341                            if let wactorz_core::message::MessageType::Command {
342                                command: wactorz_core::message::ActorCommand::Stop
343                            } = &m.payload {
344                                break;
345                            }
346                            match self.handle_message(m).await {
347                                Ok(_) => self.metrics.record_processed(),
348                                Err(e) => {
349                                    tracing::error!("[{}] {e}", self.config.name);
350                                    self.metrics.record_failed();
351                                }
352                            }
353                        }
354                    }
355                }
356                _ = hb.tick() => {
357                    self.metrics.record_heartbeat();
358                    if let Err(e) = self.on_heartbeat().await {
359                        tracing::error!("[{}] heartbeat: {e}", self.config.name);
360                    }
361                }
362            }
363        }
364        self.state = ActorState::Stopped;
365        self.on_stop().await
366    }
367}