wactorz_agents/
installer_agent.rs1use anyhow::Result;
8use async_trait::async_trait;
9use std::sync::Arc;
10use tokio::sync::mpsc;
11
12use wactorz_core::{Actor, ActorConfig, ActorMetrics, ActorState, EventPublisher, Message};
13
14pub struct InstallerAgent {
15 config: ActorConfig,
16 state: ActorState,
17 metrics: Arc<ActorMetrics>,
18 mailbox_tx: mpsc::Sender<Message>,
19 mailbox_rx: Option<mpsc::Receiver<Message>>,
20 publisher: Option<EventPublisher>,
21}
22
23impl InstallerAgent {
24 pub fn new(config: ActorConfig) -> Self {
25 let (tx, rx) = mpsc::channel(config.mailbox_capacity);
26 Self {
27 config,
28 state: ActorState::Initializing,
29 metrics: Arc::new(ActorMetrics::new()),
30 mailbox_tx: tx,
31 mailbox_rx: Some(rx),
32 publisher: None,
33 }
34 }
35
36 pub fn with_publisher(mut self, p: EventPublisher) -> Self {
37 self.publisher = Some(p);
38 self
39 }
40
41 async fn pip_install(&self, packages: &[String]) -> (bool, String) {
43 if packages.is_empty() {
44 return (true, "No packages to install.".into());
45 }
46
47 let pip = if which_pip("pip3") { "pip3" } else { "pip" };
49 let mut args = vec!["install", "--quiet"];
50 let pkg_refs: Vec<&str> = packages.iter().map(|s| s.as_str()).collect();
51 args.extend_from_slice(&pkg_refs);
52
53 tracing::info!(
54 "[{}] Running: {} install {}",
55 self.config.name,
56 pip,
57 packages.join(" ")
58 );
59
60 match tokio::process::Command::new(pip).args(&args).output().await {
61 Ok(out) => {
62 let _stdout = String::from_utf8_lossy(&out.stdout).to_string();
63 let stderr = String::from_utf8_lossy(&out.stderr).to_string();
64 let success = out.status.success();
65 let output = if success {
66 format!("Installed: {}", packages.join(", "))
67 } else {
68 format!("pip error:\n{stderr}")
69 };
70 (success, output)
71 }
72 Err(e) => (false, format!("Failed to run pip: {e}")),
73 }
74 }
75
76 fn now_ms() -> u64 {
77 std::time::SystemTime::now()
78 .duration_since(std::time::UNIX_EPOCH)
79 .unwrap_or_default()
80 .as_millis() as u64
81 }
82}
83
84fn which_pip(cmd: &str) -> bool {
85 std::process::Command::new("which")
86 .arg(cmd)
87 .output()
88 .map(|o| o.status.success())
89 .unwrap_or(false)
90}
91
92#[async_trait]
93impl Actor for InstallerAgent {
94 fn id(&self) -> String {
95 self.config.id.clone()
96 }
97 fn name(&self) -> &str {
98 &self.config.name
99 }
100 fn state(&self) -> ActorState {
101 self.state.clone()
102 }
103 fn metrics(&self) -> Arc<ActorMetrics> {
104 Arc::clone(&self.metrics)
105 }
106 fn mailbox(&self) -> mpsc::Sender<Message> {
107 self.mailbox_tx.clone()
108 }
109
110 async fn on_start(&mut self) -> Result<()> {
111 self.state = ActorState::Running;
112 if let Some(pub_) = &self.publisher {
113 pub_.publish(
114 wactorz_mqtt::topics::spawn(&self.config.id),
115 &serde_json::json!({
116 "agentId": self.config.id,
117 "agentName": self.config.name,
118 "agentType": "installer",
119 "timestampMs": Self::now_ms(),
120 }),
121 );
122 }
123 Ok(())
124 }
125
126 async fn handle_message(&mut self, message: Message) -> Result<()> {
127 use wactorz_core::message::MessageType;
128
129 let packages: Vec<String> = match &message.payload {
130 MessageType::Text { content } => {
131 content
133 .split([' ', ','])
134 .map(|s| s.trim().to_string())
135 .filter(|s| !s.is_empty())
136 .collect()
137 }
138 MessageType::Task { payload, .. } => {
139 payload
141 .get("packages")
142 .and_then(|v| v.as_array())
143 .map(|arr| {
144 arr.iter()
145 .filter_map(|v| v.as_str().map(|s| s.to_string()))
146 .collect()
147 })
148 .unwrap_or_default()
149 }
150 _ => return Ok(()),
151 };
152
153 let (success, output) = self.pip_install(&packages).await;
154
155 if let Some(pub_) = &self.publisher {
156 pub_.publish(
157 wactorz_mqtt::topics::chat(&self.config.id),
158 &serde_json::json!({
159 "from": self.config.name,
160 "to": message.from.as_deref().unwrap_or("main"),
161 "content": output,
162 "success": success,
163 "packages": packages,
164 "timestampMs": Self::now_ms(),
165 }),
166 );
167 }
168
169 if success {
170 self.metrics.record_processed();
171 } else {
172 self.metrics.record_failed();
173 }
174 Ok(())
175 }
176
177 async fn on_heartbeat(&mut self) -> Result<()> {
178 if let Some(pub_) = &self.publisher {
179 pub_.publish(
180 wactorz_mqtt::topics::heartbeat(&self.config.id),
181 &serde_json::json!({
182 "agentId": self.config.id,
183 "agentName": self.config.name,
184 "state": self.state,
185 "timestampMs": Self::now_ms(),
186 }),
187 );
188 }
189 Ok(())
190 }
191
192 async fn run(&mut self) -> Result<()> {
193 self.on_start().await?;
194 let mut rx = self
195 .mailbox_rx
196 .take()
197 .ok_or_else(|| anyhow::anyhow!("InstallerAgent already running"))?;
198 let mut hb = tokio::time::interval(std::time::Duration::from_secs(
199 self.config.heartbeat_interval_secs,
200 ));
201 hb.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
202 loop {
203 tokio::select! {
204 biased;
205 msg = rx.recv() => {
206 match msg {
207 None => break,
208 Some(m) => {
209 self.metrics.record_received();
210 if let wactorz_core::message::MessageType::Command {
211 command: wactorz_core::message::ActorCommand::Stop
212 } = &m.payload { break; }
213 if let Err(e) = self.handle_message(m).await {
214 tracing::error!("[{}] {e}", self.config.name);
215 self.metrics.record_failed();
216 }
217 }
218 }
219 }
220 _ = hb.tick() => {
221 self.metrics.record_heartbeat();
222 if let Err(e) = self.on_heartbeat().await {
223 tracing::error!("[{}] heartbeat: {e}", self.config.name);
224 }
225 }
226 }
227 }
228 self.state = ActorState::Stopped;
229 self.on_stop().await
230 }
231}