wactorz_agents/
monitor_agent.rs

1//! System health monitor agent.
2//!
3//! [`MonitorAgent`] polls all registered actors every [`POLL_INTERVAL_SECS`]
4//! seconds.  If an actor's last heartbeat exceeds [`TIMEOUT_SECS`], an alert
5//! is broadcast to the MQTT `system/health` topic.
6//!
7//! Like [`crate::main_actor::MainActor`], the monitor is **protected** from external kill commands.
8
9use anyhow::Result;
10use async_trait::async_trait;
11use std::sync::Arc;
12use tokio::sync::mpsc;
13
14use wactorz_core::{
15    Actor, ActorConfig, ActorMetrics, ActorState, ActorSystem, EventPublisher, Message,
16};
17
18/// How often the monitor sweeps all actors (seconds).
19pub const POLL_INTERVAL_SECS: u64 = 15;
20
21/// How many seconds without a heartbeat before an alert is raised (seconds).
22pub const TIMEOUT_SECS: u64 = 60;
23
24/// Health status of a single actor at a given point in time.
25#[derive(Debug, Clone)]
26pub struct ActorHealthReport {
27    pub actor_id: String,
28    pub actor_name: String,
29    pub state: ActorState,
30    pub last_heartbeat_secs_ago: u64,
31    pub is_stale: bool,
32}
33
34/// The health monitor actor.
35pub struct MonitorAgent {
36    config: ActorConfig,
37    system: ActorSystem,
38    state: ActorState,
39    metrics: Arc<ActorMetrics>,
40    mailbox_tx: mpsc::Sender<Message>,
41    mailbox_rx: Option<mpsc::Receiver<Message>>,
42    publisher: Option<EventPublisher>,
43}
44
45impl MonitorAgent {
46    pub fn new(config: ActorConfig, system: ActorSystem) -> Self {
47        let protected_config = ActorConfig {
48            protected: true,
49            ..config.clone()
50        };
51        let (tx, rx) = mpsc::channel(config.mailbox_capacity);
52        Self {
53            config: protected_config,
54            system,
55            state: ActorState::Initializing,
56            metrics: Arc::new(ActorMetrics::new()),
57            mailbox_tx: tx,
58            mailbox_rx: Some(rx),
59            publisher: None,
60        }
61    }
62
63    /// Attach an EventPublisher for MQTT output.
64    pub fn with_publisher(mut self, p: EventPublisher) -> Self {
65        self.publisher = Some(p);
66        self
67    }
68
69    /// Sweep all registered actors and return health reports.
70    pub async fn poll_health(&self) -> Vec<ActorHealthReport> {
71        use std::sync::atomic::Ordering;
72        let now = std::time::SystemTime::now()
73            .duration_since(std::time::UNIX_EPOCH)
74            .unwrap_or_default()
75            .as_secs();
76        self.system
77            .registry
78            .list()
79            .await
80            .into_iter()
81            .filter(|e| e.name != self.config.name)
82            .map(|e| {
83                let last = e.metrics.last_message_at.load(Ordering::Relaxed);
84                let age = if last == 0 {
85                    0
86                } else {
87                    now.saturating_sub(last)
88                };
89                ActorHealthReport {
90                    actor_id: e.id.clone(),
91                    actor_name: e.name,
92                    state: e.state,
93                    last_heartbeat_secs_ago: age,
94                    is_stale: age > TIMEOUT_SECS && last != 0,
95                }
96            })
97            .collect()
98    }
99
100    /// Broadcast an alert for stale actors via MQTT.
101    /// Publish per-actor alerts and the system/health summary.
102    ///
103    /// Called on every heartbeat poll so HA sensors always get a fresh value,
104    /// regardless of whether any actors are stale.
105    async fn publish_health(&self, reports: &[ActorHealthReport]) {
106        let Some(pub_) = &self.publisher else { return };
107
108        let stale: Vec<_> = reports.iter().filter(|r| r.is_stale).collect();
109
110        // Per-actor alert for each stale actor
111        for report in &stale {
112            pub_.publish(
113                wactorz_mqtt::topics::alert(&report.actor_id),
114                &serde_json::json!({
115                    "agentId": report.actor_id,
116                    "agentName": report.actor_name,
117                    "severity": "warning",
118                    "message": format!("No heartbeat for {}s", report.last_heartbeat_secs_ago),
119                    "timestampMs": std::time::SystemTime::now()
120                        .duration_since(std::time::UNIX_EPOCH)
121                        .unwrap_or_default().as_millis() as u64,
122                }),
123            );
124        }
125
126        // Always publish system/health so HA sensors stay current
127        pub_.publish(
128            wactorz_mqtt::topics::SYSTEM_HEALTH,
129            &serde_json::json!({
130                "active_agents": reports.len(),
131                "stale_count": stale.len(),
132                "timestampMs": std::time::SystemTime::now()
133                    .duration_since(std::time::UNIX_EPOCH)
134                    .unwrap_or_default().as_millis() as u64,
135            }),
136        );
137    }
138}
139
140#[async_trait]
141impl Actor for MonitorAgent {
142    fn id(&self) -> String {
143        self.config.id.clone()
144    }
145    fn name(&self) -> &str {
146        &self.config.name
147    }
148    fn state(&self) -> ActorState {
149        self.state.clone()
150    }
151    fn metrics(&self) -> Arc<ActorMetrics> {
152        Arc::clone(&self.metrics)
153    }
154    fn mailbox(&self) -> mpsc::Sender<Message> {
155        self.mailbox_tx.clone()
156    }
157    fn is_protected(&self) -> bool {
158        self.config.protected
159    }
160
161    async fn on_start(&mut self) -> Result<()> {
162        self.state = ActorState::Running;
163        if let Some(pub_) = &self.publisher {
164            let now_ms = std::time::SystemTime::now()
165                .duration_since(std::time::UNIX_EPOCH)
166                .unwrap_or_default()
167                .as_millis() as u64;
168            pub_.publish(
169                wactorz_mqtt::topics::spawn(&self.config.id),
170                &serde_json::json!({
171                    "agentId":   self.config.id,
172                    "agentName": self.config.name,
173                    "agentType": "monitor",
174                    "timestampMs": now_ms,
175                }),
176            );
177        }
178        Ok(())
179    }
180
181    async fn handle_message(&mut self, message: Message) -> Result<()> {
182        use wactorz_core::message::MessageType;
183        if let MessageType::Command {
184            command: wactorz_core::message::ActorCommand::Status,
185        } = &message.payload
186        {
187            let reports = self.poll_health().await;
188            tracing::info!(
189                "[monitor] {} actors, {} stale",
190                reports.len(),
191                reports.iter().filter(|r| r.is_stale).count()
192            );
193        }
194        Ok(())
195    }
196
197    async fn on_heartbeat(&mut self) -> Result<()> {
198        let reports = self.poll_health().await;
199        self.publish_health(&reports).await;
200
201        // Publish own heartbeat so the dashboard shows our beat count
202        if let Some(pub_) = &self.publisher {
203            let now_ms = std::time::SystemTime::now()
204                .duration_since(std::time::UNIX_EPOCH)
205                .unwrap_or_default()
206                .as_millis() as u64;
207            pub_.publish(
208                wactorz_mqtt::topics::heartbeat(&self.config.id),
209                &serde_json::json!({
210                    "agentId":   self.config.id,
211                    "agentName": self.config.name,
212                    "state":     self.state,
213                    "timestampMs": now_ms,
214                }),
215            );
216        }
217        Ok(())
218    }
219
220    async fn run(&mut self) -> Result<()> {
221        self.on_start().await?;
222        let mut rx = self
223            .mailbox_rx
224            .take()
225            .ok_or_else(|| anyhow::anyhow!("MonitorAgent already running"))?;
226        let mut hb = tokio::time::interval(std::time::Duration::from_secs(
227            self.config.heartbeat_interval_secs,
228        ));
229        hb.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
230        loop {
231            tokio::select! {
232                biased;
233                msg = rx.recv() => {
234                    match msg {
235                        None => break,
236                        Some(m) => {
237                            self.metrics.record_received();
238                            if let wactorz_core::message::MessageType::Command {
239                                command: wactorz_core::message::ActorCommand::Stop
240                            } = &m.payload {
241                                break;
242                            }
243                            match self.handle_message(m).await {
244                                Ok(_) => self.metrics.record_processed(),
245                                Err(e) => {
246                                    tracing::error!("[{}] {e}", self.config.name);
247                                    self.metrics.record_failed();
248                                }
249                            }
250                        }
251                    }
252                }
253                _ = hb.tick() => {
254                    self.metrics.record_heartbeat();
255                    if let Err(e) = self.on_heartbeat().await {
256                        tracing::error!("[{}] heartbeat: {e}", self.config.name);
257                    }
258                }
259            }
260        }
261        self.state = ActorState::Stopped;
262        self.on_stop().await
263    }
264}