wactorz_agents/
planner_agent.rs

1//! LLM-powered task planning agent.
2//!
3//! [`PlannerAgent`] decomposes multi-step goals into ordered action plans.
4//! It emits each step as a structured MQTT message so other agents can
5//! execute them sequentially or in parallel.
6
7use anyhow::Result;
8use async_trait::async_trait;
9use std::sync::Arc;
10use tokio::sync::mpsc;
11
12use crate::llm_agent::{LlmAgent, LlmConfig};
13use wactorz_core::{Actor, ActorConfig, ActorMetrics, ActorState, EventPublisher, Message};
14
15const SYSTEM_PROMPT: &str = "\
16You are a task planning expert. Given a goal, break it into clear, ordered steps.\n\
17Output ONLY a numbered list, one step per line. Each step should be atomic and actionable.\n\
18Example:\n\
191. Collect current weather data for the target city\n\
202. Retrieve the 5-day forecast\n\
213. Summarise findings in a user-friendly format\n\
22Keep steps concise. Do not include explanation or preamble.";
23
24pub struct PlannerAgent {
25    config: ActorConfig,
26    llm: LlmAgent,
27    state: ActorState,
28    metrics: Arc<ActorMetrics>,
29    mailbox_tx: mpsc::Sender<Message>,
30    mailbox_rx: Option<mpsc::Receiver<Message>>,
31    publisher: Option<EventPublisher>,
32}
33
34impl PlannerAgent {
35    pub fn new(config: ActorConfig, llm_config: LlmConfig) -> Self {
36        let mut lc = llm_config;
37        lc.system_prompt = Some(SYSTEM_PROMPT.to_string());
38        let llm_cfg = ActorConfig::new(format!("{}-llm", config.name));
39        let (tx, rx) = mpsc::channel(config.mailbox_capacity);
40        Self {
41            config,
42            llm: LlmAgent::new(llm_cfg, lc),
43            state: ActorState::Initializing,
44            metrics: Arc::new(ActorMetrics::new()),
45            mailbox_tx: tx,
46            mailbox_rx: Some(rx),
47            publisher: None,
48        }
49    }
50
51    pub fn with_publisher(mut self, p: EventPublisher) -> Self {
52        self.publisher = Some(p);
53        self
54    }
55
56    /// Parse numbered-list plan response into a `Vec<String>`.
57    fn parse_steps(text: &str) -> Vec<String> {
58        text.lines()
59            .filter_map(|line| {
60                let l = line.trim();
61                // Strip leading "1." / "1)" / "- " etc.
62                let stripped = l
63                    .trim_start_matches(|c: char| c.is_ascii_digit())
64                    .trim_start_matches(['.', ')', ' '])
65                    .trim();
66                if stripped.is_empty() {
67                    None
68                } else {
69                    Some(stripped.to_string())
70                }
71            })
72            .collect()
73    }
74
75    fn now_ms() -> u64 {
76        std::time::SystemTime::now()
77            .duration_since(std::time::UNIX_EPOCH)
78            .unwrap_or_default()
79            .as_millis() as u64
80    }
81}
82
83#[async_trait]
84impl Actor for PlannerAgent {
85    fn id(&self) -> String {
86        self.config.id.clone()
87    }
88    fn name(&self) -> &str {
89        &self.config.name
90    }
91    fn state(&self) -> ActorState {
92        self.state.clone()
93    }
94    fn metrics(&self) -> Arc<ActorMetrics> {
95        Arc::clone(&self.metrics)
96    }
97    fn mailbox(&self) -> mpsc::Sender<Message> {
98        self.mailbox_tx.clone()
99    }
100
101    async fn on_start(&mut self) -> Result<()> {
102        self.state = ActorState::Running;
103        if let Some(pub_) = &self.publisher {
104            pub_.publish(
105                wactorz_mqtt::topics::spawn(&self.config.id),
106                &serde_json::json!({
107                    "agentId":   self.config.id,
108                    "agentName": self.config.name,
109                    "agentType": "planner",
110                    "timestampMs": Self::now_ms(),
111                }),
112            );
113        }
114        Ok(())
115    }
116
117    async fn handle_message(&mut self, message: Message) -> Result<()> {
118        use wactorz_core::message::MessageType;
119        let goal = match &message.payload {
120            MessageType::Text { content } => content.clone(),
121            MessageType::Task { description, .. } => description.clone(),
122            _ => return Ok(()),
123        };
124
125        let plan_text = self
126            .llm
127            .complete(&goal)
128            .await
129            .unwrap_or_else(|e| format!("LLM error: {e}"));
130
131        let steps = Self::parse_steps(&plan_text);
132
133        if let Some(pub_) = &self.publisher {
134            // Publish the full plan as a chat message
135            pub_.publish(
136                wactorz_mqtt::topics::chat(&self.config.id),
137                &serde_json::json!({
138                    "from":        self.config.name,
139                    "to":          message.from.as_deref().unwrap_or("user"),
140                    "content":     plan_text,
141                    "steps":       steps,
142                    "goal":        goal,
143                    "timestampMs": Self::now_ms(),
144                }),
145            );
146        }
147        Ok(())
148    }
149
150    async fn on_heartbeat(&mut self) -> Result<()> {
151        if let Some(pub_) = &self.publisher {
152            let snap = self.llm.metrics().snapshot();
153            pub_.publish(
154                wactorz_mqtt::topics::heartbeat(&self.config.id),
155                &serde_json::json!({
156                    "agentId":         self.config.id,
157                    "agentName":       self.config.name,
158                    "state":           self.state,
159                    "llmInputTokens":  snap.llm_input_tokens,
160                    "llmOutputTokens": snap.llm_output_tokens,
161                    "llmCostUsd":      snap.llm_cost_usd,
162                    "timestampMs":     Self::now_ms(),
163                }),
164            );
165        }
166        Ok(())
167    }
168
169    async fn run(&mut self) -> Result<()> {
170        self.on_start().await?;
171        let mut rx = self
172            .mailbox_rx
173            .take()
174            .ok_or_else(|| anyhow::anyhow!("PlannerAgent already running"))?;
175        let mut hb = tokio::time::interval(std::time::Duration::from_secs(
176            self.config.heartbeat_interval_secs,
177        ));
178        hb.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
179        loop {
180            tokio::select! {
181                biased;
182                msg = rx.recv() => {
183                    match msg {
184                        None    => break,
185                        Some(m) => {
186                            self.metrics.record_received();
187                            if let wactorz_core::message::MessageType::Command {
188                                command: wactorz_core::message::ActorCommand::Stop
189                            } = &m.payload { break; }
190                            match self.handle_message(m).await {
191                                Ok(_)  => self.metrics.record_processed(),
192                                Err(e) => { tracing::error!("[{}] {e}", self.config.name); self.metrics.record_failed(); }
193                            }
194                        }
195                    }
196                }
197                _ = hb.tick() => {
198                    self.metrics.record_heartbeat();
199                    if let Err(e) = self.on_heartbeat().await {
200                        tracing::error!("[{}] heartbeat: {e}", self.config.name);
201                    }
202                }
203            }
204        }
205        self.state = ActorState::Stopped;
206        self.on_stop().await
207    }
208}