1use std::collections::HashMap;
13use std::path::PathBuf;
14use std::sync::Arc;
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::time::Duration;
17
18use anyhow::Result;
19use serde::{Deserialize, Serialize};
20use tokio::sync::mpsc;
21
22use wactorz_core::actor::{Actor, ActorConfig, ActorState};
23use wactorz_core::message::{ActorCommand, Message, MessageType};
24use wactorz_core::metrics::ActorMetrics;
25use wactorz_core::registry::{ActorFactory, ActorSystem, Supervisor, SupervisorStrategy};
26
27struct ProbeActor {
30 config: ActorConfig,
31 mailbox_tx: mpsc::Sender<Message>,
32 mailbox_rx: Option<mpsc::Receiver<Message>>,
33 metrics: Arc<ActorMetrics>,
34 starts: Arc<AtomicU64>,
35 crash_remaining: Arc<AtomicU64>,
36}
37
38impl ProbeActor {
39 fn new(name: &str, starts: Arc<AtomicU64>, crash_remaining: Arc<AtomicU64>) -> Self {
40 let config = ActorConfig::new(name);
41 let (tx, rx) = mpsc::channel(16);
42 Self {
43 config,
44 mailbox_tx: tx,
45 mailbox_rx: Some(rx),
46 metrics: Arc::new(ActorMetrics::new()),
47 starts,
48 crash_remaining,
49 }
50 }
51}
52
53#[async_trait::async_trait]
54impl Actor for ProbeActor {
55 fn id(&self) -> String {
56 self.config.id.clone()
57 }
58 fn name(&self) -> &str {
59 &self.config.name
60 }
61 fn state(&self) -> ActorState {
62 ActorState::Running
63 }
64 fn metrics(&self) -> Arc<ActorMetrics> {
65 self.metrics.clone()
66 }
67 fn mailbox(&self) -> mpsc::Sender<Message> {
68 self.mailbox_tx.clone()
69 }
70
71 async fn handle_message(&mut self, _msg: Message) -> Result<()> {
72 Ok(())
73 }
74
75 async fn run(&mut self) -> Result<()> {
76 self.starts.fetch_add(1, Ordering::Relaxed);
77 if self.crash_remaining.load(Ordering::Relaxed) > 0 {
78 self.crash_remaining.fetch_sub(1, Ordering::Relaxed);
79 anyhow::bail!("simulated crash");
80 }
81 let mut rx = self
82 .mailbox_rx
83 .take()
84 .ok_or_else(|| anyhow::anyhow!("mailbox already consumed"))?;
85 loop {
86 match rx.recv().await {
87 None => break,
88 Some(m) => {
89 if let MessageType::Command {
90 command: ActorCommand::Stop,
91 } = &m.payload
92 {
93 break;
94 }
95 }
96 }
97 }
98 Ok(())
99 }
100}
101
102#[derive(Deserialize)]
105struct ActorCfg {
106 name: String,
107 #[serde(default)]
108 crash_count: u64,
109}
110
111#[derive(Deserialize)]
112struct Expected {
113 start_counts: HashMap<String, u64>,
114 restart_counts: HashMap<String, u64>,
115 final_states: HashMap<String, String>,
116}
117
118#[derive(Deserialize)]
119struct Scenario {
120 name: String,
121 strategy: SupervisorStrategy,
122 actors: Vec<ActorCfg>,
123 expected: Expected,
124}
125
126#[derive(Deserialize)]
127struct Fixture {
128 contract: String,
129 scenarios: Vec<Scenario>,
130}
131
132#[derive(Serialize)]
133struct ActorResult {
134 final_state: String,
135 restart_count: u64,
136 starts: u64,
137}
138
139#[derive(Serialize)]
140struct ScenarioResult {
141 actors: HashMap<String, ActorResult>,
142 scenario: String,
143}
144
145#[derive(Serialize)]
146struct Output {
147 contract: String,
148 results: Vec<ScenarioResult>,
149}
150
151async fn run_scenario(scenario: &Scenario) -> Result<ScenarioResult> {
154 let system = ActorSystem::new();
155 let mut sup = Supervisor::with_poll_interval(system.clone(), Duration::from_millis(50));
156
157 let mut starts_map: HashMap<String, Arc<AtomicU64>> = HashMap::new();
158
159 for cfg in &scenario.actors {
160 let starts = Arc::new(AtomicU64::new(0));
161 let crash_rem = Arc::new(AtomicU64::new(cfg.crash_count));
162 starts_map.insert(cfg.name.clone(), starts.clone());
163
164 let name = cfg.name.clone();
165 let starts_c = starts.clone();
166 let crash_c = crash_rem.clone();
167 let factory: ActorFactory =
168 Arc::new(move || Box::new(ProbeActor::new(&name, starts_c.clone(), crash_c.clone())));
169
170 sup.supervise(&cfg.name, factory, scenario.strategy.clone(), 3, 60.0, 0.0);
171 }
172
173 sup.start().await?;
174 tokio::time::sleep(Duration::from_millis(350)).await;
175
176 let status = sup.status();
178 let status_map: HashMap<String, u64> = status
179 .iter()
180 .filter_map(|v| {
181 let name = v.get("name")?.as_str()?.to_string();
182 let restarts = v.get("restarts_used")?.as_u64()?;
183 Some((name, restarts))
184 })
185 .collect();
186
187 let registry_list = system.registry.list().await;
188 let registry_map: HashMap<String, ActorState> = registry_list
189 .into_iter()
190 .map(|e| (e.name, e.state))
191 .collect();
192
193 let mut actors: HashMap<String, ActorResult> = HashMap::new();
194 for cfg in &scenario.actors {
195 let starts = starts_map[&cfg.name].load(Ordering::Relaxed);
196 let restart_count = status_map.get(&cfg.name).copied().unwrap_or(0);
197 let final_state = registry_map
198 .get(&cfg.name)
199 .map(|s| match s {
200 ActorState::Running => "running",
201 ActorState::Stopped => "stopped",
202 ActorState::Paused => "paused",
203 ActorState::Initializing => "initializing",
204 ActorState::Failed(_) => "failed",
205 })
206 .unwrap_or("running") .to_string();
208
209 actors.insert(
210 cfg.name.clone(),
211 ActorResult {
212 starts,
213 restart_count,
214 final_state,
215 },
216 );
217 }
218
219 sup.stop().await;
220
221 Ok(ScenarioResult {
222 scenario: scenario.name.clone(),
223 actors,
224 })
225}
226
227fn parse_args() -> (PathBuf, bool) {
230 let args: Vec<String> = std::env::args().collect();
231 let mut fixture = std::env::current_dir()
232 .unwrap_or_default()
233 .join("tests/parity_fixtures/backend_supervisor_parity.json");
234 let mut assert_expected = false;
235
236 let mut i = 1;
237 while i < args.len() {
238 match args[i].as_str() {
239 "--fixture" => {
240 i += 1;
241 if i < args.len() {
242 fixture = PathBuf::from(&args[i]);
243 }
244 }
245 "--assert-expected" => {
246 assert_expected = true;
247 }
248 _ => {}
249 }
250 i += 1;
251 }
252 (fixture, assert_expected)
253}
254
255fn build_expected(fixture: &Fixture) -> Output {
256 let results = fixture
257 .scenarios
258 .iter()
259 .map(|s| {
260 let actors = s
261 .expected
262 .start_counts
263 .iter()
264 .map(|(name, &starts)| {
265 let restart_count = s.expected.restart_counts.get(name).copied().unwrap_or(0);
266 let final_state = s
267 .expected
268 .final_states
269 .get(name)
270 .cloned()
271 .unwrap_or_default();
272 (
273 name.clone(),
274 ActorResult {
275 starts,
276 restart_count,
277 final_state,
278 },
279 )
280 })
281 .collect();
282 ScenarioResult {
283 scenario: s.name.clone(),
284 actors,
285 }
286 })
287 .collect();
288 Output {
289 contract: fixture.contract.clone(),
290 results,
291 }
292}
293
294#[tokio::main]
297async fn main() -> Result<()> {
298 let (fixture_path, assert_expected) = parse_args();
299
300 let content = std::fs::read_to_string(&fixture_path)
301 .map_err(|e| anyhow::anyhow!("cannot read fixture {}: {e}", fixture_path.display()))?;
302 let fixture: Fixture = serde_json::from_str(&content)?;
303
304 let mut results = Vec::new();
305 for scenario in &fixture.scenarios {
306 results.push(run_scenario(scenario).await?);
307 }
308 let actual = Output {
309 contract: fixture.contract.clone(),
310 results,
311 };
312
313 if assert_expected {
314 let expected = build_expected(&fixture);
315 let actual_val = serde_json::to_value(&actual)?;
316 let expected_val = serde_json::to_value(&expected)?;
317 if actual_val != expected_val {
318 let out = serde_json::json!({
319 "actual": actual_val,
320 "expected": expected_val,
321 });
322 eprintln!("{}", serde_json::to_string_pretty(&out)?);
323 std::process::exit(1);
324 }
325 }
326
327 println!("{}", serde_json::to_string_pretty(&actual)?);
328 Ok(())
329}