wactorz_core/
actor.rs

1//! Actor trait and lifecycle state machine.
2//!
3//! Every agent in AgentFlow implements [`Actor`]. The trait mirrors the Python
4//! base `Actor` class: actors receive [`Message`]s via an async mailbox, emit
5//! heartbeats, and transition through a well-defined [`ActorState`] lifecycle.
6
7use std::sync::Arc;
8
9use async_trait::async_trait;
10use serde::{Deserialize, Serialize};
11use tokio::sync::mpsc;
12
13use crate::message::Message;
14use crate::metrics::ActorMetrics;
15
16/// Lifecycle states of an actor.
17///
18/// Transitions: `Initializing` → `Running` → `Paused` ⇄ `Running` → `Stopped`
19/// Errors can force the actor into `Failed` from any running state.
20#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
21#[serde(rename_all = "snake_case")]
22pub enum ActorState {
23    /// Actor is being initialised (resources not yet ready).
24    Initializing,
25    /// Actor is processing messages normally.
26    Running,
27    /// Actor is temporarily suspended; mailbox still buffering.
28    Paused,
29    /// Actor has been cleanly shut down.
30    Stopped,
31    /// Actor encountered an unrecoverable error.
32    Failed(String),
33}
34
35impl std::fmt::Display for ActorState {
36    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37        match self {
38            ActorState::Initializing => write!(f, "initializing"),
39            ActorState::Running => write!(f, "running"),
40            ActorState::Paused => write!(f, "paused"),
41            ActorState::Stopped => write!(f, "stopped"),
42            ActorState::Failed(e) => write!(f, "failed({e})"),
43        }
44    }
45}
46
47/// Static configuration supplied when creating an actor.
48#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct ActorConfig {
50    /// Human-readable name.
51    pub name: String,
52    /// HLC-WID for this actor (time-ordered, node-scoped, collision-resistant).
53    pub id: String,
54    /// Maximum number of messages buffered in the mailbox.
55    pub mailbox_capacity: usize,
56    /// Heartbeat interval in seconds.
57    pub heartbeat_interval_secs: u64,
58    /// Whether this actor is protected from external termination.
59    pub protected: bool,
60}
61
62/// Sanitise an actor name into a valid HLC-WID node segment (`[A-Za-z0-9_]+`).
63fn sanitize_node_name(name: &str) -> String {
64    let s: String = name
65        .chars()
66        .map(|c| {
67            if c.is_alphanumeric() || c == '_' {
68                c
69            } else {
70                '_'
71            }
72        })
73        .take(20)
74        .collect();
75    if s.is_empty() { "actor".to_string() } else { s }
76}
77
78impl ActorConfig {
79    /// Create a config with a fresh HLC-WID derived from `name`.
80    pub fn new(name: impl Into<String>) -> Self {
81        let name = name.into();
82        let node = sanitize_node_name(&name);
83        let id = wid::HLCWidGen::new(node, 4, 0)
84            .unwrap_or_else(|_| wid::HLCWidGen::new("actor".to_string(), 4, 0).unwrap())
85            .next_hlc_wid();
86        Self {
87            name,
88            id,
89            mailbox_capacity: 1000,
90            heartbeat_interval_secs: 30,
91            protected: false,
92        }
93    }
94
95    /// Create a config with an explicit HLC-WID node segment (e.g. a NATO alphabet name).
96    ///
97    /// The `node` value is sanitised the same way as in [`Self::new`], then used
98    /// verbatim as the node tag in the generated HLC-WID. This makes actor IDs
99    /// human-readable and stable across renames.
100    ///
101    /// # Example
102    /// ```ignore
103    /// ActorConfig::new_with_node("wif-agent", "india")
104    /// // → id: "20260303T120000.0001-india"
105    /// ```
106    pub fn new_with_node(name: impl Into<String>, node: impl Into<String>) -> Self {
107        let name = name.into();
108        let node = sanitize_node_name(&node.into());
109        let id = wid::HLCWidGen::new(node, 4, 0)
110            .unwrap_or_else(|_| wid::HLCWidGen::new("actor".to_string(), 4, 0).unwrap())
111            .next_hlc_wid();
112        Self {
113            name,
114            id,
115            mailbox_capacity: 1000,
116            heartbeat_interval_secs: 30,
117            protected: false,
118        }
119    }
120
121    /// Mark this actor as protected (cannot be killed externally).
122    pub fn protected(mut self) -> Self {
123        self.protected = true;
124        self
125    }
126}
127
128/// The core Actor trait.
129///
130/// Implementors must be `Send + Sync` so they can be driven by Tokio tasks.
131/// The actor loop is started by calling [`Actor::run`], which typically:
132/// 1. Calls [`Actor::on_start`]
133/// 2. Polls the mailbox and calls [`Actor::handle_message`] for each message
134/// 3. Emits heartbeats on a timer
135/// 4. Calls [`Actor::on_stop`] on shutdown
136#[async_trait]
137pub trait Actor: Send + Sync + 'static {
138    /// Return this actor's unique WID identifier.
139    fn id(&self) -> String;
140
141    /// Return this actor's human-readable name.
142    fn name(&self) -> &str;
143
144    /// Return the current lifecycle state.
145    fn state(&self) -> ActorState;
146
147    /// Return a reference to this actor's metrics.
148    fn metrics(&self) -> Arc<ActorMetrics>;
149
150    /// Return a sender handle to this actor's mailbox.
151    fn mailbox(&self) -> mpsc::Sender<Message>;
152
153    /// Return whether this actor is protected from external kill commands.
154    fn is_protected(&self) -> bool {
155        false
156    }
157
158    /// Called once after the actor is created, before the message loop starts.
159    async fn on_start(&mut self) -> anyhow::Result<()> {
160        Ok(())
161    }
162
163    /// Called with each incoming message from the mailbox.
164    async fn handle_message(&mut self, message: Message) -> anyhow::Result<()>;
165
166    /// Called on heartbeat tick; default implementation is a no-op.
167    async fn on_heartbeat(&mut self) -> anyhow::Result<()> {
168        Ok(())
169    }
170
171    /// Called once just before the actor loop exits.
172    async fn on_stop(&mut self) -> anyhow::Result<()> {
173        Ok(())
174    }
175
176    /// Drive the actor's main loop.
177    ///
178    /// Default implementation: each concrete actor MUST override this method.
179    /// The default returns an error to indicate it must be overridden.
180    ///
181    /// Pattern for concrete actors:
182    /// ```ignore
183    /// async fn run(&mut self) -> Result<()> {
184    ///     self.on_start().await?;
185    ///     let mut rx = self.mailbox_rx.take()
186    ///         .ok_or_else(|| anyhow::anyhow!("already running"))?;
187    ///     let mut hb = tokio::time::interval(Duration::from_secs(self.config.heartbeat_interval_secs));
188    ///     hb.set_missed_tick_behavior(MissedTickBehavior::Skip);
189    ///     loop {
190    ///         tokio::select! {
191    ///             biased;
192    ///             msg = rx.recv() => {
193    ///                 match msg {
194    ///                     None => break,
195    ///                     Some(m) => {
196    ///                         self.metrics.record_received();
197    ///                         if let MessageType::Command { command: ActorCommand::Stop } = &m.payload { break; }
198    ///                         match self.handle_message(m).await {
199    ///                             Ok(_) => self.metrics.record_processed(),
200    ///                             Err(e) => { tracing::error!("[{}] {e}", self.config.name); self.metrics.record_failed(); }
201    ///                         }
202    ///                     }
203    ///                 }
204    ///             }
205    ///             _ = hb.tick() => {
206    ///                 self.metrics.record_heartbeat();
207    ///                 if let Err(e) = self.on_heartbeat().await { tracing::error!("[{}] hb: {e}", self.config.name); }
208    ///             }
209    ///         }
210    ///     }
211    ///     self.on_stop().await
212    /// }
213    /// ```
214    async fn run(&mut self) -> anyhow::Result<()> {
215        anyhow::bail!("Actor::run() must be overridden by each concrete actor")
216    }
217}