1use anyhow::Result;
23use async_trait::async_trait;
24use std::collections::HashMap;
25use std::sync::Arc;
26use tokio::sync::mpsc;
27
28use wactorz_core::{Actor, ActorConfig, ActorMetrics, ActorState, EventPublisher, Message};
29
30const AGENT_RESPONSE_TIMEOUT_MS: u64 = 30_000;
32
33pub struct QAAgent {
34 config: ActorConfig,
35 state: ActorState,
36 metrics: Arc<ActorMetrics>,
37 mailbox_tx: mpsc::Sender<Message>,
38 mailbox_rx: Option<mpsc::Receiver<Message>>,
39 publisher: Option<EventPublisher>,
40 pending: HashMap<String, (String, u64)>,
44}
45
46impl QAAgent {
47 pub fn new(config: ActorConfig) -> Self {
48 let (tx, rx) = mpsc::channel(config.mailbox_capacity);
49 Self {
50 config,
51 state: ActorState::Initializing,
52 metrics: Arc::new(ActorMetrics::new()),
53 mailbox_tx: tx,
54 mailbox_rx: Some(rx),
55 publisher: None,
56 pending: HashMap::new(),
57 }
58 }
59
60 pub fn with_publisher(mut self, p: EventPublisher) -> Self {
61 self.publisher = Some(p);
62 self
63 }
64
65 fn now_ms() -> u64 {
66 std::time::SystemTime::now()
67 .duration_since(std::time::UNIX_EPOCH)
68 .unwrap_or_default()
69 .as_millis() as u64
70 }
71
72 fn check_payload_fields(from: &str, content: &str) -> Option<(String, &'static str)> {
74 if content.is_empty() {
75 return None;
76 }
77 let lower = content.to_lowercase();
78
79 if from == "user" || from.is_empty() {
81 const INJECTION: &[&str] = &[
82 "ignore previous instructions",
83 "ignore your previous",
84 "forget all previous",
85 "forget your instructions",
86 "you are now",
87 "pretend you are",
88 "act as if you are",
89 "disregard all",
90 "override your instructions",
91 "new persona",
92 "system prompt",
93 "jailbreak",
94 "dan mode",
95 ];
96 for pat in INJECTION {
97 if lower.contains(pat) {
98 return Some((format!("prompt-injection (matched: {pat})"), "warning"));
99 }
100 }
101 }
102
103 if from != "user" && !from.is_empty() {
105 const ERRORS: &[&str] = &[
106 "script error:",
107 "llm error:",
108 "rhai error:",
109 "panicked at",
110 "thread 'main' panicked",
111 "(no output)", "script not compiled", ];
114 for pat in ERRORS {
115 if lower.contains(pat) {
116 return Some((format!("agent-error-exposed ({pat})"), "error"));
117 }
118 }
119
120 let trimmed = content.trim_start();
123 if (trimmed.starts_with('{') || trimmed.starts_with('['))
124 && trimmed.len() > 20
125 && serde_json::from_str::<serde_json::Value>(trimmed).is_ok()
126 {
127 return Some(("raw-data-bleed".into(), "warning"));
128 }
129 }
130
131 for word in content.split_whitespace() {
134 if word.starts_with('@') {
135 continue;
136 }
137 if let Some(at_pos) = word.find('@') {
138 let after = &word[at_pos + 1..];
139 if after.contains('.') && after.len() >= 4 && !after.contains('/') {
141 return Some(("pii-possible-email".into(), "info"));
142 }
143 }
144 }
145
146 None
147 }
148
149 fn publish_flag(&self, category: &str, severity: &str, from: &str, excerpt: &str) {
150 if let Some(pub_) = &self.publisher {
151 let snippet = if excerpt.len() > 80 {
152 &excerpt[..80]
153 } else {
154 excerpt
155 };
156 pub_.publish(
157 "system/qa-flag",
158 &serde_json::json!({
159 "agentId": self.config.id,
160 "agentName": self.config.name,
161 "from": from,
162 "category": category,
163 "severity": severity,
164 "excerpt": snippet,
165 "message": format!("[QA/{category}] from={from}: {snippet}"),
166 "timestampMs": Self::now_ms(),
167 }),
168 );
169 }
170 }
171}
172
173#[async_trait]
174impl Actor for QAAgent {
175 fn id(&self) -> String {
176 self.config.id.clone()
177 }
178 fn name(&self) -> &str {
179 &self.config.name
180 }
181 fn state(&self) -> ActorState {
182 self.state.clone()
183 }
184 fn metrics(&self) -> Arc<ActorMetrics> {
185 Arc::clone(&self.metrics)
186 }
187 fn mailbox(&self) -> mpsc::Sender<Message> {
188 self.mailbox_tx.clone()
189 }
190 fn is_protected(&self) -> bool {
191 self.config.protected
192 }
193
194 async fn on_start(&mut self) -> Result<()> {
195 self.state = ActorState::Running;
196 if let Some(pub_) = &self.publisher {
197 pub_.publish(
198 wactorz_mqtt::topics::spawn(&self.config.id),
199 &serde_json::json!({
200 "agentId": self.config.id,
201 "agentName": self.config.name,
202 "agentType": "guardian",
203 "timestampMs": Self::now_ms(),
204 }),
205 );
206 }
207 Ok(())
208 }
209
210 async fn handle_message(&mut self, message: Message) -> Result<()> {
211 use wactorz_core::message::MessageType;
212 let payload_json = match &message.payload {
213 MessageType::Text { content } => content.clone(),
214 _ => return Ok(()),
215 };
216
217 let val: serde_json::Value = match serde_json::from_str(&payload_json) {
219 Ok(v) => v,
220 Err(_) => return Ok(()),
221 };
222 let from = val.get("from").and_then(|v| v.as_str()).unwrap_or("");
223 let to = val.get("to").and_then(|v| v.as_str()).unwrap_or("");
224 let content = val.get("content").and_then(|v| v.as_str()).unwrap_or("");
225
226 if content.is_empty() {
227 return Ok(());
228 }
229
230 if from == "user" || from.is_empty() {
232 let target = if !to.is_empty() && to != "io-agent" {
236 Some(to.to_string())
237 } else {
238 content
239 .split_whitespace()
240 .next()
241 .filter(|w| w.starts_with('@'))
242 .map(|w| w[1..].to_string())
243 };
244 if let Some(agent_name) = target {
245 let excerpt = if content.len() > 60 {
246 &content[..60]
247 } else {
248 content
249 };
250 self.pending
251 .insert(agent_name, (excerpt.to_string(), Self::now_ms()));
252 }
253 } else if to == "user" || to.is_empty() {
254 if !from.is_empty() {
256 self.pending.remove(from);
257 }
258 }
259
260 if let Some((category, severity)) = Self::check_payload_fields(from, content) {
262 tracing::warn!("[QA] flag: {category} | from={from} | {:.60}", content);
263 self.publish_flag(&category, severity, from, content);
264 }
265
266 Ok(())
267 }
268
269 async fn on_heartbeat(&mut self) -> Result<()> {
270 let now = Self::now_ms();
272 let stale: Vec<(String, String)> = self
273 .pending
274 .iter()
275 .filter(|(_, (_, sent_at))| now.saturating_sub(*sent_at) >= AGENT_RESPONSE_TIMEOUT_MS)
276 .map(|(agent, (excerpt, _))| (agent.clone(), excerpt.clone()))
277 .collect();
278
279 for (agent, excerpt) in stale {
280 tracing::warn!("[QA] no-response: agent={agent} | excerpt={:.60}", excerpt);
281 self.publish_flag("no-response", "warning", &agent, &excerpt);
282 self.pending.remove(&agent);
283 }
284
285 if let Some(pub_) = &self.publisher {
287 pub_.publish(
288 wactorz_mqtt::topics::heartbeat(&self.config.id),
289 &serde_json::json!({
290 "agentId": self.config.id,
291 "agentName": self.config.name,
292 "state": self.state,
293 "timestampMs": Self::now_ms(),
294 }),
295 );
296 }
297 Ok(())
298 }
299
300 async fn run(&mut self) -> Result<()> {
301 self.on_start().await?;
302 let mut rx = self
303 .mailbox_rx
304 .take()
305 .ok_or_else(|| anyhow::anyhow!("QAAgent already running"))?;
306 let mut hb = tokio::time::interval(std::time::Duration::from_secs(
307 self.config.heartbeat_interval_secs,
308 ));
309 hb.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
310 loop {
311 tokio::select! {
312 biased;
313 msg = rx.recv() => match msg {
314 None => break,
315 Some(m) => {
316 self.metrics.record_received();
317 if let wactorz_core::message::MessageType::Command {
318 command: wactorz_core::message::ActorCommand::Stop
319 } = &m.payload { break; }
320 match self.handle_message(m).await {
321 Ok(_) => self.metrics.record_processed(),
322 Err(e) => {
323 tracing::error!("[{}] {e}", self.config.name);
324 self.metrics.record_failed();
325 }
326 }
327 }
328 },
329 _ = hb.tick() => {
330 self.metrics.record_heartbeat();
331 if let Err(e) = self.on_heartbeat().await {
332 tracing::error!("[{}] heartbeat: {e}", self.config.name);
333 }
334 }
335 }
336 }
337 self.state = ActorState::Stopped;
338 self.on_stop().await
339 }
340}