wactorz_agents/
qa_agent.rs

1//! Quality-Assurance / Safety monitor agent.
2//!
3//! [`QAAgent`] is a passive observer: it receives a copy of every chat
4//! message flowing through the system and publishes a `system/qa-flag`
5//! alert when a policy violation or malfunction is detected.
6//!
7//! **Checks performed (rule-based, no LLM):**
8//!
9//! *Content checks (every message):*
10//! - Prompt-injection patterns in user messages
11//! - Agent error bleed-through (`script error:`, `rhai error:`, `(no output)`, …)
12//! - Raw JSON/data bleed (agent returned internal message structure)
13//! - Possible PII (email-like patterns) in any direction
14//!
15//! *Temporal checks (on every heartbeat tick):*
16//! - No-response tracking: if a user message is sent to an agent and no
17//!   reply arrives within `AGENT_RESPONSE_TIMEOUT_MS` (30 s), a `no-response`
18//!   flag is raised — exactly what triggered the math-agent 45 s timeout.
19//!
20//! The agent does NOT block messages; it only annotates.
21
22use anyhow::Result;
23use async_trait::async_trait;
24use std::collections::HashMap;
25use std::sync::Arc;
26use tokio::sync::mpsc;
27
28use wactorz_core::{Actor, ActorConfig, ActorMetrics, ActorState, EventPublisher, Message};
29
30/// How long (ms) before a user→agent request with no reply is flagged.
31const AGENT_RESPONSE_TIMEOUT_MS: u64 = 30_000;
32
33pub struct QAAgent {
34    config: ActorConfig,
35    state: ActorState,
36    metrics: Arc<ActorMetrics>,
37    mailbox_tx: mpsc::Sender<Message>,
38    mailbox_rx: Option<mpsc::Receiver<Message>>,
39    publisher: Option<EventPublisher>,
40    /// Pending user→agent requests awaiting a response.
41    /// key = agent name (from `to` field or parsed @mention)
42    /// value = (content excerpt, sent_at_ms)
43    pending: HashMap<String, (String, u64)>,
44}
45
46impl QAAgent {
47    pub fn new(config: ActorConfig) -> Self {
48        let (tx, rx) = mpsc::channel(config.mailbox_capacity);
49        Self {
50            config,
51            state: ActorState::Initializing,
52            metrics: Arc::new(ActorMetrics::new()),
53            mailbox_tx: tx,
54            mailbox_rx: Some(rx),
55            publisher: None,
56            pending: HashMap::new(),
57        }
58    }
59
60    pub fn with_publisher(mut self, p: EventPublisher) -> Self {
61        self.publisher = Some(p);
62        self
63    }
64
65    fn now_ms() -> u64 {
66        std::time::SystemTime::now()
67            .duration_since(std::time::UNIX_EPOCH)
68            .unwrap_or_default()
69            .as_millis() as u64
70    }
71
72    /// Inspect a chat message's fields and return `Some((category, severity))` if flagged.
73    fn check_payload_fields(from: &str, content: &str) -> Option<(String, &'static str)> {
74        if content.is_empty() {
75            return None;
76        }
77        let lower = content.to_lowercase();
78
79        // ── Prompt-injection (user → agent direction) ──────────────────────────
80        if from == "user" || from.is_empty() {
81            const INJECTION: &[&str] = &[
82                "ignore previous instructions",
83                "ignore your previous",
84                "forget all previous",
85                "forget your instructions",
86                "you are now",
87                "pretend you are",
88                "act as if you are",
89                "disregard all",
90                "override your instructions",
91                "new persona",
92                "system prompt",
93                "jailbreak",
94                "dan mode",
95            ];
96            for pat in INJECTION {
97                if lower.contains(pat) {
98                    return Some((format!("prompt-injection (matched: {pat})"), "warning"));
99                }
100            }
101        }
102
103        // ── Agent error / malfunction (agent → user direction) ────────────────
104        if from != "user" && !from.is_empty() {
105            const ERRORS: &[&str] = &[
106                "script error:",
107                "llm error:",
108                "rhai error:",
109                "panicked at",
110                "thread 'main' panicked",
111                "(no output)", // DynamicAgent fallback when script returns nothing
112                "script not compiled", // compile step was skipped or failed
113            ];
114            for pat in ERRORS {
115                if lower.contains(pat) {
116                    return Some((format!("agent-error-exposed ({pat})"), "error"));
117                }
118            }
119
120            // Raw JSON / data bleed — agent returned internal message structure.
121            // Heuristic: content is valid JSON object/array and longer than 20 chars.
122            let trimmed = content.trim_start();
123            if (trimmed.starts_with('{') || trimmed.starts_with('['))
124                && trimmed.len() > 20
125                && serde_json::from_str::<serde_json::Value>(trimmed).is_ok()
126            {
127                return Some(("raw-data-bleed".into(), "warning"));
128            }
129        }
130
131        // ── PII — email-like pattern (any direction) ───────────────────────────
132        // Avoid false-positives on @mention tokens (word starts with @)
133        for word in content.split_whitespace() {
134            if word.starts_with('@') {
135                continue;
136            }
137            if let Some(at_pos) = word.find('@') {
138                let after = &word[at_pos + 1..];
139                // email: has a dot, reasonable length, no slashes (not a URL fragment)
140                if after.contains('.') && after.len() >= 4 && !after.contains('/') {
141                    return Some(("pii-possible-email".into(), "info"));
142                }
143            }
144        }
145
146        None
147    }
148
149    fn publish_flag(&self, category: &str, severity: &str, from: &str, excerpt: &str) {
150        if let Some(pub_) = &self.publisher {
151            let snippet = if excerpt.len() > 80 {
152                &excerpt[..80]
153            } else {
154                excerpt
155            };
156            pub_.publish(
157                "system/qa-flag",
158                &serde_json::json!({
159                    "agentId":   self.config.id,
160                    "agentName": self.config.name,
161                    "from":      from,
162                    "category":  category,
163                    "severity":  severity,
164                    "excerpt":   snippet,
165                    "message":   format!("[QA/{category}] from={from}: {snippet}"),
166                    "timestampMs": Self::now_ms(),
167                }),
168            );
169        }
170    }
171}
172
173#[async_trait]
174impl Actor for QAAgent {
175    fn id(&self) -> String {
176        self.config.id.clone()
177    }
178    fn name(&self) -> &str {
179        &self.config.name
180    }
181    fn state(&self) -> ActorState {
182        self.state.clone()
183    }
184    fn metrics(&self) -> Arc<ActorMetrics> {
185        Arc::clone(&self.metrics)
186    }
187    fn mailbox(&self) -> mpsc::Sender<Message> {
188        self.mailbox_tx.clone()
189    }
190    fn is_protected(&self) -> bool {
191        self.config.protected
192    }
193
194    async fn on_start(&mut self) -> Result<()> {
195        self.state = ActorState::Running;
196        if let Some(pub_) = &self.publisher {
197            pub_.publish(
198                wactorz_mqtt::topics::spawn(&self.config.id),
199                &serde_json::json!({
200                    "agentId":   self.config.id,
201                    "agentName": self.config.name,
202                    "agentType": "guardian",
203                    "timestampMs": Self::now_ms(),
204                }),
205            );
206        }
207        Ok(())
208    }
209
210    async fn handle_message(&mut self, message: Message) -> Result<()> {
211        use wactorz_core::message::MessageType;
212        let payload_json = match &message.payload {
213            MessageType::Text { content } => content.clone(),
214            _ => return Ok(()),
215        };
216
217        // Parse the inner chat payload JSON once
218        let val: serde_json::Value = match serde_json::from_str(&payload_json) {
219            Ok(v) => v,
220            Err(_) => return Ok(()),
221        };
222        let from = val.get("from").and_then(|v| v.as_str()).unwrap_or("");
223        let to = val.get("to").and_then(|v| v.as_str()).unwrap_or("");
224        let content = val.get("content").and_then(|v| v.as_str()).unwrap_or("");
225
226        if content.is_empty() {
227            return Ok(());
228        }
229
230        // ── No-response tracking ───────────────────────────────────────────────
231        if from == "user" || from.is_empty() {
232            // Determine the target agent:
233            //   1. `to` field when it's a direct agent (not the io gateway)
234            //   2. Fallback: @mention at the start of content
235            let target = if !to.is_empty() && to != "io-agent" {
236                Some(to.to_string())
237            } else {
238                content
239                    .split_whitespace()
240                    .next()
241                    .filter(|w| w.starts_with('@'))
242                    .map(|w| w[1..].to_string())
243            };
244            if let Some(agent_name) = target {
245                let excerpt = if content.len() > 60 {
246                    &content[..60]
247                } else {
248                    content
249                };
250                self.pending
251                    .insert(agent_name, (excerpt.to_string(), Self::now_ms()));
252            }
253        } else if to == "user" || to.is_empty() {
254            // Agent replying to the user — resolve the pending entry for this agent
255            if !from.is_empty() {
256                self.pending.remove(from);
257            }
258        }
259
260        // ── Content checks ─────────────────────────────────────────────────────
261        if let Some((category, severity)) = Self::check_payload_fields(from, content) {
262            tracing::warn!("[QA] flag: {category} | from={from} | {:.60}", content);
263            self.publish_flag(&category, severity, from, content);
264        }
265
266        Ok(())
267    }
268
269    async fn on_heartbeat(&mut self) -> Result<()> {
270        // ── Check for unresponsive agents ──────────────────────────────────────
271        let now = Self::now_ms();
272        let stale: Vec<(String, String)> = self
273            .pending
274            .iter()
275            .filter(|(_, (_, sent_at))| now.saturating_sub(*sent_at) >= AGENT_RESPONSE_TIMEOUT_MS)
276            .map(|(agent, (excerpt, _))| (agent.clone(), excerpt.clone()))
277            .collect();
278
279        for (agent, excerpt) in stale {
280            tracing::warn!("[QA] no-response: agent={agent} | excerpt={:.60}", excerpt);
281            self.publish_flag("no-response", "warning", &agent, &excerpt);
282            self.pending.remove(&agent);
283        }
284
285        // ── Publish heartbeat ──────────────────────────────────────────────────
286        if let Some(pub_) = &self.publisher {
287            pub_.publish(
288                wactorz_mqtt::topics::heartbeat(&self.config.id),
289                &serde_json::json!({
290                    "agentId":   self.config.id,
291                    "agentName": self.config.name,
292                    "state":     self.state,
293                    "timestampMs": Self::now_ms(),
294                }),
295            );
296        }
297        Ok(())
298    }
299
300    async fn run(&mut self) -> Result<()> {
301        self.on_start().await?;
302        let mut rx = self
303            .mailbox_rx
304            .take()
305            .ok_or_else(|| anyhow::anyhow!("QAAgent already running"))?;
306        let mut hb = tokio::time::interval(std::time::Duration::from_secs(
307            self.config.heartbeat_interval_secs,
308        ));
309        hb.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
310        loop {
311            tokio::select! {
312                biased;
313                msg = rx.recv() => match msg {
314                    None => break,
315                    Some(m) => {
316                        self.metrics.record_received();
317                        if let wactorz_core::message::MessageType::Command {
318                            command: wactorz_core::message::ActorCommand::Stop
319                        } = &m.payload { break; }
320                        match self.handle_message(m).await {
321                            Ok(_) => self.metrics.record_processed(),
322                            Err(e) => {
323                                tracing::error!("[{}] {e}", self.config.name);
324                                self.metrics.record_failed();
325                            }
326                        }
327                    }
328                },
329                _ = hb.tick() => {
330                    self.metrics.record_heartbeat();
331                    if let Err(e) = self.on_heartbeat().await {
332                        tracing::error!("[{}] heartbeat: {e}", self.config.name);
333                    }
334                }
335            }
336        }
337        self.state = ActorState::Stopped;
338        self.on_stop().await
339    }
340}