wactorz_agents/
io_agent.rs

1//! UI gateway agent.
2//!
3//! [`IOAgent`] is the bridge between the frontend and the actor system.
4//! It listens on the fixed MQTT topic `io/chat` and routes messages to
5//! the appropriate actor by parsing an optional `@agent-name` prefix.
6//!
7//! If no `@` prefix is given, the message is forwarded to `main-actor`.
8
9use anyhow::Result;
10use async_trait::async_trait;
11use std::sync::Arc;
12use tokio::sync::mpsc;
13
14use wactorz_core::{
15    Actor, ActorConfig, ActorMetrics, ActorState, ActorSystem, EventPublisher, Message,
16};
17
18/// The UI gateway actor.
19pub struct IOAgent {
20    config: ActorConfig,
21    system: ActorSystem,
22    state: ActorState,
23    metrics: Arc<ActorMetrics>,
24    mailbox_tx: mpsc::Sender<Message>,
25    mailbox_rx: Option<mpsc::Receiver<Message>>,
26    publisher: Option<EventPublisher>,
27}
28
29impl IOAgent {
30    pub fn new(config: ActorConfig, system: ActorSystem) -> Self {
31        let (tx, rx) = mpsc::channel(config.mailbox_capacity);
32        Self {
33            config,
34            system,
35            state: ActorState::Initializing,
36            metrics: Arc::new(ActorMetrics::new()),
37            mailbox_tx: tx,
38            mailbox_rx: Some(rx),
39            publisher: None,
40        }
41    }
42
43    /// Attach an EventPublisher for MQTT output.
44    pub fn with_publisher(mut self, p: EventPublisher) -> Self {
45        self.publisher = Some(p);
46        self
47    }
48
49    fn now_ms() -> u64 {
50        std::time::SystemTime::now()
51            .duration_since(std::time::UNIX_EPOCH)
52            .unwrap_or_default()
53            .as_millis() as u64
54    }
55
56    /// Parse `@name rest` prefix. Returns `(target_name, content)`.
57    fn parse_mention(text: &str) -> (&str, &str) {
58        if let Some(stripped) = text.strip_prefix('@') {
59            if let Some(sp) = stripped.find(' ') {
60                return (&stripped[..sp], stripped[sp + 1..].trim());
61            }
62            // whole text is @name with no body
63            return (stripped, "");
64        }
65        ("main-actor", text)
66    }
67
68    /// Send an error response back to the frontend via our own chat topic.
69    fn publish_error(&self, error_msg: &str) {
70        if let Some(pub_) = &self.publisher {
71            pub_.publish(
72                wactorz_mqtt::topics::chat(&self.config.id),
73                &serde_json::json!({
74                    "from": self.config.name,
75                    "to": "user",
76                    "content": error_msg,
77                    "timestampMs": Self::now_ms(),
78                }),
79            );
80        }
81    }
82}
83
84#[async_trait]
85impl Actor for IOAgent {
86    fn id(&self) -> String {
87        self.config.id.clone()
88    }
89    fn name(&self) -> &str {
90        &self.config.name
91    }
92    fn state(&self) -> ActorState {
93        self.state.clone()
94    }
95    fn metrics(&self) -> Arc<ActorMetrics> {
96        Arc::clone(&self.metrics)
97    }
98    fn mailbox(&self) -> mpsc::Sender<Message> {
99        self.mailbox_tx.clone()
100    }
101    fn is_protected(&self) -> bool {
102        self.config.protected
103    }
104
105    async fn on_start(&mut self) -> Result<()> {
106        self.state = ActorState::Running;
107        if let Some(pub_) = &self.publisher {
108            pub_.publish(
109                wactorz_mqtt::topics::spawn(&self.config.id),
110                &serde_json::json!({
111                    "agentId":   self.config.id,
112                    "agentName": self.config.name,
113                    "agentType": "gateway",
114                    "timestampMs": Self::now_ms(),
115                }),
116            );
117        }
118        Ok(())
119    }
120
121    async fn handle_message(&mut self, message: Message) -> Result<()> {
122        use wactorz_core::message::MessageType;
123        let content = match &message.payload {
124            MessageType::Text { content } => content.clone(),
125            MessageType::Task { description, .. } => description.clone(),
126            _ => return Ok(()),
127        };
128
129        let (target_name, body) = Self::parse_mention(&content);
130        if body.is_empty() {
131            self.publish_error("Empty message — nothing to forward.");
132            return Ok(());
133        }
134
135        match self.system.registry.get_by_name(target_name).await {
136            Some(entry) => {
137                let msg = Message::text(
138                    Some(self.config.name.clone()),
139                    Some(entry.id.clone()),
140                    body.to_string(),
141                );
142                if let Err(e) = self.system.registry.send(&entry.id, msg).await {
143                    let err = format!("Failed to deliver to @{target_name}: {e}");
144                    tracing::warn!("{err}");
145                    self.publish_error(&err);
146                }
147            }
148            None => {
149                let err = format!("Agent @{target_name} not found.");
150                tracing::warn!("{err}");
151                self.publish_error(&err);
152            }
153        }
154        Ok(())
155    }
156
157    async fn on_heartbeat(&mut self) -> Result<()> {
158        if let Some(pub_) = &self.publisher {
159            pub_.publish(
160                wactorz_mqtt::topics::heartbeat(&self.config.id),
161                &serde_json::json!({
162                    "agentId": self.config.id,
163                    "agentName": self.config.name,
164                    "state": self.state,
165                    "timestampMs": Self::now_ms(),
166                }),
167            );
168        }
169        Ok(())
170    }
171
172    async fn run(&mut self) -> Result<()> {
173        self.on_start().await?;
174        let mut rx = self
175            .mailbox_rx
176            .take()
177            .ok_or_else(|| anyhow::anyhow!("IOAgent already running"))?;
178        let mut hb = tokio::time::interval(std::time::Duration::from_secs(
179            self.config.heartbeat_interval_secs,
180        ));
181        hb.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
182        loop {
183            tokio::select! {
184                biased;
185                msg = rx.recv() => {
186                    match msg {
187                        None => break,
188                        Some(m) => {
189                            self.metrics.record_received();
190                            if let wactorz_core::message::MessageType::Command {
191                                command: wactorz_core::message::ActorCommand::Stop
192                            } = &m.payload {
193                                break;
194                            }
195                            match self.handle_message(m).await {
196                                Ok(_) => self.metrics.record_processed(),
197                                Err(e) => {
198                                    tracing::error!("[{}] {e}", self.config.name);
199                                    self.metrics.record_failed();
200                                }
201                            }
202                        }
203                    }
204                }
205                _ = hb.tick() => {
206                    self.metrics.record_heartbeat();
207                    if let Err(e) = self.on_heartbeat().await {
208                        tracing::error!("[{}] heartbeat: {e}", self.config.name);
209                    }
210                }
211            }
212        }
213        self.state = ActorState::Stopped;
214        self.on_stop().await
215    }
216}