1use anyhow::Result;
22use async_trait::async_trait;
23use serde::{Deserialize, Serialize};
24use std::sync::Arc;
25use tokio::sync::mpsc;
26
27use wactorz_core::{
28 Actor, ActorConfig, ActorMetrics, ActorState, ActorSystem, EventPublisher, Message,
29};
30
31use crate::llm_agent::{LlmAgent, LlmConfig};
32
33fn default_agent_type() -> String {
34 "DynamicAgent".into()
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct SpawnDirective {
40 #[serde(default = "default_agent_type")]
41 pub agent_type: String,
42 pub agent_name: String,
43 pub script: Option<String>,
44 pub description: Option<String>,
45 pub config: Option<serde_json::Value>,
46}
47
48pub struct MainActor {
50 config: ActorConfig,
51 llm: LlmAgent,
52 llm_config: LlmConfig,
53 system: ActorSystem,
54 state: ActorState,
55 metrics: Arc<ActorMetrics>,
56 mailbox_tx: mpsc::Sender<Message>,
57 mailbox_rx: Option<mpsc::Receiver<Message>>,
58 publisher: Option<EventPublisher>,
59}
60
61impl MainActor {
62 pub fn new(config: ActorConfig, llm_config: LlmConfig, system: ActorSystem) -> Self {
63 let protected_config = ActorConfig {
64 protected: true,
65 ..config.clone()
66 };
67 let llm = LlmAgent::new(
68 ActorConfig::new(format!("{}-llm", config.name)),
69 llm_config.clone(),
70 );
71 let (tx, rx) = mpsc::channel(config.mailbox_capacity);
72 Self {
73 config: protected_config,
74 llm,
75 llm_config,
76 system,
77 state: ActorState::Initializing,
78 metrics: Arc::new(ActorMetrics::new()),
79 mailbox_tx: tx,
80 mailbox_rx: Some(rx),
81 publisher: None,
82 }
83 }
84
85 pub fn with_publisher(mut self, p: EventPublisher) -> Self {
87 self.publisher = Some(p);
88 self
89 }
90
91 pub fn parse_spawn_blocks(response: &str) -> Vec<SpawnDirective> {
93 let mut out = Vec::new();
94 let mut rest = response;
95 while let Some(open) = rest.find("<spawn>") {
96 let after = &rest[open + 7..];
97 if let Some(close) = after.find("</spawn>") {
98 let json_str = after[..close].trim();
99 match serde_json::from_str::<SpawnDirective>(json_str) {
100 Ok(d) => out.push(d),
101 Err(e) => tracing::warn!("Bad <spawn> block: {e}"),
102 }
103 rest = &after[close + 8..];
104 } else {
105 break;
106 }
107 }
108 out
109 }
110
111 async fn execute_spawn(&self, directive: SpawnDirective) -> Result<()> {
113 use crate::{DynamicAgent, MonitorAgent};
114
115 let cfg = ActorConfig::new(&directive.agent_name);
116 let agent_id = cfg.id.clone();
117 let agent_name = cfg.name.clone();
118 let agent_type = directive.agent_type.clone();
119
120 let description = directive.description.unwrap_or_default();
121 let actor: Box<dyn wactorz_core::Actor> = match directive.agent_type.as_str() {
122 "DynamicAgent" | "dynamic" => {
123 let script = directive.script.unwrap_or_default();
124 let mut a =
125 DynamicAgent::new(cfg, script).with_llm(self.llm_config.clone(), description);
126 if let Some(pub_) = &self.publisher {
127 a = a.with_publisher(pub_.clone());
128 }
129 Box::new(a)
130 }
131 "MonitorAgent" | "monitor" => {
132 let a = MonitorAgent::new(cfg, self.system.clone());
133 if let Some(pub_) = &self.publisher {
134 Box::new(a.with_publisher(pub_.clone()))
135 } else {
136 Box::new(a)
137 }
138 }
139 _ => {
140 tracing::warn!(
141 "Unknown agent_type '{}', defaulting to DynamicAgent",
142 directive.agent_type
143 );
144 let mut a = DynamicAgent::new(cfg, String::new())
145 .with_llm(self.llm_config.clone(), description);
146 if let Some(pub_) = &self.publisher {
147 a = a.with_publisher(pub_.clone());
148 }
149 Box::new(a)
150 }
151 };
152 self.system.spawn_actor(actor).await?;
153 tracing::info!("Spawned {} as {}", agent_name, agent_type);
154
155 if let Some(pub_) = &self.publisher {
157 pub_.publish(
158 wactorz_mqtt::topics::spawn(&agent_id),
159 &serde_json::json!({
160 "agentId": agent_id,
161 "agentName": agent_name,
162 "agentType": agent_type,
163 "timestampMs": Self::now_ms(),
164 }),
165 );
166 }
167 Ok(())
168 }
169
170 fn now_ms() -> u64 {
171 std::time::SystemTime::now()
172 .duration_since(std::time::UNIX_EPOCH)
173 .unwrap_or_default()
174 .as_millis() as u64
175 }
176
177 async fn build_system_prompt(&self) -> String {
179 let actors = self.system.registry.list().await;
180 let agent_list: Vec<String> = actors
181 .iter()
182 .map(|e| format!("- {} (id={}, state={})", e.name, e.id, e.state))
183 .collect();
184 format!(
185 "You are the main orchestrator of the AgentFlow multi-agent system.\n\
186 \n\
187 Current agents:\n\
188 {agents}\n\
189 \n\
190 To spawn a new agent, embed a JSON block using EXACTLY this format:\n\
191 <spawn>\n\
192 {{\n\
193 \"agent_type\": \"DynamicAgent\",\n\
194 \"agent_name\": \"my-agent\",\n\
195 \"description\": \"What this agent does\",\n\
196 \"script\": \"fn main(msg) {{ agent_log(\\\"got: \\\" + msg); \\\"ok\\\" }}\"\n\
197 }}\n\
198 </spawn>\n\
199 \n\
200 Rules:\n\
201 - \"agent_type\" must be exactly \"DynamicAgent\" or \"MonitorAgent\" (required)\n\
202 - \"agent_name\" must be lowercase-hyphenated, no spaces (required)\n\
203 - \"script\" MUST define a Rhai function: fn main(msg) {{ ... }}\n\
204 `msg` is the PLAIN TEXT the user typed (e.g. \"3 / 4\" or \"hello\").\n\
205 The function must return a string (the reply to send back).\n\
206 Available API calls inside the script:\n\
207 agent_log(text) — log a message\n\
208 agent_alert(text) — broadcast an alert\n\
209 agent_state_get(key) → value — read persistent state\n\
210 agent_state_set(key, value) — write persistent state\n\
211 - Example math agent script:\n\
212 \"fn main(msg) {{ let expr = msg.trim(); let result = eval(expr); \\\"= \\\" + result.to_string() }}\"\n\
213 - Example echo agent: \"fn main(msg) {{ \\\"Echo: \\\" + msg }}\"\n\
214 - Example counter agent:\n\
215 \"fn main(msg) {{ let n = agent_state_get(\\\"count\\\"); let c = if n == () {{ 0 }} else {{ n }}; agent_state_set(\\\"count\\\", c + 1); \\\"Count: \\\" + (c + 1).to_string() }}\"\n\
216 - Respond conversationally; include <spawn> blocks ONLY when the user explicitly asks to create a new agent.",
217 agents = agent_list.join("\n")
218 )
219 }
220}
221
222#[async_trait]
223impl Actor for MainActor {
224 fn id(&self) -> String {
225 self.config.id.clone()
226 }
227 fn name(&self) -> &str {
228 &self.config.name
229 }
230 fn state(&self) -> ActorState {
231 self.state.clone()
232 }
233 fn metrics(&self) -> Arc<ActorMetrics> {
234 Arc::clone(&self.metrics)
235 }
236 fn mailbox(&self) -> mpsc::Sender<Message> {
237 self.mailbox_tx.clone()
238 }
239 fn is_protected(&self) -> bool {
240 self.config.protected
241 }
242
243 async fn on_start(&mut self) -> Result<()> {
244 self.state = ActorState::Running;
245 if let Some(pub_) = &self.publisher {
246 pub_.publish(
247 wactorz_mqtt::topics::spawn(&self.config.id),
248 &serde_json::json!({
249 "agentId": self.config.id,
250 "agentName": self.config.name,
251 "agentType": "orchestrator",
252 "timestampMs": Self::now_ms(),
253 }),
254 );
255 }
256 Ok(())
257 }
258
259 async fn handle_message(&mut self, message: Message) -> Result<()> {
260 use wactorz_core::message::MessageType;
261 let user_text = match &message.payload {
262 MessageType::Text { content } => content.clone(),
263 MessageType::Task { description, .. } => description.clone(),
264 _ => return Ok(()),
265 };
266
267 let system_prompt = self.build_system_prompt().await;
268 let full_prompt = format!("{}\n\nUser: {}", system_prompt, user_text);
269 let response = self
270 .llm
271 .complete(&full_prompt)
272 .await
273 .unwrap_or_else(|e| format!("LLM error: {e}"));
274
275 let directives = Self::parse_spawn_blocks(&response);
277 for dir in directives {
278 if let Err(e) = self.execute_spawn(dir).await {
279 tracing::error!("Spawn failed: {e}");
280 }
281 }
282
283 if let Some(pub_) = &self.publisher {
285 let msg_id = wid::HLCWidGen::new("msg".to_string(), 4, 0)
286 .expect("HLCWidGen init")
287 .next_hlc_wid();
288 pub_.publish(
289 wactorz_mqtt::topics::chat(&self.config.id),
290 &serde_json::json!({
291 "id": msg_id,
292 "from": self.config.name,
293 "to": "user",
294 "content": response,
295 "timestampMs": std::time::SystemTime::now()
296 .duration_since(std::time::UNIX_EPOCH)
297 .unwrap_or_default().as_millis() as u64,
298 }),
299 );
300 }
301 Ok(())
302 }
303
304 async fn on_heartbeat(&mut self) -> Result<()> {
305 use std::sync::atomic::Ordering;
306 if let Some(pub_) = &self.publisher {
307 pub_.publish(
308 wactorz_mqtt::topics::heartbeat(&self.config.id),
309 &serde_json::json!({
310 "agentId": self.config.id,
311 "agentName": self.config.name,
312 "state": self.state,
313 "sequence": self.metrics.heartbeats.load(Ordering::Relaxed),
314 "timestampMs": std::time::SystemTime::now()
315 .duration_since(std::time::UNIX_EPOCH)
316 .unwrap_or_default().as_millis() as u64,
317 }),
318 );
319 }
320 Ok(())
321 }
322
323 async fn run(&mut self) -> Result<()> {
324 self.on_start().await?;
325 let mut rx = self
326 .mailbox_rx
327 .take()
328 .ok_or_else(|| anyhow::anyhow!("MainActor already running"))?;
329 let mut hb = tokio::time::interval(std::time::Duration::from_secs(
330 self.config.heartbeat_interval_secs,
331 ));
332 hb.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
333 loop {
334 tokio::select! {
335 biased;
336 msg = rx.recv() => {
337 match msg {
338 None => break,
339 Some(m) => {
340 self.metrics.record_received();
341 if let wactorz_core::message::MessageType::Command {
342 command: wactorz_core::message::ActorCommand::Stop
343 } = &m.payload {
344 break;
345 }
346 match self.handle_message(m).await {
347 Ok(_) => self.metrics.record_processed(),
348 Err(e) => {
349 tracing::error!("[{}] {e}", self.config.name);
350 self.metrics.record_failed();
351 }
352 }
353 }
354 }
355 }
356 _ = hb.tick() => {
357 self.metrics.record_heartbeat();
358 if let Err(e) = self.on_heartbeat().await {
359 tracing::error!("[{}] heartbeat: {e}", self.config.name);
360 }
361 }
362 }
363 }
364 self.state = ActorState::Stopped;
365 self.on_stop().await
366 }
367}