wactorz_mqtt/
client.rs

1//! Async MQTT client wrapper.
2//!
3//! [`MqttClient`] wraps `rumqttc::AsyncClient` and its event loop, exposing
4//! ergonomic `publish` / `subscribe` helpers that work directly with
5//! [`wactorz_core::Message`] values (serialised as JSON).
6
7use anyhow::Result;
8use rumqttc::{AsyncClient, EventLoop, QoS};
9use serde::{Deserialize, Serialize};
10use tracing::debug;
11
12use wactorz_core::Message;
13
14/// Connection parameters for the MQTT broker.
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct MqttConfig {
17    /// Broker hostname or IP address.
18    pub host: String,
19    /// Standard MQTT port (default 1883) or TLS port (8883).
20    pub port: u16,
21    /// Client identifier sent to the broker.
22    pub client_id: String,
23    /// Optional username for broker authentication.
24    pub username: Option<String>,
25    /// Optional password for broker authentication.
26    pub password: Option<String>,
27    /// Keep-alive interval in seconds.
28    pub keep_alive_secs: u64,
29    /// WebSocket port (for browser clients, default 9001).
30    pub ws_port: u16,
31}
32
33impl Default for MqttConfig {
34    fn default() -> Self {
35        Self {
36            host: "localhost".into(),
37            port: 1883,
38            client_id: "wactorz-server".into(),
39            username: None,
40            password: None,
41            keep_alive_secs: 30,
42            ws_port: 9001,
43        }
44    }
45}
46
47/// Typed events surfaced by the MQTT event loop.
48#[derive(Debug)]
49pub enum MqttEvent {
50    /// A message arrived on a subscribed topic.
51    Incoming { topic: String, payload: Vec<u8> },
52    /// The client successfully connected (or reconnected) to the broker.
53    Connected,
54    /// The client was cleanly disconnected.
55    Disconnected,
56}
57
58/// Async MQTT client.
59///
60/// Internally owns both the `rumqttc::AsyncClient` (for publish/subscribe) and
61/// the `rumqttc::EventLoop` (which must be polled continuously to keep the
62/// connection alive).
63pub struct MqttClient {
64    inner: AsyncClient,
65}
66
67impl MqttClient {
68    /// Create a new client and connect to the broker described by `config`.
69    ///
70    /// The returned `EventLoop` must be driven by calling [`MqttClient::run_event_loop`]
71    /// or by polling it manually in a dedicated task.
72    pub fn new(config: MqttConfig) -> Result<(Self, EventLoop)> {
73        let mut opts = rumqttc::MqttOptions::new(&config.client_id, &config.host, config.port);
74        opts.set_keep_alive(std::time::Duration::from_secs(config.keep_alive_secs));
75        if let (Some(user), Some(pass)) = (&config.username, &config.password) {
76            opts.set_credentials(user, pass);
77        }
78        opts.set_max_packet_size(256 * 1024, 256 * 1024);
79        let (inner, event_loop) = rumqttc::AsyncClient::new(opts, 64);
80        Ok((Self { inner }, event_loop))
81    }
82
83    /// Publish a serialised [`Message`] to the given topic.
84    pub async fn publish_message(&self, topic: &str, message: &Message) -> Result<()> {
85        let payload = serde_json::to_vec(message)?;
86        self.inner
87            .publish(topic, QoS::AtLeastOnce, false, payload)
88            .await?;
89        Ok(())
90    }
91
92    /// Publish a raw JSON payload to the given topic.
93    pub async fn publish_json(&self, topic: &str, payload: &impl Serialize) -> Result<()> {
94        let bytes = serde_json::to_vec(payload)?;
95        self.inner
96            .publish(topic, QoS::AtLeastOnce, false, bytes)
97            .await?;
98        Ok(())
99    }
100
101    /// Publish raw bytes to the given topic.
102    pub async fn publish_raw(&self, topic: &str, payload: Vec<u8>) -> Result<()> {
103        self.inner
104            .publish(topic, QoS::AtLeastOnce, false, payload)
105            .await?;
106        Ok(())
107    }
108
109    /// Subscribe to a topic pattern (MQTT wildcards `+` and `#` supported).
110    pub async fn subscribe(&self, topic: &str) -> Result<()> {
111        self.inner.subscribe(topic, QoS::AtLeastOnce).await?;
112        debug!(topic, "subscribed");
113        Ok(())
114    }
115
116    /// Unsubscribe from a topic.
117    pub async fn unsubscribe(&self, topic: &str) -> Result<()> {
118        self.inner.unsubscribe(topic).await?;
119        Ok(())
120    }
121
122    /// Drive the event loop, mapping raw `rumqttc` events to [`MqttEvent`]s
123    /// and forwarding them to `handler`.
124    ///
125    /// This method loops forever; call it in a dedicated `tokio::spawn` task.
126    pub async fn run_event_loop(
127        event_loop: &mut EventLoop,
128        mut handler: impl FnMut(MqttEvent) + Send + 'static,
129    ) {
130        use rumqttc::{Event, Packet};
131        loop {
132            match event_loop.poll().await {
133                Ok(Event::Incoming(Packet::Publish(p))) => {
134                    handler(MqttEvent::Incoming {
135                        topic: p.topic,
136                        payload: p.payload.to_vec(),
137                    });
138                }
139                Ok(Event::Incoming(Packet::ConnAck(_))) => {
140                    handler(MqttEvent::Connected);
141                }
142                Ok(Event::Incoming(Packet::Disconnect)) => {
143                    handler(MqttEvent::Disconnected);
144                    break;
145                }
146                Ok(_) => {} // PingReq, PubAck, SubAck etc — ignore
147                Err(e) => {
148                    tracing::error!("MQTT event loop error: {e}");
149                    tokio::time::sleep(std::time::Duration::from_secs(2)).await;
150                }
151            }
152        }
153    }
154}