wactorz_agents/
weather_agent.rs1use anyhow::Result;
19use async_trait::async_trait;
20use std::sync::Arc;
21use tokio::sync::mpsc;
22
23use wactorz_core::{Actor, ActorConfig, ActorMetrics, ActorState, EventPublisher, Message};
24
25const DEFAULT_LOCATION_ENV: &str = "WEATHER_DEFAULT_LOCATION";
27const DEFAULT_LOCATION_FALLBACK: &str = "London";
28
29const HTTP_TIMEOUT_SECS: u64 = 10;
31
32pub struct WeatherAgent {
33 config: ActorConfig,
34 state: ActorState,
35 metrics: Arc<ActorMetrics>,
36 mailbox_tx: mpsc::Sender<Message>,
37 mailbox_rx: Option<mpsc::Receiver<Message>>,
38 publisher: Option<EventPublisher>,
39 http: reqwest::Client,
40}
41
42impl WeatherAgent {
43 pub fn new(config: ActorConfig) -> Self {
44 let (tx, rx) = mpsc::channel(config.mailbox_capacity);
45 let http = reqwest::Client::builder()
46 .timeout(std::time::Duration::from_secs(HTTP_TIMEOUT_SECS))
47 .user_agent("AgentFlow-WeatherAgent/1.0")
48 .build()
49 .unwrap_or_default();
50 Self {
51 config,
52 state: ActorState::Initializing,
53 metrics: Arc::new(ActorMetrics::new()),
54 mailbox_tx: tx,
55 mailbox_rx: Some(rx),
56 publisher: None,
57 http,
58 }
59 }
60
61 pub fn with_publisher(mut self, p: EventPublisher) -> Self {
62 self.publisher = Some(p);
63 self
64 }
65
66 fn now_ms() -> u64 {
67 std::time::SystemTime::now()
68 .duration_since(std::time::UNIX_EPOCH)
69 .unwrap_or_default()
70 .as_millis() as u64
71 }
72
73 fn default_location() -> String {
74 std::env::var(DEFAULT_LOCATION_ENV)
75 .unwrap_or_else(|_| DEFAULT_LOCATION_FALLBACK.to_string())
76 }
77
78 fn reply(&self, content: &str) {
79 if let Some(pub_) = &self.publisher {
80 pub_.publish(
81 wactorz_mqtt::topics::chat(&self.config.id),
82 &serde_json::json!({
83 "from": self.config.name,
84 "to": "user",
85 "content": content,
86 "timestampMs": Self::now_ms(),
87 }),
88 );
89 }
90 }
91
92 async fn fetch_weather(&self, location: &str) -> Result<String> {
93 let url = format!("https://wttr.in/{}?format=j1", urlencoding(location));
95
96 let resp = self.http.get(&url).send().await?;
97 if !resp.status().is_success() {
98 let url2 = format!("https://wttr.in/{}?format=3", urlencoding(location));
100 let r2 = self.http.get(&url2).send().await?;
101 return Ok(r2.text().await?.trim().to_string());
102 }
103
104 let json: serde_json::Value = resp.json().await?;
105
106 let current = json
108 .get("current_condition")
109 .and_then(|a| a.as_array())
110 .and_then(|a| a.first())
111 .cloned()
112 .unwrap_or_default();
113
114 let desc = current
115 .get("weatherDesc")
116 .and_then(|a| a.as_array())
117 .and_then(|a| a.first())
118 .and_then(|v| v.get("value"))
119 .and_then(|v| v.as_str())
120 .unwrap_or("Unknown");
121
122 let temp_c = current
123 .get("temp_C")
124 .and_then(|v| v.as_str())
125 .unwrap_or("?");
126 let temp_f = current
127 .get("temp_F")
128 .and_then(|v| v.as_str())
129 .unwrap_or("?");
130 let feels_c = current
131 .get("FeelsLikeC")
132 .and_then(|v| v.as_str())
133 .unwrap_or("?");
134 let humidity = current
135 .get("humidity")
136 .and_then(|v| v.as_str())
137 .unwrap_or("?");
138 let wind_kmph = current
139 .get("windspeedKmph")
140 .and_then(|v| v.as_str())
141 .unwrap_or("?");
142 let wind_dir = current
143 .get("winddir16Point")
144 .and_then(|v| v.as_str())
145 .unwrap_or("?");
146 let uv = current
147 .get("uvIndex")
148 .and_then(|v| v.as_str())
149 .unwrap_or("?");
150 let visibility = current
151 .get("visibility")
152 .and_then(|v| v.as_str())
153 .unwrap_or("?");
154
155 let area = json
157 .get("nearest_area")
158 .and_then(|a| a.as_array())
159 .and_then(|a| a.first())
160 .and_then(|v| v.get("areaName"))
161 .and_then(|a| a.as_array())
162 .and_then(|a| a.first())
163 .and_then(|v| v.get("value"))
164 .and_then(|v| v.as_str())
165 .unwrap_or(location);
166
167 Ok(format!(
168 "**Weather in {area}**\n\n\
169 🌡 **{temp_c}°C / {temp_f}°F** (feels like {feels_c}°C)\n\
170 ☁ {desc}\n\
171 💧 Humidity: {humidity}%\n\
172 💨 Wind: {wind_kmph} km/h {wind_dir}\n\
173 👁 Visibility: {visibility} km\n\
174 ☀ UV index: {uv}\n\n\
175 *Data: [wttr.in](https://wttr.in/{loc})*",
176 loc = urlencoding(location)
177 ))
178 }
179}
180
181fn urlencoding(s: &str) -> String {
183 s.chars()
184 .flat_map(|c| match c {
185 ' ' => vec!['+'],
186 c if c.is_alphanumeric() || matches!(c, '-' | '_' | '.' | ',') => vec![c],
187 c => format!("%{:02X}", c as u32).chars().collect(),
188 })
189 .collect()
190}
191
192#[async_trait]
193impl Actor for WeatherAgent {
194 fn id(&self) -> String {
195 self.config.id.clone()
196 }
197 fn name(&self) -> &str {
198 &self.config.name
199 }
200 fn state(&self) -> ActorState {
201 self.state.clone()
202 }
203 fn metrics(&self) -> Arc<ActorMetrics> {
204 Arc::clone(&self.metrics)
205 }
206 fn mailbox(&self) -> mpsc::Sender<Message> {
207 self.mailbox_tx.clone()
208 }
209 fn is_protected(&self) -> bool {
210 self.config.protected
211 }
212
213 async fn on_start(&mut self) -> Result<()> {
214 self.state = ActorState::Running;
215 if let Some(pub_) = &self.publisher {
216 pub_.publish(
217 wactorz_mqtt::topics::spawn(&self.config.id),
218 &serde_json::json!({
219 "agentId": self.config.id,
220 "agentName": self.config.name,
221 "agentType": "data",
222 "timestampMs": Self::now_ms(),
223 }),
224 );
225 }
226 Ok(())
227 }
228
229 async fn handle_message(&mut self, message: Message) -> Result<()> {
230 use wactorz_core::message::MessageType;
231
232 let content = match &message.payload {
233 MessageType::Text { content } => content.trim().to_string(),
234 MessageType::Task { description, .. } => description.trim().to_string(),
235 _ => return Ok(()),
236 };
237
238 let arg = content
240 .strip_prefix("@weather-agent")
241 .unwrap_or(&content)
242 .trim()
243 .to_string();
244
245 match arg.to_lowercase().as_str() {
246 "" => {
247 let loc = Self::default_location();
248 let typing = format!("🌦 Fetching weather for **{loc}**…");
249 self.reply(&typing);
250 match self.fetch_weather(&loc).await {
251 Ok(report) => self.reply(&report),
252 Err(e) => self.reply(&format!("⚠ Could not fetch weather: {e}")),
253 }
254 }
255 "help" => {
256 let default = Self::default_location();
257 self.reply(&format!(
258 "**WeatherAgent** — current conditions via wttr.in (no API key needed)\n\n\
259 ```\n\
260 @weather-agent # {default} (default)\n\
261 @weather-agent Tokyo\n\
262 @weather-agent New York\n\
263 @weather-agent 48.8566,2.3522 # coordinates\n\
264 ```\n\
265 Set `WEATHER_DEFAULT_LOCATION` in `.env` to change the default."
266 ));
267 }
268 location => {
269 let typing = format!("🌦 Fetching weather for **{location}**…");
270 self.reply(&typing);
271 match self.fetch_weather(location).await {
272 Ok(report) => self.reply(&report),
273 Err(e) => {
274 self.reply(&format!("⚠ Could not fetch weather for '{location}': {e}"))
275 }
276 }
277 }
278 }
279
280 Ok(())
281 }
282
283 async fn on_heartbeat(&mut self) -> Result<()> {
284 if let Some(pub_) = &self.publisher {
285 pub_.publish(
286 wactorz_mqtt::topics::heartbeat(&self.config.id),
287 &serde_json::json!({
288 "agentId": self.config.id,
289 "agentName": self.config.name,
290 "state": self.state,
291 "timestampMs": Self::now_ms(),
292 }),
293 );
294 }
295 Ok(())
296 }
297
298 async fn run(&mut self) -> Result<()> {
299 self.on_start().await?;
300 let mut rx = self
301 .mailbox_rx
302 .take()
303 .ok_or_else(|| anyhow::anyhow!("WeatherAgent already running"))?;
304 let mut hb = tokio::time::interval(std::time::Duration::from_secs(
305 self.config.heartbeat_interval_secs,
306 ));
307 hb.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
308 loop {
309 tokio::select! {
310 biased;
311 msg = rx.recv() => match msg {
312 None => break,
313 Some(m) => {
314 self.metrics.record_received();
315 if let wactorz_core::message::MessageType::Command {
316 command: wactorz_core::message::ActorCommand::Stop
317 } = &m.payload { break; }
318 match self.handle_message(m).await {
319 Ok(_) => self.metrics.record_processed(),
320 Err(e) => {
321 tracing::error!("[{}] {e}", self.config.name);
322 self.metrics.record_failed();
323 }
324 }
325 }
326 },
327 _ = hb.tick() => {
328 self.metrics.record_heartbeat();
329 if let Err(e) = self.on_heartbeat().await {
330 tracing::error!("[{}] heartbeat: {e}", self.config.name);
331 }
332 }
333 }
334 }
335 self.state = ActorState::Stopped;
336 self.on_stop().await
337 }
338}