pub trait Actor:
Send
+ Sync
+ 'static {
// Required methods
fn id(&self) -> String;
fn name(&self) -> &str;
fn state(&self) -> ActorState;
fn metrics(&self) -> Arc<ActorMetrics>;
fn mailbox(&self) -> Sender<Message>;
fn handle_message<'life0, 'async_trait>(
&'life0 mut self,
message: Message,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
// Provided methods
fn is_protected(&self) -> bool { ... }
fn on_start<'life0, 'async_trait>(
&'life0 mut self,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn on_heartbeat<'life0, 'async_trait>(
&'life0 mut self,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn on_stop<'life0, 'async_trait>(
&'life0 mut self,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn run<'life0, 'async_trait>(
&'life0 mut self,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
}Expand description
The core Actor trait.
Implementors must be Send + Sync so they can be driven by Tokio tasks.
The actor loop is started by calling Actor::run, which typically:
- Calls
Actor::on_start - Polls the mailbox and calls
Actor::handle_messagefor each message - Emits heartbeats on a timer
- Calls
Actor::on_stopon shutdown
Required Methods§
Sourcefn state(&self) -> ActorState
fn state(&self) -> ActorState
Return the current lifecycle state.
Sourcefn metrics(&self) -> Arc<ActorMetrics>
fn metrics(&self) -> Arc<ActorMetrics>
Return a reference to this actor’s metrics.
Provided Methods§
Sourcefn is_protected(&self) -> bool
fn is_protected(&self) -> bool
Return whether this actor is protected from external kill commands.
Sourcefn on_start<'life0, 'async_trait>(
&'life0 mut self,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn on_start<'life0, 'async_trait>(
&'life0 mut self,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Called once after the actor is created, before the message loop starts.
Sourcefn on_heartbeat<'life0, 'async_trait>(
&'life0 mut self,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn on_heartbeat<'life0, 'async_trait>(
&'life0 mut self,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Called on heartbeat tick; default implementation is a no-op.
Sourcefn on_stop<'life0, 'async_trait>(
&'life0 mut self,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn on_stop<'life0, 'async_trait>(
&'life0 mut self,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Called once just before the actor loop exits.
Sourcefn run<'life0, 'async_trait>(
&'life0 mut self,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn run<'life0, 'async_trait>(
&'life0 mut self,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Drive the actor’s main loop.
Default implementation: each concrete actor MUST override this method. The default returns an error to indicate it must be overridden.
Pattern for concrete actors:
ⓘ
async fn run(&mut self) -> Result<()> {
self.on_start().await?;
let mut rx = self.mailbox_rx.take()
.ok_or_else(|| anyhow::anyhow!("already running"))?;
let mut hb = tokio::time::interval(Duration::from_secs(self.config.heartbeat_interval_secs));
hb.set_missed_tick_behavior(MissedTickBehavior::Skip);
loop {
tokio::select! {
biased;
msg = rx.recv() => {
match msg {
None => break,
Some(m) => {
self.metrics.record_received();
if let MessageType::Command { command: ActorCommand::Stop } = &m.payload { break; }
match self.handle_message(m).await {
Ok(_) => self.metrics.record_processed(),
Err(e) => { tracing::error!("[{}] {e}", self.config.name); self.metrics.record_failed(); }
}
}
}
}
_ = hb.tick() => {
self.metrics.record_heartbeat();
if let Err(e) = self.on_heartbeat().await { tracing::error!("[{}] hb: {e}", self.config.name); }
}
}
}
self.on_stop().await
}