wactorz_agents/
tick_agent.rs

1//! Scheduled / periodic ticker agent.
2//!
3//! [`TickAgent`] fires a configurable callback (Rhai script or MQTT publish)
4//! on a fixed interval.  Useful for polling, scheduled reports, or periodic
5//! data collection.
6//!
7//! The tick interval is configurable via the `interval_secs` field (default 60s).
8//! On each tick it publishes to `agents/{id}/tick`.
9
10use anyhow::Result;
11use async_trait::async_trait;
12use std::sync::Arc;
13use std::time::Duration;
14use tokio::sync::mpsc;
15
16use wactorz_core::{Actor, ActorConfig, ActorMetrics, ActorState, EventPublisher, Message};
17
18pub struct TickAgent {
19    config: ActorConfig,
20    interval_secs: u64,
21    /// Optional Rhai script executed on each tick (receives `tick_count` variable).
22    script: Option<String>,
23    tick_count: u64,
24    state: ActorState,
25    metrics: Arc<ActorMetrics>,
26    mailbox_tx: mpsc::Sender<Message>,
27    mailbox_rx: Option<mpsc::Receiver<Message>>,
28    publisher: Option<EventPublisher>,
29}
30
31impl TickAgent {
32    pub fn new(config: ActorConfig, interval_secs: u64) -> Self {
33        let (tx, rx) = mpsc::channel(config.mailbox_capacity);
34        Self {
35            config,
36            interval_secs,
37            script: None,
38            tick_count: 0,
39            state: ActorState::Initializing,
40            metrics: Arc::new(ActorMetrics::new()),
41            mailbox_tx: tx,
42            mailbox_rx: Some(rx),
43            publisher: None,
44        }
45    }
46
47    pub fn with_publisher(mut self, p: EventPublisher) -> Self {
48        self.publisher = Some(p);
49        self
50    }
51
52    pub fn with_script(mut self, script: impl Into<String>) -> Self {
53        self.script = Some(script.into());
54        self
55    }
56
57    fn now_ms() -> u64 {
58        std::time::SystemTime::now()
59            .duration_since(std::time::UNIX_EPOCH)
60            .unwrap_or_default()
61            .as_millis() as u64
62    }
63
64    fn fire_tick(&mut self) {
65        self.tick_count += 1;
66        let n = self.tick_count;
67
68        if let Some(pub_) = &self.publisher {
69            pub_.publish(
70                format!("agents/{}/tick", self.config.id),
71                &serde_json::json!({
72                    "agentId":    self.config.id,
73                    "agentName":  self.config.name,
74                    "tickCount":  n,
75                    "intervalSecs": self.interval_secs,
76                    "timestampMs":  Self::now_ms(),
77                }),
78            );
79        }
80
81        tracing::debug!("[{}] tick #{n}", self.config.name);
82    }
83}
84
85#[async_trait]
86impl Actor for TickAgent {
87    fn id(&self) -> String {
88        self.config.id.clone()
89    }
90    fn name(&self) -> &str {
91        &self.config.name
92    }
93    fn state(&self) -> ActorState {
94        self.state.clone()
95    }
96    fn metrics(&self) -> Arc<ActorMetrics> {
97        Arc::clone(&self.metrics)
98    }
99    fn mailbox(&self) -> mpsc::Sender<Message> {
100        self.mailbox_tx.clone()
101    }
102
103    async fn on_start(&mut self) -> Result<()> {
104        self.state = ActorState::Running;
105        tracing::info!(
106            "[{}] Tick agent started (interval={}s)",
107            self.config.name,
108            self.interval_secs
109        );
110        if let Some(pub_) = &self.publisher {
111            pub_.publish(
112                wactorz_mqtt::topics::spawn(&self.config.id),
113                &serde_json::json!({
114                    "agentId":      self.config.id,
115                    "agentName":    self.config.name,
116                    "agentType":    "tick",
117                    "intervalSecs": self.interval_secs,
118                    "timestampMs":  Self::now_ms(),
119                }),
120            );
121        }
122        Ok(())
123    }
124
125    async fn handle_message(&mut self, message: Message) -> Result<()> {
126        use wactorz_core::message::MessageType;
127        // Accept interval change via Task payload {"interval_secs": N}
128        if let MessageType::Task { payload, .. } = &message.payload
129            && let Some(n) = payload.get("interval_secs").and_then(|v| v.as_u64())
130        {
131            self.interval_secs = n;
132            tracing::info!("[{}] interval changed to {n}s", self.config.name);
133        }
134        Ok(())
135    }
136
137    async fn on_heartbeat(&mut self) -> Result<()> {
138        if let Some(pub_) = &self.publisher {
139            pub_.publish(
140                wactorz_mqtt::topics::heartbeat(&self.config.id),
141                &serde_json::json!({
142                    "agentId":      self.config.id,
143                    "agentName":    self.config.name,
144                    "state":        self.state,
145                    "tickCount":    self.tick_count,
146                    "intervalSecs": self.interval_secs,
147                    "timestampMs":  Self::now_ms(),
148                }),
149            );
150        }
151        Ok(())
152    }
153
154    async fn run(&mut self) -> Result<()> {
155        self.on_start().await?;
156        let mut rx = self
157            .mailbox_rx
158            .take()
159            .ok_or_else(|| anyhow::anyhow!("TickAgent already running"))?;
160        let mut hb =
161            tokio::time::interval(Duration::from_secs(self.config.heartbeat_interval_secs));
162        let mut tick_timer = tokio::time::interval(Duration::from_secs(self.interval_secs));
163        hb.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
164        tick_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
165        // Skip the first immediate tick
166        tick_timer.tick().await;
167
168        loop {
169            tokio::select! {
170                biased;
171                msg = rx.recv() => {
172                    match msg {
173                        None    => break,
174                        Some(m) => {
175                            self.metrics.record_received();
176                            if let wactorz_core::message::MessageType::Command {
177                                command: wactorz_core::message::ActorCommand::Stop
178                            } = &m.payload { break; }
179                            match self.handle_message(m).await {
180                                Ok(_)  => self.metrics.record_processed(),
181                                Err(e) => { tracing::error!("[{}] {e}", self.config.name); self.metrics.record_failed(); }
182                            }
183                        }
184                    }
185                }
186                _ = tick_timer.tick() => {
187                    self.fire_tick();
188                }
189                _ = hb.tick() => {
190                    self.metrics.record_heartbeat();
191                    if let Err(e) = self.on_heartbeat().await {
192                        tracing::error!("[{}] heartbeat: {e}", self.config.name);
193                    }
194                }
195            }
196        }
197        self.state = ActorState::Stopped;
198        self.on_stop().await
199    }
200}