wactorz_agents/
news_agent.rs

1//! News headlines agent.
2//!
3//! [`NewsAgent`] fetches current headlines on demand.
4//!
5//! **Sources (no API key needed by default):**
6//!
7//! - [Hacker News](https://hacker-news.firebaseio.com) — tech/startup news (default)
8//! - Any RSS/Atom feed URL via `NEWS_RSS_URL` env var
9//!
10//! ## Usage (via IO bar)
11//!
12//! ```text
13//! @news-agent                 → top 5 HackerNews stories
14//! @news-agent 10              → top 10 stories
15//! @news-agent top             → same as above
16//! @news-agent ask             → HN "Ask HN" stories
17//! @news-agent show            → HN "Show HN" stories
18//! @news-agent new             → newest HN stories
19//! @news-agent jobs            → HN job postings
20//! @news-agent help            → show usage
21//! ```
22//!
23//! The agent does **not** poll; it only fetches when it receives a message.
24//! It is stoppable and pausable — consumes no resources when idle.
25
26use anyhow::Result;
27use async_trait::async_trait;
28use std::sync::Arc;
29use tokio::sync::mpsc;
30
31use wactorz_core::{Actor, ActorConfig, ActorMetrics, ActorState, EventPublisher, Message};
32
33/// Default number of stories to show.
34const DEFAULT_STORY_COUNT: usize = 5;
35/// Maximum stories to fetch.
36const MAX_STORY_COUNT: usize = 20;
37
38const HTTP_TIMEOUT_SECS: u64 = 12;
39
40/// Hacker News Firebase API base URL.
41const HN_API: &str = "https://hacker-news.firebaseio.com/v0";
42
43pub struct NewsAgent {
44    config: ActorConfig,
45    state: ActorState,
46    metrics: Arc<ActorMetrics>,
47    mailbox_tx: mpsc::Sender<Message>,
48    mailbox_rx: Option<mpsc::Receiver<Message>>,
49    publisher: Option<EventPublisher>,
50    http: reqwest::Client,
51}
52
53impl NewsAgent {
54    pub fn new(config: ActorConfig) -> Self {
55        let (tx, rx) = mpsc::channel(config.mailbox_capacity);
56        let http = reqwest::Client::builder()
57            .timeout(std::time::Duration::from_secs(HTTP_TIMEOUT_SECS))
58            .user_agent("AgentFlow-NewsAgent/1.0")
59            .build()
60            .unwrap_or_default();
61        Self {
62            config,
63            state: ActorState::Initializing,
64            metrics: Arc::new(ActorMetrics::new()),
65            mailbox_tx: tx,
66            mailbox_rx: Some(rx),
67            publisher: None,
68            http,
69        }
70    }
71
72    pub fn with_publisher(mut self, p: EventPublisher) -> Self {
73        self.publisher = Some(p);
74        self
75    }
76
77    fn now_ms() -> u64 {
78        std::time::SystemTime::now()
79            .duration_since(std::time::UNIX_EPOCH)
80            .unwrap_or_default()
81            .as_millis() as u64
82    }
83
84    fn reply(&self, content: &str) {
85        if let Some(pub_) = &self.publisher {
86            pub_.publish(
87                wactorz_mqtt::topics::chat(&self.config.id),
88                &serde_json::json!({
89                    "from":        self.config.name,
90                    "to":          "user",
91                    "content":     content,
92                    "timestampMs": Self::now_ms(),
93                }),
94            );
95        }
96    }
97
98    /// Fetch story IDs from a HN endpoint (top/new/best/ask/show/job).
99    async fn hn_story_ids(&self, feed: &str) -> Result<Vec<u64>> {
100        let url = format!("{HN_API}/{feed}stories.json");
101        let ids: Vec<u64> = self.http.get(&url).send().await?.json().await?;
102        Ok(ids)
103    }
104
105    /// Fetch and format HN stories.
106    async fn fetch_hn(&self, feed: &str, count: usize) -> Result<String> {
107        let ids = self.hn_story_ids(feed).await?;
108        let take = count.min(ids.len()).min(MAX_STORY_COUNT);
109
110        // Fetch stories concurrently
111        let mut handles = Vec::with_capacity(take);
112        for &id in ids.iter().take(take) {
113            let client = self.http.clone();
114            handles.push(tokio::spawn(async move {
115                let url = format!("{HN_API}/item/{id}.json");
116                client
117                    .get(&url)
118                    .send()
119                    .await
120                    .ok()?
121                    .json::<serde_json::Value>()
122                    .await
123                    .ok()
124            }));
125        }
126
127        let mut lines = Vec::with_capacity(take);
128        for (i, handle) in handles.into_iter().enumerate() {
129            if let Ok(Some(item)) = handle.await {
130                let title = item
131                    .get("title")
132                    .and_then(|v| v.as_str())
133                    .unwrap_or("(no title)");
134                let url = item.get("url").and_then(|v| v.as_str()).unwrap_or("");
135                let score = item.get("score").and_then(|v| v.as_i64()).unwrap_or(0);
136                let hn_url = format!("https://news.ycombinator.com/item?id={}", ids[i]);
137
138                let link = if url.is_empty() {
139                    hn_url.clone()
140                } else {
141                    url.to_string()
142                };
143                lines.push(format!(
144                    "{}. **[{title}]({link})** — ⬆ {score} · [HN]({hn_url})",
145                    i + 1
146                ));
147            }
148        }
149
150        let feed_label = match feed {
151            "top" => "Top",
152            "new" => "Newest",
153            "best" => "Best",
154            "ask" => "Ask HN",
155            "show" => "Show HN",
156            "job" => "Jobs",
157            other => other,
158        };
159
160        if lines.is_empty() {
161            return Ok(format!("No {feed_label} stories found right now."));
162        }
163
164        Ok(format!(
165            "**Hacker News — {feed_label} Stories** (top {take})\n\n{}\n\n*Source: [news.ycombinator.com](https://news.ycombinator.com)*",
166            lines.join("\n")
167        ))
168    }
169}
170
171#[async_trait]
172impl Actor for NewsAgent {
173    fn id(&self) -> String {
174        self.config.id.clone()
175    }
176    fn name(&self) -> &str {
177        &self.config.name
178    }
179    fn state(&self) -> ActorState {
180        self.state.clone()
181    }
182    fn metrics(&self) -> Arc<ActorMetrics> {
183        Arc::clone(&self.metrics)
184    }
185    fn mailbox(&self) -> mpsc::Sender<Message> {
186        self.mailbox_tx.clone()
187    }
188    fn is_protected(&self) -> bool {
189        self.config.protected
190    }
191
192    async fn on_start(&mut self) -> Result<()> {
193        self.state = ActorState::Running;
194        if let Some(pub_) = &self.publisher {
195            pub_.publish(
196                wactorz_mqtt::topics::spawn(&self.config.id),
197                &serde_json::json!({
198                    "agentId":   self.config.id,
199                    "agentName": self.config.name,
200                    "agentType": "data",
201                    "timestampMs": Self::now_ms(),
202                }),
203            );
204        }
205        Ok(())
206    }
207
208    async fn handle_message(&mut self, message: Message) -> Result<()> {
209        use wactorz_core::message::MessageType;
210
211        let content = match &message.payload {
212            MessageType::Text { content } => content.trim().to_string(),
213            MessageType::Task { description, .. } => description.trim().to_string(),
214            _ => return Ok(()),
215        };
216
217        // Strip @news-agent prefix if present
218        let arg = content
219            .strip_prefix("@news-agent")
220            .unwrap_or(&content)
221            .trim()
222            .to_lowercase();
223
224        let parts: Vec<&str> = arg.split_whitespace().collect();
225        let first = parts.first().copied().unwrap_or("");
226
227        // Determine feed and count
228        let (feed, count) = match first {
229            "" | "top" => (
230                "top",
231                parts
232                    .get(1)
233                    .and_then(|s| s.parse().ok())
234                    .unwrap_or(DEFAULT_STORY_COUNT),
235            ),
236            "new" => (
237                "new",
238                parts
239                    .get(1)
240                    .and_then(|s| s.parse().ok())
241                    .unwrap_or(DEFAULT_STORY_COUNT),
242            ),
243            "best" => (
244                "best",
245                parts
246                    .get(1)
247                    .and_then(|s| s.parse().ok())
248                    .unwrap_or(DEFAULT_STORY_COUNT),
249            ),
250            "ask" => (
251                "ask",
252                parts
253                    .get(1)
254                    .and_then(|s| s.parse().ok())
255                    .unwrap_or(DEFAULT_STORY_COUNT),
256            ),
257            "show" => (
258                "show",
259                parts
260                    .get(1)
261                    .and_then(|s| s.parse().ok())
262                    .unwrap_or(DEFAULT_STORY_COUNT),
263            ),
264            "jobs" | "job" => ("job", DEFAULT_STORY_COUNT),
265            n if n.parse::<usize>().is_ok() => ("top", n.parse().unwrap_or(DEFAULT_STORY_COUNT)),
266            "help" => {
267                self.reply(
268                    "**NewsAgent** — headlines via Hacker News (no API key needed)\n\n\
269                     ```\n\
270                     @news-agent              # top 5 stories\n\
271                     @news-agent 10           # top 10 stories\n\
272                     @news-agent new          # newest\n\
273                     @news-agent best         # all-time best\n\
274                     @news-agent ask          # Ask HN\n\
275                     @news-agent show         # Show HN\n\
276                     @news-agent jobs         # job postings\n\
277                     @news-agent help         # this message\n\
278                     ```",
279                );
280                return Ok(());
281            }
282            _ => ("top", DEFAULT_STORY_COUNT),
283        };
284
285        let count = count.min(MAX_STORY_COUNT);
286        self.reply(&format!(
287            "📰 Fetching top {count} {feed} stories from Hacker News…"
288        ));
289
290        match self.fetch_hn(feed, count).await {
291            Ok(report) => self.reply(&report),
292            Err(e) => self.reply(&format!("⚠ Could not fetch news: {e}")),
293        }
294
295        Ok(())
296    }
297
298    async fn on_heartbeat(&mut self) -> Result<()> {
299        if let Some(pub_) = &self.publisher {
300            pub_.publish(
301                wactorz_mqtt::topics::heartbeat(&self.config.id),
302                &serde_json::json!({
303                    "agentId":   self.config.id,
304                    "agentName": self.config.name,
305                    "state":     self.state,
306                    "timestampMs": Self::now_ms(),
307                }),
308            );
309        }
310        Ok(())
311    }
312
313    async fn run(&mut self) -> Result<()> {
314        self.on_start().await?;
315        let mut rx = self
316            .mailbox_rx
317            .take()
318            .ok_or_else(|| anyhow::anyhow!("NewsAgent already running"))?;
319        let mut hb = tokio::time::interval(std::time::Duration::from_secs(
320            self.config.heartbeat_interval_secs,
321        ));
322        hb.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
323        loop {
324            tokio::select! {
325                biased;
326                msg = rx.recv() => match msg {
327                    None => break,
328                    Some(m) => {
329                        self.metrics.record_received();
330                        if let wactorz_core::message::MessageType::Command {
331                            command: wactorz_core::message::ActorCommand::Stop
332                        } = &m.payload { break; }
333                        match self.handle_message(m).await {
334                            Ok(_)  => self.metrics.record_processed(),
335                            Err(e) => {
336                                tracing::error!("[{}] {e}", self.config.name);
337                                self.metrics.record_failed();
338                            }
339                        }
340                    }
341                },
342                _ = hb.tick() => {
343                    self.metrics.record_heartbeat();
344                    if let Err(e) = self.on_heartbeat().await {
345                        tracing::error!("[{}] heartbeat: {e}", self.config.name);
346                    }
347                }
348            }
349        }
350        self.state = ActorState::Stopped;
351        self.on_stop().await
352    }
353}