Actor

Trait Actor 

Source
pub trait Actor:
    Send
    + Sync
    + 'static {
    // Required methods
    fn id(&self) -> String;
    fn name(&self) -> &str;
    fn state(&self) -> ActorState;
    fn metrics(&self) -> Arc<ActorMetrics>;
    fn mailbox(&self) -> Sender<Message>;
    fn handle_message<'life0, 'async_trait>(
        &'life0 mut self,
        message: Message,
    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait;

    // Provided methods
    fn is_protected(&self) -> bool { ... }
    fn on_start<'life0, 'async_trait>(
        &'life0 mut self,
    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait { ... }
    fn on_heartbeat<'life0, 'async_trait>(
        &'life0 mut self,
    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait { ... }
    fn on_stop<'life0, 'async_trait>(
        &'life0 mut self,
    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait { ... }
    fn run<'life0, 'async_trait>(
        &'life0 mut self,
    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait { ... }
}
Expand description

The core Actor trait.

Implementors must be Send + Sync so they can be driven by Tokio tasks. The actor loop is started by calling Actor::run, which typically:

  1. Calls Actor::on_start
  2. Polls the mailbox and calls Actor::handle_message for each message
  3. Emits heartbeats on a timer
  4. Calls Actor::on_stop on shutdown

Required Methods§

Source

fn id(&self) -> String

Return this actor’s unique WID identifier.

Source

fn name(&self) -> &str

Return this actor’s human-readable name.

Source

fn state(&self) -> ActorState

Return the current lifecycle state.

Source

fn metrics(&self) -> Arc<ActorMetrics>

Return a reference to this actor’s metrics.

Source

fn mailbox(&self) -> Sender<Message>

Return a sender handle to this actor’s mailbox.

Source

fn handle_message<'life0, 'async_trait>( &'life0 mut self, message: Message, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Called with each incoming message from the mailbox.

Provided Methods§

Source

fn is_protected(&self) -> bool

Return whether this actor is protected from external kill commands.

Source

fn on_start<'life0, 'async_trait>( &'life0 mut self, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Called once after the actor is created, before the message loop starts.

Source

fn on_heartbeat<'life0, 'async_trait>( &'life0 mut self, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Called on heartbeat tick; default implementation is a no-op.

Source

fn on_stop<'life0, 'async_trait>( &'life0 mut self, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Called once just before the actor loop exits.

Source

fn run<'life0, 'async_trait>( &'life0 mut self, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Drive the actor’s main loop.

Default implementation: each concrete actor MUST override this method. The default returns an error to indicate it must be overridden.

Pattern for concrete actors:

async fn run(&mut self) -> Result<()> {
    self.on_start().await?;
    let mut rx = self.mailbox_rx.take()
        .ok_or_else(|| anyhow::anyhow!("already running"))?;
    let mut hb = tokio::time::interval(Duration::from_secs(self.config.heartbeat_interval_secs));
    hb.set_missed_tick_behavior(MissedTickBehavior::Skip);
    loop {
        tokio::select! {
            biased;
            msg = rx.recv() => {
                match msg {
                    None => break,
                    Some(m) => {
                        self.metrics.record_received();
                        if let MessageType::Command { command: ActorCommand::Stop } = &m.payload { break; }
                        match self.handle_message(m).await {
                            Ok(_) => self.metrics.record_processed(),
                            Err(e) => { tracing::error!("[{}] {e}", self.config.name); self.metrics.record_failed(); }
                        }
                    }
                }
            }
            _ = hb.tick() => {
                self.metrics.record_heartbeat();
                if let Err(e) = self.on_heartbeat().await { tracing::error!("[{}] hb: {e}", self.config.name); }
            }
        }
    }
    self.on_stop().await
}

Implementors§