1use anyhow::Result;
30use async_trait::async_trait;
31use std::sync::Arc;
32use tokio::process::Command;
33use tokio::sync::mpsc;
34
35use wactorz_core::{Actor, ActorConfig, ActorMetrics, ActorState, EventPublisher, Message};
36
37#[derive(Debug, Clone)]
41pub struct NautilusConfig {
42 pub ssh_key: Option<String>,
45
46 pub connect_timeout_secs: u64,
48
49 pub exec_timeout_secs: u64,
51
52 pub rsync_extra_flags: Vec<String>,
54
55 pub strict_host_keys: bool,
58}
59
60impl Default for NautilusConfig {
61 fn default() -> Self {
62 let strict = std::env::var("NAUTILUS_STRICT_HOST_KEYS")
64 .map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
65 .unwrap_or(false);
66 Self {
67 ssh_key: std::env::var("NAUTILUS_SSH_KEY").ok(),
68 connect_timeout_secs: 10,
69 exec_timeout_secs: 120,
70 rsync_extra_flags: Vec::new(),
71 strict_host_keys: strict,
72 }
73 }
74}
75
76pub struct NautilusAgent {
80 config: ActorConfig,
81 nautilus: NautilusConfig,
82 state: ActorState,
83 metrics: Arc<ActorMetrics>,
84 mailbox_tx: mpsc::Sender<Message>,
85 mailbox_rx: Option<mpsc::Receiver<Message>>,
86 publisher: Option<EventPublisher>,
87}
88
89impl NautilusAgent {
90 pub fn new(config: ActorConfig) -> Self {
92 Self::with_nautilus_config(config, NautilusConfig::default())
93 }
94
95 pub fn with_nautilus_config(config: ActorConfig, nautilus: NautilusConfig) -> Self {
97 let (tx, rx) = mpsc::channel(config.mailbox_capacity);
98 Self {
99 config,
100 nautilus,
101 state: ActorState::Initializing,
102 metrics: Arc::new(ActorMetrics::new()),
103 mailbox_tx: tx,
104 mailbox_rx: Some(rx),
105 publisher: None,
106 }
107 }
108
109 pub fn with_publisher(mut self, p: EventPublisher) -> Self {
111 self.publisher = Some(p);
112 self
113 }
114
115 fn now_ms() -> u64 {
118 std::time::SystemTime::now()
119 .duration_since(std::time::UNIX_EPOCH)
120 .unwrap_or_default()
121 .as_millis() as u64
122 }
123
124 fn reply(&self, content: &str) {
126 if let Some(pub_) = &self.publisher {
127 pub_.publish(
128 wactorz_mqtt::topics::chat(&self.config.id),
129 &serde_json::json!({
130 "from": self.config.name,
131 "to": "user",
132 "content": content,
133 "timestampMs": Self::now_ms(),
134 }),
135 );
136 }
137 }
138
139 fn ssh_opts(&self) -> Vec<String> {
141 let mut opts = vec![format!(
142 "ConnectTimeout={}",
143 self.nautilus.connect_timeout_secs
144 )];
145 if !self.nautilus.strict_host_keys {
146 opts.push("StrictHostKeyChecking=accept-new".to_string());
147 }
148 opts
149 }
150
151 fn ssh_cmd(&self) -> Command {
153 let mut cmd = Command::new("ssh");
154 for opt in self.ssh_opts() {
155 cmd.args(["-o", &opt]);
156 }
157 if let Some(key) = &self.nautilus.ssh_key {
158 cmd.args(["-i", key]);
159 }
160 cmd
161 }
162
163 async fn cmd_ping(&self, host: &str) {
167 if host.is_empty() {
168 self.reply("Usage: `ping <user@host>`");
169 return;
170 }
171 self.reply(&format!("Pinging `{host}`…"));
172
173 let result = tokio::time::timeout(
174 std::time::Duration::from_secs(self.nautilus.connect_timeout_secs + 2),
175 self.ssh_cmd().args([host, "exit"]).output(),
176 )
177 .await;
178
179 match result {
180 Ok(Ok(out)) if out.status.success() => {
181 self.reply(&format!("✓ `{host}` is reachable via SSH."));
182 }
183 Ok(Ok(out)) => {
184 let stderr = String::from_utf8_lossy(&out.stderr);
185 self.reply(&format!(
186 "✗ SSH to `{host}` failed (exit {}):\n```\n{stderr}\n```",
187 out.status.code().unwrap_or(-1)
188 ));
189 }
190 Ok(Err(e)) => {
191 self.reply(&format!("✗ Could not launch ssh: {e}"));
192 }
193 Err(_) => {
194 self.reply(&format!("✗ Connection to `{host}` timed out."));
195 }
196 }
197 }
198
199 async fn cmd_exec(&self, host: &str, remote_args: &[&str]) {
201 if host.is_empty() || remote_args.is_empty() {
202 self.reply("Usage: `exec <user@host> <command [args…]>`");
203 return;
204 }
205
206 let display_cmd = remote_args.join(" ");
207 self.reply(&format!("Running `{display_cmd}` on `{host}`…"));
208
209 let result = tokio::time::timeout(
211 std::time::Duration::from_secs(self.nautilus.exec_timeout_secs),
212 self.ssh_cmd().arg(host).args(remote_args).output(),
213 )
214 .await;
215
216 match result {
217 Ok(Ok(out)) => {
218 let stdout = String::from_utf8_lossy(&out.stdout);
219 let stderr = String::from_utf8_lossy(&out.stderr);
220 let code = out.status.code().unwrap_or(-1);
221 let status_icon = if out.status.success() { "✓" } else { "✗" };
222 let mut response =
223 format!("{status_icon} `{display_cmd}` on `{host}` (exit {code})");
224 if !stdout.trim().is_empty() {
225 response.push_str(&format!("\n```\n{}\n```", stdout.trim()));
226 }
227 if !stderr.trim().is_empty() {
228 response.push_str(&format!("\nstderr:\n```\n{}\n```", stderr.trim()));
229 }
230 self.reply(&response);
231 }
232 Ok(Err(e)) => {
233 self.reply(&format!("✗ Could not launch ssh: {e}"));
234 }
235 Err(_) => {
236 self.reply(&format!(
237 "✗ Command timed out after {}s.",
238 self.nautilus.exec_timeout_secs
239 ));
240 }
241 }
242 }
243
244 async fn cmd_sync(&self, src: &str, dst: &str) {
246 if src.is_empty() || dst.is_empty() {
247 self.reply("Usage: `sync <[user@host:]src-path> <local-dst-path>`");
248 return;
249 }
250 self.rsync(src, dst, "sync").await;
251 }
252
253 async fn cmd_push(&self, src: &str, dst: &str) {
255 if src.is_empty() || dst.is_empty() {
256 self.reply("Usage: `push <local-src-path> <[user@host:]dst-path>`");
257 return;
258 }
259 self.rsync(src, dst, "push").await;
260 }
261
262 async fn rsync(&self, src: &str, dst: &str, direction: &str) {
264 self.reply(&format!("Starting rsync {direction}: `{src}` → `{dst}`…"));
265
266 let mut ssh_parts = vec!["ssh".to_string()];
268 for opt in self.ssh_opts() {
269 ssh_parts.push("-o".to_string());
270 ssh_parts.push(opt);
271 }
272 if let Some(key) = &self.nautilus.ssh_key {
273 ssh_parts.push("-i".to_string());
274 ssh_parts.push(key.clone());
275 }
276 let ssh_e = ssh_parts.join(" ");
277
278 let mut cmd = Command::new("rsync");
279 cmd.args(["-avz", "--progress", "-e", &ssh_e]);
280 for flag in &self.nautilus.rsync_extra_flags {
281 cmd.arg(flag);
282 }
283 cmd.args([src, dst]);
284
285 let result = tokio::time::timeout(
286 std::time::Duration::from_secs(self.nautilus.exec_timeout_secs),
287 cmd.output(),
288 )
289 .await;
290
291 match result {
292 Ok(Ok(out)) => {
293 let stdout = String::from_utf8_lossy(&out.stdout);
294 let stderr = String::from_utf8_lossy(&out.stderr);
295 let code = out.status.code().unwrap_or(-1);
296 let icon = if out.status.success() { "✓" } else { "✗" };
297 let mut response =
298 format!("{icon} rsync {direction} `{src}` → `{dst}` (exit {code})");
299 if !stdout.trim().is_empty() {
300 let lines: Vec<&str> = stdout.trim().lines().collect();
302 let tail = if lines.len() > 20 {
303 let skip = lines.len() - 20;
304 format!("… ({} lines omitted) …\n{}", skip, lines[skip..].join("\n"))
305 } else {
306 lines.join("\n")
307 };
308 response.push_str(&format!("\n```\n{tail}\n```"));
309 }
310 if !stderr.trim().is_empty() {
311 response.push_str(&format!("\nstderr:\n```\n{}\n```", stderr.trim()));
312 }
313 self.reply(&response);
314 }
315 Ok(Err(e)) => {
316 self.reply(&format!(
317 "✗ Could not launch rsync: {e}\n(Is rsync installed in the container?)"
318 ));
319 }
320 Err(_) => {
321 self.reply(&format!(
322 "✗ rsync timed out after {}s.",
323 self.nautilus.exec_timeout_secs
324 ));
325 }
326 }
327 }
328
329 async fn dispatch(&self, text: &str) {
331 let tokens: Vec<&str> = text.split_whitespace().collect();
332 match tokens.as_slice() {
333 [] => {
334 self.reply("Empty command. Type `help` for usage.");
335 }
336 ["help" | "?"] => {
337 self.reply(
338 "**NautilusAgent** — SSH & rsync bridge\n\n\
339 | Command | Description |\n\
340 |---------|-------------|\n\
341 | `ping <user@host>` | Test SSH connectivity |\n\
342 | `exec <user@host> <cmd [args…]>` | Run remote command |\n\
343 | `sync <[user@host:]src> <dst>` | rsync pull |\n\
344 | `push <src> <[user@host:]dst>` | rsync push |\n\
345 | `help` | Show this message |",
346 );
347 }
348 ["ping", host] => {
349 self.cmd_ping(host).await;
350 }
351 ["exec", host, rest @ ..] => {
352 self.cmd_exec(host, rest).await;
353 }
354 ["sync", src, dst] => {
355 self.cmd_sync(src, dst).await;
356 }
357 ["push", src, dst] => {
358 self.cmd_push(src, dst).await;
359 }
360 [cmd, ..] => {
361 self.reply(&format!("Unknown command: `{cmd}`. Type `help` for usage."));
362 }
363 }
364 }
365}
366
367#[async_trait]
370impl Actor for NautilusAgent {
371 fn id(&self) -> String {
372 self.config.id.clone()
373 }
374 fn name(&self) -> &str {
375 &self.config.name
376 }
377 fn state(&self) -> ActorState {
378 self.state.clone()
379 }
380 fn metrics(&self) -> Arc<ActorMetrics> {
381 Arc::clone(&self.metrics)
382 }
383 fn mailbox(&self) -> mpsc::Sender<Message> {
384 self.mailbox_tx.clone()
385 }
386 fn is_protected(&self) -> bool {
387 self.config.protected
388 }
389
390 async fn on_start(&mut self) -> Result<()> {
391 self.state = ActorState::Running;
392 if let Some(pub_) = &self.publisher {
393 pub_.publish(
394 wactorz_mqtt::topics::spawn(&self.config.id),
395 &serde_json::json!({
396 "agentId": self.config.id,
397 "agentName": self.config.name,
398 "agentType": "transfer",
399 "timestampMs": Self::now_ms(),
400 }),
401 );
402 }
403 tracing::info!(
404 "[nautilus] started — SSH key: {:?}, strict keys: {}",
405 self.nautilus.ssh_key,
406 self.nautilus.strict_host_keys,
407 );
408 Ok(())
409 }
410
411 async fn handle_message(&mut self, message: Message) -> Result<()> {
412 use wactorz_core::message::MessageType;
413 let text = match &message.payload {
414 MessageType::Text { content } => content.trim().to_string(),
415 MessageType::Task { description, .. } => description.trim().to_string(),
416 _ => return Ok(()),
417 };
418 if text.is_empty() {
419 return Ok(());
420 }
421 tracing::debug!("[nautilus] command: {text}");
422 self.dispatch(&text).await;
423 Ok(())
424 }
425
426 async fn on_heartbeat(&mut self) -> Result<()> {
427 if let Some(pub_) = &self.publisher {
428 pub_.publish(
429 wactorz_mqtt::topics::heartbeat(&self.config.id),
430 &serde_json::json!({
431 "agentId": self.config.id,
432 "agentName": self.config.name,
433 "state": self.state,
434 "timestampMs": Self::now_ms(),
435 }),
436 );
437 }
438 Ok(())
439 }
440
441 async fn run(&mut self) -> Result<()> {
442 self.on_start().await?;
443 let mut rx = self
444 .mailbox_rx
445 .take()
446 .ok_or_else(|| anyhow::anyhow!("NautilusAgent already running"))?;
447 let mut hb = tokio::time::interval(std::time::Duration::from_secs(
448 self.config.heartbeat_interval_secs,
449 ));
450 hb.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
451 loop {
452 tokio::select! {
453 biased;
454 msg = rx.recv() => {
455 match msg {
456 None => break,
457 Some(m) => {
458 self.metrics.record_received();
459 if let wactorz_core::message::MessageType::Command {
460 command: wactorz_core::message::ActorCommand::Stop,
461 } = &m.payload
462 {
463 break;
464 }
465 match self.handle_message(m).await {
466 Ok(_) => self.metrics.record_processed(),
467 Err(e) => {
468 tracing::error!("[nautilus] {e}");
469 self.metrics.record_failed();
470 }
471 }
472 }
473 }
474 }
475 _ = hb.tick() => {
476 self.metrics.record_heartbeat();
477 if let Err(e) = self.on_heartbeat().await {
478 tracing::error!("[nautilus] heartbeat: {e}");
479 }
480 }
481 }
482 }
483 self.state = ActorState::Stopped;
484 self.on_stop().await
485 }
486}