wactorz_agents/
fuseki_agent.rs

1//! Apache Jena Fuseki SPARQL agent.
2//!
3//! [`FusekiAgent`] executes SPARQL queries and updates against an Apache
4//! Jena Fuseki endpoint.  It also supports LLM-assisted query generation.
5//!
6//! Configuration (env vars):
7//! - `FUSEKI_URL`     — Fuseki base URL (default: `http://fuseki:3030`)
8//! - `FUSEKI_DATASET` — Dataset path   (default: `/ds`)
9
10use anyhow::Result;
11use async_trait::async_trait;
12use std::sync::Arc;
13use tokio::sync::mpsc;
14
15use crate::llm_agent::{LlmAgent, LlmConfig};
16use wactorz_core::{Actor, ActorConfig, ActorMetrics, ActorState, EventPublisher, Message};
17
18pub struct FusekiAgent {
19    config: ActorConfig,
20    fuseki_url: String,
21    dataset: String,
22    http: reqwest::Client,
23    llm: Option<LlmAgent>,
24    state: ActorState,
25    metrics: Arc<ActorMetrics>,
26    mailbox_tx: mpsc::Sender<Message>,
27    mailbox_rx: Option<mpsc::Receiver<Message>>,
28    publisher: Option<EventPublisher>,
29}
30
31impl FusekiAgent {
32    pub fn new(config: ActorConfig) -> Self {
33        let fuseki_url =
34            std::env::var("FUSEKI_URL").unwrap_or_else(|_| "http://fuseki:3030".into());
35        let dataset = std::env::var("FUSEKI_DATASET").unwrap_or_else(|_| "/ds".into());
36        let (tx, rx) = mpsc::channel(config.mailbox_capacity);
37        Self {
38            config,
39            fuseki_url,
40            dataset,
41            http: reqwest::Client::new(),
42            llm: None,
43            state: ActorState::Initializing,
44            metrics: Arc::new(ActorMetrics::new()),
45            mailbox_tx: tx,
46            mailbox_rx: Some(rx),
47            publisher: None,
48        }
49    }
50
51    pub fn with_publisher(mut self, p: EventPublisher) -> Self {
52        self.publisher = Some(p);
53        self
54    }
55
56    pub fn with_llm(mut self, llm_config: LlmConfig) -> Self {
57        let llm_cfg = ActorConfig::new(format!("{}-llm", self.config.name));
58        self.llm = Some(LlmAgent::new(llm_cfg, llm_config));
59        self
60    }
61
62    /// Execute a SPARQL SELECT query; returns JSON results.
63    async fn sparql_query(&self, query: &str) -> Result<serde_json::Value> {
64        let url = format!("{}{}/query", self.fuseki_url, self.dataset);
65        let resp = self
66            .http
67            .post(&url)
68            .header("Content-Type", "application/sparql-query")
69            .header("Accept", "application/sparql-results+json")
70            .body(query.to_string())
71            .send()
72            .await?;
73        if !resp.status().is_success() {
74            let s = resp.status();
75            let t = resp.text().await.unwrap_or_default();
76            anyhow::bail!("Fuseki {s}: {t}");
77        }
78        Ok(resp.json().await?)
79    }
80
81    /// Execute a SPARQL UPDATE statement.
82    async fn sparql_update(&self, update: &str) -> Result<()> {
83        let url = format!("{}{}/update", self.fuseki_url, self.dataset);
84        let resp = self
85            .http
86            .post(&url)
87            .header("Content-Type", "application/sparql-update")
88            .body(update.to_string())
89            .send()
90            .await?;
91        if !resp.status().is_success() {
92            let s = resp.status();
93            let t = resp.text().await.unwrap_or_default();
94            anyhow::bail!("Fuseki update {s}: {t}");
95        }
96        Ok(())
97    }
98
99    async fn process(&mut self, text: &str) -> String {
100        // If input looks like SPARQL, run it directly
101        let trimmed = text.trim().to_uppercase();
102        if trimmed.starts_with("SELECT")
103            || trimmed.starts_with("ASK")
104            || trimmed.starts_with("CONSTRUCT")
105        {
106            match self.sparql_query(text).await {
107                Ok(v) => format!(
108                    "SPARQL results:\n{}",
109                    serde_json::to_string_pretty(&v).unwrap_or_else(|_| v.to_string())
110                ),
111                Err(e) => format!("Fuseki error: {e}"),
112            }
113        } else if trimmed.starts_with("INSERT")
114            || trimmed.starts_with("DELETE")
115            || trimmed.starts_with("WITH")
116        {
117            match self.sparql_update(text).await {
118                Ok(()) => "Update executed successfully.".into(),
119                Err(e) => format!("Fuseki update error: {e}"),
120            }
121        } else if let Some(llm) = &mut self.llm {
122            let prompt = format!(
123                "You are a SPARQL/RDF expert connected to a Fuseki endpoint at {}{}\n\
124                 The user asked: \"{text}\"\n\
125                 Generate a SPARQL query or respond helpfully. \
126                 If you generate a query, wrap it in ```sparql ... ``` fences.",
127                self.fuseki_url, self.dataset
128            );
129            llm.complete(&prompt)
130                .await
131                .unwrap_or_else(|e| format!("LLM error: {e}"))
132        } else {
133            format!(
134                "Provide a SPARQL query (SELECT/ASK/INSERT/DELETE) to execute against {}{}",
135                self.fuseki_url, self.dataset
136            )
137        }
138    }
139
140    fn now_ms() -> u64 {
141        std::time::SystemTime::now()
142            .duration_since(std::time::UNIX_EPOCH)
143            .unwrap_or_default()
144            .as_millis() as u64
145    }
146}
147
148#[async_trait]
149impl Actor for FusekiAgent {
150    fn id(&self) -> String {
151        self.config.id.clone()
152    }
153    fn name(&self) -> &str {
154        &self.config.name
155    }
156    fn state(&self) -> ActorState {
157        self.state.clone()
158    }
159    fn metrics(&self) -> Arc<ActorMetrics> {
160        Arc::clone(&self.metrics)
161    }
162    fn mailbox(&self) -> mpsc::Sender<Message> {
163        self.mailbox_tx.clone()
164    }
165
166    async fn on_start(&mut self) -> Result<()> {
167        self.state = ActorState::Running;
168        tracing::info!(
169            "[{}] Fuseki agent → {}{}",
170            self.config.name,
171            self.fuseki_url,
172            self.dataset
173        );
174        if let Some(pub_) = &self.publisher {
175            pub_.publish(
176                wactorz_mqtt::topics::spawn(&self.config.id),
177                &serde_json::json!({
178                    "agentId":   self.config.id,
179                    "agentName": self.config.name,
180                    "agentType": "fuseki",
181                    "fusekiUrl": self.fuseki_url,
182                    "dataset":   self.dataset,
183                    "timestampMs": Self::now_ms(),
184                }),
185            );
186        }
187        Ok(())
188    }
189
190    async fn handle_message(&mut self, message: Message) -> Result<()> {
191        use wactorz_core::message::MessageType;
192        let text = match &message.payload {
193            MessageType::Text { content } => content.clone(),
194            MessageType::Task { description, .. } => description.clone(),
195            _ => return Ok(()),
196        };
197        let response = self.process(&text).await;
198        if let Some(pub_) = &self.publisher {
199            pub_.publish(
200                wactorz_mqtt::topics::chat(&self.config.id),
201                &serde_json::json!({
202                    "from":      self.config.name,
203                    "to":        message.from.as_deref().unwrap_or("user"),
204                    "content":   response,
205                    "timestampMs": Self::now_ms(),
206                }),
207            );
208        }
209        Ok(())
210    }
211
212    async fn on_heartbeat(&mut self) -> Result<()> {
213        if let Some(pub_) = &self.publisher {
214            pub_.publish(
215                wactorz_mqtt::topics::heartbeat(&self.config.id),
216                &serde_json::json!({
217                    "agentId":   self.config.id,
218                    "agentName": self.config.name,
219                    "state":     self.state,
220                    "fusekiUrl": self.fuseki_url,
221                    "timestampMs": Self::now_ms(),
222                }),
223            );
224        }
225        Ok(())
226    }
227
228    async fn run(&mut self) -> Result<()> {
229        self.on_start().await?;
230        let mut rx = self
231            .mailbox_rx
232            .take()
233            .ok_or_else(|| anyhow::anyhow!("FusekiAgent already running"))?;
234        let mut hb = tokio::time::interval(std::time::Duration::from_secs(
235            self.config.heartbeat_interval_secs,
236        ));
237        hb.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
238        loop {
239            tokio::select! {
240                biased;
241                msg = rx.recv() => {
242                    match msg {
243                        None    => break,
244                        Some(m) => {
245                            self.metrics.record_received();
246                            if let wactorz_core::message::MessageType::Command {
247                                command: wactorz_core::message::ActorCommand::Stop
248                            } = &m.payload { break; }
249                            match self.handle_message(m).await {
250                                Ok(_)  => self.metrics.record_processed(),
251                                Err(e) => { tracing::error!("[{}] {e}", self.config.name); self.metrics.record_failed(); }
252                            }
253                        }
254                    }
255                }
256                _ = hb.tick() => {
257                    self.metrics.record_heartbeat();
258                    if let Err(e) = self.on_heartbeat().await {
259                        tracing::error!("[{}] heartbeat: {e}", self.config.name);
260                    }
261                }
262            }
263        }
264        self.state = ActorState::Stopped;
265        self.on_stop().await
266    }
267}