wactorz_agents/
weather_agent.rs

1//! Weather information agent.
2//!
3//! [`WeatherAgent`] fetches current weather conditions on demand using the
4//! free [wttr.in](https://wttr.in) service — **no API key required**.
5//!
6//! ## Usage (via IO bar)
7//!
8//! ```text
9//! @weather-agent                  → weather for default location (WEATHER_DEFAULT_LOCATION or "London")
10//! @weather-agent Tokyo            → weather for Tokyo
11//! @weather-agent New York         → weather for New York
12//! @weather-agent help             → show usage
13//! ```
14//!
15//! The agent does **not** poll; it only fetches when it receives a message.
16//! It is stoppable and pausable — consumes no resources when idle.
17
18use anyhow::Result;
19use async_trait::async_trait;
20use std::sync::Arc;
21use tokio::sync::mpsc;
22
23use wactorz_core::{Actor, ActorConfig, ActorMetrics, ActorState, EventPublisher, Message};
24
25/// Default location used when the user sends `@weather-agent` with no argument.
26const DEFAULT_LOCATION_ENV: &str = "WEATHER_DEFAULT_LOCATION";
27const DEFAULT_LOCATION_FALLBACK: &str = "London";
28
29/// Idle timeout for the reqwest client.
30const HTTP_TIMEOUT_SECS: u64 = 10;
31
32pub struct WeatherAgent {
33    config: ActorConfig,
34    state: ActorState,
35    metrics: Arc<ActorMetrics>,
36    mailbox_tx: mpsc::Sender<Message>,
37    mailbox_rx: Option<mpsc::Receiver<Message>>,
38    publisher: Option<EventPublisher>,
39    http: reqwest::Client,
40}
41
42impl WeatherAgent {
43    pub fn new(config: ActorConfig) -> Self {
44        let (tx, rx) = mpsc::channel(config.mailbox_capacity);
45        let http = reqwest::Client::builder()
46            .timeout(std::time::Duration::from_secs(HTTP_TIMEOUT_SECS))
47            .user_agent("AgentFlow-WeatherAgent/1.0")
48            .build()
49            .unwrap_or_default();
50        Self {
51            config,
52            state: ActorState::Initializing,
53            metrics: Arc::new(ActorMetrics::new()),
54            mailbox_tx: tx,
55            mailbox_rx: Some(rx),
56            publisher: None,
57            http,
58        }
59    }
60
61    pub fn with_publisher(mut self, p: EventPublisher) -> Self {
62        self.publisher = Some(p);
63        self
64    }
65
66    fn now_ms() -> u64 {
67        std::time::SystemTime::now()
68            .duration_since(std::time::UNIX_EPOCH)
69            .unwrap_or_default()
70            .as_millis() as u64
71    }
72
73    fn default_location() -> String {
74        std::env::var(DEFAULT_LOCATION_ENV)
75            .unwrap_or_else(|_| DEFAULT_LOCATION_FALLBACK.to_string())
76    }
77
78    fn reply(&self, content: &str) {
79        if let Some(pub_) = &self.publisher {
80            pub_.publish(
81                wactorz_mqtt::topics::chat(&self.config.id),
82                &serde_json::json!({
83                    "from":        self.config.name,
84                    "to":          "user",
85                    "content":     content,
86                    "timestampMs": Self::now_ms(),
87                }),
88            );
89        }
90    }
91
92    async fn fetch_weather(&self, location: &str) -> Result<String> {
93        // wttr.in format=j1 returns JSON with current conditions.
94        let url = format!("https://wttr.in/{}?format=j1", urlencoding(location));
95
96        let resp = self.http.get(&url).send().await?;
97        if !resp.status().is_success() {
98            // Fallback to one-line format on error
99            let url2 = format!("https://wttr.in/{}?format=3", urlencoding(location));
100            let r2 = self.http.get(&url2).send().await?;
101            return Ok(r2.text().await?.trim().to_string());
102        }
103
104        let json: serde_json::Value = resp.json().await?;
105
106        // Parse the JSON response
107        let current = json
108            .get("current_condition")
109            .and_then(|a| a.as_array())
110            .and_then(|a| a.first())
111            .cloned()
112            .unwrap_or_default();
113
114        let desc = current
115            .get("weatherDesc")
116            .and_then(|a| a.as_array())
117            .and_then(|a| a.first())
118            .and_then(|v| v.get("value"))
119            .and_then(|v| v.as_str())
120            .unwrap_or("Unknown");
121
122        let temp_c = current
123            .get("temp_C")
124            .and_then(|v| v.as_str())
125            .unwrap_or("?");
126        let temp_f = current
127            .get("temp_F")
128            .and_then(|v| v.as_str())
129            .unwrap_or("?");
130        let feels_c = current
131            .get("FeelsLikeC")
132            .and_then(|v| v.as_str())
133            .unwrap_or("?");
134        let humidity = current
135            .get("humidity")
136            .and_then(|v| v.as_str())
137            .unwrap_or("?");
138        let wind_kmph = current
139            .get("windspeedKmph")
140            .and_then(|v| v.as_str())
141            .unwrap_or("?");
142        let wind_dir = current
143            .get("winddir16Point")
144            .and_then(|v| v.as_str())
145            .unwrap_or("?");
146        let uv = current
147            .get("uvIndex")
148            .and_then(|v| v.as_str())
149            .unwrap_or("?");
150        let visibility = current
151            .get("visibility")
152            .and_then(|v| v.as_str())
153            .unwrap_or("?");
154
155        // Nearest area name
156        let area = json
157            .get("nearest_area")
158            .and_then(|a| a.as_array())
159            .and_then(|a| a.first())
160            .and_then(|v| v.get("areaName"))
161            .and_then(|a| a.as_array())
162            .and_then(|a| a.first())
163            .and_then(|v| v.get("value"))
164            .and_then(|v| v.as_str())
165            .unwrap_or(location);
166
167        Ok(format!(
168            "**Weather in {area}**\n\n\
169             🌡 **{temp_c}°C / {temp_f}°F** (feels like {feels_c}°C)\n\
170             ☁ {desc}\n\
171             💧 Humidity: {humidity}%\n\
172             💨 Wind: {wind_kmph} km/h {wind_dir}\n\
173             👁 Visibility: {visibility} km\n\
174             ☀ UV index: {uv}\n\n\
175             *Data: [wttr.in](https://wttr.in/{loc})*",
176            loc = urlencoding(location)
177        ))
178    }
179}
180
181/// Minimal URL percent-encoding for location names.
182fn urlencoding(s: &str) -> String {
183    s.chars()
184        .flat_map(|c| match c {
185            ' ' => vec!['+'],
186            c if c.is_alphanumeric() || matches!(c, '-' | '_' | '.' | ',') => vec![c],
187            c => format!("%{:02X}", c as u32).chars().collect(),
188        })
189        .collect()
190}
191
192#[async_trait]
193impl Actor for WeatherAgent {
194    fn id(&self) -> String {
195        self.config.id.clone()
196    }
197    fn name(&self) -> &str {
198        &self.config.name
199    }
200    fn state(&self) -> ActorState {
201        self.state.clone()
202    }
203    fn metrics(&self) -> Arc<ActorMetrics> {
204        Arc::clone(&self.metrics)
205    }
206    fn mailbox(&self) -> mpsc::Sender<Message> {
207        self.mailbox_tx.clone()
208    }
209    fn is_protected(&self) -> bool {
210        self.config.protected
211    }
212
213    async fn on_start(&mut self) -> Result<()> {
214        self.state = ActorState::Running;
215        if let Some(pub_) = &self.publisher {
216            pub_.publish(
217                wactorz_mqtt::topics::spawn(&self.config.id),
218                &serde_json::json!({
219                    "agentId":   self.config.id,
220                    "agentName": self.config.name,
221                    "agentType": "data",
222                    "timestampMs": Self::now_ms(),
223                }),
224            );
225        }
226        Ok(())
227    }
228
229    async fn handle_message(&mut self, message: Message) -> Result<()> {
230        use wactorz_core::message::MessageType;
231
232        let content = match &message.payload {
233            MessageType::Text { content } => content.trim().to_string(),
234            MessageType::Task { description, .. } => description.trim().to_string(),
235            _ => return Ok(()),
236        };
237
238        // Strip @weather-agent prefix if present
239        let arg = content
240            .strip_prefix("@weather-agent")
241            .unwrap_or(&content)
242            .trim()
243            .to_string();
244
245        match arg.to_lowercase().as_str() {
246            "" => {
247                let loc = Self::default_location();
248                let typing = format!("🌦 Fetching weather for **{loc}**…");
249                self.reply(&typing);
250                match self.fetch_weather(&loc).await {
251                    Ok(report) => self.reply(&report),
252                    Err(e) => self.reply(&format!("⚠ Could not fetch weather: {e}")),
253                }
254            }
255            "help" => {
256                let default = Self::default_location();
257                self.reply(&format!(
258                    "**WeatherAgent** — current conditions via wttr.in (no API key needed)\n\n\
259                     ```\n\
260                     @weather-agent              # {default} (default)\n\
261                     @weather-agent Tokyo\n\
262                     @weather-agent New York\n\
263                     @weather-agent 48.8566,2.3522  # coordinates\n\
264                     ```\n\
265                     Set `WEATHER_DEFAULT_LOCATION` in `.env` to change the default."
266                ));
267            }
268            location => {
269                let typing = format!("🌦 Fetching weather for **{location}**…");
270                self.reply(&typing);
271                match self.fetch_weather(location).await {
272                    Ok(report) => self.reply(&report),
273                    Err(e) => {
274                        self.reply(&format!("⚠ Could not fetch weather for '{location}': {e}"))
275                    }
276                }
277            }
278        }
279
280        Ok(())
281    }
282
283    async fn on_heartbeat(&mut self) -> Result<()> {
284        if let Some(pub_) = &self.publisher {
285            pub_.publish(
286                wactorz_mqtt::topics::heartbeat(&self.config.id),
287                &serde_json::json!({
288                    "agentId":   self.config.id,
289                    "agentName": self.config.name,
290                    "state":     self.state,
291                    "timestampMs": Self::now_ms(),
292                }),
293            );
294        }
295        Ok(())
296    }
297
298    async fn run(&mut self) -> Result<()> {
299        self.on_start().await?;
300        let mut rx = self
301            .mailbox_rx
302            .take()
303            .ok_or_else(|| anyhow::anyhow!("WeatherAgent already running"))?;
304        let mut hb = tokio::time::interval(std::time::Duration::from_secs(
305            self.config.heartbeat_interval_secs,
306        ));
307        hb.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
308        loop {
309            tokio::select! {
310                biased;
311                msg = rx.recv() => match msg {
312                    None => break,
313                    Some(m) => {
314                        self.metrics.record_received();
315                        if let wactorz_core::message::MessageType::Command {
316                            command: wactorz_core::message::ActorCommand::Stop
317                        } = &m.payload { break; }
318                        match self.handle_message(m).await {
319                            Ok(_)  => self.metrics.record_processed(),
320                            Err(e) => {
321                                tracing::error!("[{}] {e}", self.config.name);
322                                self.metrics.record_failed();
323                            }
324                        }
325                    }
326                },
327                _ = hb.tick() => {
328                    self.metrics.record_heartbeat();
329                    if let Err(e) = self.on_heartbeat().await {
330                        tracing::error!("[{}] heartbeat: {e}", self.config.name);
331                    }
332                }
333            }
334        }
335        self.state = ActorState::Stopped;
336        self.on_stop().await
337    }
338}