1use anyhow::Result;
30use async_trait::async_trait;
31use std::{
32 collections::HashMap,
33 sync::{Arc, Mutex},
34 time::{SystemTime, UNIX_EPOCH},
35};
36use tokio::sync::mpsc;
37
38use wactorz_core::{Actor, ActorConfig, ActorMetrics, ActorState, EventPublisher, Message};
39
40#[derive(Clone)]
43struct MediaEntry {
44 title: String,
45 media_type: String, rating: Option<f64>,
47 genre: Option<String>,
48 #[expect(dead_code)]
49 progress: Option<String>, #[expect(dead_code)]
51 ts_ms: u64,
52}
53
54#[derive(Clone)]
55struct QueueItem {
56 title: String,
57 media_type: String,
58 #[expect(dead_code)]
59 added_ms: u64,
60}
61
62pub struct WmeAgent {
65 config: ActorConfig,
66 state: ActorState,
67 metrics: Arc<ActorMetrics>,
68 mailbox_tx: mpsc::Sender<Message>,
69 mailbox_rx: Option<mpsc::Receiver<Message>>,
70 publisher: Option<EventPublisher>,
71 log: Arc<Mutex<Vec<MediaEntry>>>,
72 queue: Arc<Mutex<Vec<QueueItem>>>,
73}
74
75impl WmeAgent {
76 pub fn new(config: ActorConfig) -> Self {
77 let (tx, rx) = mpsc::channel(config.mailbox_capacity);
78 Self {
79 config,
80 state: ActorState::Initializing,
81 metrics: Arc::new(ActorMetrics::new()),
82 mailbox_tx: tx,
83 mailbox_rx: Some(rx),
84 publisher: None,
85 log: Arc::new(Mutex::new(Vec::new())),
86 queue: Arc::new(Mutex::new(Vec::new())),
87 }
88 }
89
90 pub fn with_publisher(mut self, p: EventPublisher) -> Self {
91 self.publisher = Some(p);
92 self
93 }
94
95 fn now_ms() -> u64 {
96 SystemTime::now()
97 .duration_since(UNIX_EPOCH)
98 .unwrap_or_default()
99 .as_millis() as u64
100 }
101
102 fn reply(&self, content: &str) {
103 if let Some(pub_) = &self.publisher {
104 pub_.publish(
105 wactorz_mqtt::topics::chat(&self.config.id),
106 &serde_json::json!({
107 "from": self.config.name,
108 "to": "user",
109 "content": content,
110 "timestampMs": Self::now_ms(),
111 }),
112 );
113 }
114 }
115
116 fn cmd_add(&self, parts: &[&str]) -> String {
119 if parts.len() < 2 {
121 return "Usage: `add <movie|show|book|podcast> \"<title>\" [rating] [progress] [genre]`\n\n\
122 Examples:\n\
123 • `add movie \"Inception\" 9 scifi`\n\
124 • `add show \"Breaking Bad\" s1e3 8.5 drama`\n\
125 • `add book \"Dune\" 150 scifi`\n\
126 • `add podcast \"Lex Fridman #420\" 95 tech`".to_string();
127 }
128
129 let media_type = parts[0].to_lowercase();
130 if !["movie", "show", "book", "podcast"].contains(&media_type.as_str()) {
131 return format!(
132 "Unknown type `{media_type}`. Use: `movie`, `show`, `book`, or `podcast`."
133 );
134 }
135
136 let rest = parts[1..].join(" ");
138 let (title, remainder) = if let Some(stripped) = rest.strip_prefix('"') {
139 if let Some(end) = stripped.find('"') {
140 (
141 stripped[..end].to_string(),
142 stripped[end + 1..].trim().to_string(),
143 )
144 } else {
145 (stripped.trim_matches('"').to_string(), String::new())
146 }
147 } else {
148 let mut words = parts[1..].iter().peekable();
149 let title_word = words.next().unwrap_or(&"").to_string();
150 let rem: Vec<&str> = words.copied().collect();
151 (title_word, rem.join(" "))
152 };
153
154 let rem_parts: Vec<&str> = remainder.split_whitespace().collect();
155
156 let (progress, raw_rating, raw_genre): (Option<String>, Option<&str>, Option<&str>) =
158 match media_type.as_str() {
159 "show" => {
160 let prog: Option<&str> = rem_parts
161 .first()
162 .copied()
163 .filter(|s| s.to_lowercase().starts_with('s') && s.contains('e'));
164 let offset = if prog.is_some() { 1 } else { 0 };
165 let rat = rem_parts.get(offset).copied();
166 let gn = rem_parts.get(offset + 1).copied();
167 (prog.map(|s| s.to_string()), rat, gn)
168 }
169 "book" | "podcast" => {
170 let prog: Option<String> = rem_parts.first().copied().and_then(|s| {
171 s.parse::<u64>().ok().map(|n| match media_type.as_str() {
172 "book" => format!("{n} pages"),
173 _ => format!("{n} min"),
174 })
175 });
176 let gn = rem_parts.get(1).copied();
177 (prog, None, gn)
178 }
179 _ => {
180 let rat = rem_parts.first().copied();
182 let gn = rem_parts.get(1).copied();
183 (None, rat, gn)
184 }
185 };
186
187 let rating: Option<f64> = raw_rating.and_then(|s: &str| {
188 s.trim_end_matches('/')
189 .parse::<f64>()
190 .ok()
191 .filter(|&v| (0.0..=10.0).contains(&v))
192 });
193
194 let entry = MediaEntry {
195 title: title.clone(),
196 media_type: media_type.clone(),
197 rating,
198 genre: raw_genre.map(|s| s.to_string()),
199 progress: progress.clone(),
200 ts_ms: Self::now_ms(),
201 };
202 self.log.lock().unwrap().push(entry);
203
204 let mut queue = self.queue.lock().unwrap();
206 let before = queue.len();
207 queue.retain(|q| q.title.to_lowercase() != title.to_lowercase());
208 let dequeued = before != queue.len();
209
210 let rating_str = rating.map(|r| format!(" ⭐ {r:.1}/10")).unwrap_or_default();
211 let progress_str = progress.map(|p| format!(" _{p}_")).unwrap_or_default();
212 let genre_str2 = raw_genre.map(|g| format!(" `{g}`")).unwrap_or_default();
213 let dequeued_note = if dequeued {
214 "\n✅ Removed from queue."
215 } else {
216 ""
217 };
218
219 let icon = match media_type.as_str() {
220 "movie" => "🎬",
221 "show" => "📺",
222 "book" => "📚",
223 "podcast" => "🎙",
224 _ => "🎭",
225 };
226 format!("{icon} Logged **{title}**{rating_str}{progress_str}{genre_str2}{dequeued_note}")
227 }
228
229 fn cmd_queue(&self, parts: &[&str]) -> String {
230 match parts.first().copied().unwrap_or("list") {
231 "add" => {
232 if parts.len() < 3 {
233 return "Usage: `queue add <movie|show|book|podcast> \"<title>\"`".to_string();
234 }
235 let media_type = parts[1].to_lowercase();
236 let title = parts[2..].join(" ").trim_matches('"').to_string();
237 let mut queue = self.queue.lock().unwrap();
238 if queue
239 .iter()
240 .any(|q| q.title.to_lowercase() == title.to_lowercase())
241 {
242 return format!("📋 **{title}** is already in the queue.");
243 }
244 queue.push(QueueItem {
245 title: title.clone(),
246 media_type: media_type.clone(),
247 added_ms: Self::now_ms(),
248 });
249 let icon = match media_type.as_str() {
250 "movie" => "🎬",
251 "show" => "📺",
252 "book" => "📚",
253 "podcast" => "🎙",
254 _ => "🎭",
255 };
256 format!(
257 "{icon} Added **{title}** to queue ({} items total)",
258 queue.len()
259 )
260 }
261
262 "list" | "ls" => {
263 let queue = self.queue.lock().unwrap();
264 if queue.is_empty() {
265 return "📭 Queue is empty.\n\nTry: `queue add movie \"Oppenheimer\"`"
266 .to_string();
267 }
268 let mut by_type: HashMap<&str, Vec<&str>> = HashMap::new();
269 for q in queue.iter() {
270 by_type
271 .entry(q.media_type.as_str())
272 .or_default()
273 .push(q.title.as_str());
274 }
275 let mut sections = Vec::new();
276 for (t, titles) in &by_type {
277 let icon = match *t {
278 "movie" => "🎬",
279 "show" => "📺",
280 "book" => "📚",
281 "podcast" => "🎙",
282 _ => "🎭",
283 };
284 let rows: Vec<String> = titles.iter().map(|t| format!(" • {t}")).collect();
285 sections.push(format!(
286 "{icon} **{}**\n{}",
287 Self::capitalize(t),
288 rows.join("\n")
289 ));
290 }
291 sections.sort();
292 format!(
293 "**📋 Queue ({} items)**\n\n{}",
294 queue.len(),
295 sections.join("\n\n")
296 )
297 }
298
299 "done" | "watched" | "read" | "finished" => {
300 if parts.len() < 2 {
301 return "Usage: `queue done \"<title>\" [rating]`".to_string();
302 }
303 let title_parts = if parts.len() > 2 {
304 parts[1..parts.len() - 1].join(" ")
305 } else {
306 parts[1..].join(" ")
307 };
308 let title = title_parts.trim_matches('"').to_string();
309 let rating: Option<f64> = parts
310 .last()
311 .and_then(|s| s.parse::<f64>().ok())
312 .filter(|&v| (0.0..=10.0).contains(&v));
313
314 let mut queue = self.queue.lock().unwrap();
315 let before = queue.len();
316 let removed_type = queue
317 .iter()
318 .find(|q| q.title.to_lowercase() == title.to_lowercase())
319 .map(|q| q.media_type.clone());
320 queue.retain(|q| q.title.to_lowercase() != title.to_lowercase());
321
322 if queue.len() == before {
323 return format!("❓ **{title}** not found in queue.");
324 }
325 drop(queue);
326
327 let media_type = removed_type.unwrap_or_else(|| "movie".to_string());
328 let entry = MediaEntry {
329 title: title.clone(),
330 media_type: media_type.clone(),
331 rating,
332 genre: None,
333 progress: None,
334 ts_ms: Self::now_ms(),
335 };
336 self.log.lock().unwrap().push(entry);
337
338 let rating_str = rating.map(|r| format!(" ⭐ {r:.1}/10")).unwrap_or_default();
339 let icon = match media_type.as_str() {
340 "movie" => "🎬",
341 "show" => "📺",
342 "book" => "📚",
343 "podcast" => "🎙",
344 _ => "🎭",
345 };
346 format!("{icon} Marked **{title}** as done{rating_str} and removed from queue.")
347 }
348
349 "drop" | "remove" | "rm" => {
350 if parts.len() < 2 {
351 return "Usage: `queue drop \"<title>\"`".to_string();
352 }
353 let title = parts[1..].join(" ").trim_matches('"').to_string();
354 let mut queue = self.queue.lock().unwrap();
355 let before = queue.len();
356 queue.retain(|q| q.title.to_lowercase() != title.to_lowercase());
357 if queue.len() == before {
358 format!("❓ **{title}** not found in queue.")
359 } else {
360 format!("🗑 Dropped **{title}** from queue.")
361 }
362 }
363
364 sub => {
365 format!("Unknown queue sub-command `{sub}`. Use: `add`, `list`, `done`, `drop`.")
366 }
367 }
368 }
369
370 fn cmd_stats(&self, filter: Option<&str>) -> String {
371 let log = self.log.lock().unwrap();
372 if log.is_empty() {
373 return "📭 Nothing logged yet.\n\nTry: `add movie \"Inception\" 9 scifi`".to_string();
374 }
375
376 let entries: Vec<&MediaEntry> = match filter {
377 Some(t) => log.iter().filter(|e| e.media_type == t).collect(),
378 None => log.iter().collect(),
379 };
380
381 if entries.is_empty() {
382 return format!("📭 No `{}` entries yet.", filter.unwrap_or(""));
383 }
384
385 let mut by_type: HashMap<&str, (usize, f64, u32)> = HashMap::new(); for e in &entries {
387 let rec = by_type.entry(e.media_type.as_str()).or_insert((0, 0.0, 0));
388 rec.0 += 1;
389 if let Some(r) = e.rating {
390 rec.1 += r;
391 rec.2 += 1;
392 }
393 }
394
395 let total = entries.len();
396 let total_rated: u32 = by_type.values().map(|r| r.2).sum();
397 let total_rating: f64 = by_type.values().map(|r| r.1).sum();
398 let avg_overall = if total_rated > 0 {
399 total_rating / total_rated as f64
400 } else {
401 0.0
402 };
403
404 let mut rows = Vec::new();
405 let mut type_order = vec!["movie", "show", "book", "podcast"];
406 type_order.retain(|t| by_type.contains_key(*t));
407 for t in type_order {
408 if let Some(&(count, rs, rc)) = by_type.get(t) {
409 let avg = if rc > 0 {
410 format!("avg ⭐ {:.1}", rs / rc as f64)
411 } else {
412 "unrated".to_string()
413 };
414 let icon = match t {
415 "movie" => "🎬",
416 "show" => "📺",
417 "book" => "📚",
418 "podcast" => "🎙",
419 _ => "🎭",
420 };
421 rows.push(format!(
422 " {icon} **{}**: {count} entries — {avg}",
423 Self::capitalize(t)
424 ));
425 }
426 }
427
428 let header = match filter {
429 Some(t) => format!("**📊 Media Stats — {}**", Self::capitalize(t)),
430 None => "**📊 Media Stats — All Time**".to_string(),
431 };
432 let avg_line = if total_rated > 0 {
433 format!("\n\n**Overall avg rating**: ⭐ {avg_overall:.1}/10 ({total_rated} rated)")
434 } else {
435 String::new()
436 };
437
438 format!(
439 "{header}\n\n{}\n\n**Total entries**: {total}{avg_line}",
440 rows.join("\n")
441 )
442 }
443
444 fn cmd_top(&self, parts: &[&str]) -> String {
445 let n: usize = parts
446 .first()
447 .and_then(|s| s.parse().ok())
448 .unwrap_or(5)
449 .min(20);
450 let filter = parts
451 .get(
452 if parts
453 .first()
454 .and_then(|s| s.parse::<usize>().ok())
455 .is_some()
456 {
457 1
458 } else {
459 0
460 },
461 )
462 .copied();
463
464 let log = self.log.lock().unwrap();
465 let mut rated: Vec<&MediaEntry> = log
466 .iter()
467 .filter(|e| e.rating.is_some())
468 .filter(|e| filter.map(|t| e.media_type == t).unwrap_or(true))
469 .collect();
470
471 if rated.is_empty() {
472 return "📭 No rated entries yet.\n\nTry: `add movie \"Inception\" 9 scifi`"
473 .to_string();
474 }
475
476 rated.sort_by(|a, b| {
477 b.rating
478 .unwrap_or(0.0)
479 .partial_cmp(&a.rating.unwrap_or(0.0))
480 .unwrap_or(std::cmp::Ordering::Equal)
481 });
482
483 let type_label = filter
484 .map(|t| format!(" — {}", Self::capitalize(t)))
485 .unwrap_or_default();
486 let rows: Vec<String> = rated
487 .iter()
488 .take(n)
489 .enumerate()
490 .map(|(i, e)| {
491 let icon = match e.media_type.as_str() {
492 "movie" => "🎬",
493 "show" => "📺",
494 "book" => "📚",
495 "podcast" => "🎙",
496 _ => "🎭",
497 };
498 let genre = e
499 .genre
500 .as_deref()
501 .map(|g| format!(" `{g}`"))
502 .unwrap_or_default();
503 format!(
504 " {}. {icon} **{}** ⭐ {:.1}{genre}",
505 i + 1,
506 e.title,
507 e.rating.unwrap_or(0.0)
508 )
509 })
510 .collect();
511
512 format!("**🏆 Top {n}{type_label}**\n\n{}", rows.join("\n"))
513 }
514
515 fn cmd_calc(&self, parts: &[&str]) -> String {
516 match parts.first().copied().unwrap_or("") {
517 "binge" => {
518 if parts.len() < 3 {
522 return "Usage: `calc binge <episodes> <mins_per_ep>`\n\
523 OR: `calc binge <seasons> <eps_per_season> <mins_per_ep>`\n\n\
524 Examples:\n\
525 • `calc binge 62 47` → 62 episodes × 47 min\n\
526 • `calc binge 5 13 45` → 5 seasons × 13 eps × 45 min"
527 .to_string();
528 }
529
530 let (total_eps, mins): (u64, u64) = if parts.len() >= 4 {
531 let seasons: u64 = parts[1].parse().unwrap_or(0);
532 let eps: u64 = parts[2].parse().unwrap_or(0);
533 let mins: u64 = parts[3].parse().unwrap_or(0);
534 (seasons * eps, mins)
535 } else {
536 let eps: u64 = parts[1].parse().unwrap_or(0);
537 let mins: u64 = parts[2].parse().unwrap_or(0);
538 (eps, mins)
539 };
540
541 if total_eps == 0 || mins == 0 {
542 return "Episodes and minutes per episode must be positive.".to_string();
543 }
544
545 let total_mins = total_eps * mins;
546 let hours = total_mins / 60;
547 let rem_mins = total_mins % 60;
548 let days_8h = (total_mins as f64 / 480.0).ceil() as u64; let days_4h = (total_mins as f64 / 240.0).ceil() as u64; format!(
552 "**📺 Binge Calculator**\n\n\
553 Episodes : {total_eps} × {mins} min\n\
554 Total time : **{hours}h {rem_mins}m** ({total_mins} min)\n\n\
555 At 4 h/day → **{days_4h} days**\n\
556 At 8 h/day → **{days_8h} days**\n\n\
557 _Popcorn budget not included._"
558 )
559 }
560
561 "read" => {
562 if parts.len() < 3 {
564 return "Usage: `calc read <pages> <pages_per_hour>`\n\n\
565 Example: `calc read 400 30` → 400 pages @ 30 pp/h"
566 .to_string();
567 }
568 let pages: u64 = parts[1].parse().unwrap_or(0);
569 let speed: u64 = parts[2].parse().unwrap_or(0);
570 if pages == 0 || speed == 0 {
571 return "Pages and speed must be positive.".to_string();
572 }
573
574 let total_mins = pages * 60 / speed;
575 let hours = total_mins / 60;
576 let rem_mins = total_mins % 60;
577 let sessions_30 = (total_mins as f64 / 30.0).ceil() as u64;
578 let sessions_60 = (total_mins as f64 / 60.0).ceil() as u64;
579
580 format!(
581 "**📚 Reading Calculator**\n\n\
582 Pages : {pages} @ {speed} pp/h\n\
583 Time : **{hours}h {rem_mins}m** ({total_mins} min)\n\n\
584 In 30-min sessions → **{sessions_30} sessions**\n\
585 In 60-min sessions → **{sessions_60} sessions**"
586 )
587 }
588
589 "listen" => {
590 if parts.len() < 3 {
592 return "Usage: `calc listen <episodes> <mins_per_episode>`\n\n\
593 Example: `calc listen 10 45` → 10 episodes × 45 min"
594 .to_string();
595 }
596 let eps: u64 = parts[1].parse().unwrap_or(0);
597 let mins: u64 = parts[2].parse().unwrap_or(0);
598 if eps == 0 || mins == 0 {
599 return "Episodes and duration must be positive.".to_string();
600 }
601
602 let total = eps * mins;
603 let hours = total / 60;
604 let rem = total % 60;
605 let at_1_5x = total * 2 / 3;
606 let h15 = at_1_5x / 60;
607 let r15 = at_1_5x % 60;
608
609 format!(
610 "**🎙 Podcast Queue Calculator**\n\n\
611 {eps} episodes × {mins} min\n\
612 Normal speed : **{hours}h {rem}m**\n\
613 At 1.5× : **{h15}h {r15}m**"
614 )
615 }
616
617 _ => "**calc** subcommands:\n\n\
618 ```\n\
619 calc binge <eps> <mins/ep> binge time\n\
620 calc binge <seasons> <eps/season> <mins/ep> binge time\n\
621 calc read <pages> <pages/hr> reading time\n\
622 calc listen <episodes> <mins/ep> podcast queue time\n\
623 ```"
624 .to_string(),
625 }
626 }
627
628 fn cmd_tips(&self, topic: &str) -> String {
629 match topic {
630 "streaming" | "stream" => {
631 "**📺 Streaming Tips**\n\n\
632 1. **Watch before cancelling** — use free trials strategically (one at a time)\n\
633 2. **Download for offline** — save mobile data on commutes\n\
634 3. **Avoid spoilers** — use `queue add` to park titles; watch in order\n\
635 4. **Skip intros/recaps** — reclaim ~5 min per episode on long binges\n\
636 5. **1.25× speed** — comfortable for most content, saves ~20% of time\n\n\
637 _Use `calc binge` to plan how many days a series will take._".to_string()
638 }
639 "reading" | "read" | "books" => {
640 "**📚 Reading Tips**\n\n\
641 1. **Read daily** — even 20 pages/day = ~15 books a year\n\
642 2. **Two-book rule** — keep a light read alongside a dense one\n\
643 3. **No phone in bed** — replace doom-scrolling with 30 min of reading\n\
644 4. **Take notes** — highlight + jot key ideas; retention increases 3×\n\
645 5. **DNF guilt-free** — abandon books you're not enjoying after 50 pages\n\n\
646 _Use `calc read` to see how long your next book will take._".to_string()
647 }
648 "podcasts" | "podcast" => {
649 "**🎙 Podcast Tips**\n\n\
650 1. **Queue by theme** — batch similar episodes for deeper absorption\n\
651 2. **Speed up gradually** — 1.0× → 1.25× → 1.5×; brain adapts in a week\n\
652 3. **Active listening** — note one insight per episode before moving on\n\
653 4. **Commute pairing** — assign podcasts to routine tasks (gym, dishes)\n\
654 5. **Trim the backlog** — unsubscribe from shows you skip for 3+ episodes\n\n\
655 _Use `calc listen` to size your podcast backlog._".to_string()
656 }
657 "focus" | "productivity" => {
658 "**🎯 Focus & Deep Work Media Tips**\n\n\
659 1. **Instrumental only** — lyrics activate language centres and fragment focus\n\
660 2. **Lo-fi / ambient** — consistent low-stimulation audio masks distractions\n\
661 3. **Volume at 50–60%** — loud audio elevates cortisol over long sessions\n\
662 4. **No queue anxiety** — log content in `queue` and forget it; reclaim mental RAM\n\
663 5. **Media fasts** — 1 day/week screen-free improves creative output measurably\n\n\
664 _Try: `queue add podcast \"Deep Work recap\"` to park ideas for later._".to_string()
665 }
666 "movies" | "film" | "cinema" => {
667 "**🎬 Film Tips**\n\n\
668 1. **Rate right after** — memory of emotional impact fades within hours\n\
669 2. **Director filmographies** — watch all films by one director back-to-back\n\
670 3. **Criterion / A24** — reliable quality signals for arthouse discovery\n\
671 4. **First 10 minutes rule** — if it doesn't hook you, it rarely improves\n\
672 5. **Discuss afterwards** — verbalising a film doubles long-term retention\n\n\
673 _Use `top 10 movie` to review your highest-rated films._".to_string()
674 }
675 _ => {
676 "**WME Media Tips** — pick a topic:\n\n\
677 ```\n\
678 tips streaming — streaming platform strategy\n\
679 tips reading — books and reading habits\n\
680 tips podcasts — podcast consumption\n\
681 tips focus — media for deep work\n\
682 tips movies — film watching habits\n\
683 ```".to_string()
684 }
685 }
686 }
687
688 fn capitalize(s: &str) -> String {
689 let mut c = s.chars();
690 match c.next() {
691 None => String::new(),
692 Some(f) => f.to_uppercase().collect::<String>() + c.as_str(),
693 }
694 }
695
696 fn dispatch(&self, text: &str) -> String {
697 let arg = text.strip_prefix("@wme-agent").unwrap_or(text).trim();
698
699 let parts: Vec<&str> = arg.split_whitespace().collect();
700 let cmd = parts.first().copied().unwrap_or("help");
701
702 match cmd {
703 "add" => self.cmd_add(&parts[1..]),
704 "queue" => self.cmd_queue(&parts[1..]),
705 "stats" => self.cmd_stats(parts.get(1).copied()),
706 "top" => self.cmd_top(&parts[1..]),
707 "calc" => self.cmd_calc(&parts[1..]),
708 "tips" => self.cmd_tips(parts.get(1).copied().unwrap_or("")),
709 "help" | "" => "**WME — Media Expert** 🎬\n\
710 _Waldiez Media Expert_\n\n\
711 ```\n\
712 add movie|show|book|podcast \"<title>\" … log media\n\
713 queue add <type> \"<title>\" add to watchlist\n\
714 queue list show queue\n\
715 queue done \"<title>\" [rating] mark finished\n\
716 queue drop \"<title>\" remove from queue\n\
717 stats [movie|show|book|podcast] consumption stats\n\
718 top [n] [movie|show|book|podcast] highest rated\n\
719 calc binge <eps> <mins/ep> binge time\n\
720 calc read <pages> <pp/hr> reading time\n\
721 calc listen <eps> <mins/ep> podcast queue time\n\
722 tips [streaming|reading|podcasts|focus] media advice\n\
723 help this message\n\
724 ```"
725 .to_string(),
726 _ => format!("Unknown command: `{cmd}`. Type `help` for the full command list."),
727 }
728 }
729}
730
731#[async_trait]
734impl Actor for WmeAgent {
735 fn id(&self) -> String {
736 self.config.id.clone()
737 }
738 fn name(&self) -> &str {
739 &self.config.name
740 }
741 fn state(&self) -> ActorState {
742 self.state.clone()
743 }
744 fn metrics(&self) -> Arc<ActorMetrics> {
745 Arc::clone(&self.metrics)
746 }
747 fn mailbox(&self) -> mpsc::Sender<Message> {
748 self.mailbox_tx.clone()
749 }
750 fn is_protected(&self) -> bool {
751 self.config.protected
752 }
753
754 async fn on_start(&mut self) -> Result<()> {
755 self.state = ActorState::Running;
756 if let Some(pub_) = &self.publisher {
757 pub_.publish(
758 wactorz_mqtt::topics::spawn(&self.config.id),
759 &serde_json::json!({
760 "agentId": self.config.id,
761 "agentName": self.config.name,
762 "agentType": "media",
763 "timestampMs": Self::now_ms(),
764 }),
765 );
766 }
767 Ok(())
768 }
769
770 async fn handle_message(&mut self, message: Message) -> Result<()> {
771 use wactorz_core::message::MessageType;
772
773 let content = match &message.payload {
774 MessageType::Text { content } => content.trim().to_string(),
775 MessageType::Task { description, .. } => description.trim().to_string(),
776 _ => return Ok(()),
777 };
778
779 let reply = self.dispatch(&content);
780 self.reply(&reply);
781 Ok(())
782 }
783
784 async fn on_heartbeat(&mut self) -> Result<()> {
785 if let Some(pub_) = &self.publisher {
786 pub_.publish(
787 wactorz_mqtt::topics::heartbeat(&self.config.id),
788 &serde_json::json!({
789 "agentId": self.config.id,
790 "agentName": self.config.name,
791 "state": self.state,
792 "timestampMs": Self::now_ms(),
793 }),
794 );
795 }
796 Ok(())
797 }
798
799 async fn run(&mut self) -> Result<()> {
800 self.on_start().await?;
801
802 let mut rx = self
803 .mailbox_rx
804 .take()
805 .ok_or_else(|| anyhow::anyhow!("WmeAgent already running"))?;
806
807 let mut hb = tokio::time::interval(std::time::Duration::from_secs(
808 self.config.heartbeat_interval_secs,
809 ));
810 hb.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
811
812 loop {
813 tokio::select! {
814 biased;
815 msg = rx.recv() => match msg {
816 None => break,
817 Some(m) => {
818 self.metrics.record_received();
819 if let wactorz_core::message::MessageType::Command {
820 command: wactorz_core::message::ActorCommand::Stop,
821 } = &m.payload { break; }
822 match self.handle_message(m).await {
823 Ok(_) => self.metrics.record_processed(),
824 Err(e) => {
825 tracing::error!("[{}] {e}", self.config.name);
826 self.metrics.record_failed();
827 }
828 }
829 }
830 },
831 _ = hb.tick() => {
832 self.metrics.record_heartbeat();
833 if let Err(e) = self.on_heartbeat().await {
834 tracing::error!("[{}] heartbeat: {e}", self.config.name);
835 }
836 }
837 }
838 }
839
840 self.state = ActorState::Stopped;
841 self.on_stop().await
842 }
843}