wactorz_agents/
manual_agent.rs

1//! Device manual / documentation lookup agent.
2//!
3//! [`ManualAgent`] uses an LLM to answer questions about device manuals,
4//! datasheets, and technical documentation.  It can also search a local
5//! document store if one is configured.
6
7use anyhow::Result;
8use async_trait::async_trait;
9use std::sync::Arc;
10use tokio::sync::mpsc;
11
12use crate::llm_agent::{LlmAgent, LlmConfig};
13use wactorz_core::{Actor, ActorConfig, ActorMetrics, ActorState, EventPublisher, Message};
14
15const SYSTEM_PROMPT: &str = "\
16You are a technical documentation and device manual expert. \
17You help users understand how to use, configure, and troubleshoot devices and software. \
18When answering:\n\
19- Cite specific manual sections or page numbers when you know them\n\
20- Provide step-by-step instructions when applicable\n\
21- Flag safety warnings prominently\n\
22- If you don't know the answer with confidence, say so clearly\n\
23- Suggest searching the official manufacturer documentation if needed";
24
25pub struct ManualAgent {
26    config: ActorConfig,
27    llm: LlmAgent,
28    state: ActorState,
29    metrics: Arc<ActorMetrics>,
30    mailbox_tx: mpsc::Sender<Message>,
31    mailbox_rx: Option<mpsc::Receiver<Message>>,
32    publisher: Option<EventPublisher>,
33}
34
35impl ManualAgent {
36    pub fn new(config: ActorConfig, llm_config: LlmConfig) -> Self {
37        let mut lc = llm_config;
38        lc.system_prompt = Some(SYSTEM_PROMPT.to_string());
39        let llm_cfg = ActorConfig::new(format!("{}-llm", config.name));
40        let (tx, rx) = mpsc::channel(config.mailbox_capacity);
41        Self {
42            config,
43            llm: LlmAgent::new(llm_cfg, lc),
44            state: ActorState::Initializing,
45            metrics: Arc::new(ActorMetrics::new()),
46            mailbox_tx: tx,
47            mailbox_rx: Some(rx),
48            publisher: None,
49        }
50    }
51
52    pub fn with_publisher(mut self, p: EventPublisher) -> Self {
53        self.publisher = Some(p);
54        self
55    }
56
57    fn now_ms() -> u64 {
58        std::time::SystemTime::now()
59            .duration_since(std::time::UNIX_EPOCH)
60            .unwrap_or_default()
61            .as_millis() as u64
62    }
63}
64
65#[async_trait]
66impl Actor for ManualAgent {
67    fn id(&self) -> String {
68        self.config.id.clone()
69    }
70    fn name(&self) -> &str {
71        &self.config.name
72    }
73    fn state(&self) -> ActorState {
74        self.state.clone()
75    }
76    fn metrics(&self) -> Arc<ActorMetrics> {
77        Arc::clone(&self.metrics)
78    }
79    fn mailbox(&self) -> mpsc::Sender<Message> {
80        self.mailbox_tx.clone()
81    }
82
83    async fn on_start(&mut self) -> Result<()> {
84        self.state = ActorState::Running;
85        if let Some(pub_) = &self.publisher {
86            pub_.publish(
87                wactorz_mqtt::topics::spawn(&self.config.id),
88                &serde_json::json!({
89                    "agentId":   self.config.id,
90                    "agentName": self.config.name,
91                    "agentType": "manual",
92                    "timestampMs": Self::now_ms(),
93                }),
94            );
95        }
96        Ok(())
97    }
98
99    async fn handle_message(&mut self, message: Message) -> Result<()> {
100        use wactorz_core::message::MessageType;
101        let text = match &message.payload {
102            MessageType::Text { content } => content.clone(),
103            MessageType::Task { description, .. } => description.clone(),
104            _ => return Ok(()),
105        };
106
107        let response = self
108            .llm
109            .complete(&text)
110            .await
111            .unwrap_or_else(|e| format!("LLM error: {e}"));
112
113        if let Some(pub_) = &self.publisher {
114            pub_.publish(
115                wactorz_mqtt::topics::chat(&self.config.id),
116                &serde_json::json!({
117                    "from":      self.config.name,
118                    "to":        message.from.as_deref().unwrap_or("user"),
119                    "content":   response,
120                    "timestampMs": Self::now_ms(),
121                }),
122            );
123        }
124        Ok(())
125    }
126
127    async fn on_heartbeat(&mut self) -> Result<()> {
128        if let Some(pub_) = &self.publisher {
129            let snap = self.llm.metrics().snapshot();
130            pub_.publish(
131                wactorz_mqtt::topics::heartbeat(&self.config.id),
132                &serde_json::json!({
133                    "agentId":         self.config.id,
134                    "agentName":       self.config.name,
135                    "state":           self.state,
136                    "llmInputTokens":  snap.llm_input_tokens,
137                    "llmOutputTokens": snap.llm_output_tokens,
138                    "llmCostUsd":      snap.llm_cost_usd,
139                    "timestampMs":     Self::now_ms(),
140                }),
141            );
142        }
143        Ok(())
144    }
145
146    async fn run(&mut self) -> Result<()> {
147        self.on_start().await?;
148        let mut rx = self
149            .mailbox_rx
150            .take()
151            .ok_or_else(|| anyhow::anyhow!("ManualAgent already running"))?;
152        let mut hb = tokio::time::interval(std::time::Duration::from_secs(
153            self.config.heartbeat_interval_secs,
154        ));
155        hb.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
156        loop {
157            tokio::select! {
158                biased;
159                msg = rx.recv() => {
160                    match msg {
161                        None    => break,
162                        Some(m) => {
163                            self.metrics.record_received();
164                            if let wactorz_core::message::MessageType::Command {
165                                command: wactorz_core::message::ActorCommand::Stop
166                            } = &m.payload { break; }
167                            match self.handle_message(m).await {
168                                Ok(_)  => self.metrics.record_processed(),
169                                Err(e) => { tracing::error!("[{}] {e}", self.config.name); self.metrics.record_failed(); }
170                            }
171                        }
172                    }
173                }
174                _ = hb.tick() => {
175                    self.metrics.record_heartbeat();
176                    if let Err(e) = self.on_heartbeat().await {
177                        tracing::error!("[{}] heartbeat: {e}", self.config.name);
178                    }
179                }
180            }
181        }
182        self.state = ActorState::Stopped;
183        self.on_stop().await
184    }
185}