wactorz_agents/
installer_agent.rs

1//! Package installer agent.
2//!
3//! [`InstallerAgent`] runs `pip install` (or `pip3 install`) as a subprocess
4//! and streams progress to MQTT.  Used by MainActor to install Python
5//! dependencies before spawning new dynamic agents.
6
7use anyhow::Result;
8use async_trait::async_trait;
9use std::sync::Arc;
10use tokio::sync::mpsc;
11
12use wactorz_core::{Actor, ActorConfig, ActorMetrics, ActorState, EventPublisher, Message};
13
14pub struct InstallerAgent {
15    config: ActorConfig,
16    state: ActorState,
17    metrics: Arc<ActorMetrics>,
18    mailbox_tx: mpsc::Sender<Message>,
19    mailbox_rx: Option<mpsc::Receiver<Message>>,
20    publisher: Option<EventPublisher>,
21}
22
23impl InstallerAgent {
24    pub fn new(config: ActorConfig) -> Self {
25        let (tx, rx) = mpsc::channel(config.mailbox_capacity);
26        Self {
27            config,
28            state: ActorState::Initializing,
29            metrics: Arc::new(ActorMetrics::new()),
30            mailbox_tx: tx,
31            mailbox_rx: Some(rx),
32            publisher: None,
33        }
34    }
35
36    pub fn with_publisher(mut self, p: EventPublisher) -> Self {
37        self.publisher = Some(p);
38        self
39    }
40
41    /// Run `pip install <packages>` and return (success, output).
42    async fn pip_install(&self, packages: &[String]) -> (bool, String) {
43        if packages.is_empty() {
44            return (true, "No packages to install.".into());
45        }
46
47        // Try pip3 first, fall back to pip
48        let pip = if which_pip("pip3") { "pip3" } else { "pip" };
49        let mut args = vec!["install", "--quiet"];
50        let pkg_refs: Vec<&str> = packages.iter().map(|s| s.as_str()).collect();
51        args.extend_from_slice(&pkg_refs);
52
53        tracing::info!(
54            "[{}] Running: {} install {}",
55            self.config.name,
56            pip,
57            packages.join(" ")
58        );
59
60        match tokio::process::Command::new(pip).args(&args).output().await {
61            Ok(out) => {
62                let _stdout = String::from_utf8_lossy(&out.stdout).to_string();
63                let stderr = String::from_utf8_lossy(&out.stderr).to_string();
64                let success = out.status.success();
65                let output = if success {
66                    format!("Installed: {}", packages.join(", "))
67                } else {
68                    format!("pip error:\n{stderr}")
69                };
70                (success, output)
71            }
72            Err(e) => (false, format!("Failed to run pip: {e}")),
73        }
74    }
75
76    fn now_ms() -> u64 {
77        std::time::SystemTime::now()
78            .duration_since(std::time::UNIX_EPOCH)
79            .unwrap_or_default()
80            .as_millis() as u64
81    }
82}
83
84fn which_pip(cmd: &str) -> bool {
85    std::process::Command::new("which")
86        .arg(cmd)
87        .output()
88        .map(|o| o.status.success())
89        .unwrap_or(false)
90}
91
92#[async_trait]
93impl Actor for InstallerAgent {
94    fn id(&self) -> String {
95        self.config.id.clone()
96    }
97    fn name(&self) -> &str {
98        &self.config.name
99    }
100    fn state(&self) -> ActorState {
101        self.state.clone()
102    }
103    fn metrics(&self) -> Arc<ActorMetrics> {
104        Arc::clone(&self.metrics)
105    }
106    fn mailbox(&self) -> mpsc::Sender<Message> {
107        self.mailbox_tx.clone()
108    }
109
110    async fn on_start(&mut self) -> Result<()> {
111        self.state = ActorState::Running;
112        if let Some(pub_) = &self.publisher {
113            pub_.publish(
114                wactorz_mqtt::topics::spawn(&self.config.id),
115                &serde_json::json!({
116                    "agentId":   self.config.id,
117                    "agentName": self.config.name,
118                    "agentType": "installer",
119                    "timestampMs": Self::now_ms(),
120                }),
121            );
122        }
123        Ok(())
124    }
125
126    async fn handle_message(&mut self, message: Message) -> Result<()> {
127        use wactorz_core::message::MessageType;
128
129        let packages: Vec<String> = match &message.payload {
130            MessageType::Text { content } => {
131                // Parse space- or comma-separated package names
132                content
133                    .split([' ', ','])
134                    .map(|s| s.trim().to_string())
135                    .filter(|s| !s.is_empty())
136                    .collect()
137            }
138            MessageType::Task { payload, .. } => {
139                // Expect {"packages": ["pkg1", "pkg2"]}
140                payload
141                    .get("packages")
142                    .and_then(|v| v.as_array())
143                    .map(|arr| {
144                        arr.iter()
145                            .filter_map(|v| v.as_str().map(|s| s.to_string()))
146                            .collect()
147                    })
148                    .unwrap_or_default()
149            }
150            _ => return Ok(()),
151        };
152
153        let (success, output) = self.pip_install(&packages).await;
154
155        if let Some(pub_) = &self.publisher {
156            pub_.publish(
157                wactorz_mqtt::topics::chat(&self.config.id),
158                &serde_json::json!({
159                    "from":      self.config.name,
160                    "to":        message.from.as_deref().unwrap_or("main"),
161                    "content":   output,
162                    "success":   success,
163                    "packages":  packages,
164                    "timestampMs": Self::now_ms(),
165                }),
166            );
167        }
168
169        if success {
170            self.metrics.record_processed();
171        } else {
172            self.metrics.record_failed();
173        }
174        Ok(())
175    }
176
177    async fn on_heartbeat(&mut self) -> Result<()> {
178        if let Some(pub_) = &self.publisher {
179            pub_.publish(
180                wactorz_mqtt::topics::heartbeat(&self.config.id),
181                &serde_json::json!({
182                    "agentId":   self.config.id,
183                    "agentName": self.config.name,
184                    "state":     self.state,
185                    "timestampMs": Self::now_ms(),
186                }),
187            );
188        }
189        Ok(())
190    }
191
192    async fn run(&mut self) -> Result<()> {
193        self.on_start().await?;
194        let mut rx = self
195            .mailbox_rx
196            .take()
197            .ok_or_else(|| anyhow::anyhow!("InstallerAgent already running"))?;
198        let mut hb = tokio::time::interval(std::time::Duration::from_secs(
199            self.config.heartbeat_interval_secs,
200        ));
201        hb.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
202        loop {
203            tokio::select! {
204                biased;
205                msg = rx.recv() => {
206                    match msg {
207                        None => break,
208                        Some(m) => {
209                            self.metrics.record_received();
210                            if let wactorz_core::message::MessageType::Command {
211                                command: wactorz_core::message::ActorCommand::Stop
212                            } = &m.payload { break; }
213                            if let Err(e) = self.handle_message(m).await {
214                                tracing::error!("[{}] {e}", self.config.name);
215                                self.metrics.record_failed();
216                            }
217                        }
218                    }
219                }
220                _ = hb.tick() => {
221                    self.metrics.record_heartbeat();
222                    if let Err(e) = self.on_heartbeat().await {
223                        tracing::error!("[{}] heartbeat: {e}", self.config.name);
224                    }
225                }
226            }
227        }
228        self.state = ActorState::Stopped;
229        self.on_stop().await
230    }
231}