wactorz_agents/
smart_cities_agent.rs1use anyhow::Result;
8use async_trait::async_trait;
9use std::sync::Arc;
10use tokio::sync::mpsc;
11
12use crate::llm_agent::{LlmAgent, LlmConfig};
13use wactorz_core::{Actor, ActorConfig, ActorMetrics, ActorState, EventPublisher, Message};
14
15pub struct SmartCitiesAgent {
16 config: ActorConfig,
17 city: String,
18 http: reqwest::Client,
19 llm: Option<LlmAgent>,
20 state: ActorState,
21 metrics: Arc<ActorMetrics>,
22 mailbox_tx: mpsc::Sender<Message>,
23 mailbox_rx: Option<mpsc::Receiver<Message>>,
24 publisher: Option<EventPublisher>,
25}
26
27impl SmartCitiesAgent {
28 pub fn new(config: ActorConfig, city: impl Into<String>) -> Self {
29 let (tx, rx) = mpsc::channel(config.mailbox_capacity);
30 Self {
31 config,
32 city: city.into(),
33 http: reqwest::Client::new(),
34 llm: None,
35 state: ActorState::Initializing,
36 metrics: Arc::new(ActorMetrics::new()),
37 mailbox_tx: tx,
38 mailbox_rx: Some(rx),
39 publisher: None,
40 }
41 }
42
43 pub fn with_publisher(mut self, p: EventPublisher) -> Self {
44 self.publisher = Some(p);
45 self
46 }
47
48 pub fn with_llm(mut self, llm_config: LlmConfig) -> Self {
49 let llm_cfg = ActorConfig::new(format!("{}-llm", self.config.name));
50 self.llm = Some(LlmAgent::new(llm_cfg, llm_config));
51 self
52 }
53
54 async fn fetch_air_quality(&self) -> Result<serde_json::Value> {
56 let geo_url = format!(
58 "https://geocoding-api.open-meteo.com/v1/search?name={}&count=1&format=json",
59 urlencoding::encode(&self.city)
60 );
61 let geo: serde_json::Value = self.http.get(&geo_url).send().await?.json().await?;
62 let lat = geo["results"][0]["latitude"].as_f64().unwrap_or(51.5);
63 let lon = geo["results"][0]["longitude"].as_f64().unwrap_or(-0.1);
64
65 let aq_url = format!(
66 "https://air-quality-api.open-meteo.com/v1/air-quality?\
67 latitude={lat}&longitude={lon}\
68 &hourly=pm10,pm2_5,carbon_monoxide,nitrogen_dioxide,ozone\
69 &forecast_days=1"
70 );
71 Ok(self.http.get(&aq_url).send().await?.json().await?)
72 }
73
74 async fn process(&mut self, query: &str) -> String {
75 let lower = query.to_lowercase();
76
77 if lower.contains("air") || lower.contains("quality") || lower.contains("pollution") {
78 match self.fetch_air_quality().await {
79 Ok(v) => {
80 let pm25 = v["hourly"]["pm2_5"][0].as_f64().unwrap_or(0.0);
81 let pm10 = v["hourly"]["pm10"][0].as_f64().unwrap_or(0.0);
82 let no2 = v["hourly"]["nitrogen_dioxide"][0].as_f64().unwrap_or(0.0);
83 format!(
84 "Air quality for {}:\n PM2.5: {:.1} μg/m³\n PM10: {:.1} μg/m³\n NO₂: {:.1} μg/m³",
85 self.city, pm25, pm10, no2
86 )
87 }
88 Err(e) => format!("Air quality fetch error: {e}"),
89 }
90 } else if let Some(llm) = &mut self.llm {
91 let prompt = format!(
92 "You are a smart cities data analyst for {}. \
93 The user asks: \"{query}\"\n\
94 Answer with available urban data insights.",
95 self.city
96 );
97 llm.complete(&prompt)
98 .await
99 .unwrap_or_else(|e| format!("LLM error: {e}"))
100 } else {
101 format!(
102 "Smart cities agent for {}. Ask about air quality, traffic, or energy.",
103 self.city
104 )
105 }
106 }
107
108 fn now_ms() -> u64 {
109 std::time::SystemTime::now()
110 .duration_since(std::time::UNIX_EPOCH)
111 .unwrap_or_default()
112 .as_millis() as u64
113 }
114}
115
116#[async_trait]
117impl Actor for SmartCitiesAgent {
118 fn id(&self) -> String {
119 self.config.id.clone()
120 }
121 fn name(&self) -> &str {
122 &self.config.name
123 }
124 fn state(&self) -> ActorState {
125 self.state.clone()
126 }
127 fn metrics(&self) -> Arc<ActorMetrics> {
128 Arc::clone(&self.metrics)
129 }
130 fn mailbox(&self) -> mpsc::Sender<Message> {
131 self.mailbox_tx.clone()
132 }
133
134 async fn on_start(&mut self) -> Result<()> {
135 self.state = ActorState::Running;
136 tracing::info!(
137 "[{}] Smart cities agent for '{}'",
138 self.config.name,
139 self.city
140 );
141 if let Some(pub_) = &self.publisher {
142 pub_.publish(
143 wactorz_mqtt::topics::spawn(&self.config.id),
144 &serde_json::json!({
145 "agentId": self.config.id,
146 "agentName": self.config.name,
147 "agentType": "smart_cities",
148 "city": self.city,
149 "timestampMs": Self::now_ms(),
150 }),
151 );
152 }
153 Ok(())
154 }
155
156 async fn handle_message(&mut self, message: Message) -> Result<()> {
157 use wactorz_core::message::MessageType;
158 let text = match &message.payload {
159 MessageType::Text { content } => content.clone(),
160 MessageType::Task { description, .. } => description.clone(),
161 _ => return Ok(()),
162 };
163 let response = self.process(&text).await;
164 if let Some(pub_) = &self.publisher {
165 pub_.publish(
166 wactorz_mqtt::topics::chat(&self.config.id),
167 &serde_json::json!({
168 "from": self.config.name,
169 "to": message.from.as_deref().unwrap_or("user"),
170 "content": response,
171 "timestampMs": Self::now_ms(),
172 }),
173 );
174 }
175 Ok(())
176 }
177
178 async fn on_heartbeat(&mut self) -> Result<()> {
179 if let Some(pub_) = &self.publisher {
180 pub_.publish(
181 wactorz_mqtt::topics::heartbeat(&self.config.id),
182 &serde_json::json!({
183 "agentId": self.config.id,
184 "agentName": self.config.name,
185 "state": self.state,
186 "city": self.city,
187 "timestampMs": Self::now_ms(),
188 }),
189 );
190 }
191 Ok(())
192 }
193
194 async fn run(&mut self) -> Result<()> {
195 self.on_start().await?;
196 let mut rx = self
197 .mailbox_rx
198 .take()
199 .ok_or_else(|| anyhow::anyhow!("SmartCitiesAgent already running"))?;
200 let mut hb = tokio::time::interval(std::time::Duration::from_secs(
201 self.config.heartbeat_interval_secs,
202 ));
203 hb.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
204 loop {
205 tokio::select! {
206 biased;
207 msg = rx.recv() => {
208 match msg {
209 None => break,
210 Some(m) => {
211 self.metrics.record_received();
212 if let wactorz_core::message::MessageType::Command {
213 command: wactorz_core::message::ActorCommand::Stop
214 } = &m.payload { break; }
215 match self.handle_message(m).await {
216 Ok(_) => self.metrics.record_processed(),
217 Err(e) => { tracing::error!("[{}] {e}", self.config.name); self.metrics.record_failed(); }
218 }
219 }
220 }
221 }
222 _ = hb.tick() => {
223 self.metrics.record_heartbeat();
224 if let Err(e) = self.on_heartbeat().await {
225 tracing::error!("[{}] heartbeat: {e}", self.config.name);
226 }
227 }
228 }
229 }
230 self.state = ActorState::Stopped;
231 self.on_stop().await
232 }
233}