backend_parity/
backend_parity.rs

1//! Rust counterpart to `tests/backend_parity_harness.py`.
2//!
3//! Runs the same supervisor scenarios defined in
4//! `tests/parity_fixtures/backend_supervisor_parity.json` and emits a JSON
5//! result that must match the Python harness output byte-for-byte (after
6//! parsing).
7//!
8//! Usage:
9//!   cargo run -q -p wactorz-core --bin backend_parity -- \
10//!       --fixture `<path>` \[--assert-expected\]
11
12use std::collections::HashMap;
13use std::path::PathBuf;
14use std::sync::Arc;
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::time::Duration;
17
18use anyhow::Result;
19use serde::{Deserialize, Serialize};
20use tokio::sync::mpsc;
21
22use wactorz_core::actor::{Actor, ActorConfig, ActorState};
23use wactorz_core::message::{ActorCommand, Message, MessageType};
24use wactorz_core::metrics::ActorMetrics;
25use wactorz_core::registry::{ActorFactory, ActorSystem, Supervisor, SupervisorStrategy};
26
27// ── ProbeActor ────────────────────────────────────────────────────────────────
28
29struct ProbeActor {
30    config: ActorConfig,
31    mailbox_tx: mpsc::Sender<Message>,
32    mailbox_rx: Option<mpsc::Receiver<Message>>,
33    metrics: Arc<ActorMetrics>,
34    starts: Arc<AtomicU64>,
35    crash_remaining: Arc<AtomicU64>,
36}
37
38impl ProbeActor {
39    fn new(name: &str, starts: Arc<AtomicU64>, crash_remaining: Arc<AtomicU64>) -> Self {
40        let config = ActorConfig::new(name);
41        let (tx, rx) = mpsc::channel(16);
42        Self {
43            config,
44            mailbox_tx: tx,
45            mailbox_rx: Some(rx),
46            metrics: Arc::new(ActorMetrics::new()),
47            starts,
48            crash_remaining,
49        }
50    }
51}
52
53#[async_trait::async_trait]
54impl Actor for ProbeActor {
55    fn id(&self) -> String {
56        self.config.id.clone()
57    }
58    fn name(&self) -> &str {
59        &self.config.name
60    }
61    fn state(&self) -> ActorState {
62        ActorState::Running
63    }
64    fn metrics(&self) -> Arc<ActorMetrics> {
65        self.metrics.clone()
66    }
67    fn mailbox(&self) -> mpsc::Sender<Message> {
68        self.mailbox_tx.clone()
69    }
70
71    async fn handle_message(&mut self, _msg: Message) -> Result<()> {
72        Ok(())
73    }
74
75    async fn run(&mut self) -> Result<()> {
76        self.starts.fetch_add(1, Ordering::Relaxed);
77        if self.crash_remaining.load(Ordering::Relaxed) > 0 {
78            self.crash_remaining.fetch_sub(1, Ordering::Relaxed);
79            anyhow::bail!("simulated crash");
80        }
81        let mut rx = self
82            .mailbox_rx
83            .take()
84            .ok_or_else(|| anyhow::anyhow!("mailbox already consumed"))?;
85        loop {
86            match rx.recv().await {
87                None => break,
88                Some(m) => {
89                    if let MessageType::Command {
90                        command: ActorCommand::Stop,
91                    } = &m.payload
92                    {
93                        break;
94                    }
95                }
96            }
97        }
98        Ok(())
99    }
100}
101
102// ── Fixture / output types ────────────────────────────────────────────────────
103
104#[derive(Deserialize)]
105struct ActorCfg {
106    name: String,
107    #[serde(default)]
108    crash_count: u64,
109}
110
111#[derive(Deserialize)]
112struct Expected {
113    start_counts: HashMap<String, u64>,
114    restart_counts: HashMap<String, u64>,
115    final_states: HashMap<String, String>,
116}
117
118#[derive(Deserialize)]
119struct Scenario {
120    name: String,
121    strategy: SupervisorStrategy,
122    actors: Vec<ActorCfg>,
123    expected: Expected,
124}
125
126#[derive(Deserialize)]
127struct Fixture {
128    contract: String,
129    scenarios: Vec<Scenario>,
130}
131
132#[derive(Serialize)]
133struct ActorResult {
134    final_state: String,
135    restart_count: u64,
136    starts: u64,
137}
138
139#[derive(Serialize)]
140struct ScenarioResult {
141    actors: HashMap<String, ActorResult>,
142    scenario: String,
143}
144
145#[derive(Serialize)]
146struct Output {
147    contract: String,
148    results: Vec<ScenarioResult>,
149}
150
151// ── Scenario runner ───────────────────────────────────────────────────────────
152
153async fn run_scenario(scenario: &Scenario) -> Result<ScenarioResult> {
154    let system = ActorSystem::new();
155    let mut sup = Supervisor::with_poll_interval(system.clone(), Duration::from_millis(50));
156
157    let mut starts_map: HashMap<String, Arc<AtomicU64>> = HashMap::new();
158
159    for cfg in &scenario.actors {
160        let starts = Arc::new(AtomicU64::new(0));
161        let crash_rem = Arc::new(AtomicU64::new(cfg.crash_count));
162        starts_map.insert(cfg.name.clone(), starts.clone());
163
164        let name = cfg.name.clone();
165        let starts_c = starts.clone();
166        let crash_c = crash_rem.clone();
167        let factory: ActorFactory =
168            Arc::new(move || Box::new(ProbeActor::new(&name, starts_c.clone(), crash_c.clone())));
169
170        sup.supervise(&cfg.name, factory, scenario.strategy.clone(), 3, 60.0, 0.0);
171    }
172
173    sup.start().await?;
174    tokio::time::sleep(Duration::from_millis(350)).await;
175
176    // Collect status before stopping
177    let status = sup.status();
178    let status_map: HashMap<String, u64> = status
179        .iter()
180        .filter_map(|v| {
181            let name = v.get("name")?.as_str()?.to_string();
182            let restarts = v.get("restarts_used")?.as_u64()?;
183            Some((name, restarts))
184        })
185        .collect();
186
187    let registry_list = system.registry.list().await;
188    let registry_map: HashMap<String, ActorState> = registry_list
189        .into_iter()
190        .map(|e| (e.name, e.state))
191        .collect();
192
193    let mut actors: HashMap<String, ActorResult> = HashMap::new();
194    for cfg in &scenario.actors {
195        let starts = starts_map[&cfg.name].load(Ordering::Relaxed);
196        let restart_count = status_map.get(&cfg.name).copied().unwrap_or(0);
197        let final_state = registry_map
198            .get(&cfg.name)
199            .map(|s| match s {
200                ActorState::Running => "running",
201                ActorState::Stopped => "stopped",
202                ActorState::Paused => "paused",
203                ActorState::Initializing => "initializing",
204                ActorState::Failed(_) => "failed",
205            })
206            .unwrap_or("running") // deregistered after stop command → treat as running
207            .to_string();
208
209        actors.insert(
210            cfg.name.clone(),
211            ActorResult {
212                starts,
213                restart_count,
214                final_state,
215            },
216        );
217    }
218
219    sup.stop().await;
220
221    Ok(ScenarioResult {
222        scenario: scenario.name.clone(),
223        actors,
224    })
225}
226
227// ── CLI ───────────────────────────────────────────────────────────────────────
228
229fn parse_args() -> (PathBuf, bool) {
230    let args: Vec<String> = std::env::args().collect();
231    let mut fixture = std::env::current_dir()
232        .unwrap_or_default()
233        .join("tests/parity_fixtures/backend_supervisor_parity.json");
234    let mut assert_expected = false;
235
236    let mut i = 1;
237    while i < args.len() {
238        match args[i].as_str() {
239            "--fixture" => {
240                i += 1;
241                if i < args.len() {
242                    fixture = PathBuf::from(&args[i]);
243                }
244            }
245            "--assert-expected" => {
246                assert_expected = true;
247            }
248            _ => {}
249        }
250        i += 1;
251    }
252    (fixture, assert_expected)
253}
254
255fn build_expected(fixture: &Fixture) -> Output {
256    let results = fixture
257        .scenarios
258        .iter()
259        .map(|s| {
260            let actors = s
261                .expected
262                .start_counts
263                .iter()
264                .map(|(name, &starts)| {
265                    let restart_count = s.expected.restart_counts.get(name).copied().unwrap_or(0);
266                    let final_state = s
267                        .expected
268                        .final_states
269                        .get(name)
270                        .cloned()
271                        .unwrap_or_default();
272                    (
273                        name.clone(),
274                        ActorResult {
275                            starts,
276                            restart_count,
277                            final_state,
278                        },
279                    )
280                })
281                .collect();
282            ScenarioResult {
283                scenario: s.name.clone(),
284                actors,
285            }
286        })
287        .collect();
288    Output {
289        contract: fixture.contract.clone(),
290        results,
291    }
292}
293
294// ── main ──────────────────────────────────────────────────────────────────────
295
296#[tokio::main]
297async fn main() -> Result<()> {
298    let (fixture_path, assert_expected) = parse_args();
299
300    let content = std::fs::read_to_string(&fixture_path)
301        .map_err(|e| anyhow::anyhow!("cannot read fixture {}: {e}", fixture_path.display()))?;
302    let fixture: Fixture = serde_json::from_str(&content)?;
303
304    let mut results = Vec::new();
305    for scenario in &fixture.scenarios {
306        results.push(run_scenario(scenario).await?);
307    }
308    let actual = Output {
309        contract: fixture.contract.clone(),
310        results,
311    };
312
313    if assert_expected {
314        let expected = build_expected(&fixture);
315        let actual_val = serde_json::to_value(&actual)?;
316        let expected_val = serde_json::to_value(&expected)?;
317        if actual_val != expected_val {
318            let out = serde_json::json!({
319                "actual":   actual_val,
320                "expected": expected_val,
321            });
322            eprintln!("{}", serde_json::to_string_pretty(&out)?);
323            std::process::exit(1);
324        }
325    }
326
327    println!("{}", serde_json::to_string_pretty(&actual)?);
328    Ok(())
329}