1use anyhow::Result;
8use rumqttc::{AsyncClient, EventLoop, QoS};
9use serde::{Deserialize, Serialize};
10use tracing::debug;
11
12use wactorz_core::Message;
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct MqttConfig {
17 pub host: String,
19 pub port: u16,
21 pub client_id: String,
23 pub username: Option<String>,
25 pub password: Option<String>,
27 pub keep_alive_secs: u64,
29 pub ws_port: u16,
31}
32
33impl Default for MqttConfig {
34 fn default() -> Self {
35 Self {
36 host: "localhost".into(),
37 port: 1883,
38 client_id: "wactorz-server".into(),
39 username: None,
40 password: None,
41 keep_alive_secs: 30,
42 ws_port: 9001,
43 }
44 }
45}
46
47#[derive(Debug)]
49pub enum MqttEvent {
50 Incoming { topic: String, payload: Vec<u8> },
52 Connected,
54 Disconnected,
56}
57
58pub struct MqttClient {
64 inner: AsyncClient,
65}
66
67impl MqttClient {
68 pub fn new(config: MqttConfig) -> Result<(Self, EventLoop)> {
73 let mut opts = rumqttc::MqttOptions::new(&config.client_id, &config.host, config.port);
74 opts.set_keep_alive(std::time::Duration::from_secs(config.keep_alive_secs));
75 if let (Some(user), Some(pass)) = (&config.username, &config.password) {
76 opts.set_credentials(user, pass);
77 }
78 opts.set_max_packet_size(256 * 1024, 256 * 1024);
79 let (inner, event_loop) = rumqttc::AsyncClient::new(opts, 64);
80 Ok((Self { inner }, event_loop))
81 }
82
83 pub async fn publish_message(&self, topic: &str, message: &Message) -> Result<()> {
85 let payload = serde_json::to_vec(message)?;
86 self.inner
87 .publish(topic, QoS::AtLeastOnce, false, payload)
88 .await?;
89 Ok(())
90 }
91
92 pub async fn publish_json(&self, topic: &str, payload: &impl Serialize) -> Result<()> {
94 let bytes = serde_json::to_vec(payload)?;
95 self.inner
96 .publish(topic, QoS::AtLeastOnce, false, bytes)
97 .await?;
98 Ok(())
99 }
100
101 pub async fn publish_raw(&self, topic: &str, payload: Vec<u8>) -> Result<()> {
103 self.inner
104 .publish(topic, QoS::AtLeastOnce, false, payload)
105 .await?;
106 Ok(())
107 }
108
109 pub async fn subscribe(&self, topic: &str) -> Result<()> {
111 self.inner.subscribe(topic, QoS::AtLeastOnce).await?;
112 debug!(topic, "subscribed");
113 Ok(())
114 }
115
116 pub async fn unsubscribe(&self, topic: &str) -> Result<()> {
118 self.inner.unsubscribe(topic).await?;
119 Ok(())
120 }
121
122 pub async fn run_event_loop(
127 event_loop: &mut EventLoop,
128 mut handler: impl FnMut(MqttEvent) + Send + 'static,
129 ) {
130 use rumqttc::{Event, Packet};
131 loop {
132 match event_loop.poll().await {
133 Ok(Event::Incoming(Packet::Publish(p))) => {
134 handler(MqttEvent::Incoming {
135 topic: p.topic,
136 payload: p.payload.to_vec(),
137 });
138 }
139 Ok(Event::Incoming(Packet::ConnAck(_))) => {
140 handler(MqttEvent::Connected);
141 }
142 Ok(Event::Incoming(Packet::Disconnect)) => {
143 handler(MqttEvent::Disconnected);
144 break;
145 }
146 Ok(_) => {} Err(e) => {
148 tracing::error!("MQTT event loop error: {e}");
149 tokio::time::sleep(std::time::Duration::from_secs(2)).await;
150 }
151 }
152 }
153 }
154}