wactorz_agents/
nautilus_agent.rs

1//! NautilusAgent — SSH & rsync file-transfer bridge.
2//!
3//! Named after the *nautilus*: a deep-sea creature with a spiral protective
4//! **shell** — mirroring SSH (**Secure Shell**) — and the Jules Verne submarine
5//! that autonomously traverses unreachable depths, just as this agent bridges
6//! distant filesystems without human intervention.
7//!
8//! ## Commands (sent as plain text to the agent's MQTT mailbox)
9//!
10//! | Command                          | Description                           |
11//! |----------------------------------|---------------------------------------|
12//! | `ping <user@host>`               | Test SSH connectivity (exit code only)|
13//! | `exec <user@host> <cmd [args…]>` | Run a command over SSH                |
14//! | `sync <[user@host:]src> <dst>`   | rsync pull from remote                |
15//! | `push <src> <[user@host:]dst>`   | rsync push to remote                  |
16//! | `help`                           | Print available commands              |
17//!
18//! Results are published back to `agents/{id}/chat` so they appear in the
19//! frontend chat panel.
20//!
21//! ## Security notes
22//!
23//! Arguments are **never** passed through a shell — each token is a discrete
24//! [`std::process::Command`] argument, preventing shell injection.
25//! Host-key verification defaults to `accept-new` so first connections work
26//! automatically in container environments; set `NAUTILUS_STRICT_HOST_KEYS=1`
27//! to enforce strict checking.
28
29use anyhow::Result;
30use async_trait::async_trait;
31use std::sync::Arc;
32use tokio::process::Command;
33use tokio::sync::mpsc;
34
35use wactorz_core::{Actor, ActorConfig, ActorMetrics, ActorState, EventPublisher, Message};
36
37// ── Configuration ─────────────────────────────────────────────────────────────
38
39/// Runtime configuration for [`NautilusAgent`].
40#[derive(Debug, Clone)]
41pub struct NautilusConfig {
42    /// Path to the SSH private key (e.g. `~/.ssh/id_rsa`).
43    /// If `None`, SSH uses its default key search order.
44    pub ssh_key: Option<String>,
45
46    /// SSH connect timeout in seconds (passed as `-o ConnectTimeout=N`).
47    pub connect_timeout_secs: u64,
48
49    /// Wall-clock timeout for any single command execution.
50    pub exec_timeout_secs: u64,
51
52    /// Extra `rsync` flags applied to every sync/push (e.g. `["--delete"]`).
53    pub rsync_extra_flags: Vec<String>,
54
55    /// When `true`, enforce strict SSH host-key checking.
56    /// When `false` (default), new host keys are auto-accepted.
57    pub strict_host_keys: bool,
58}
59
60impl Default for NautilusConfig {
61    fn default() -> Self {
62        // Read strict-key preference from env at construction time.
63        let strict = std::env::var("NAUTILUS_STRICT_HOST_KEYS")
64            .map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
65            .unwrap_or(false);
66        Self {
67            ssh_key: std::env::var("NAUTILUS_SSH_KEY").ok(),
68            connect_timeout_secs: 10,
69            exec_timeout_secs: 120,
70            rsync_extra_flags: Vec::new(),
71            strict_host_keys: strict,
72        }
73    }
74}
75
76// ── Agent struct ──────────────────────────────────────────────────────────────
77
78/// SSH & rsync file-transfer bridge agent.
79pub struct NautilusAgent {
80    config: ActorConfig,
81    nautilus: NautilusConfig,
82    state: ActorState,
83    metrics: Arc<ActorMetrics>,
84    mailbox_tx: mpsc::Sender<Message>,
85    mailbox_rx: Option<mpsc::Receiver<Message>>,
86    publisher: Option<EventPublisher>,
87}
88
89impl NautilusAgent {
90    /// Create a new NautilusAgent with the given actor config.
91    pub fn new(config: ActorConfig) -> Self {
92        Self::with_nautilus_config(config, NautilusConfig::default())
93    }
94
95    /// Create a new NautilusAgent with custom SSH/rsync configuration.
96    pub fn with_nautilus_config(config: ActorConfig, nautilus: NautilusConfig) -> Self {
97        let (tx, rx) = mpsc::channel(config.mailbox_capacity);
98        Self {
99            config,
100            nautilus,
101            state: ActorState::Initializing,
102            metrics: Arc::new(ActorMetrics::new()),
103            mailbox_tx: tx,
104            mailbox_rx: Some(rx),
105            publisher: None,
106        }
107    }
108
109    /// Attach an [`EventPublisher`] for MQTT output.
110    pub fn with_publisher(mut self, p: EventPublisher) -> Self {
111        self.publisher = Some(p);
112        self
113    }
114
115    // ── Helpers ───────────────────────────────────────────────────────────────
116
117    fn now_ms() -> u64 {
118        std::time::SystemTime::now()
119            .duration_since(std::time::UNIX_EPOCH)
120            .unwrap_or_default()
121            .as_millis() as u64
122    }
123
124    /// Send a chat reply back to the frontend.
125    fn reply(&self, content: &str) {
126        if let Some(pub_) = &self.publisher {
127            pub_.publish(
128                wactorz_mqtt::topics::chat(&self.config.id),
129                &serde_json::json!({
130                    "from":        self.config.name,
131                    "to":          "user",
132                    "content":     content,
133                    "timestampMs": Self::now_ms(),
134                }),
135            );
136        }
137    }
138
139    /// Build the SSH option flags shared by all SSH/rsync invocations.
140    fn ssh_opts(&self) -> Vec<String> {
141        let mut opts = vec![format!(
142            "ConnectTimeout={}",
143            self.nautilus.connect_timeout_secs
144        )];
145        if !self.nautilus.strict_host_keys {
146            opts.push("StrictHostKeyChecking=accept-new".to_string());
147        }
148        opts
149    }
150
151    /// Build a base `ssh` [`Command`] ready for additional arguments.
152    fn ssh_cmd(&self) -> Command {
153        let mut cmd = Command::new("ssh");
154        for opt in self.ssh_opts() {
155            cmd.args(["-o", &opt]);
156        }
157        if let Some(key) = &self.nautilus.ssh_key {
158            cmd.args(["-i", key]);
159        }
160        cmd
161    }
162
163    // ── Command handlers ──────────────────────────────────────────────────────
164
165    /// `ping <user@host>` — test SSH connectivity.
166    async fn cmd_ping(&self, host: &str) {
167        if host.is_empty() {
168            self.reply("Usage: `ping <user@host>`");
169            return;
170        }
171        self.reply(&format!("Pinging `{host}`…"));
172
173        let result = tokio::time::timeout(
174            std::time::Duration::from_secs(self.nautilus.connect_timeout_secs + 2),
175            self.ssh_cmd().args([host, "exit"]).output(),
176        )
177        .await;
178
179        match result {
180            Ok(Ok(out)) if out.status.success() => {
181                self.reply(&format!("✓ `{host}` is reachable via SSH."));
182            }
183            Ok(Ok(out)) => {
184                let stderr = String::from_utf8_lossy(&out.stderr);
185                self.reply(&format!(
186                    "✗ SSH to `{host}` failed (exit {}):\n```\n{stderr}\n```",
187                    out.status.code().unwrap_or(-1)
188                ));
189            }
190            Ok(Err(e)) => {
191                self.reply(&format!("✗ Could not launch ssh: {e}"));
192            }
193            Err(_) => {
194                self.reply(&format!("✗ Connection to `{host}` timed out."));
195            }
196        }
197    }
198
199    /// `exec <user@host> <command [args…]>` — run a remote command.
200    async fn cmd_exec(&self, host: &str, remote_args: &[&str]) {
201        if host.is_empty() || remote_args.is_empty() {
202            self.reply("Usage: `exec <user@host> <command [args…]>`");
203            return;
204        }
205
206        let display_cmd = remote_args.join(" ");
207        self.reply(&format!("Running `{display_cmd}` on `{host}`…"));
208
209        // Each remote token is a discrete argument — no shell interpolation.
210        let result = tokio::time::timeout(
211            std::time::Duration::from_secs(self.nautilus.exec_timeout_secs),
212            self.ssh_cmd().arg(host).args(remote_args).output(),
213        )
214        .await;
215
216        match result {
217            Ok(Ok(out)) => {
218                let stdout = String::from_utf8_lossy(&out.stdout);
219                let stderr = String::from_utf8_lossy(&out.stderr);
220                let code = out.status.code().unwrap_or(-1);
221                let status_icon = if out.status.success() { "✓" } else { "✗" };
222                let mut response =
223                    format!("{status_icon} `{display_cmd}` on `{host}` (exit {code})");
224                if !stdout.trim().is_empty() {
225                    response.push_str(&format!("\n```\n{}\n```", stdout.trim()));
226                }
227                if !stderr.trim().is_empty() {
228                    response.push_str(&format!("\nstderr:\n```\n{}\n```", stderr.trim()));
229                }
230                self.reply(&response);
231            }
232            Ok(Err(e)) => {
233                self.reply(&format!("✗ Could not launch ssh: {e}"));
234            }
235            Err(_) => {
236                self.reply(&format!(
237                    "✗ Command timed out after {}s.",
238                    self.nautilus.exec_timeout_secs
239                ));
240            }
241        }
242    }
243
244    /// `sync <[user@host:]src> <dst>` — rsync pull from remote to local.
245    async fn cmd_sync(&self, src: &str, dst: &str) {
246        if src.is_empty() || dst.is_empty() {
247            self.reply("Usage: `sync <[user@host:]src-path> <local-dst-path>`");
248            return;
249        }
250        self.rsync(src, dst, "sync").await;
251    }
252
253    /// `push <src> <[user@host:]dst>` — rsync push from local to remote.
254    async fn cmd_push(&self, src: &str, dst: &str) {
255        if src.is_empty() || dst.is_empty() {
256            self.reply("Usage: `push <local-src-path> <[user@host:]dst-path>`");
257            return;
258        }
259        self.rsync(src, dst, "push").await;
260    }
261
262    /// Shared rsync executor used by both `sync` and `push`.
263    async fn rsync(&self, src: &str, dst: &str, direction: &str) {
264        self.reply(&format!("Starting rsync {direction}: `{src}` → `{dst}`…"));
265
266        // Build SSH options string for rsync's -e flag
267        let mut ssh_parts = vec!["ssh".to_string()];
268        for opt in self.ssh_opts() {
269            ssh_parts.push("-o".to_string());
270            ssh_parts.push(opt);
271        }
272        if let Some(key) = &self.nautilus.ssh_key {
273            ssh_parts.push("-i".to_string());
274            ssh_parts.push(key.clone());
275        }
276        let ssh_e = ssh_parts.join(" ");
277
278        let mut cmd = Command::new("rsync");
279        cmd.args(["-avz", "--progress", "-e", &ssh_e]);
280        for flag in &self.nautilus.rsync_extra_flags {
281            cmd.arg(flag);
282        }
283        cmd.args([src, dst]);
284
285        let result = tokio::time::timeout(
286            std::time::Duration::from_secs(self.nautilus.exec_timeout_secs),
287            cmd.output(),
288        )
289        .await;
290
291        match result {
292            Ok(Ok(out)) => {
293                let stdout = String::from_utf8_lossy(&out.stdout);
294                let stderr = String::from_utf8_lossy(&out.stderr);
295                let code = out.status.code().unwrap_or(-1);
296                let icon = if out.status.success() { "✓" } else { "✗" };
297                let mut response =
298                    format!("{icon} rsync {direction} `{src}` → `{dst}` (exit {code})");
299                if !stdout.trim().is_empty() {
300                    // Keep last 20 lines of rsync output to avoid flooding chat
301                    let lines: Vec<&str> = stdout.trim().lines().collect();
302                    let tail = if lines.len() > 20 {
303                        let skip = lines.len() - 20;
304                        format!("… ({} lines omitted) …\n{}", skip, lines[skip..].join("\n"))
305                    } else {
306                        lines.join("\n")
307                    };
308                    response.push_str(&format!("\n```\n{tail}\n```"));
309                }
310                if !stderr.trim().is_empty() {
311                    response.push_str(&format!("\nstderr:\n```\n{}\n```", stderr.trim()));
312                }
313                self.reply(&response);
314            }
315            Ok(Err(e)) => {
316                self.reply(&format!(
317                    "✗ Could not launch rsync: {e}\n(Is rsync installed in the container?)"
318                ));
319            }
320            Err(_) => {
321                self.reply(&format!(
322                    "✗ rsync timed out after {}s.",
323                    self.nautilus.exec_timeout_secs
324                ));
325            }
326        }
327    }
328
329    /// Dispatch a parsed text command to the appropriate handler.
330    async fn dispatch(&self, text: &str) {
331        let tokens: Vec<&str> = text.split_whitespace().collect();
332        match tokens.as_slice() {
333            [] => {
334                self.reply("Empty command. Type `help` for usage.");
335            }
336            ["help" | "?"] => {
337                self.reply(
338                    "**NautilusAgent** — SSH & rsync bridge\n\n\
339                     | Command | Description |\n\
340                     |---------|-------------|\n\
341                     | `ping <user@host>` | Test SSH connectivity |\n\
342                     | `exec <user@host> <cmd [args…]>` | Run remote command |\n\
343                     | `sync <[user@host:]src> <dst>` | rsync pull |\n\
344                     | `push <src> <[user@host:]dst>` | rsync push |\n\
345                     | `help` | Show this message |",
346                );
347            }
348            ["ping", host] => {
349                self.cmd_ping(host).await;
350            }
351            ["exec", host, rest @ ..] => {
352                self.cmd_exec(host, rest).await;
353            }
354            ["sync", src, dst] => {
355                self.cmd_sync(src, dst).await;
356            }
357            ["push", src, dst] => {
358                self.cmd_push(src, dst).await;
359            }
360            [cmd, ..] => {
361                self.reply(&format!("Unknown command: `{cmd}`. Type `help` for usage."));
362            }
363        }
364    }
365}
366
367// ── Actor impl ────────────────────────────────────────────────────────────────
368
369#[async_trait]
370impl Actor for NautilusAgent {
371    fn id(&self) -> String {
372        self.config.id.clone()
373    }
374    fn name(&self) -> &str {
375        &self.config.name
376    }
377    fn state(&self) -> ActorState {
378        self.state.clone()
379    }
380    fn metrics(&self) -> Arc<ActorMetrics> {
381        Arc::clone(&self.metrics)
382    }
383    fn mailbox(&self) -> mpsc::Sender<Message> {
384        self.mailbox_tx.clone()
385    }
386    fn is_protected(&self) -> bool {
387        self.config.protected
388    }
389
390    async fn on_start(&mut self) -> Result<()> {
391        self.state = ActorState::Running;
392        if let Some(pub_) = &self.publisher {
393            pub_.publish(
394                wactorz_mqtt::topics::spawn(&self.config.id),
395                &serde_json::json!({
396                    "agentId":   self.config.id,
397                    "agentName": self.config.name,
398                    "agentType": "transfer",
399                    "timestampMs": Self::now_ms(),
400                }),
401            );
402        }
403        tracing::info!(
404            "[nautilus] started — SSH key: {:?}, strict keys: {}",
405            self.nautilus.ssh_key,
406            self.nautilus.strict_host_keys,
407        );
408        Ok(())
409    }
410
411    async fn handle_message(&mut self, message: Message) -> Result<()> {
412        use wactorz_core::message::MessageType;
413        let text = match &message.payload {
414            MessageType::Text { content } => content.trim().to_string(),
415            MessageType::Task { description, .. } => description.trim().to_string(),
416            _ => return Ok(()),
417        };
418        if text.is_empty() {
419            return Ok(());
420        }
421        tracing::debug!("[nautilus] command: {text}");
422        self.dispatch(&text).await;
423        Ok(())
424    }
425
426    async fn on_heartbeat(&mut self) -> Result<()> {
427        if let Some(pub_) = &self.publisher {
428            pub_.publish(
429                wactorz_mqtt::topics::heartbeat(&self.config.id),
430                &serde_json::json!({
431                    "agentId":   self.config.id,
432                    "agentName": self.config.name,
433                    "state":     self.state,
434                    "timestampMs": Self::now_ms(),
435                }),
436            );
437        }
438        Ok(())
439    }
440
441    async fn run(&mut self) -> Result<()> {
442        self.on_start().await?;
443        let mut rx = self
444            .mailbox_rx
445            .take()
446            .ok_or_else(|| anyhow::anyhow!("NautilusAgent already running"))?;
447        let mut hb = tokio::time::interval(std::time::Duration::from_secs(
448            self.config.heartbeat_interval_secs,
449        ));
450        hb.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
451        loop {
452            tokio::select! {
453                biased;
454                msg = rx.recv() => {
455                    match msg {
456                        None => break,
457                        Some(m) => {
458                            self.metrics.record_received();
459                            if let wactorz_core::message::MessageType::Command {
460                                command: wactorz_core::message::ActorCommand::Stop,
461                            } = &m.payload
462                            {
463                                break;
464                            }
465                            match self.handle_message(m).await {
466                                Ok(_) => self.metrics.record_processed(),
467                                Err(e) => {
468                                    tracing::error!("[nautilus] {e}");
469                                    self.metrics.record_failed();
470                                }
471                            }
472                        }
473                    }
474                }
475                _ = hb.tick() => {
476                    self.metrics.record_heartbeat();
477                    if let Err(e) = self.on_heartbeat().await {
478                        tracing::error!("[nautilus] heartbeat: {e}");
479                    }
480                }
481            }
482        }
483        self.state = ActorState::Stopped;
484        self.on_stop().await
485    }
486}