1use anyhow::Result;
16use async_trait::async_trait;
17use rhai::{AST, Engine};
18use std::collections::HashMap;
19use std::sync::Arc;
20use tokio::sync::mpsc;
21
22use crate::llm_agent::{LlmAgent, LlmConfig};
23use wactorz_core::{Actor, ActorConfig, ActorMetrics, ActorState, EventPublisher, Message};
24
25pub struct DynamicAgent {
27 config: ActorConfig,
28 script_ast: Option<AST>,
30 script_source: String,
32 description: String,
34 llm: Option<LlmAgent>,
36 agent_state: std::sync::Arc<std::sync::Mutex<HashMap<String, serde_json::Value>>>,
38 log_queue: std::sync::Arc<std::sync::Mutex<Vec<String>>>,
39 engine: Engine,
40 state: ActorState,
41 metrics: Arc<ActorMetrics>,
42 mailbox_tx: mpsc::Sender<Message>,
43 mailbox_rx: Option<mpsc::Receiver<Message>>,
44 publisher: Option<EventPublisher>,
45}
46
47impl DynamicAgent {
48 pub fn new(config: ActorConfig, script_source: impl Into<String>) -> Self {
50 let (tx, rx) = mpsc::channel(config.mailbox_capacity);
51 let agent_state = std::sync::Arc::new(std::sync::Mutex::new(HashMap::<
52 String,
53 serde_json::Value,
54 >::new()));
55 let log_queue = std::sync::Arc::new(std::sync::Mutex::new(Vec::<String>::new()));
56 let mut agent = Self {
57 config,
58 script_ast: None,
59 script_source: script_source.into(),
60 description: String::new(),
61 llm: None,
62 agent_state,
63 log_queue,
64 engine: Engine::new(),
65 state: ActorState::Initializing,
66 metrics: Arc::new(ActorMetrics::new()),
67 mailbox_tx: tx,
68 mailbox_rx: Some(rx),
69 publisher: None,
70 };
71 agent.register_api();
72 agent
73 }
74
75 pub fn with_publisher(mut self, p: EventPublisher) -> Self {
77 self.publisher = Some(p);
78 self
79 }
80
81 pub fn with_llm(mut self, llm_config: LlmConfig, description: String) -> Self {
83 let llm_cfg = ActorConfig::new(format!("{}-llm", self.config.name));
84 self.llm = Some(LlmAgent::new(llm_cfg, llm_config));
85 self.description = description;
86 self
87 }
88
89 fn now_ms() -> u64 {
90 std::time::SystemTime::now()
91 .duration_since(std::time::UNIX_EPOCH)
92 .unwrap_or_default()
93 .as_millis() as u64
94 }
95
96 fn register_api(&mut self) {
98 let state_r = std::sync::Arc::clone(&self.agent_state);
99 let state_w = std::sync::Arc::clone(&self.agent_state);
100 let log_q = std::sync::Arc::clone(&self.log_queue);
101 let log_q2 = std::sync::Arc::clone(&self.log_queue);
102
103 self.engine
104 .register_fn("agent_state_get", move |key: &str| -> rhai::Dynamic {
105 let map = state_r.lock().unwrap();
106 match map.get(key) {
107 Some(v) => rhai::serde::to_dynamic(v.clone()).unwrap_or(rhai::Dynamic::UNIT),
108 None => rhai::Dynamic::UNIT,
109 }
110 });
111
112 self.engine
113 .register_fn("agent_state_set", move |key: &str, value: rhai::Dynamic| {
114 if let Ok(json) = rhai::serde::from_dynamic::<serde_json::Value>(&value) {
115 state_w.lock().unwrap().insert(key.to_string(), json);
116 }
117 });
118
119 self.engine.register_fn("agent_log", move |msg: &str| {
120 log_q.lock().unwrap().push(msg.to_string());
121 });
122
123 self.engine.register_fn("agent_alert", move |msg: &str| {
124 log_q2.lock().unwrap().push(format!("ALERT: {msg}"));
125 tracing::warn!("DynamicAgent alert: {msg}");
126 });
127
128 self.engine
132 .register_fn("+", |a: String, b: rhai::Dynamic| -> String {
133 format!("{a}{b}")
134 });
135 self.engine
136 .register_fn("str", |v: rhai::Dynamic| -> String { v.to_string() });
137 }
138
139 pub fn compile_script(&mut self) -> Result<()> {
141 let ast = self
142 .engine
143 .compile(&self.script_source)
144 .map_err(|e| anyhow::anyhow!("Rhai compile error: {e}"))?;
145 self.script_ast = Some(ast);
146 Ok(())
147 }
148
149 pub fn reload_script(&mut self, new_source: impl Into<String>) -> Result<()> {
151 self.script_source = new_source.into();
152 self.compile_script()
153 }
154
155 async fn llm_respond(&mut self, user_input: &str) -> String {
157 if let Some(llm) = &mut self.llm {
158 let prompt = format!(
159 "You are {}. {}\n\nUser: {}",
160 self.config.name,
161 if self.description.is_empty() {
162 "A helpful AI agent."
163 } else {
164 &self.description
165 },
166 user_input,
167 );
168 llm.complete(&prompt)
169 .await
170 .unwrap_or_else(|e| format!("LLM error: {e}"))
171 } else {
172 format!(
173 "I received '{}' but my script produced no response. \
174 Ask main-actor to give me a better script.",
175 &user_input[..user_input.len().min(60)]
176 )
177 }
178 }
179
180 async fn llm_respond_with_error(&mut self, user_input: &str, script_err: &str) -> String {
182 if let Some(llm) = &mut self.llm {
183 let prompt = format!(
184 "You are {}. {}\n\
185 Your Rhai script had an error: {}\n\
186 Respond helpfully to the user anyway.\n\nUser: {}",
187 self.config.name,
188 if self.description.is_empty() {
189 "A helpful AI agent."
190 } else {
191 &self.description
192 },
193 script_err,
194 user_input,
195 );
196 llm.complete(&prompt)
197 .await
198 .unwrap_or_else(|e| format!("LLM error: {e}"))
199 } else {
200 format!("script error: {script_err}")
201 }
202 }
203
204 fn run_script(&self, message_json: &str) -> Result<String> {
210 let ast = self
211 .script_ast
212 .as_ref()
213 .ok_or_else(|| anyhow::anyhow!("script not compiled"))?;
214
215 let mut scope = rhai::Scope::new();
216 let call_result = self.engine.call_fn::<rhai::Dynamic>(
217 &mut scope,
218 ast,
219 "main",
220 (message_json.to_string(),),
221 );
222
223 match call_result {
224 Ok(v) => Ok(v.to_string()),
225 Err(e) if e.to_string().contains("not found") => {
226 let mut scope2 = rhai::Scope::new();
228 scope2.push("msg", message_json.to_string());
229 let v = self
230 .engine
231 .eval_ast_with_scope::<rhai::Dynamic>(&mut scope2, ast)
232 .unwrap_or(rhai::Dynamic::from("(no output)"));
233 Ok(v.to_string())
234 }
235 Err(e) => Err(anyhow::anyhow!("Rhai error: {e}")),
236 }
237 }
238}
239
240#[async_trait]
241impl Actor for DynamicAgent {
242 fn id(&self) -> String {
243 self.config.id.clone()
244 }
245 fn name(&self) -> &str {
246 &self.config.name
247 }
248 fn state(&self) -> ActorState {
249 self.state.clone()
250 }
251 fn metrics(&self) -> Arc<ActorMetrics> {
252 Arc::clone(&self.metrics)
253 }
254 fn mailbox(&self) -> mpsc::Sender<Message> {
255 self.mailbox_tx.clone()
256 }
257 fn is_protected(&self) -> bool {
258 self.config.protected
259 }
260
261 async fn on_start(&mut self) -> Result<()> {
262 self.compile_script()?;
263 self.state = ActorState::Running;
264 if let Some(pub_) = &self.publisher {
266 pub_.publish(
267 wactorz_mqtt::topics::spawn(&self.config.id),
268 &serde_json::json!({
269 "agentId": self.config.id,
270 "agentName": self.config.name,
271 "agentType": "dynamic",
272 "timestampMs": Self::now_ms(),
273 }),
274 );
275 }
276 Ok(())
277 }
278
279 async fn handle_message(&mut self, message: Message) -> Result<()> {
280 let content = match &message.payload {
282 wactorz_core::message::MessageType::Text { content } => content.clone(),
283 wactorz_core::message::MessageType::Task { description, .. } => description.clone(),
284 _ => return Ok(()),
285 };
286 tracing::info!(
287 "[{}] recv: {:?}",
288 self.config.name,
289 &content[..content.len().min(120)]
290 );
291
292 let script_result = tokio::task::block_in_place(|| self.run_script(&content));
293
294 let logs: Vec<String> = self.log_queue.lock().unwrap().drain(..).collect();
296 for log in &logs {
297 tracing::info!("[{}] script log: {log}", self.config.name);
298 if let Some(pub_) = &self.publisher {
299 pub_.publish(
300 wactorz_mqtt::topics::logs(&self.config.id),
301 &serde_json::json!({"message": log}),
302 );
303 }
304 }
305
306 let response: String = match script_result {
311 Ok(r) if !r.is_empty() && r != "()" => {
312 tracing::info!(
313 "[{}] script → {:?}",
314 self.config.name,
315 &r[..r.len().min(120)]
316 );
317 r
318 }
319 Ok(r) => {
320 tracing::info!(
321 "[{}] script returned {:?} — trying LLM fallback",
322 self.config.name,
323 r
324 );
325 self.llm_respond(&content).await
326 }
327 Err(e) => {
328 tracing::warn!("[{}] script error: {e}", self.config.name);
329 self.llm_respond_with_error(&content, &e.to_string()).await
330 }
331 };
332
333 if !response.is_empty()
334 && let Some(pub_) = &self.publisher
335 {
336 pub_.publish(
337 wactorz_mqtt::topics::chat(&self.config.id),
338 &serde_json::json!({
339 "from": self.config.name,
340 "to": "user",
341 "content": response,
342 "timestampMs": Self::now_ms(),
343 }),
344 );
345 }
346 Ok(())
347 }
348
349 async fn on_heartbeat(&mut self) -> Result<()> {
350 if let Some(pub_) = &self.publisher {
351 pub_.publish(
352 wactorz_mqtt::topics::heartbeat(&self.config.id),
353 &serde_json::json!({
354 "agentId": self.config.id,
355 "agentName": self.config.name,
356 "state": self.state,
357 "timestampMs": Self::now_ms(),
358 }),
359 );
360 }
361 Ok(())
362 }
363
364 async fn on_stop(&mut self) -> Result<()> {
365 if let Some(pub_) = &self.publisher {
366 pub_.publish(
367 wactorz_mqtt::topics::status(&self.config.id),
368 &serde_json::json!({
369 "agentId": self.config.id,
370 "agentName": self.config.name,
371 "state": "stopped",
372 "timestampMs": Self::now_ms(),
373 }),
374 );
375 }
376 Ok(())
377 }
378
379 async fn run(&mut self) -> Result<()> {
380 self.on_start().await?;
381 let mut rx = self
382 .mailbox_rx
383 .take()
384 .ok_or_else(|| anyhow::anyhow!("DynamicAgent already running"))?;
385 let mut hb = tokio::time::interval(std::time::Duration::from_secs(
386 self.config.heartbeat_interval_secs,
387 ));
388 hb.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
389 let mut paused = false;
390 loop {
391 tokio::select! {
392 biased;
393 msg = rx.recv() => {
394 match msg {
395 None => break,
396 Some(m) => {
397 self.metrics.record_received();
398 use wactorz_core::message::{ActorCommand, MessageType};
399 match &m.payload {
400 MessageType::Command { command: ActorCommand::Stop } => break,
401
402 MessageType::Command { command: ActorCommand::Pause } if !paused => {
403 paused = true;
404 self.state = ActorState::Paused;
405 tracing::info!("[{}] paused", self.config.name);
406 if let Some(pub_) = &self.publisher {
407 pub_.publish(
408 wactorz_mqtt::topics::status(&self.config.id),
409 &serde_json::json!({
410 "agentId": self.config.id,
411 "agentName": self.config.name,
412 "state": "paused",
413 "timestampMs": Self::now_ms(),
414 }),
415 );
416 }
417 }
418
419 MessageType::Command { command: ActorCommand::Resume } if paused => {
420 paused = false;
421 self.state = ActorState::Running;
422 tracing::info!("[{}] resumed", self.config.name);
423 if let Some(pub_) = &self.publisher {
424 pub_.publish(
425 wactorz_mqtt::topics::status(&self.config.id),
426 &serde_json::json!({
427 "agentId": self.config.id,
428 "agentName": self.config.name,
429 "state": "running",
430 "timestampMs": Self::now_ms(),
431 }),
432 );
433 }
434 }
435
436 _ if !paused => {
437 match self.handle_message(m).await {
438 Ok(_) => self.metrics.record_processed(),
439 Err(e) => {
440 tracing::error!("[{}] {e}", self.config.name);
441 self.metrics.record_failed();
442 }
443 }
444 }
445
446 _ => {} }
448 }
449 }
450 }
451 _ = hb.tick() => {
452 if !paused {
453 self.metrics.record_heartbeat();
454 if let Err(e) = self.on_heartbeat().await {
455 tracing::error!("[{}] heartbeat: {e}", self.config.name);
456 }
457 }
458 }
459 }
460 }
461 self.state = ActorState::Stopped;
462 self.on_stop().await
463 }
464}