wactorz_agents/
tick_agent.rs1use anyhow::Result;
11use async_trait::async_trait;
12use std::sync::Arc;
13use std::time::Duration;
14use tokio::sync::mpsc;
15
16use wactorz_core::{Actor, ActorConfig, ActorMetrics, ActorState, EventPublisher, Message};
17
18pub struct TickAgent {
19 config: ActorConfig,
20 interval_secs: u64,
21 script: Option<String>,
23 tick_count: u64,
24 state: ActorState,
25 metrics: Arc<ActorMetrics>,
26 mailbox_tx: mpsc::Sender<Message>,
27 mailbox_rx: Option<mpsc::Receiver<Message>>,
28 publisher: Option<EventPublisher>,
29}
30
31impl TickAgent {
32 pub fn new(config: ActorConfig, interval_secs: u64) -> Self {
33 let (tx, rx) = mpsc::channel(config.mailbox_capacity);
34 Self {
35 config,
36 interval_secs,
37 script: None,
38 tick_count: 0,
39 state: ActorState::Initializing,
40 metrics: Arc::new(ActorMetrics::new()),
41 mailbox_tx: tx,
42 mailbox_rx: Some(rx),
43 publisher: None,
44 }
45 }
46
47 pub fn with_publisher(mut self, p: EventPublisher) -> Self {
48 self.publisher = Some(p);
49 self
50 }
51
52 pub fn with_script(mut self, script: impl Into<String>) -> Self {
53 self.script = Some(script.into());
54 self
55 }
56
57 fn now_ms() -> u64 {
58 std::time::SystemTime::now()
59 .duration_since(std::time::UNIX_EPOCH)
60 .unwrap_or_default()
61 .as_millis() as u64
62 }
63
64 fn fire_tick(&mut self) {
65 self.tick_count += 1;
66 let n = self.tick_count;
67
68 if let Some(pub_) = &self.publisher {
69 pub_.publish(
70 format!("agents/{}/tick", self.config.id),
71 &serde_json::json!({
72 "agentId": self.config.id,
73 "agentName": self.config.name,
74 "tickCount": n,
75 "intervalSecs": self.interval_secs,
76 "timestampMs": Self::now_ms(),
77 }),
78 );
79 }
80
81 tracing::debug!("[{}] tick #{n}", self.config.name);
82 }
83}
84
85#[async_trait]
86impl Actor for TickAgent {
87 fn id(&self) -> String {
88 self.config.id.clone()
89 }
90 fn name(&self) -> &str {
91 &self.config.name
92 }
93 fn state(&self) -> ActorState {
94 self.state.clone()
95 }
96 fn metrics(&self) -> Arc<ActorMetrics> {
97 Arc::clone(&self.metrics)
98 }
99 fn mailbox(&self) -> mpsc::Sender<Message> {
100 self.mailbox_tx.clone()
101 }
102
103 async fn on_start(&mut self) -> Result<()> {
104 self.state = ActorState::Running;
105 tracing::info!(
106 "[{}] Tick agent started (interval={}s)",
107 self.config.name,
108 self.interval_secs
109 );
110 if let Some(pub_) = &self.publisher {
111 pub_.publish(
112 wactorz_mqtt::topics::spawn(&self.config.id),
113 &serde_json::json!({
114 "agentId": self.config.id,
115 "agentName": self.config.name,
116 "agentType": "tick",
117 "intervalSecs": self.interval_secs,
118 "timestampMs": Self::now_ms(),
119 }),
120 );
121 }
122 Ok(())
123 }
124
125 async fn handle_message(&mut self, message: Message) -> Result<()> {
126 use wactorz_core::message::MessageType;
127 if let MessageType::Task { payload, .. } = &message.payload
129 && let Some(n) = payload.get("interval_secs").and_then(|v| v.as_u64())
130 {
131 self.interval_secs = n;
132 tracing::info!("[{}] interval changed to {n}s", self.config.name);
133 }
134 Ok(())
135 }
136
137 async fn on_heartbeat(&mut self) -> Result<()> {
138 if let Some(pub_) = &self.publisher {
139 pub_.publish(
140 wactorz_mqtt::topics::heartbeat(&self.config.id),
141 &serde_json::json!({
142 "agentId": self.config.id,
143 "agentName": self.config.name,
144 "state": self.state,
145 "tickCount": self.tick_count,
146 "intervalSecs": self.interval_secs,
147 "timestampMs": Self::now_ms(),
148 }),
149 );
150 }
151 Ok(())
152 }
153
154 async fn run(&mut self) -> Result<()> {
155 self.on_start().await?;
156 let mut rx = self
157 .mailbox_rx
158 .take()
159 .ok_or_else(|| anyhow::anyhow!("TickAgent already running"))?;
160 let mut hb =
161 tokio::time::interval(Duration::from_secs(self.config.heartbeat_interval_secs));
162 let mut tick_timer = tokio::time::interval(Duration::from_secs(self.interval_secs));
163 hb.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
164 tick_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
165 tick_timer.tick().await;
167
168 loop {
169 tokio::select! {
170 biased;
171 msg = rx.recv() => {
172 match msg {
173 None => break,
174 Some(m) => {
175 self.metrics.record_received();
176 if let wactorz_core::message::MessageType::Command {
177 command: wactorz_core::message::ActorCommand::Stop
178 } = &m.payload { break; }
179 match self.handle_message(m).await {
180 Ok(_) => self.metrics.record_processed(),
181 Err(e) => { tracing::error!("[{}] {e}", self.config.name); self.metrics.record_failed(); }
182 }
183 }
184 }
185 }
186 _ = tick_timer.tick() => {
187 self.fire_tick();
188 }
189 _ = hb.tick() => {
190 self.metrics.record_heartbeat();
191 if let Err(e) = self.on_heartbeat().await {
192 tracing::error!("[{}] heartbeat: {e}", self.config.name);
193 }
194 }
195 }
196 }
197 self.state = ActorState::Stopped;
198 self.on_stop().await
199 }
200}