wactorz_core/actor.rs
1//! Actor trait and lifecycle state machine.
2//!
3//! Every agent in AgentFlow implements [`Actor`]. The trait mirrors the Python
4//! base `Actor` class: actors receive [`Message`]s via an async mailbox, emit
5//! heartbeats, and transition through a well-defined [`ActorState`] lifecycle.
6
7use std::sync::Arc;
8
9use async_trait::async_trait;
10use serde::{Deserialize, Serialize};
11use tokio::sync::mpsc;
12
13use crate::message::Message;
14use crate::metrics::ActorMetrics;
15
16/// Lifecycle states of an actor.
17///
18/// Transitions: `Initializing` → `Running` → `Paused` ⇄ `Running` → `Stopped`
19/// Errors can force the actor into `Failed` from any running state.
20#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
21#[serde(rename_all = "snake_case")]
22pub enum ActorState {
23 /// Actor is being initialised (resources not yet ready).
24 Initializing,
25 /// Actor is processing messages normally.
26 Running,
27 /// Actor is temporarily suspended; mailbox still buffering.
28 Paused,
29 /// Actor has been cleanly shut down.
30 Stopped,
31 /// Actor encountered an unrecoverable error.
32 Failed(String),
33}
34
35impl std::fmt::Display for ActorState {
36 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37 match self {
38 ActorState::Initializing => write!(f, "initializing"),
39 ActorState::Running => write!(f, "running"),
40 ActorState::Paused => write!(f, "paused"),
41 ActorState::Stopped => write!(f, "stopped"),
42 ActorState::Failed(e) => write!(f, "failed({e})"),
43 }
44 }
45}
46
47/// Static configuration supplied when creating an actor.
48#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct ActorConfig {
50 /// Human-readable name.
51 pub name: String,
52 /// HLC-WID for this actor (time-ordered, node-scoped, collision-resistant).
53 pub id: String,
54 /// Maximum number of messages buffered in the mailbox.
55 pub mailbox_capacity: usize,
56 /// Heartbeat interval in seconds.
57 pub heartbeat_interval_secs: u64,
58 /// Whether this actor is protected from external termination.
59 pub protected: bool,
60}
61
62/// Sanitise an actor name into a valid HLC-WID node segment (`[A-Za-z0-9_]+`).
63fn sanitize_node_name(name: &str) -> String {
64 let s: String = name
65 .chars()
66 .map(|c| {
67 if c.is_alphanumeric() || c == '_' {
68 c
69 } else {
70 '_'
71 }
72 })
73 .take(20)
74 .collect();
75 if s.is_empty() { "actor".to_string() } else { s }
76}
77
78impl ActorConfig {
79 /// Create a config with a fresh HLC-WID derived from `name`.
80 pub fn new(name: impl Into<String>) -> Self {
81 let name = name.into();
82 let node = sanitize_node_name(&name);
83 let id = wid::HLCWidGen::new(node, 4, 0)
84 .unwrap_or_else(|_| wid::HLCWidGen::new("actor".to_string(), 4, 0).unwrap())
85 .next_hlc_wid();
86 Self {
87 name,
88 id,
89 mailbox_capacity: 1000,
90 heartbeat_interval_secs: 30,
91 protected: false,
92 }
93 }
94
95 /// Create a config with an explicit HLC-WID node segment (e.g. a NATO alphabet name).
96 ///
97 /// The `node` value is sanitised the same way as in [`Self::new`], then used
98 /// verbatim as the node tag in the generated HLC-WID. This makes actor IDs
99 /// human-readable and stable across renames.
100 ///
101 /// # Example
102 /// ```ignore
103 /// ActorConfig::new_with_node("wif-agent", "india")
104 /// // → id: "20260303T120000.0001-india"
105 /// ```
106 pub fn new_with_node(name: impl Into<String>, node: impl Into<String>) -> Self {
107 let name = name.into();
108 let node = sanitize_node_name(&node.into());
109 let id = wid::HLCWidGen::new(node, 4, 0)
110 .unwrap_or_else(|_| wid::HLCWidGen::new("actor".to_string(), 4, 0).unwrap())
111 .next_hlc_wid();
112 Self {
113 name,
114 id,
115 mailbox_capacity: 1000,
116 heartbeat_interval_secs: 30,
117 protected: false,
118 }
119 }
120
121 /// Mark this actor as protected (cannot be killed externally).
122 pub fn protected(mut self) -> Self {
123 self.protected = true;
124 self
125 }
126}
127
128/// The core Actor trait.
129///
130/// Implementors must be `Send + Sync` so they can be driven by Tokio tasks.
131/// The actor loop is started by calling [`Actor::run`], which typically:
132/// 1. Calls [`Actor::on_start`]
133/// 2. Polls the mailbox and calls [`Actor::handle_message`] for each message
134/// 3. Emits heartbeats on a timer
135/// 4. Calls [`Actor::on_stop`] on shutdown
136#[async_trait]
137pub trait Actor: Send + Sync + 'static {
138 /// Return this actor's unique WID identifier.
139 fn id(&self) -> String;
140
141 /// Return this actor's human-readable name.
142 fn name(&self) -> &str;
143
144 /// Return the current lifecycle state.
145 fn state(&self) -> ActorState;
146
147 /// Return a reference to this actor's metrics.
148 fn metrics(&self) -> Arc<ActorMetrics>;
149
150 /// Return a sender handle to this actor's mailbox.
151 fn mailbox(&self) -> mpsc::Sender<Message>;
152
153 /// Return whether this actor is protected from external kill commands.
154 fn is_protected(&self) -> bool {
155 false
156 }
157
158 /// Called once after the actor is created, before the message loop starts.
159 async fn on_start(&mut self) -> anyhow::Result<()> {
160 Ok(())
161 }
162
163 /// Called with each incoming message from the mailbox.
164 async fn handle_message(&mut self, message: Message) -> anyhow::Result<()>;
165
166 /// Called on heartbeat tick; default implementation is a no-op.
167 async fn on_heartbeat(&mut self) -> anyhow::Result<()> {
168 Ok(())
169 }
170
171 /// Called once just before the actor loop exits.
172 async fn on_stop(&mut self) -> anyhow::Result<()> {
173 Ok(())
174 }
175
176 /// Drive the actor's main loop.
177 ///
178 /// Default implementation: each concrete actor MUST override this method.
179 /// The default returns an error to indicate it must be overridden.
180 ///
181 /// Pattern for concrete actors:
182 /// ```ignore
183 /// async fn run(&mut self) -> Result<()> {
184 /// self.on_start().await?;
185 /// let mut rx = self.mailbox_rx.take()
186 /// .ok_or_else(|| anyhow::anyhow!("already running"))?;
187 /// let mut hb = tokio::time::interval(Duration::from_secs(self.config.heartbeat_interval_secs));
188 /// hb.set_missed_tick_behavior(MissedTickBehavior::Skip);
189 /// loop {
190 /// tokio::select! {
191 /// biased;
192 /// msg = rx.recv() => {
193 /// match msg {
194 /// None => break,
195 /// Some(m) => {
196 /// self.metrics.record_received();
197 /// if let MessageType::Command { command: ActorCommand::Stop } = &m.payload { break; }
198 /// match self.handle_message(m).await {
199 /// Ok(_) => self.metrics.record_processed(),
200 /// Err(e) => { tracing::error!("[{}] {e}", self.config.name); self.metrics.record_failed(); }
201 /// }
202 /// }
203 /// }
204 /// }
205 /// _ = hb.tick() => {
206 /// self.metrics.record_heartbeat();
207 /// if let Err(e) = self.on_heartbeat().await { tracing::error!("[{}] hb: {e}", self.config.name); }
208 /// }
209 /// }
210 /// }
211 /// self.on_stop().await
212 /// }
213 /// ```
214 async fn run(&mut self) -> anyhow::Result<()> {
215 anyhow::bail!("Actor::run() must be overridden by each concrete actor")
216 }
217}