wactorz_agents/
planner_agent.rs1use anyhow::Result;
8use async_trait::async_trait;
9use std::sync::Arc;
10use tokio::sync::mpsc;
11
12use crate::llm_agent::{LlmAgent, LlmConfig};
13use wactorz_core::{Actor, ActorConfig, ActorMetrics, ActorState, EventPublisher, Message};
14
15const SYSTEM_PROMPT: &str = "\
16You are a task planning expert. Given a goal, break it into clear, ordered steps.\n\
17Output ONLY a numbered list, one step per line. Each step should be atomic and actionable.\n\
18Example:\n\
191. Collect current weather data for the target city\n\
202. Retrieve the 5-day forecast\n\
213. Summarise findings in a user-friendly format\n\
22Keep steps concise. Do not include explanation or preamble.";
23
24pub struct PlannerAgent {
25 config: ActorConfig,
26 llm: LlmAgent,
27 state: ActorState,
28 metrics: Arc<ActorMetrics>,
29 mailbox_tx: mpsc::Sender<Message>,
30 mailbox_rx: Option<mpsc::Receiver<Message>>,
31 publisher: Option<EventPublisher>,
32}
33
34impl PlannerAgent {
35 pub fn new(config: ActorConfig, llm_config: LlmConfig) -> Self {
36 let mut lc = llm_config;
37 lc.system_prompt = Some(SYSTEM_PROMPT.to_string());
38 let llm_cfg = ActorConfig::new(format!("{}-llm", config.name));
39 let (tx, rx) = mpsc::channel(config.mailbox_capacity);
40 Self {
41 config,
42 llm: LlmAgent::new(llm_cfg, lc),
43 state: ActorState::Initializing,
44 metrics: Arc::new(ActorMetrics::new()),
45 mailbox_tx: tx,
46 mailbox_rx: Some(rx),
47 publisher: None,
48 }
49 }
50
51 pub fn with_publisher(mut self, p: EventPublisher) -> Self {
52 self.publisher = Some(p);
53 self
54 }
55
56 fn parse_steps(text: &str) -> Vec<String> {
58 text.lines()
59 .filter_map(|line| {
60 let l = line.trim();
61 let stripped = l
63 .trim_start_matches(|c: char| c.is_ascii_digit())
64 .trim_start_matches(['.', ')', ' '])
65 .trim();
66 if stripped.is_empty() {
67 None
68 } else {
69 Some(stripped.to_string())
70 }
71 })
72 .collect()
73 }
74
75 fn now_ms() -> u64 {
76 std::time::SystemTime::now()
77 .duration_since(std::time::UNIX_EPOCH)
78 .unwrap_or_default()
79 .as_millis() as u64
80 }
81}
82
83#[async_trait]
84impl Actor for PlannerAgent {
85 fn id(&self) -> String {
86 self.config.id.clone()
87 }
88 fn name(&self) -> &str {
89 &self.config.name
90 }
91 fn state(&self) -> ActorState {
92 self.state.clone()
93 }
94 fn metrics(&self) -> Arc<ActorMetrics> {
95 Arc::clone(&self.metrics)
96 }
97 fn mailbox(&self) -> mpsc::Sender<Message> {
98 self.mailbox_tx.clone()
99 }
100
101 async fn on_start(&mut self) -> Result<()> {
102 self.state = ActorState::Running;
103 if let Some(pub_) = &self.publisher {
104 pub_.publish(
105 wactorz_mqtt::topics::spawn(&self.config.id),
106 &serde_json::json!({
107 "agentId": self.config.id,
108 "agentName": self.config.name,
109 "agentType": "planner",
110 "timestampMs": Self::now_ms(),
111 }),
112 );
113 }
114 Ok(())
115 }
116
117 async fn handle_message(&mut self, message: Message) -> Result<()> {
118 use wactorz_core::message::MessageType;
119 let goal = match &message.payload {
120 MessageType::Text { content } => content.clone(),
121 MessageType::Task { description, .. } => description.clone(),
122 _ => return Ok(()),
123 };
124
125 let plan_text = self
126 .llm
127 .complete(&goal)
128 .await
129 .unwrap_or_else(|e| format!("LLM error: {e}"));
130
131 let steps = Self::parse_steps(&plan_text);
132
133 if let Some(pub_) = &self.publisher {
134 pub_.publish(
136 wactorz_mqtt::topics::chat(&self.config.id),
137 &serde_json::json!({
138 "from": self.config.name,
139 "to": message.from.as_deref().unwrap_or("user"),
140 "content": plan_text,
141 "steps": steps,
142 "goal": goal,
143 "timestampMs": Self::now_ms(),
144 }),
145 );
146 }
147 Ok(())
148 }
149
150 async fn on_heartbeat(&mut self) -> Result<()> {
151 if let Some(pub_) = &self.publisher {
152 let snap = self.llm.metrics().snapshot();
153 pub_.publish(
154 wactorz_mqtt::topics::heartbeat(&self.config.id),
155 &serde_json::json!({
156 "agentId": self.config.id,
157 "agentName": self.config.name,
158 "state": self.state,
159 "llmInputTokens": snap.llm_input_tokens,
160 "llmOutputTokens": snap.llm_output_tokens,
161 "llmCostUsd": snap.llm_cost_usd,
162 "timestampMs": Self::now_ms(),
163 }),
164 );
165 }
166 Ok(())
167 }
168
169 async fn run(&mut self) -> Result<()> {
170 self.on_start().await?;
171 let mut rx = self
172 .mailbox_rx
173 .take()
174 .ok_or_else(|| anyhow::anyhow!("PlannerAgent already running"))?;
175 let mut hb = tokio::time::interval(std::time::Duration::from_secs(
176 self.config.heartbeat_interval_secs,
177 ));
178 hb.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
179 loop {
180 tokio::select! {
181 biased;
182 msg = rx.recv() => {
183 match msg {
184 None => break,
185 Some(m) => {
186 self.metrics.record_received();
187 if let wactorz_core::message::MessageType::Command {
188 command: wactorz_core::message::ActorCommand::Stop
189 } = &m.payload { break; }
190 match self.handle_message(m).await {
191 Ok(_) => self.metrics.record_processed(),
192 Err(e) => { tracing::error!("[{}] {e}", self.config.name); self.metrics.record_failed(); }
193 }
194 }
195 }
196 }
197 _ = hb.tick() => {
198 self.metrics.record_heartbeat();
199 if let Err(e) = self.on_heartbeat().await {
200 tracing::error!("[{}] heartbeat: {e}", self.config.name);
201 }
202 }
203 }
204 }
205 self.state = ActorState::Stopped;
206 self.on_stop().await
207 }
208}