wactorz_agents/
io_agent.rs1use anyhow::Result;
10use async_trait::async_trait;
11use std::sync::Arc;
12use tokio::sync::mpsc;
13
14use wactorz_core::{
15 Actor, ActorConfig, ActorMetrics, ActorState, ActorSystem, EventPublisher, Message,
16};
17
18pub struct IOAgent {
20 config: ActorConfig,
21 system: ActorSystem,
22 state: ActorState,
23 metrics: Arc<ActorMetrics>,
24 mailbox_tx: mpsc::Sender<Message>,
25 mailbox_rx: Option<mpsc::Receiver<Message>>,
26 publisher: Option<EventPublisher>,
27}
28
29impl IOAgent {
30 pub fn new(config: ActorConfig, system: ActorSystem) -> Self {
31 let (tx, rx) = mpsc::channel(config.mailbox_capacity);
32 Self {
33 config,
34 system,
35 state: ActorState::Initializing,
36 metrics: Arc::new(ActorMetrics::new()),
37 mailbox_tx: tx,
38 mailbox_rx: Some(rx),
39 publisher: None,
40 }
41 }
42
43 pub fn with_publisher(mut self, p: EventPublisher) -> Self {
45 self.publisher = Some(p);
46 self
47 }
48
49 fn now_ms() -> u64 {
50 std::time::SystemTime::now()
51 .duration_since(std::time::UNIX_EPOCH)
52 .unwrap_or_default()
53 .as_millis() as u64
54 }
55
56 fn parse_mention(text: &str) -> (&str, &str) {
58 if let Some(stripped) = text.strip_prefix('@') {
59 if let Some(sp) = stripped.find(' ') {
60 return (&stripped[..sp], stripped[sp + 1..].trim());
61 }
62 return (stripped, "");
64 }
65 ("main-actor", text)
66 }
67
68 fn publish_error(&self, error_msg: &str) {
70 if let Some(pub_) = &self.publisher {
71 pub_.publish(
72 wactorz_mqtt::topics::chat(&self.config.id),
73 &serde_json::json!({
74 "from": self.config.name,
75 "to": "user",
76 "content": error_msg,
77 "timestampMs": Self::now_ms(),
78 }),
79 );
80 }
81 }
82}
83
84#[async_trait]
85impl Actor for IOAgent {
86 fn id(&self) -> String {
87 self.config.id.clone()
88 }
89 fn name(&self) -> &str {
90 &self.config.name
91 }
92 fn state(&self) -> ActorState {
93 self.state.clone()
94 }
95 fn metrics(&self) -> Arc<ActorMetrics> {
96 Arc::clone(&self.metrics)
97 }
98 fn mailbox(&self) -> mpsc::Sender<Message> {
99 self.mailbox_tx.clone()
100 }
101 fn is_protected(&self) -> bool {
102 self.config.protected
103 }
104
105 async fn on_start(&mut self) -> Result<()> {
106 self.state = ActorState::Running;
107 if let Some(pub_) = &self.publisher {
108 pub_.publish(
109 wactorz_mqtt::topics::spawn(&self.config.id),
110 &serde_json::json!({
111 "agentId": self.config.id,
112 "agentName": self.config.name,
113 "agentType": "gateway",
114 "timestampMs": Self::now_ms(),
115 }),
116 );
117 }
118 Ok(())
119 }
120
121 async fn handle_message(&mut self, message: Message) -> Result<()> {
122 use wactorz_core::message::MessageType;
123 let content = match &message.payload {
124 MessageType::Text { content } => content.clone(),
125 MessageType::Task { description, .. } => description.clone(),
126 _ => return Ok(()),
127 };
128
129 let (target_name, body) = Self::parse_mention(&content);
130 if body.is_empty() {
131 self.publish_error("Empty message — nothing to forward.");
132 return Ok(());
133 }
134
135 match self.system.registry.get_by_name(target_name).await {
136 Some(entry) => {
137 let msg = Message::text(
138 Some(self.config.name.clone()),
139 Some(entry.id.clone()),
140 body.to_string(),
141 );
142 if let Err(e) = self.system.registry.send(&entry.id, msg).await {
143 let err = format!("Failed to deliver to @{target_name}: {e}");
144 tracing::warn!("{err}");
145 self.publish_error(&err);
146 }
147 }
148 None => {
149 let err = format!("Agent @{target_name} not found.");
150 tracing::warn!("{err}");
151 self.publish_error(&err);
152 }
153 }
154 Ok(())
155 }
156
157 async fn on_heartbeat(&mut self) -> Result<()> {
158 if let Some(pub_) = &self.publisher {
159 pub_.publish(
160 wactorz_mqtt::topics::heartbeat(&self.config.id),
161 &serde_json::json!({
162 "agentId": self.config.id,
163 "agentName": self.config.name,
164 "state": self.state,
165 "timestampMs": Self::now_ms(),
166 }),
167 );
168 }
169 Ok(())
170 }
171
172 async fn run(&mut self) -> Result<()> {
173 self.on_start().await?;
174 let mut rx = self
175 .mailbox_rx
176 .take()
177 .ok_or_else(|| anyhow::anyhow!("IOAgent already running"))?;
178 let mut hb = tokio::time::interval(std::time::Duration::from_secs(
179 self.config.heartbeat_interval_secs,
180 ));
181 hb.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
182 loop {
183 tokio::select! {
184 biased;
185 msg = rx.recv() => {
186 match msg {
187 None => break,
188 Some(m) => {
189 self.metrics.record_received();
190 if let wactorz_core::message::MessageType::Command {
191 command: wactorz_core::message::ActorCommand::Stop
192 } = &m.payload {
193 break;
194 }
195 match self.handle_message(m).await {
196 Ok(_) => self.metrics.record_processed(),
197 Err(e) => {
198 tracing::error!("[{}] {e}", self.config.name);
199 self.metrics.record_failed();
200 }
201 }
202 }
203 }
204 }
205 _ = hb.tick() => {
206 self.metrics.record_heartbeat();
207 if let Err(e) = self.on_heartbeat().await {
208 tracing::error!("[{}] heartbeat: {e}", self.config.name);
209 }
210 }
211 }
212 }
213 self.state = ActorState::Stopped;
214 self.on_stop().await
215 }
216}