wactorz_agents/
manual_agent.rs1use anyhow::Result;
8use async_trait::async_trait;
9use std::sync::Arc;
10use tokio::sync::mpsc;
11
12use crate::llm_agent::{LlmAgent, LlmConfig};
13use wactorz_core::{Actor, ActorConfig, ActorMetrics, ActorState, EventPublisher, Message};
14
15const SYSTEM_PROMPT: &str = "\
16You are a technical documentation and device manual expert. \
17You help users understand how to use, configure, and troubleshoot devices and software. \
18When answering:\n\
19- Cite specific manual sections or page numbers when you know them\n\
20- Provide step-by-step instructions when applicable\n\
21- Flag safety warnings prominently\n\
22- If you don't know the answer with confidence, say so clearly\n\
23- Suggest searching the official manufacturer documentation if needed";
24
25pub struct ManualAgent {
26 config: ActorConfig,
27 llm: LlmAgent,
28 state: ActorState,
29 metrics: Arc<ActorMetrics>,
30 mailbox_tx: mpsc::Sender<Message>,
31 mailbox_rx: Option<mpsc::Receiver<Message>>,
32 publisher: Option<EventPublisher>,
33}
34
35impl ManualAgent {
36 pub fn new(config: ActorConfig, llm_config: LlmConfig) -> Self {
37 let mut lc = llm_config;
38 lc.system_prompt = Some(SYSTEM_PROMPT.to_string());
39 let llm_cfg = ActorConfig::new(format!("{}-llm", config.name));
40 let (tx, rx) = mpsc::channel(config.mailbox_capacity);
41 Self {
42 config,
43 llm: LlmAgent::new(llm_cfg, lc),
44 state: ActorState::Initializing,
45 metrics: Arc::new(ActorMetrics::new()),
46 mailbox_tx: tx,
47 mailbox_rx: Some(rx),
48 publisher: None,
49 }
50 }
51
52 pub fn with_publisher(mut self, p: EventPublisher) -> Self {
53 self.publisher = Some(p);
54 self
55 }
56
57 fn now_ms() -> u64 {
58 std::time::SystemTime::now()
59 .duration_since(std::time::UNIX_EPOCH)
60 .unwrap_or_default()
61 .as_millis() as u64
62 }
63}
64
65#[async_trait]
66impl Actor for ManualAgent {
67 fn id(&self) -> String {
68 self.config.id.clone()
69 }
70 fn name(&self) -> &str {
71 &self.config.name
72 }
73 fn state(&self) -> ActorState {
74 self.state.clone()
75 }
76 fn metrics(&self) -> Arc<ActorMetrics> {
77 Arc::clone(&self.metrics)
78 }
79 fn mailbox(&self) -> mpsc::Sender<Message> {
80 self.mailbox_tx.clone()
81 }
82
83 async fn on_start(&mut self) -> Result<()> {
84 self.state = ActorState::Running;
85 if let Some(pub_) = &self.publisher {
86 pub_.publish(
87 wactorz_mqtt::topics::spawn(&self.config.id),
88 &serde_json::json!({
89 "agentId": self.config.id,
90 "agentName": self.config.name,
91 "agentType": "manual",
92 "timestampMs": Self::now_ms(),
93 }),
94 );
95 }
96 Ok(())
97 }
98
99 async fn handle_message(&mut self, message: Message) -> Result<()> {
100 use wactorz_core::message::MessageType;
101 let text = match &message.payload {
102 MessageType::Text { content } => content.clone(),
103 MessageType::Task { description, .. } => description.clone(),
104 _ => return Ok(()),
105 };
106
107 let response = self
108 .llm
109 .complete(&text)
110 .await
111 .unwrap_or_else(|e| format!("LLM error: {e}"));
112
113 if let Some(pub_) = &self.publisher {
114 pub_.publish(
115 wactorz_mqtt::topics::chat(&self.config.id),
116 &serde_json::json!({
117 "from": self.config.name,
118 "to": message.from.as_deref().unwrap_or("user"),
119 "content": response,
120 "timestampMs": Self::now_ms(),
121 }),
122 );
123 }
124 Ok(())
125 }
126
127 async fn on_heartbeat(&mut self) -> Result<()> {
128 if let Some(pub_) = &self.publisher {
129 let snap = self.llm.metrics().snapshot();
130 pub_.publish(
131 wactorz_mqtt::topics::heartbeat(&self.config.id),
132 &serde_json::json!({
133 "agentId": self.config.id,
134 "agentName": self.config.name,
135 "state": self.state,
136 "llmInputTokens": snap.llm_input_tokens,
137 "llmOutputTokens": snap.llm_output_tokens,
138 "llmCostUsd": snap.llm_cost_usd,
139 "timestampMs": Self::now_ms(),
140 }),
141 );
142 }
143 Ok(())
144 }
145
146 async fn run(&mut self) -> Result<()> {
147 self.on_start().await?;
148 let mut rx = self
149 .mailbox_rx
150 .take()
151 .ok_or_else(|| anyhow::anyhow!("ManualAgent already running"))?;
152 let mut hb = tokio::time::interval(std::time::Duration::from_secs(
153 self.config.heartbeat_interval_secs,
154 ));
155 hb.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
156 loop {
157 tokio::select! {
158 biased;
159 msg = rx.recv() => {
160 match msg {
161 None => break,
162 Some(m) => {
163 self.metrics.record_received();
164 if let wactorz_core::message::MessageType::Command {
165 command: wactorz_core::message::ActorCommand::Stop
166 } = &m.payload { break; }
167 match self.handle_message(m).await {
168 Ok(_) => self.metrics.record_processed(),
169 Err(e) => { tracing::error!("[{}] {e}", self.config.name); self.metrics.record_failed(); }
170 }
171 }
172 }
173 }
174 _ = hb.tick() => {
175 self.metrics.record_heartbeat();
176 if let Err(e) = self.on_heartbeat().await {
177 tracing::error!("[{}] heartbeat: {e}", self.config.name);
178 }
179 }
180 }
181 }
182 self.state = ActorState::Stopped;
183 self.on_stop().await
184 }
185}