wactorz_agents/
fuseki_agent.rs1use anyhow::Result;
11use async_trait::async_trait;
12use std::sync::Arc;
13use tokio::sync::mpsc;
14
15use crate::llm_agent::{LlmAgent, LlmConfig};
16use wactorz_core::{Actor, ActorConfig, ActorMetrics, ActorState, EventPublisher, Message};
17
18pub struct FusekiAgent {
19 config: ActorConfig,
20 fuseki_url: String,
21 dataset: String,
22 http: reqwest::Client,
23 llm: Option<LlmAgent>,
24 state: ActorState,
25 metrics: Arc<ActorMetrics>,
26 mailbox_tx: mpsc::Sender<Message>,
27 mailbox_rx: Option<mpsc::Receiver<Message>>,
28 publisher: Option<EventPublisher>,
29}
30
31impl FusekiAgent {
32 pub fn new(config: ActorConfig) -> Self {
33 let fuseki_url =
34 std::env::var("FUSEKI_URL").unwrap_or_else(|_| "http://fuseki:3030".into());
35 let dataset = std::env::var("FUSEKI_DATASET").unwrap_or_else(|_| "/ds".into());
36 let (tx, rx) = mpsc::channel(config.mailbox_capacity);
37 Self {
38 config,
39 fuseki_url,
40 dataset,
41 http: reqwest::Client::new(),
42 llm: None,
43 state: ActorState::Initializing,
44 metrics: Arc::new(ActorMetrics::new()),
45 mailbox_tx: tx,
46 mailbox_rx: Some(rx),
47 publisher: None,
48 }
49 }
50
51 pub fn with_publisher(mut self, p: EventPublisher) -> Self {
52 self.publisher = Some(p);
53 self
54 }
55
56 pub fn with_llm(mut self, llm_config: LlmConfig) -> Self {
57 let llm_cfg = ActorConfig::new(format!("{}-llm", self.config.name));
58 self.llm = Some(LlmAgent::new(llm_cfg, llm_config));
59 self
60 }
61
62 async fn sparql_query(&self, query: &str) -> Result<serde_json::Value> {
64 let url = format!("{}{}/query", self.fuseki_url, self.dataset);
65 let resp = self
66 .http
67 .post(&url)
68 .header("Content-Type", "application/sparql-query")
69 .header("Accept", "application/sparql-results+json")
70 .body(query.to_string())
71 .send()
72 .await?;
73 if !resp.status().is_success() {
74 let s = resp.status();
75 let t = resp.text().await.unwrap_or_default();
76 anyhow::bail!("Fuseki {s}: {t}");
77 }
78 Ok(resp.json().await?)
79 }
80
81 async fn sparql_update(&self, update: &str) -> Result<()> {
83 let url = format!("{}{}/update", self.fuseki_url, self.dataset);
84 let resp = self
85 .http
86 .post(&url)
87 .header("Content-Type", "application/sparql-update")
88 .body(update.to_string())
89 .send()
90 .await?;
91 if !resp.status().is_success() {
92 let s = resp.status();
93 let t = resp.text().await.unwrap_or_default();
94 anyhow::bail!("Fuseki update {s}: {t}");
95 }
96 Ok(())
97 }
98
99 async fn process(&mut self, text: &str) -> String {
100 let trimmed = text.trim().to_uppercase();
102 if trimmed.starts_with("SELECT")
103 || trimmed.starts_with("ASK")
104 || trimmed.starts_with("CONSTRUCT")
105 {
106 match self.sparql_query(text).await {
107 Ok(v) => format!(
108 "SPARQL results:\n{}",
109 serde_json::to_string_pretty(&v).unwrap_or_else(|_| v.to_string())
110 ),
111 Err(e) => format!("Fuseki error: {e}"),
112 }
113 } else if trimmed.starts_with("INSERT")
114 || trimmed.starts_with("DELETE")
115 || trimmed.starts_with("WITH")
116 {
117 match self.sparql_update(text).await {
118 Ok(()) => "Update executed successfully.".into(),
119 Err(e) => format!("Fuseki update error: {e}"),
120 }
121 } else if let Some(llm) = &mut self.llm {
122 let prompt = format!(
123 "You are a SPARQL/RDF expert connected to a Fuseki endpoint at {}{}\n\
124 The user asked: \"{text}\"\n\
125 Generate a SPARQL query or respond helpfully. \
126 If you generate a query, wrap it in ```sparql ... ``` fences.",
127 self.fuseki_url, self.dataset
128 );
129 llm.complete(&prompt)
130 .await
131 .unwrap_or_else(|e| format!("LLM error: {e}"))
132 } else {
133 format!(
134 "Provide a SPARQL query (SELECT/ASK/INSERT/DELETE) to execute against {}{}",
135 self.fuseki_url, self.dataset
136 )
137 }
138 }
139
140 fn now_ms() -> u64 {
141 std::time::SystemTime::now()
142 .duration_since(std::time::UNIX_EPOCH)
143 .unwrap_or_default()
144 .as_millis() as u64
145 }
146}
147
148#[async_trait]
149impl Actor for FusekiAgent {
150 fn id(&self) -> String {
151 self.config.id.clone()
152 }
153 fn name(&self) -> &str {
154 &self.config.name
155 }
156 fn state(&self) -> ActorState {
157 self.state.clone()
158 }
159 fn metrics(&self) -> Arc<ActorMetrics> {
160 Arc::clone(&self.metrics)
161 }
162 fn mailbox(&self) -> mpsc::Sender<Message> {
163 self.mailbox_tx.clone()
164 }
165
166 async fn on_start(&mut self) -> Result<()> {
167 self.state = ActorState::Running;
168 tracing::info!(
169 "[{}] Fuseki agent → {}{}",
170 self.config.name,
171 self.fuseki_url,
172 self.dataset
173 );
174 if let Some(pub_) = &self.publisher {
175 pub_.publish(
176 wactorz_mqtt::topics::spawn(&self.config.id),
177 &serde_json::json!({
178 "agentId": self.config.id,
179 "agentName": self.config.name,
180 "agentType": "fuseki",
181 "fusekiUrl": self.fuseki_url,
182 "dataset": self.dataset,
183 "timestampMs": Self::now_ms(),
184 }),
185 );
186 }
187 Ok(())
188 }
189
190 async fn handle_message(&mut self, message: Message) -> Result<()> {
191 use wactorz_core::message::MessageType;
192 let text = match &message.payload {
193 MessageType::Text { content } => content.clone(),
194 MessageType::Task { description, .. } => description.clone(),
195 _ => return Ok(()),
196 };
197 let response = self.process(&text).await;
198 if let Some(pub_) = &self.publisher {
199 pub_.publish(
200 wactorz_mqtt::topics::chat(&self.config.id),
201 &serde_json::json!({
202 "from": self.config.name,
203 "to": message.from.as_deref().unwrap_or("user"),
204 "content": response,
205 "timestampMs": Self::now_ms(),
206 }),
207 );
208 }
209 Ok(())
210 }
211
212 async fn on_heartbeat(&mut self) -> Result<()> {
213 if let Some(pub_) = &self.publisher {
214 pub_.publish(
215 wactorz_mqtt::topics::heartbeat(&self.config.id),
216 &serde_json::json!({
217 "agentId": self.config.id,
218 "agentName": self.config.name,
219 "state": self.state,
220 "fusekiUrl": self.fuseki_url,
221 "timestampMs": Self::now_ms(),
222 }),
223 );
224 }
225 Ok(())
226 }
227
228 async fn run(&mut self) -> Result<()> {
229 self.on_start().await?;
230 let mut rx = self
231 .mailbox_rx
232 .take()
233 .ok_or_else(|| anyhow::anyhow!("FusekiAgent already running"))?;
234 let mut hb = tokio::time::interval(std::time::Duration::from_secs(
235 self.config.heartbeat_interval_secs,
236 ));
237 hb.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
238 loop {
239 tokio::select! {
240 biased;
241 msg = rx.recv() => {
242 match msg {
243 None => break,
244 Some(m) => {
245 self.metrics.record_received();
246 if let wactorz_core::message::MessageType::Command {
247 command: wactorz_core::message::ActorCommand::Stop
248 } = &m.payload { break; }
249 match self.handle_message(m).await {
250 Ok(_) => self.metrics.record_processed(),
251 Err(e) => { tracing::error!("[{}] {e}", self.config.name); self.metrics.record_failed(); }
252 }
253 }
254 }
255 }
256 _ = hb.tick() => {
257 self.metrics.record_heartbeat();
258 if let Err(e) = self.on_heartbeat().await {
259 tracing::error!("[{}] heartbeat: {e}", self.config.name);
260 }
261 }
262 }
263 }
264 self.state = ActorState::Stopped;
265 self.on_stop().await
266 }
267}