wactorz_agents/
smart_cities_agent.rs

1//! Smart cities data integration agent.
2//!
3//! [`SmartCitiesAgent`] aggregates urban data from multiple open APIs:
4//! traffic, air quality, public transport, and energy consumption.
5//! It synthesises data into city health summaries and publishes them to MQTT.
6
7use anyhow::Result;
8use async_trait::async_trait;
9use std::sync::Arc;
10use tokio::sync::mpsc;
11
12use crate::llm_agent::{LlmAgent, LlmConfig};
13use wactorz_core::{Actor, ActorConfig, ActorMetrics, ActorState, EventPublisher, Message};
14
15pub struct SmartCitiesAgent {
16    config: ActorConfig,
17    city: String,
18    http: reqwest::Client,
19    llm: Option<LlmAgent>,
20    state: ActorState,
21    metrics: Arc<ActorMetrics>,
22    mailbox_tx: mpsc::Sender<Message>,
23    mailbox_rx: Option<mpsc::Receiver<Message>>,
24    publisher: Option<EventPublisher>,
25}
26
27impl SmartCitiesAgent {
28    pub fn new(config: ActorConfig, city: impl Into<String>) -> Self {
29        let (tx, rx) = mpsc::channel(config.mailbox_capacity);
30        Self {
31            config,
32            city: city.into(),
33            http: reqwest::Client::new(),
34            llm: None,
35            state: ActorState::Initializing,
36            metrics: Arc::new(ActorMetrics::new()),
37            mailbox_tx: tx,
38            mailbox_rx: Some(rx),
39            publisher: None,
40        }
41    }
42
43    pub fn with_publisher(mut self, p: EventPublisher) -> Self {
44        self.publisher = Some(p);
45        self
46    }
47
48    pub fn with_llm(mut self, llm_config: LlmConfig) -> Self {
49        let llm_cfg = ActorConfig::new(format!("{}-llm", self.config.name));
50        self.llm = Some(LlmAgent::new(llm_cfg, llm_config));
51        self
52    }
53
54    /// Fetch air quality index from open-meteo (no API key needed).
55    async fn fetch_air_quality(&self) -> Result<serde_json::Value> {
56        // Use geocoding first, then air quality
57        let geo_url = format!(
58            "https://geocoding-api.open-meteo.com/v1/search?name={}&count=1&format=json",
59            urlencoding::encode(&self.city)
60        );
61        let geo: serde_json::Value = self.http.get(&geo_url).send().await?.json().await?;
62        let lat = geo["results"][0]["latitude"].as_f64().unwrap_or(51.5);
63        let lon = geo["results"][0]["longitude"].as_f64().unwrap_or(-0.1);
64
65        let aq_url = format!(
66            "https://air-quality-api.open-meteo.com/v1/air-quality?\
67             latitude={lat}&longitude={lon}\
68             &hourly=pm10,pm2_5,carbon_monoxide,nitrogen_dioxide,ozone\
69             &forecast_days=1"
70        );
71        Ok(self.http.get(&aq_url).send().await?.json().await?)
72    }
73
74    async fn process(&mut self, query: &str) -> String {
75        let lower = query.to_lowercase();
76
77        if lower.contains("air") || lower.contains("quality") || lower.contains("pollution") {
78            match self.fetch_air_quality().await {
79                Ok(v) => {
80                    let pm25 = v["hourly"]["pm2_5"][0].as_f64().unwrap_or(0.0);
81                    let pm10 = v["hourly"]["pm10"][0].as_f64().unwrap_or(0.0);
82                    let no2 = v["hourly"]["nitrogen_dioxide"][0].as_f64().unwrap_or(0.0);
83                    format!(
84                        "Air quality for {}:\n  PM2.5: {:.1} μg/m³\n  PM10:  {:.1} μg/m³\n  NO₂:   {:.1} μg/m³",
85                        self.city, pm25, pm10, no2
86                    )
87                }
88                Err(e) => format!("Air quality fetch error: {e}"),
89            }
90        } else if let Some(llm) = &mut self.llm {
91            let prompt = format!(
92                "You are a smart cities data analyst for {}. \
93                 The user asks: \"{query}\"\n\
94                 Answer with available urban data insights.",
95                self.city
96            );
97            llm.complete(&prompt)
98                .await
99                .unwrap_or_else(|e| format!("LLM error: {e}"))
100        } else {
101            format!(
102                "Smart cities agent for {}. Ask about air quality, traffic, or energy.",
103                self.city
104            )
105        }
106    }
107
108    fn now_ms() -> u64 {
109        std::time::SystemTime::now()
110            .duration_since(std::time::UNIX_EPOCH)
111            .unwrap_or_default()
112            .as_millis() as u64
113    }
114}
115
116#[async_trait]
117impl Actor for SmartCitiesAgent {
118    fn id(&self) -> String {
119        self.config.id.clone()
120    }
121    fn name(&self) -> &str {
122        &self.config.name
123    }
124    fn state(&self) -> ActorState {
125        self.state.clone()
126    }
127    fn metrics(&self) -> Arc<ActorMetrics> {
128        Arc::clone(&self.metrics)
129    }
130    fn mailbox(&self) -> mpsc::Sender<Message> {
131        self.mailbox_tx.clone()
132    }
133
134    async fn on_start(&mut self) -> Result<()> {
135        self.state = ActorState::Running;
136        tracing::info!(
137            "[{}] Smart cities agent for '{}'",
138            self.config.name,
139            self.city
140        );
141        if let Some(pub_) = &self.publisher {
142            pub_.publish(
143                wactorz_mqtt::topics::spawn(&self.config.id),
144                &serde_json::json!({
145                    "agentId":   self.config.id,
146                    "agentName": self.config.name,
147                    "agentType": "smart_cities",
148                    "city":      self.city,
149                    "timestampMs": Self::now_ms(),
150                }),
151            );
152        }
153        Ok(())
154    }
155
156    async fn handle_message(&mut self, message: Message) -> Result<()> {
157        use wactorz_core::message::MessageType;
158        let text = match &message.payload {
159            MessageType::Text { content } => content.clone(),
160            MessageType::Task { description, .. } => description.clone(),
161            _ => return Ok(()),
162        };
163        let response = self.process(&text).await;
164        if let Some(pub_) = &self.publisher {
165            pub_.publish(
166                wactorz_mqtt::topics::chat(&self.config.id),
167                &serde_json::json!({
168                    "from":      self.config.name,
169                    "to":        message.from.as_deref().unwrap_or("user"),
170                    "content":   response,
171                    "timestampMs": Self::now_ms(),
172                }),
173            );
174        }
175        Ok(())
176    }
177
178    async fn on_heartbeat(&mut self) -> Result<()> {
179        if let Some(pub_) = &self.publisher {
180            pub_.publish(
181                wactorz_mqtt::topics::heartbeat(&self.config.id),
182                &serde_json::json!({
183                    "agentId":   self.config.id,
184                    "agentName": self.config.name,
185                    "state":     self.state,
186                    "city":      self.city,
187                    "timestampMs": Self::now_ms(),
188                }),
189            );
190        }
191        Ok(())
192    }
193
194    async fn run(&mut self) -> Result<()> {
195        self.on_start().await?;
196        let mut rx = self
197            .mailbox_rx
198            .take()
199            .ok_or_else(|| anyhow::anyhow!("SmartCitiesAgent already running"))?;
200        let mut hb = tokio::time::interval(std::time::Duration::from_secs(
201            self.config.heartbeat_interval_secs,
202        ));
203        hb.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
204        loop {
205            tokio::select! {
206                biased;
207                msg = rx.recv() => {
208                    match msg {
209                        None    => break,
210                        Some(m) => {
211                            self.metrics.record_received();
212                            if let wactorz_core::message::MessageType::Command {
213                                command: wactorz_core::message::ActorCommand::Stop
214                            } = &m.payload { break; }
215                            match self.handle_message(m).await {
216                                Ok(_)  => self.metrics.record_processed(),
217                                Err(e) => { tracing::error!("[{}] {e}", self.config.name); self.metrics.record_failed(); }
218                            }
219                        }
220                    }
221                }
222                _ = hb.tick() => {
223                    self.metrics.record_heartbeat();
224                    if let Err(e) = self.on_heartbeat().await {
225                        tracing::error!("[{}] heartbeat: {e}", self.config.name);
226                    }
227                }
228            }
229        }
230        self.state = ActorState::Stopped;
231        self.on_stop().await
232    }
233}