wactorz_agents/
monitor_agent.rs1use anyhow::Result;
10use async_trait::async_trait;
11use std::sync::Arc;
12use tokio::sync::mpsc;
13
14use wactorz_core::{
15 Actor, ActorConfig, ActorMetrics, ActorState, ActorSystem, EventPublisher, Message,
16};
17
18pub const POLL_INTERVAL_SECS: u64 = 15;
20
21pub const TIMEOUT_SECS: u64 = 60;
23
24#[derive(Debug, Clone)]
26pub struct ActorHealthReport {
27 pub actor_id: String,
28 pub actor_name: String,
29 pub state: ActorState,
30 pub last_heartbeat_secs_ago: u64,
31 pub is_stale: bool,
32}
33
34pub struct MonitorAgent {
36 config: ActorConfig,
37 system: ActorSystem,
38 state: ActorState,
39 metrics: Arc<ActorMetrics>,
40 mailbox_tx: mpsc::Sender<Message>,
41 mailbox_rx: Option<mpsc::Receiver<Message>>,
42 publisher: Option<EventPublisher>,
43}
44
45impl MonitorAgent {
46 pub fn new(config: ActorConfig, system: ActorSystem) -> Self {
47 let protected_config = ActorConfig {
48 protected: true,
49 ..config.clone()
50 };
51 let (tx, rx) = mpsc::channel(config.mailbox_capacity);
52 Self {
53 config: protected_config,
54 system,
55 state: ActorState::Initializing,
56 metrics: Arc::new(ActorMetrics::new()),
57 mailbox_tx: tx,
58 mailbox_rx: Some(rx),
59 publisher: None,
60 }
61 }
62
63 pub fn with_publisher(mut self, p: EventPublisher) -> Self {
65 self.publisher = Some(p);
66 self
67 }
68
69 pub async fn poll_health(&self) -> Vec<ActorHealthReport> {
71 use std::sync::atomic::Ordering;
72 let now = std::time::SystemTime::now()
73 .duration_since(std::time::UNIX_EPOCH)
74 .unwrap_or_default()
75 .as_secs();
76 self.system
77 .registry
78 .list()
79 .await
80 .into_iter()
81 .filter(|e| e.name != self.config.name)
82 .map(|e| {
83 let last = e.metrics.last_message_at.load(Ordering::Relaxed);
84 let age = if last == 0 {
85 0
86 } else {
87 now.saturating_sub(last)
88 };
89 ActorHealthReport {
90 actor_id: e.id.clone(),
91 actor_name: e.name,
92 state: e.state,
93 last_heartbeat_secs_ago: age,
94 is_stale: age > TIMEOUT_SECS && last != 0,
95 }
96 })
97 .collect()
98 }
99
100 async fn publish_health(&self, reports: &[ActorHealthReport]) {
106 let Some(pub_) = &self.publisher else { return };
107
108 let stale: Vec<_> = reports.iter().filter(|r| r.is_stale).collect();
109
110 for report in &stale {
112 pub_.publish(
113 wactorz_mqtt::topics::alert(&report.actor_id),
114 &serde_json::json!({
115 "agentId": report.actor_id,
116 "agentName": report.actor_name,
117 "severity": "warning",
118 "message": format!("No heartbeat for {}s", report.last_heartbeat_secs_ago),
119 "timestampMs": std::time::SystemTime::now()
120 .duration_since(std::time::UNIX_EPOCH)
121 .unwrap_or_default().as_millis() as u64,
122 }),
123 );
124 }
125
126 pub_.publish(
128 wactorz_mqtt::topics::SYSTEM_HEALTH,
129 &serde_json::json!({
130 "active_agents": reports.len(),
131 "stale_count": stale.len(),
132 "timestampMs": std::time::SystemTime::now()
133 .duration_since(std::time::UNIX_EPOCH)
134 .unwrap_or_default().as_millis() as u64,
135 }),
136 );
137 }
138}
139
140#[async_trait]
141impl Actor for MonitorAgent {
142 fn id(&self) -> String {
143 self.config.id.clone()
144 }
145 fn name(&self) -> &str {
146 &self.config.name
147 }
148 fn state(&self) -> ActorState {
149 self.state.clone()
150 }
151 fn metrics(&self) -> Arc<ActorMetrics> {
152 Arc::clone(&self.metrics)
153 }
154 fn mailbox(&self) -> mpsc::Sender<Message> {
155 self.mailbox_tx.clone()
156 }
157 fn is_protected(&self) -> bool {
158 self.config.protected
159 }
160
161 async fn on_start(&mut self) -> Result<()> {
162 self.state = ActorState::Running;
163 if let Some(pub_) = &self.publisher {
164 let now_ms = std::time::SystemTime::now()
165 .duration_since(std::time::UNIX_EPOCH)
166 .unwrap_or_default()
167 .as_millis() as u64;
168 pub_.publish(
169 wactorz_mqtt::topics::spawn(&self.config.id),
170 &serde_json::json!({
171 "agentId": self.config.id,
172 "agentName": self.config.name,
173 "agentType": "monitor",
174 "timestampMs": now_ms,
175 }),
176 );
177 }
178 Ok(())
179 }
180
181 async fn handle_message(&mut self, message: Message) -> Result<()> {
182 use wactorz_core::message::MessageType;
183 if let MessageType::Command {
184 command: wactorz_core::message::ActorCommand::Status,
185 } = &message.payload
186 {
187 let reports = self.poll_health().await;
188 tracing::info!(
189 "[monitor] {} actors, {} stale",
190 reports.len(),
191 reports.iter().filter(|r| r.is_stale).count()
192 );
193 }
194 Ok(())
195 }
196
197 async fn on_heartbeat(&mut self) -> Result<()> {
198 let reports = self.poll_health().await;
199 self.publish_health(&reports).await;
200
201 if let Some(pub_) = &self.publisher {
203 let now_ms = std::time::SystemTime::now()
204 .duration_since(std::time::UNIX_EPOCH)
205 .unwrap_or_default()
206 .as_millis() as u64;
207 pub_.publish(
208 wactorz_mqtt::topics::heartbeat(&self.config.id),
209 &serde_json::json!({
210 "agentId": self.config.id,
211 "agentName": self.config.name,
212 "state": self.state,
213 "timestampMs": now_ms,
214 }),
215 );
216 }
217 Ok(())
218 }
219
220 async fn run(&mut self) -> Result<()> {
221 self.on_start().await?;
222 let mut rx = self
223 .mailbox_rx
224 .take()
225 .ok_or_else(|| anyhow::anyhow!("MonitorAgent already running"))?;
226 let mut hb = tokio::time::interval(std::time::Duration::from_secs(
227 self.config.heartbeat_interval_secs,
228 ));
229 hb.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
230 loop {
231 tokio::select! {
232 biased;
233 msg = rx.recv() => {
234 match msg {
235 None => break,
236 Some(m) => {
237 self.metrics.record_received();
238 if let wactorz_core::message::MessageType::Command {
239 command: wactorz_core::message::ActorCommand::Stop
240 } = &m.payload {
241 break;
242 }
243 match self.handle_message(m).await {
244 Ok(_) => self.metrics.record_processed(),
245 Err(e) => {
246 tracing::error!("[{}] {e}", self.config.name);
247 self.metrics.record_failed();
248 }
249 }
250 }
251 }
252 }
253 _ = hb.tick() => {
254 self.metrics.record_heartbeat();
255 if let Err(e) = self.on_heartbeat().await {
256 tracing::error!("[{}] heartbeat: {e}", self.config.name);
257 }
258 }
259 }
260 }
261 self.state = ActorState::Stopped;
262 self.on_stop().await
263 }
264}