1use anyhow::Result;
27use async_trait::async_trait;
28use std::sync::Arc;
29use tokio::sync::mpsc;
30
31use wactorz_core::{Actor, ActorConfig, ActorMetrics, ActorState, EventPublisher, Message};
32
33const DEFAULT_STORY_COUNT: usize = 5;
35const MAX_STORY_COUNT: usize = 20;
37
38const HTTP_TIMEOUT_SECS: u64 = 12;
39
40const HN_API: &str = "https://hacker-news.firebaseio.com/v0";
42
43pub struct NewsAgent {
44 config: ActorConfig,
45 state: ActorState,
46 metrics: Arc<ActorMetrics>,
47 mailbox_tx: mpsc::Sender<Message>,
48 mailbox_rx: Option<mpsc::Receiver<Message>>,
49 publisher: Option<EventPublisher>,
50 http: reqwest::Client,
51}
52
53impl NewsAgent {
54 pub fn new(config: ActorConfig) -> Self {
55 let (tx, rx) = mpsc::channel(config.mailbox_capacity);
56 let http = reqwest::Client::builder()
57 .timeout(std::time::Duration::from_secs(HTTP_TIMEOUT_SECS))
58 .user_agent("AgentFlow-NewsAgent/1.0")
59 .build()
60 .unwrap_or_default();
61 Self {
62 config,
63 state: ActorState::Initializing,
64 metrics: Arc::new(ActorMetrics::new()),
65 mailbox_tx: tx,
66 mailbox_rx: Some(rx),
67 publisher: None,
68 http,
69 }
70 }
71
72 pub fn with_publisher(mut self, p: EventPublisher) -> Self {
73 self.publisher = Some(p);
74 self
75 }
76
77 fn now_ms() -> u64 {
78 std::time::SystemTime::now()
79 .duration_since(std::time::UNIX_EPOCH)
80 .unwrap_or_default()
81 .as_millis() as u64
82 }
83
84 fn reply(&self, content: &str) {
85 if let Some(pub_) = &self.publisher {
86 pub_.publish(
87 wactorz_mqtt::topics::chat(&self.config.id),
88 &serde_json::json!({
89 "from": self.config.name,
90 "to": "user",
91 "content": content,
92 "timestampMs": Self::now_ms(),
93 }),
94 );
95 }
96 }
97
98 async fn hn_story_ids(&self, feed: &str) -> Result<Vec<u64>> {
100 let url = format!("{HN_API}/{feed}stories.json");
101 let ids: Vec<u64> = self.http.get(&url).send().await?.json().await?;
102 Ok(ids)
103 }
104
105 async fn fetch_hn(&self, feed: &str, count: usize) -> Result<String> {
107 let ids = self.hn_story_ids(feed).await?;
108 let take = count.min(ids.len()).min(MAX_STORY_COUNT);
109
110 let mut handles = Vec::with_capacity(take);
112 for &id in ids.iter().take(take) {
113 let client = self.http.clone();
114 handles.push(tokio::spawn(async move {
115 let url = format!("{HN_API}/item/{id}.json");
116 client
117 .get(&url)
118 .send()
119 .await
120 .ok()?
121 .json::<serde_json::Value>()
122 .await
123 .ok()
124 }));
125 }
126
127 let mut lines = Vec::with_capacity(take);
128 for (i, handle) in handles.into_iter().enumerate() {
129 if let Ok(Some(item)) = handle.await {
130 let title = item
131 .get("title")
132 .and_then(|v| v.as_str())
133 .unwrap_or("(no title)");
134 let url = item.get("url").and_then(|v| v.as_str()).unwrap_or("");
135 let score = item.get("score").and_then(|v| v.as_i64()).unwrap_or(0);
136 let hn_url = format!("https://news.ycombinator.com/item?id={}", ids[i]);
137
138 let link = if url.is_empty() {
139 hn_url.clone()
140 } else {
141 url.to_string()
142 };
143 lines.push(format!(
144 "{}. **[{title}]({link})** — ⬆ {score} · [HN]({hn_url})",
145 i + 1
146 ));
147 }
148 }
149
150 let feed_label = match feed {
151 "top" => "Top",
152 "new" => "Newest",
153 "best" => "Best",
154 "ask" => "Ask HN",
155 "show" => "Show HN",
156 "job" => "Jobs",
157 other => other,
158 };
159
160 if lines.is_empty() {
161 return Ok(format!("No {feed_label} stories found right now."));
162 }
163
164 Ok(format!(
165 "**Hacker News — {feed_label} Stories** (top {take})\n\n{}\n\n*Source: [news.ycombinator.com](https://news.ycombinator.com)*",
166 lines.join("\n")
167 ))
168 }
169}
170
171#[async_trait]
172impl Actor for NewsAgent {
173 fn id(&self) -> String {
174 self.config.id.clone()
175 }
176 fn name(&self) -> &str {
177 &self.config.name
178 }
179 fn state(&self) -> ActorState {
180 self.state.clone()
181 }
182 fn metrics(&self) -> Arc<ActorMetrics> {
183 Arc::clone(&self.metrics)
184 }
185 fn mailbox(&self) -> mpsc::Sender<Message> {
186 self.mailbox_tx.clone()
187 }
188 fn is_protected(&self) -> bool {
189 self.config.protected
190 }
191
192 async fn on_start(&mut self) -> Result<()> {
193 self.state = ActorState::Running;
194 if let Some(pub_) = &self.publisher {
195 pub_.publish(
196 wactorz_mqtt::topics::spawn(&self.config.id),
197 &serde_json::json!({
198 "agentId": self.config.id,
199 "agentName": self.config.name,
200 "agentType": "data",
201 "timestampMs": Self::now_ms(),
202 }),
203 );
204 }
205 Ok(())
206 }
207
208 async fn handle_message(&mut self, message: Message) -> Result<()> {
209 use wactorz_core::message::MessageType;
210
211 let content = match &message.payload {
212 MessageType::Text { content } => content.trim().to_string(),
213 MessageType::Task { description, .. } => description.trim().to_string(),
214 _ => return Ok(()),
215 };
216
217 let arg = content
219 .strip_prefix("@news-agent")
220 .unwrap_or(&content)
221 .trim()
222 .to_lowercase();
223
224 let parts: Vec<&str> = arg.split_whitespace().collect();
225 let first = parts.first().copied().unwrap_or("");
226
227 let (feed, count) = match first {
229 "" | "top" => (
230 "top",
231 parts
232 .get(1)
233 .and_then(|s| s.parse().ok())
234 .unwrap_or(DEFAULT_STORY_COUNT),
235 ),
236 "new" => (
237 "new",
238 parts
239 .get(1)
240 .and_then(|s| s.parse().ok())
241 .unwrap_or(DEFAULT_STORY_COUNT),
242 ),
243 "best" => (
244 "best",
245 parts
246 .get(1)
247 .and_then(|s| s.parse().ok())
248 .unwrap_or(DEFAULT_STORY_COUNT),
249 ),
250 "ask" => (
251 "ask",
252 parts
253 .get(1)
254 .and_then(|s| s.parse().ok())
255 .unwrap_or(DEFAULT_STORY_COUNT),
256 ),
257 "show" => (
258 "show",
259 parts
260 .get(1)
261 .and_then(|s| s.parse().ok())
262 .unwrap_or(DEFAULT_STORY_COUNT),
263 ),
264 "jobs" | "job" => ("job", DEFAULT_STORY_COUNT),
265 n if n.parse::<usize>().is_ok() => ("top", n.parse().unwrap_or(DEFAULT_STORY_COUNT)),
266 "help" => {
267 self.reply(
268 "**NewsAgent** — headlines via Hacker News (no API key needed)\n\n\
269 ```\n\
270 @news-agent # top 5 stories\n\
271 @news-agent 10 # top 10 stories\n\
272 @news-agent new # newest\n\
273 @news-agent best # all-time best\n\
274 @news-agent ask # Ask HN\n\
275 @news-agent show # Show HN\n\
276 @news-agent jobs # job postings\n\
277 @news-agent help # this message\n\
278 ```",
279 );
280 return Ok(());
281 }
282 _ => ("top", DEFAULT_STORY_COUNT),
283 };
284
285 let count = count.min(MAX_STORY_COUNT);
286 self.reply(&format!(
287 "📰 Fetching top {count} {feed} stories from Hacker News…"
288 ));
289
290 match self.fetch_hn(feed, count).await {
291 Ok(report) => self.reply(&report),
292 Err(e) => self.reply(&format!("⚠ Could not fetch news: {e}")),
293 }
294
295 Ok(())
296 }
297
298 async fn on_heartbeat(&mut self) -> Result<()> {
299 if let Some(pub_) = &self.publisher {
300 pub_.publish(
301 wactorz_mqtt::topics::heartbeat(&self.config.id),
302 &serde_json::json!({
303 "agentId": self.config.id,
304 "agentName": self.config.name,
305 "state": self.state,
306 "timestampMs": Self::now_ms(),
307 }),
308 );
309 }
310 Ok(())
311 }
312
313 async fn run(&mut self) -> Result<()> {
314 self.on_start().await?;
315 let mut rx = self
316 .mailbox_rx
317 .take()
318 .ok_or_else(|| anyhow::anyhow!("NewsAgent already running"))?;
319 let mut hb = tokio::time::interval(std::time::Duration::from_secs(
320 self.config.heartbeat_interval_secs,
321 ));
322 hb.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
323 loop {
324 tokio::select! {
325 biased;
326 msg = rx.recv() => match msg {
327 None => break,
328 Some(m) => {
329 self.metrics.record_received();
330 if let wactorz_core::message::MessageType::Command {
331 command: wactorz_core::message::ActorCommand::Stop
332 } = &m.payload { break; }
333 match self.handle_message(m).await {
334 Ok(_) => self.metrics.record_processed(),
335 Err(e) => {
336 tracing::error!("[{}] {e}", self.config.name);
337 self.metrics.record_failed();
338 }
339 }
340 }
341 },
342 _ = hb.tick() => {
343 self.metrics.record_heartbeat();
344 if let Err(e) = self.on_heartbeat().await {
345 tracing::error!("[{}] heartbeat: {e}", self.config.name);
346 }
347 }
348 }
349 }
350 self.state = ActorState::Stopped;
351 self.on_stop().await
352 }
353}