wactorz_agents/
dynamic_agent.rs

1//! Dynamic runtime-script agent.
2//!
3//! [`DynamicAgent`] executes **Rhai** scripts generated by the LLM at runtime.
4//! Rhai is a safe, sandboxed scripting language for Rust — it replaces the
5//! Python `exec()` approach with proper isolation.
6//!
7//! The Rhai engine exposes a limited API to scripts:
8//! - `agent_log(msg)` — log a message
9//! - `agent_alert(msg)` — broadcast an alert
10//! - `agent_state_get(key)` — read from persistent state
11//! - `agent_state_set(key, value)` — write to persistent state
12//!
13//! Scripts must define a `fn main()` function that is called on each message.
14
15use anyhow::Result;
16use async_trait::async_trait;
17use rhai::{AST, Engine};
18use std::collections::HashMap;
19use std::sync::Arc;
20use tokio::sync::mpsc;
21
22use crate::llm_agent::{LlmAgent, LlmConfig};
23use wactorz_core::{Actor, ActorConfig, ActorMetrics, ActorState, EventPublisher, Message};
24
25/// A dynamically scripted agent.
26pub struct DynamicAgent {
27    config: ActorConfig,
28    /// The compiled Rhai AST for the current script.
29    script_ast: Option<AST>,
30    /// Raw source of the current script (for re-compilation after reload).
31    script_source: String,
32    /// Human-readable description, used as LLM system-prompt context.
33    description: String,
34    /// Optional LLM backend — called when the Rhai script produces no output.
35    llm: Option<LlmAgent>,
36    /// Persistent key-value state accessible from scripts.
37    agent_state: std::sync::Arc<std::sync::Mutex<HashMap<String, serde_json::Value>>>,
38    log_queue: std::sync::Arc<std::sync::Mutex<Vec<String>>>,
39    engine: Engine,
40    state: ActorState,
41    metrics: Arc<ActorMetrics>,
42    mailbox_tx: mpsc::Sender<Message>,
43    mailbox_rx: Option<mpsc::Receiver<Message>>,
44    publisher: Option<EventPublisher>,
45}
46
47impl DynamicAgent {
48    /// Create a new DynamicAgent with the given Rhai script source.
49    pub fn new(config: ActorConfig, script_source: impl Into<String>) -> Self {
50        let (tx, rx) = mpsc::channel(config.mailbox_capacity);
51        let agent_state = std::sync::Arc::new(std::sync::Mutex::new(HashMap::<
52            String,
53            serde_json::Value,
54        >::new()));
55        let log_queue = std::sync::Arc::new(std::sync::Mutex::new(Vec::<String>::new()));
56        let mut agent = Self {
57            config,
58            script_ast: None,
59            script_source: script_source.into(),
60            description: String::new(),
61            llm: None,
62            agent_state,
63            log_queue,
64            engine: Engine::new(),
65            state: ActorState::Initializing,
66            metrics: Arc::new(ActorMetrics::new()),
67            mailbox_tx: tx,
68            mailbox_rx: Some(rx),
69            publisher: None,
70        };
71        agent.register_api();
72        agent
73    }
74
75    /// Attach an EventPublisher for MQTT output.
76    pub fn with_publisher(mut self, p: EventPublisher) -> Self {
77        self.publisher = Some(p);
78        self
79    }
80
81    /// Attach an LLM backend used as fallback when the Rhai script produces no output.
82    pub fn with_llm(mut self, llm_config: LlmConfig, description: String) -> Self {
83        let llm_cfg = ActorConfig::new(format!("{}-llm", self.config.name));
84        self.llm = Some(LlmAgent::new(llm_cfg, llm_config));
85        self.description = description;
86        self
87    }
88
89    fn now_ms() -> u64 {
90        std::time::SystemTime::now()
91            .duration_since(std::time::UNIX_EPOCH)
92            .unwrap_or_default()
93            .as_millis() as u64
94    }
95
96    /// Register the agent API functions into the Rhai engine.
97    fn register_api(&mut self) {
98        let state_r = std::sync::Arc::clone(&self.agent_state);
99        let state_w = std::sync::Arc::clone(&self.agent_state);
100        let log_q = std::sync::Arc::clone(&self.log_queue);
101        let log_q2 = std::sync::Arc::clone(&self.log_queue);
102
103        self.engine
104            .register_fn("agent_state_get", move |key: &str| -> rhai::Dynamic {
105                let map = state_r.lock().unwrap();
106                match map.get(key) {
107                    Some(v) => rhai::serde::to_dynamic(v.clone()).unwrap_or(rhai::Dynamic::UNIT),
108                    None => rhai::Dynamic::UNIT,
109                }
110            });
111
112        self.engine
113            .register_fn("agent_state_set", move |key: &str, value: rhai::Dynamic| {
114                if let Ok(json) = rhai::serde::from_dynamic::<serde_json::Value>(&value) {
115                    state_w.lock().unwrap().insert(key.to_string(), json);
116                }
117            });
118
119        self.engine.register_fn("agent_log", move |msg: &str| {
120            log_q.lock().unwrap().push(msg.to_string());
121        });
122
123        self.engine.register_fn("agent_alert", move |msg: &str| {
124            log_q2.lock().unwrap().push(format!("ALERT: {msg}"));
125            tracing::warn!("DynamicAgent alert: {msg}");
126        });
127
128        // Type helpers: allow `"text" + any_value` and `str(x)` in scripts.
129        // LLM-generated scripts often write `"= " + result` without calling
130        // `.to_string()` first; these registrations make that safe.
131        self.engine
132            .register_fn("+", |a: String, b: rhai::Dynamic| -> String {
133                format!("{a}{b}")
134            });
135        self.engine
136            .register_fn("str", |v: rhai::Dynamic| -> String { v.to_string() });
137    }
138
139    /// Compile (or recompile) the stored script source.
140    pub fn compile_script(&mut self) -> Result<()> {
141        let ast = self
142            .engine
143            .compile(&self.script_source)
144            .map_err(|e| anyhow::anyhow!("Rhai compile error: {e}"))?;
145        self.script_ast = Some(ast);
146        Ok(())
147    }
148
149    /// Hot-reload a new script, recompiling it immediately.
150    pub fn reload_script(&mut self, new_source: impl Into<String>) -> Result<()> {
151        self.script_source = new_source.into();
152        self.compile_script()
153    }
154
155    /// LLM fallback when Rhai produces no output.
156    async fn llm_respond(&mut self, user_input: &str) -> String {
157        if let Some(llm) = &mut self.llm {
158            let prompt = format!(
159                "You are {}. {}\n\nUser: {}",
160                self.config.name,
161                if self.description.is_empty() {
162                    "A helpful AI agent."
163                } else {
164                    &self.description
165                },
166                user_input,
167            );
168            llm.complete(&prompt)
169                .await
170                .unwrap_or_else(|e| format!("LLM error: {e}"))
171        } else {
172            format!(
173                "I received '{}' but my script produced no response. \
174                 Ask main-actor to give me a better script.",
175                &user_input[..user_input.len().min(60)]
176            )
177        }
178    }
179
180    /// LLM fallback when Rhai errors out; includes the error for context.
181    async fn llm_respond_with_error(&mut self, user_input: &str, script_err: &str) -> String {
182        if let Some(llm) = &mut self.llm {
183            let prompt = format!(
184                "You are {}. {}\n\
185                 Your Rhai script had an error: {}\n\
186                 Respond helpfully to the user anyway.\n\nUser: {}",
187                self.config.name,
188                if self.description.is_empty() {
189                    "A helpful AI agent."
190                } else {
191                    &self.description
192                },
193                script_err,
194                user_input,
195            );
196            llm.complete(&prompt)
197                .await
198                .unwrap_or_else(|e| format!("LLM error: {e}"))
199        } else {
200            format!("script error: {script_err}")
201        }
202    }
203
204    /// Invoke the script's `main(message_json)` function.
205    ///
206    /// Fallback: if the script does not define `fn main`, evaluates the whole
207    /// script top-to-bottom with `msg` pre-populated in the scope.  This lets
208    /// the LLM omit the function wrapper and still get a response.
209    fn run_script(&self, message_json: &str) -> Result<String> {
210        let ast = self
211            .script_ast
212            .as_ref()
213            .ok_or_else(|| anyhow::anyhow!("script not compiled"))?;
214
215        let mut scope = rhai::Scope::new();
216        let call_result = self.engine.call_fn::<rhai::Dynamic>(
217            &mut scope,
218            ast,
219            "main",
220            (message_json.to_string(),),
221        );
222
223        match call_result {
224            Ok(v) => Ok(v.to_string()),
225            Err(e) if e.to_string().contains("not found") => {
226                // fn main not defined — evaluate script directly with msg in scope
227                let mut scope2 = rhai::Scope::new();
228                scope2.push("msg", message_json.to_string());
229                let v = self
230                    .engine
231                    .eval_ast_with_scope::<rhai::Dynamic>(&mut scope2, ast)
232                    .unwrap_or(rhai::Dynamic::from("(no output)"));
233                Ok(v.to_string())
234            }
235            Err(e) => Err(anyhow::anyhow!("Rhai error: {e}")),
236        }
237    }
238}
239
240#[async_trait]
241impl Actor for DynamicAgent {
242    fn id(&self) -> String {
243        self.config.id.clone()
244    }
245    fn name(&self) -> &str {
246        &self.config.name
247    }
248    fn state(&self) -> ActorState {
249        self.state.clone()
250    }
251    fn metrics(&self) -> Arc<ActorMetrics> {
252        Arc::clone(&self.metrics)
253    }
254    fn mailbox(&self) -> mpsc::Sender<Message> {
255        self.mailbox_tx.clone()
256    }
257    fn is_protected(&self) -> bool {
258        self.config.protected
259    }
260
261    async fn on_start(&mut self) -> Result<()> {
262        self.compile_script()?;
263        self.state = ActorState::Running;
264        // Announce to the dashboard so it appears immediately
265        if let Some(pub_) = &self.publisher {
266            pub_.publish(
267                wactorz_mqtt::topics::spawn(&self.config.id),
268                &serde_json::json!({
269                    "agentId":   self.config.id,
270                    "agentName": self.config.name,
271                    "agentType": "dynamic",
272                    "timestampMs": Self::now_ms(),
273                }),
274            );
275        }
276        Ok(())
277    }
278
279    async fn handle_message(&mut self, message: Message) -> Result<()> {
280        // Extract text content — pass plain text to Rhai so simple scripts work.
281        let content = match &message.payload {
282            wactorz_core::message::MessageType::Text { content } => content.clone(),
283            wactorz_core::message::MessageType::Task { description, .. } => description.clone(),
284            _ => return Ok(()),
285        };
286        tracing::info!(
287            "[{}] recv: {:?}",
288            self.config.name,
289            &content[..content.len().min(120)]
290        );
291
292        let script_result = tokio::task::block_in_place(|| self.run_script(&content));
293
294        // Drain script log queue
295        let logs: Vec<String> = self.log_queue.lock().unwrap().drain(..).collect();
296        for log in &logs {
297            tracing::info!("[{}] script log: {log}", self.config.name);
298            if let Some(pub_) = &self.publisher {
299                pub_.publish(
300                    wactorz_mqtt::topics::logs(&self.config.id),
301                    &serde_json::json!({"message": log}),
302                );
303            }
304        }
305
306        // Determine the final response:
307        //   1. Rhai returned a useful string → use it
308        //   2. Rhai returned "()" / "" OR errored → LLM fallback (if configured)
309        //   3. No LLM → publish error/empty notice so the user isn't left hanging
310        let response: String = match script_result {
311            Ok(r) if !r.is_empty() && r != "()" => {
312                tracing::info!(
313                    "[{}] script → {:?}",
314                    self.config.name,
315                    &r[..r.len().min(120)]
316                );
317                r
318            }
319            Ok(r) => {
320                tracing::info!(
321                    "[{}] script returned {:?} — trying LLM fallback",
322                    self.config.name,
323                    r
324                );
325                self.llm_respond(&content).await
326            }
327            Err(e) => {
328                tracing::warn!("[{}] script error: {e}", self.config.name);
329                self.llm_respond_with_error(&content, &e.to_string()).await
330            }
331        };
332
333        if !response.is_empty()
334            && let Some(pub_) = &self.publisher
335        {
336            pub_.publish(
337                wactorz_mqtt::topics::chat(&self.config.id),
338                &serde_json::json!({
339                    "from":        self.config.name,
340                    "to":          "user",
341                    "content":     response,
342                    "timestampMs": Self::now_ms(),
343                }),
344            );
345        }
346        Ok(())
347    }
348
349    async fn on_heartbeat(&mut self) -> Result<()> {
350        if let Some(pub_) = &self.publisher {
351            pub_.publish(
352                wactorz_mqtt::topics::heartbeat(&self.config.id),
353                &serde_json::json!({
354                    "agentId":   self.config.id,
355                    "agentName": self.config.name,
356                    "state":     self.state,
357                    "timestampMs": Self::now_ms(),
358                }),
359            );
360        }
361        Ok(())
362    }
363
364    async fn on_stop(&mut self) -> Result<()> {
365        if let Some(pub_) = &self.publisher {
366            pub_.publish(
367                wactorz_mqtt::topics::status(&self.config.id),
368                &serde_json::json!({
369                    "agentId":   self.config.id,
370                    "agentName": self.config.name,
371                    "state":     "stopped",
372                    "timestampMs": Self::now_ms(),
373                }),
374            );
375        }
376        Ok(())
377    }
378
379    async fn run(&mut self) -> Result<()> {
380        self.on_start().await?;
381        let mut rx = self
382            .mailbox_rx
383            .take()
384            .ok_or_else(|| anyhow::anyhow!("DynamicAgent already running"))?;
385        let mut hb = tokio::time::interval(std::time::Duration::from_secs(
386            self.config.heartbeat_interval_secs,
387        ));
388        hb.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
389        let mut paused = false;
390        loop {
391            tokio::select! {
392                biased;
393                msg = rx.recv() => {
394                    match msg {
395                        None => break,
396                        Some(m) => {
397                            self.metrics.record_received();
398                            use wactorz_core::message::{ActorCommand, MessageType};
399                            match &m.payload {
400                                MessageType::Command { command: ActorCommand::Stop } => break,
401
402                                MessageType::Command { command: ActorCommand::Pause } if !paused => {
403                                    paused = true;
404                                    self.state = ActorState::Paused;
405                                    tracing::info!("[{}] paused", self.config.name);
406                                    if let Some(pub_) = &self.publisher {
407                                        pub_.publish(
408                                            wactorz_mqtt::topics::status(&self.config.id),
409                                            &serde_json::json!({
410                                                "agentId":   self.config.id,
411                                                "agentName": self.config.name,
412                                                "state":     "paused",
413                                                "timestampMs": Self::now_ms(),
414                                            }),
415                                        );
416                                    }
417                                }
418
419                                MessageType::Command { command: ActorCommand::Resume } if paused => {
420                                    paused = false;
421                                    self.state = ActorState::Running;
422                                    tracing::info!("[{}] resumed", self.config.name);
423                                    if let Some(pub_) = &self.publisher {
424                                        pub_.publish(
425                                            wactorz_mqtt::topics::status(&self.config.id),
426                                            &serde_json::json!({
427                                                "agentId":   self.config.id,
428                                                "agentName": self.config.name,
429                                                "state":     "running",
430                                                "timestampMs": Self::now_ms(),
431                                            }),
432                                        );
433                                    }
434                                }
435
436                                _ if !paused => {
437                                    match self.handle_message(m).await {
438                                        Ok(_) => self.metrics.record_processed(),
439                                        Err(e) => {
440                                            tracing::error!("[{}] {e}", self.config.name);
441                                            self.metrics.record_failed();
442                                        }
443                                    }
444                                }
445
446                                _ => {} // paused — drop non-command messages
447                            }
448                        }
449                    }
450                }
451                _ = hb.tick() => {
452                    if !paused {
453                        self.metrics.record_heartbeat();
454                        if let Err(e) = self.on_heartbeat().await {
455                            tracing::error!("[{}] heartbeat: {e}", self.config.name);
456                        }
457                    }
458                }
459            }
460        }
461        self.state = ActorState::Stopped;
462        self.on_stop().await
463    }
464}