vanguards_rs/
control.rs

1//! Control module for Tor controller interaction and main application loop.
2//!
3//! This module provides the core functionality for connecting to Tor's control port,
4//! authenticating, handling events, and managing the vanguard protection lifecycle.
5//!
6//! # Overview
7//!
8//! The control module handles:
9//!
10//! - **Connection Management**: Connect via TCP or Unix socket with auto-detection
11//! - **Authentication**: Password, cookie, and interactive authentication
12//! - **Consensus Processing**: Parse consensus weights and update vanguard state
13//! - **Event Handling**: Register and dispatch events to protection components
14//! - **Circuit Management**: Close circuits when attacks are detected
15//! - **Signal Handling**: Handle SIGHUP for configuration reload
16//!
17//! # Architecture
18//!
19//! The control module orchestrates all protection components through a central event loop:
20//!
21//! ```text
22//! ┌──────────────────────────────────────────────────────────────────────────┐
23//! │                         Main Event Loop                                  │
24//! │                                                                          │
25//! │  ┌──────────────┐     ┌──────────────┐     ┌──────────────────────────┐  │
26//! │  │  Tor Control │───▶│   Event      │───▶│   Protection Components  │  │
27//! │  │    Socket    │    │  Dispatcher  │      │                          │  │
28//! │  └──────────────┘    └──────────────┘      │  ┌────────────────────┐  │  │
29//! │         │                   │              │  │ VanguardState      │  │  │
30//! │         │                   │              │  │ (guard management) │  │  │
31//! │         ▼                   │              │  └────────────────────┘  │  │
32//! │  ┌──────────────┐           │              │  ┌────────────────────┐  │  │
33//! │  │ Authenticate │           │              │  │ BandwidthStats     │  │  │
34//! │  │ (password/   │           │              │  │ (attack detection) │  │  │
35//! │  │  cookie)     │           │              │  └────────────────────┘  │  │
36//! │  └──────────────┘           │              │  ┌────────────────────┐  │  │
37//! │                             │              │  │ RendGuard          │  │  │
38//! │                             │              │  │ (RP monitoring)    │  │  │
39//! │                             │              │  └────────────────────┘  │  │
40//! │                             │              │  ┌────────────────────┐  │  │
41//! │                             │              │  │ PathVerify         │  │  │
42//! │                             │              │  │ (path validation)  │  │  │
43//! │                             │              │  └────────────────────┘  │  │
44//! │                             │              └──────────────────────────┘  │
45//! │                             │                                            │
46//! │                             ▼                                            │
47//! │                    ┌──────────────────┐                                  │
48//! │                    │ Circuit Actions  │                                  │
49//! │                    │ (close on attack)│                                  │
50//! │                    └──────────────────┘                                  │
51//! └─────────────────────────────────────────────────────────────────────────┘
52//! ```
53//!
54//! # Main Loop Flow
55//!
56//! The main application loop follows this sequence:
57//!
58//! ```text
59//!                    ┌─────────────────┐
60//!                    │     Start       │
61//!                    └────────┬────────┘
62//!                             │
63//!                             ▼
64//!                    ┌─────────────────┐
65//!                    │ Connect to Tor  │◀──────────────────┐
66//!                    │ (socket/port)   │                   │
67//!                    └────────┬────────┘                   │
68//!                             │                            │
69//!                             ▼                            │
70//!                    ┌─────────────────┐                   │
71//!                    │  Authenticate   │                   │
72//!                    │ (auto-detect)   │                   │
73//!                    └────────┬────────┘                   │
74//!                             │                            │
75//!                             ▼                            │
76//!                    ┌─────────────────┐                   │
77//!                    │ Load/Create     │                   │
78//!                    │ Vanguard State  │                   │
79//!                    └────────┬────────┘                   │
80//!                             │                            │
81//!                             ▼                            │
82//!                    ┌─────────────────┐                   │
83//!                    │ Subscribe to    │                   │
84//!                    │ Tor Events      │                   │
85//!                    └────────┬────────┘                   │
86//!                             │                            │
87//!                             ▼                            │
88//!              ┌──────────────────────────────┐            │
89//!              │      Event Processing Loop   │            │
90//!              │  ┌────────────────────────┐  │            │
91//!              │  │ Receive Event          │  │            │
92//!              │  └───────────┬────────────┘  │            │
93//!              │              │               │            │
94//!              │              ▼               │            │
95//!              │  ┌────────────────────────┐  │            │
96//!              │  │ Dispatch to Handlers   │  │            │
97//!              │  │ (CIRC, CIRC_BW, etc.)  │  │            │
98//!              │  └───────────┬────────────┘  │            │
99//!              │              │               │            │
100//!              │              ▼               │            │
101//!              │  ┌────────────────────────┐  │            │
102//!              │  │ Check Circuit Limits   │  │            │
103//!              │  │ (close if attack)      │  │            │
104//!              │  └───────────┬────────────┘  │            │
105//!              │              │               │            │
106//!              │              ▼               │            │
107//!              │         [Continue]           │            │
108//!              └──────────────┬───────────────┘            │
109//!                             │                            │
110//!                    [Connection Lost]                     │
111//!                             │                            │
112//!                             ▼                            │
113//!                    ┌─────────────────┐                   │
114//!                    │ Reconnect?      │───────────────────┘
115//!                    │ (retry limit)   │
116//!                    └────────┬────────┘
117//!                             │ [Limit Reached]
118//!                             ▼
119//!                    ┌─────────────────┐
120//!                    │      Exit       │
121//!                    └─────────────────┘
122//! ```
123//!
124//! # Event Handling State Diagram
125//!
126//! Events are dispatched to different handlers based on type:
127//!
128//! ```text
129//!                         ┌─────────────┐
130//!                         │ Tor Event   │
131//!                         └──────┬──────┘
132//!                                │
133//!          ┌─────────────────────┼─────────────────────┐
134//!          │                     │                     │
135//!          ▼                     ▼                     ▼
136//!    ┌───────────┐        ┌───────────┐        ┌───────────┐
137//!    │   CIRC    │        │  CIRC_BW  │        │ NEWCONSENS│
138//!    └─────┬─────┘        └─────┬─────┘        └─────┬─────┘
139//!          │                    │                    │
140//!          ▼                    ▼                    ▼
141//!    ┌───────────┐        ┌───────────┐        ┌───────────┐
142//!    │RendGuard  │        │BandGuards │        │ Update    │
143//!    │BandGuards │        │(bandwidth │        │ Vanguard  │
144//!    │CBTVerify  │        │ tracking) │        │ State     │
145//!    │PathVerify │        └───────────┘        └───────────┘
146//!    │LogGuard   │
147//!    └───────────┘
148//!
149//!          ┌─────────────────────┼─────────────────────┐
150//!          │                     │                     │
151//!          ▼                     ▼                     ▼
152//!    ┌───────────┐        ┌───────────┐        ┌───────────┐
153//!    │  ORCONN   │        │   GUARD   │        │  SIGNAL   │
154//!    └─────┬─────┘        └─────┬─────┘        └─────┬─────┘
155//!          │                    │                    │
156//!          ▼                    ▼                    ▼
157//!    ┌───────────┐        ┌───────────┐        ┌───────────┐
158//!    │BandGuards │        │PathVerify │        │ Reapply   │
159//!    │PathVerify │        │(guard     │        │ Vanguards │
160//!    │(conn      │        │ tracking) │        │ (SIGHUP)  │
161//!    │ tracking) │        └───────────┘        └───────────┘
162//!    └───────────┘
163//! ```
164//!
165//! # What This Module Does NOT Do
166//!
167//! - **Direct relay communication**: Use stem-rs client module for ORPort connections
168//! - **Descriptor parsing**: Consensus parsing is limited to bandwidth weights
169//! - **Guard selection**: Use [`crate::node_selection`] for bandwidth-weighted selection
170//! - **State persistence**: Use [`crate::vanguards::VanguardState`] for state file I/O
171//!
172//! # Example
173//!
174//! Running the main application loop:
175//!
176//! ```rust,no_run
177//! use vanguards_rs::config::Config;
178//! use vanguards_rs::control::run_main;
179//!
180//! # async fn example() -> Result<(), vanguards_rs::error::Error> {
181//! // Load configuration
182//! let config = Config::default();
183//!
184//! // Run the main loop (blocks until shutdown)
185//! run_main(config).await?;
186//! # Ok(())
187//! # }
188//! ```
189//!
190//! # Security Considerations
191//!
192//! - Passwords are prompted interactively if not provided (never logged)
193//! - Circuit closure decisions are logged for audit purposes
194//! - State files contain guard fingerprints (protect accordingly)
195//! - Reconnection attempts are rate-limited to prevent DoS
196//!
197//! # See Also
198//!
199//! - [`crate::config`] - Configuration management
200//! - [`crate::vanguards`] - Vanguard state management
201//! - [`crate::bandguards`] - Bandwidth-based attack detection
202//! - [`crate::rendguard`] - Rendezvous point monitoring
203//! - [`crate::pathverify`] - Circuit path verification
204//! - [Python vanguards control](https://github.com/mikeperry-tor/vanguards) - Original implementation
205//! - [Tor Control Protocol](https://spec.torproject.org/control-spec) - Protocol specification
206
207use std::collections::HashMap;
208use std::io::{BufRead, BufReader};
209use std::path::Path;
210use std::sync::atomic::{AtomicBool, Ordering};
211use std::sync::Arc;
212use std::time::Duration;
213
214use stem_rs::controller::{CircuitId, Controller};
215use stem_rs::descriptor::router_status::RouterStatusEntry;
216use stem_rs::events::ParsedEvent;
217use stem_rs::version::Version;
218use stem_rs::EventType;
219
220use crate::bandguards::BandwidthStats;
221use crate::cbtverify::TimeoutStats;
222use crate::config::{Config, LogLevel};
223use crate::error::{Error, Result};
224use crate::logger::plog;
225use crate::logguard::LogGuard;
226use crate::node_selection::{BwWeightedGenerator, FlagsRestriction, NodeRestrictionList, Position};
227use crate::pathverify::PathVerify;
228use crate::vanguards::{ExcludeNodes, VanguardState};
229
230/// Library version string.
231pub const VERSION: &str = env!("CARGO_PKG_VERSION");
232
233/// Minimum Tor version required for CIRC_BW and CIRC_MINOR events.
234#[allow(dead_code)]
235const MIN_TOR_VERSION_FOR_BW: &str = "0.3.4.10";
236
237/// Minimum Tor version required for HSLayer2Nodes support.
238#[allow(dead_code)]
239const MIN_TOR_VERSION_FOR_VANGUARDS: &str = "0.3.3.0";
240
241/// Global flag for close circuits configuration.
242///
243/// When true, detected attacks will result in circuit closure.
244/// When false, attacks are logged but circuits remain open (monitoring mode).
245static CLOSE_CIRCUITS: AtomicBool = AtomicBool::new(true);
246
247/// Sets the global close circuits flag.
248///
249/// Controls whether circuits are actually closed when attacks are detected.
250/// Set to `false` for monitoring-only mode where attacks are logged but
251/// circuits are not closed.
252///
253/// # Arguments
254///
255/// * `value` - `true` to enable circuit closure, `false` for monitoring only
256///
257/// # Thread Safety
258///
259/// This function uses atomic operations and is safe to call from any thread.
260///
261/// # Example
262///
263/// ```rust
264/// use vanguards_rs::control::{set_close_circuits, get_close_circuits};
265///
266/// // Enable monitoring-only mode
267/// set_close_circuits(false);
268/// assert!(!get_close_circuits());
269///
270/// // Re-enable circuit closure
271/// set_close_circuits(true);
272/// assert!(get_close_circuits());
273/// ```
274pub fn set_close_circuits(value: bool) {
275    CLOSE_CIRCUITS.store(value, Ordering::SeqCst);
276}
277
278/// Gets the global close circuits flag.
279///
280/// Returns whether circuits will be closed when attacks are detected.
281///
282/// # Returns
283///
284/// `true` if circuit closure is enabled, `false` if in monitoring-only mode.
285///
286/// # Thread Safety
287///
288/// This function uses atomic operations and is safe to call from any thread.
289///
290/// # Example
291///
292/// ```rust
293/// use vanguards_rs::control::get_close_circuits;
294///
295/// if get_close_circuits() {
296///     println!("Circuit closure is enabled");
297/// } else {
298///     println!("Monitoring-only mode");
299/// }
300/// ```
301pub fn get_close_circuits() -> bool {
302    CLOSE_CIRCUITS.load(Ordering::SeqCst)
303}
304
305/// Authenticates with Tor using any available method.
306///
307/// Attempts authentication in this order:
308/// 1. No authentication (if control port is open)
309/// 2. Password authentication (if provided)
310/// 3. Cookie authentication
311///
312/// If password authentication fails and no password was provided,
313/// prompts the user interactively for a password.
314///
315/// # Arguments
316///
317/// * `controller` - The Tor controller to authenticate
318/// * `password` - Optional password for authentication
319///
320/// # Errors
321///
322/// Returns [`Error::Control`] if authentication fails.
323pub async fn authenticate_any(controller: &mut Controller, password: Option<&str>) -> Result<()> {
324    let result = controller.authenticate(password).await;
325
326    match result {
327        Ok(()) => {
328            let version = controller.get_version().await?;
329            plog(
330                LogLevel::Notice,
331                &format!(
332                    "Vanguards {} connected to Tor {} using stem-rs",
333                    VERSION, version
334                ),
335            );
336            Ok(())
337        }
338        Err(stem_rs::Error::Authentication(stem_rs::AuthError::MissingPassword)) => {
339            // Prompt for password interactively
340            let passwd = prompt_password()?;
341            controller.authenticate(Some(&passwd)).await?;
342            let version = controller.get_version().await?;
343            plog(
344                LogLevel::Notice,
345                &format!(
346                    "Vanguards {} connected to Tor {} using stem-rs",
347                    VERSION, version
348                ),
349            );
350            Ok(())
351        }
352        Err(e) => Err(Error::Control(e)),
353    }
354}
355
356/// Prompts the user for a password interactively.
357fn prompt_password() -> Result<String> {
358    eprint!("Controller password: ");
359    let mut password = String::new();
360    std::io::stdin()
361        .read_line(&mut password)
362        .map_err(Error::Io)?;
363    Ok(password.trim().to_string())
364}
365
366/// Parses consensus bandwidth weights from a cached-microdesc-consensus file.
367///
368/// Bandwidth weights are used by Tor clients to select relays proportionally
369/// to their contribution to the network. These weights are essential for
370/// the bandwidth-weighted guard selection algorithm.
371///
372/// # Weight Keys
373///
374/// Common weight keys include:
375///
376/// | Key | Description |
377/// |-----|-------------|
378/// | `Wgg` | Weight for Guard-flagged nodes in guard position |
379/// | `Wgm` | Weight for Guard-flagged nodes in middle position |
380/// | `Wmm` | Weight for non-flagged nodes in middle position |
381/// | `Wme` | Weight for Exit-flagged nodes in middle position |
382/// | `Wee` | Weight for Exit-flagged nodes in exit position |
383///
384/// # Arguments
385///
386/// * `consensus_filename` - Path to the cached-microdesc-consensus file
387///
388/// # Returns
389///
390/// A HashMap mapping weight keys (e.g., "Wmm") to their integer values
391/// (typically in range 0-10000, representing parts per 10000).
392///
393/// # Errors
394///
395/// Returns [`Error::Consensus`] if:
396/// - The file cannot be opened or read
397/// - No `bandwidth-weights` line is found in the file
398///
399/// # File Format
400///
401/// The function looks for a line starting with `bandwidth-weights ` followed
402/// by space-separated key=value pairs:
403///
404/// ```text
405/// bandwidth-weights Wbd=0 Wbe=0 Wbg=4194 Wbm=10000 ...
406/// ```
407///
408/// # Example
409///
410/// ```rust,no_run
411/// use std::path::Path;
412/// use vanguards_rs::control::get_consensus_weights;
413///
414/// # fn example() -> Result<(), vanguards_rs::error::Error> {
415/// let weights = get_consensus_weights(Path::new("/var/lib/tor/cached-microdesc-consensus"))?;
416///
417/// if let Some(wmm) = weights.get("Wmm") {
418///     println!("Middle position weight: {}", wmm);
419/// }
420/// # Ok(())
421/// # }
422/// ```
423///
424/// # See Also
425///
426/// - [`BwWeightedGenerator`] - Uses these weights
427/// - [dir-spec.txt](https://spec.torproject.org/dir-spec) - Consensus format specification
428pub fn get_consensus_weights(consensus_filename: &Path) -> Result<HashMap<String, i64>> {
429    let file = std::fs::File::open(consensus_filename).map_err(|e| {
430        Error::Consensus(format!(
431            "cannot read {}: {}",
432            consensus_filename.display(),
433            e
434        ))
435    })?;
436    let reader = BufReader::new(file);
437
438    let mut weights = HashMap::new();
439
440    for line in reader.lines() {
441        let line = line.map_err(|e| Error::Consensus(format!("read error: {}", e)))?;
442        if line.starts_with("bandwidth-weights ") {
443            // Parse bandwidth-weights line
444            // Format: bandwidth-weights Wbd=0 Wbe=0 Wbg=4194 Wbm=10000 ...
445            for part in line.split_whitespace().skip(1) {
446                if let Some((key, value)) = part.split_once('=') {
447                    if let Ok(v) = value.parse::<i64>() {
448                        weights.insert(key.to_string(), v);
449                    }
450                }
451            }
452            break;
453        }
454    }
455
456    if weights.is_empty() {
457        return Err(Error::Consensus(
458            "no bandwidth-weights found in consensus".to_string(),
459        ));
460    }
461
462    Ok(weights)
463}
464
465/// Attempts to close a circuit, optionally dumping logs first.
466///
467/// This function is called when an attack is detected and a circuit needs
468/// to be closed. If logguard is enabled, it dumps the log queue for the
469/// circuit before closing to aid in post-incident analysis.
470///
471/// # Arguments
472///
473/// * `controller` - The Tor controller
474/// * `circ_id` - The circuit ID to close
475/// * `logguard` - Optional log guard for pre-close log dumping
476///
477/// # Behavior
478///
479/// 1. If logguard is provided, dumps buffered logs for the circuit
480/// 2. If `close_circuits` global flag is true, sends CLOSECIRCUIT command
481/// 3. Logs success or failure of the close operation
482///
483/// # Global Flag
484///
485/// The `close_circuits` flag (set via [`set_close_circuits`]) controls whether
486/// circuits are actually closed. When false, the function logs but doesn't close.
487/// This is useful for testing or monitoring-only mode.
488///
489/// # Example
490///
491/// ```rust,no_run
492/// use stem_rs::controller::Controller;
493/// use vanguards_rs::control::try_close_circuit;
494///
495/// # async fn example() -> Result<(), vanguards_rs::error::Error> {
496/// let mut controller = Controller::from_port("127.0.0.1:9051".parse().unwrap()).await?;
497/// controller.authenticate(None).await?;
498///
499/// // Close circuit without log dumping
500/// try_close_circuit(&mut controller, "42", None).await;
501/// # Ok(())
502/// # }
503/// ```
504///
505/// # See Also
506///
507/// - [`set_close_circuits`] - Control whether circuits are actually closed
508/// - [`LogGuard::dump_log_queue`] - Log dumping implementation
509pub async fn try_close_circuit(
510    controller: &mut Controller,
511    circ_id: &str,
512    logguard: Option<&mut LogGuard>,
513) {
514    // Dump logs before closing
515    if let Some(lg) = logguard {
516        lg.dump_log_queue(circ_id, "Pre");
517    }
518
519    if get_close_circuits() {
520        let circuit_id = CircuitId::new(circ_id);
521        match controller.close_circuit(&circuit_id).await {
522            Ok(()) => {
523                plog(
524                    LogLevel::Info,
525                    &format!("We force-closed circuit {}", circ_id),
526                );
527            }
528            Err(e) => {
529                plog(
530                    LogLevel::Info,
531                    &format!("Failed to close circuit {}: {}", circ_id, e),
532                );
533            }
534        }
535    }
536}
537
538/// Configures Tor with the current vanguard settings.
539///
540/// Sets Tor configuration options to enforce the vanguard guard layers.
541/// This function is called after vanguard state is updated to apply
542/// the new guard sets to Tor.
543///
544/// # Configuration Options Set
545///
546/// | Option | Description | Condition |
547/// |--------|-------------|-----------|
548/// | `NumEntryGuards` | Number of layer 1 guards | If > 0 |
549/// | `NumDirectoryGuards` | Number of directory guards | If > 0 |
550/// | `GuardLifetime` | Layer 1 guard lifetime | If > 0 days |
551/// | `HSLayer2Nodes` | Layer 2 guard fingerprints | Always |
552/// | `HSLayer3Nodes` | Layer 3 guard fingerprints | If num_layer3 > 0 |
553///
554/// # Arguments
555///
556/// * `controller` - The Tor controller
557/// * `state` - The current vanguard state containing guard sets
558/// * `config` - The vanguards configuration
559///
560/// # Returns
561///
562/// Returns `Ok(())` on successful configuration.
563///
564/// # Errors
565///
566/// Returns [`Error::Control`] if Tor configuration fails. This typically
567/// indicates an incompatible Tor version (requires 0.3.3.x or newer).
568///
569/// # Tor Version Requirements
570///
571/// - `HSLayer2Nodes`: Requires Tor 0.3.3.0+
572/// - `HSLayer3Nodes`: Requires Tor 0.3.3.0+
573///
574/// # Example
575///
576/// ```rust,no_run
577/// use stem_rs::controller::Controller;
578/// use vanguards_rs::vanguards::VanguardState;
579/// use vanguards_rs::config::Config;
580/// use vanguards_rs::control::configure_tor;
581///
582/// # async fn example() -> Result<(), vanguards_rs::error::Error> {
583/// let mut controller = Controller::from_port("127.0.0.1:9051".parse().unwrap()).await?;
584/// controller.authenticate(None).await?;
585///
586/// let state = VanguardState::new("/tmp/vanguards.state");
587/// let config = Config::default();
588///
589/// configure_tor(&mut controller, &state, &config).await?;
590/// # Ok(())
591/// # }
592/// ```
593///
594/// # See Also
595///
596/// - [`VanguardState::layer2_guardset`] - Get layer 2 fingerprint string
597/// - [`VanguardState::layer3_guardset`] - Get layer 3 fingerprint string
598pub async fn configure_tor(
599    controller: &mut Controller,
600    state: &VanguardState,
601    config: &Config,
602) -> Result<()> {
603    let vg_config = &config.vanguards;
604
605    // Set NumEntryGuards and NumDirectoryGuards if configured
606    if vg_config.num_layer1_guards > 0 {
607        controller
608            .set_conf("NumEntryGuards", &vg_config.num_layer1_guards.to_string())
609            .await?;
610        controller
611            .set_conf(
612                "NumDirectoryGuards",
613                &vg_config.num_layer1_guards.to_string(),
614            )
615            .await?;
616    }
617
618    // Set GuardLifetime if configured
619    if vg_config.layer1_lifetime_days > 0 {
620        controller
621            .set_conf(
622                "GuardLifetime",
623                &format!("{} days", vg_config.layer1_lifetime_days),
624            )
625            .await?;
626    }
627
628    // Set HSLayer2Nodes
629    let layer2_guardset = state.layer2_guardset();
630    controller
631        .set_conf("HSLayer2Nodes", &layer2_guardset)
632        .await
633        .inspect_err(|_e| {
634            plog(
635                LogLevel::Error,
636                "Vanguards requires Tor 0.3.3.x (and ideally 0.3.4.x or newer).",
637            );
638        })?;
639
640    // Set HSLayer3Nodes if configured
641    if vg_config.num_layer3_guards > 0 {
642        let layer3_guardset = state.layer3_guardset();
643        controller
644            .set_conf("HSLayer3Nodes", &layer3_guardset)
645            .await?;
646    }
647
648    plog(
649        LogLevel::Info,
650        &format!("Layer2 guards: {}", layer2_guardset),
651    );
652    if vg_config.num_layer3_guards > 0 {
653        plog(
654            LogLevel::Info,
655            &format!("Layer3 guards: {}", state.layer3_guardset()),
656        );
657    }
658
659    Ok(())
660}
661
662/// Handles a new consensus event by updating vanguard state.
663///
664/// This function is called when a new consensus is received from Tor. It performs
665/// a complete update cycle for all protection components.
666///
667/// # Processing Steps
668///
669/// ```text
670/// ┌─────────────────────────────────────────────────────────────┐
671/// │                  new_consensus_event()                       │
672/// │                                                              │
673/// │  1. Get router list from Tor (GETINFO ns/all)               │
674/// │  2. Get ExcludeNodes configuration                          │
675/// │  3. Parse consensus weights from cached-microdesc-consensus │
676/// │  4. Update vanguard state:                                  │
677/// │     • Remove guards no longer in consensus                  │
678/// │     • Remove expired guards                                 │
679/// │     • Remove excluded guards                                │
680/// │     • Replenish guard layers                                │
681/// │  5. Update rendguard use counts                             │
682/// │  6. Configure Tor with new HSLayer2/3Nodes                  │
683/// │  7. Write state to file                                     │
684/// └─────────────────────────────────────────────────────────────┘
685/// ```
686///
687/// # Arguments
688///
689/// * `controller` - The Tor controller for querying and configuration
690/// * `state` - The vanguard state to update
691/// * `config` - The vanguards configuration
692///
693/// # Returns
694///
695/// Returns `Ok(())` on successful update.
696///
697/// # Errors
698///
699/// - [`Error::DescriptorUnavailable`] - Tor doesn't have descriptors yet (retry later)
700/// - [`Error::Consensus`] - Failed to parse consensus file
701/// - [`Error::Config`] - DataDirectory not configured in Tor
702/// - [`Error::Control`] - Failed to configure Tor
703///
704/// # Example
705///
706/// ```rust,no_run
707/// use stem_rs::controller::Controller;
708/// use vanguards_rs::vanguards::VanguardState;
709/// use vanguards_rs::config::Config;
710/// use vanguards_rs::control::new_consensus_event;
711///
712/// # async fn example() -> Result<(), vanguards_rs::error::Error> {
713/// let mut controller = Controller::from_port("127.0.0.1:9051".parse().unwrap()).await?;
714/// controller.authenticate(None).await?;
715///
716/// let mut state = VanguardState::new("/tmp/vanguards.state");
717/// let config = Config::default();
718///
719/// new_consensus_event(&mut controller, &mut state, &config).await?;
720/// # Ok(())
721/// # }
722/// ```
723///
724/// # See Also
725///
726/// - [`get_consensus_weights`] - Consensus weight parsing
727/// - [`configure_tor`] - Tor configuration
728/// - [`VanguardState::replenish_layers`] - Guard replenishment
729pub async fn new_consensus_event(
730    controller: &mut Controller,
731    state: &mut VanguardState,
732    config: &Config,
733) -> Result<()> {
734    // Get routers from Tor
735    let routers = get_network_statuses(controller).await?;
736
737    // Get ExcludeNodes configuration
738    let exclude_nodes_conf = controller
739        .get_conf("ExcludeNodes")
740        .await
741        .ok()
742        .and_then(|v| v.first().cloned())
743        .unwrap_or_default();
744    let geoip_exclude = controller
745        .get_conf("GeoIPExcludeUnknown")
746        .await
747        .ok()
748        .and_then(|v| v.first().cloned());
749    let exclude = ExcludeNodes::parse(&exclude_nodes_conf, geoip_exclude.as_deref());
750
751    // Get DataDirectory for consensus file
752    let data_dir = controller
753        .get_conf("DataDirectory")
754        .await?
755        .first()
756        .cloned()
757        .ok_or_else(|| {
758            Error::Config("You must set a DataDirectory location option in your torrc.".to_string())
759        })?;
760
761    let consensus_file = Path::new(&data_dir).join("cached-microdesc-consensus");
762    let weights = get_consensus_weights(&consensus_file)?;
763
764    // Update vanguard state
765    consensus_update(state, &routers, &weights, &exclude, config)?;
766
767    // Configure Tor if vanguards enabled
768    if config.enable_vanguards {
769        configure_tor(controller, state, config).await?;
770    }
771
772    // Write state to file
773    let state_path = Path::new(&state.state_file);
774    state.write_to_file(state_path).map_err(|e| {
775        plog(
776            LogLevel::Error,
777            &format!("Cannot write state to {}: {}", state.state_file, e),
778        );
779        e
780    })?;
781
782    Ok(())
783}
784
785/// Updates vanguard state based on new consensus.
786fn consensus_update(
787    state: &mut VanguardState,
788    routers: &[RouterStatusEntry],
789    weights: &HashMap<String, i64>,
790    exclude: &ExcludeNodes,
791    config: &Config,
792) -> Result<()> {
793    // Sort routers by measured bandwidth
794    let mut sorted_routers: Vec<RouterStatusEntry> = routers.to_vec();
795    sorted_routers.sort_by(|a, b| {
796        let bw_a = a.measured.or(a.bandwidth).unwrap_or(0);
797        let bw_b = b.measured.or(b.bandwidth).unwrap_or(0);
798        bw_b.cmp(&bw_a)
799    });
800
801    // Create router map for lookups
802    let router_map: HashMap<String, &RouterStatusEntry> = sorted_routers
803        .iter()
804        .map(|r| (r.fingerprint.clone(), r))
805        .collect();
806
807    // Create consensus fingerprint set
808    let consensus_fps: std::collections::HashSet<String> = sorted_routers
809        .iter()
810        .map(|r| r.fingerprint.clone())
811        .collect();
812
813    // Create generator for vanguard selection
814    let restriction = FlagsRestriction::new(
815        vec![
816            "Fast".to_string(),
817            "Stable".to_string(),
818            "Valid".to_string(),
819        ],
820        vec!["Authority".to_string()],
821    );
822    let restrictions = NodeRestrictionList::new(vec![Box::new(restriction)]);
823    let generator = BwWeightedGenerator::new(
824        sorted_routers.clone(),
825        restrictions,
826        weights.clone(),
827        Position::Middle,
828    )?;
829
830    if state.enable_vanguards {
831        // Remove guards that are no longer in consensus
832        VanguardState::remove_down_from_layer(&mut state.layer2, &consensus_fps);
833        VanguardState::remove_down_from_layer(&mut state.layer3, &consensus_fps);
834
835        // Remove expired guards
836        VanguardState::remove_expired_from_layer(&mut state.layer2);
837        VanguardState::remove_expired_from_layer(&mut state.layer3);
838
839        // Remove excluded guards
840        VanguardState::remove_excluded_from_layer(&mut state.layer2, &router_map, exclude);
841        VanguardState::remove_excluded_from_layer(&mut state.layer3, &router_map, exclude);
842
843        // Replenish guard layers
844        state.replenish_layers(&generator, exclude, &config.vanguards)?;
845    }
846
847    // Create generator for rendguard (with Exit flag allowed)
848    let rend_restriction = FlagsRestriction::new(
849        vec!["Fast".to_string(), "Valid".to_string()],
850        vec!["Authority".to_string()],
851    );
852    let rend_restrictions = NodeRestrictionList::new(vec![Box::new(rend_restriction)]);
853    let mut rend_generator = BwWeightedGenerator::new(
854        sorted_routers,
855        rend_restrictions,
856        weights.clone(),
857        Position::Middle,
858    )?;
859
860    // Repair exit weights for RP selection
861    rend_generator.repair_exits();
862
863    // Update rendguard use counts
864    state
865        .rendguard
866        .xfer_use_counts(&rend_generator, &config.rendguard);
867
868    Ok(())
869}
870
871/// Gets network statuses from Tor.
872async fn get_network_statuses(controller: &mut Controller) -> Result<Vec<RouterStatusEntry>> {
873    let response = controller
874        .get_info("ns/all")
875        .await
876        .map_err(|e| Error::DescriptorUnavailable(format!("Cannot get network statuses: {}", e)))?;
877
878    parse_network_statuses(&response)
879}
880
881/// Parses network status entries from GETINFO ns/all response.
882fn parse_network_statuses(response: &str) -> Result<Vec<RouterStatusEntry>> {
883    use chrono::Utc;
884    use stem_rs::descriptor::router_status::RouterStatusEntryType;
885
886    let mut routers = Vec::new();
887    let mut current_router: Option<RouterStatusEntry> = None;
888
889    for line in response.lines() {
890        if line.starts_with("r ") {
891            // Save previous router if any
892            if let Some(router) = current_router.take() {
893                routers.push(router);
894            }
895
896            // Parse r line: r nickname identity digest published IP ORPort DirPort
897            let parts: Vec<&str> = line.split_whitespace().collect();
898            if parts.len() >= 8 {
899                let nickname = parts[1].to_string();
900                let fingerprint = decode_base64_fingerprint(parts[2]);
901                let address = parts[5]
902                    .parse()
903                    .unwrap_or_else(|_| "0.0.0.0".parse().unwrap());
904                let or_port = parts[6].parse().unwrap_or(9001);
905
906                current_router = Some(RouterStatusEntry::new(
907                    RouterStatusEntryType::V3,
908                    nickname,
909                    fingerprint,
910                    Utc::now(),
911                    address,
912                    or_port,
913                ));
914            }
915        } else if let Some(stripped) = line.strip_prefix("s ") {
916            // Parse s line: s Flag1 Flag2 ...
917            if let Some(ref mut router) = current_router {
918                router.flags = stripped.split_whitespace().map(|s| s.to_string()).collect();
919            }
920        } else if let Some(stripped) = line.strip_prefix("w ") {
921            // Parse w line: w Bandwidth=X Measured=Y
922            if let Some(ref mut router) = current_router {
923                for part in stripped.split_whitespace() {
924                    if let Some((key, value)) = part.split_once('=') {
925                        if let Ok(v) = value.parse::<u64>() {
926                            match key {
927                                "Bandwidth" => router.bandwidth = Some(v),
928                                "Measured" => router.measured = Some(v),
929                                _ => {}
930                            }
931                        }
932                    }
933                }
934            }
935        }
936    }
937
938    // Don't forget the last router
939    if let Some(router) = current_router {
940        routers.push(router);
941    }
942
943    Ok(routers)
944}
945
946/// Decodes a base64-encoded fingerprint to hex.
947fn decode_base64_fingerprint(b64: &str) -> String {
948    // Add padding if needed
949    let padded = match b64.len() % 4 {
950        0 => b64.to_string(),
951        2 => format!("{}==", b64),
952        3 => format!("{}=", b64),
953        _ => b64.to_string(),
954    };
955
956    // Decode base64
957    let decoded = base64_decode(&padded).unwrap_or_default();
958
959    // Convert to hex
960    decoded.iter().map(|b| format!("{:02X}", b)).collect()
961}
962
963/// Simple base64 decoder.
964fn base64_decode(input: &str) -> Option<Vec<u8>> {
965    const ALPHABET: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
966
967    let input = input.trim_end_matches('=');
968    let mut output = Vec::new();
969    let mut buffer = 0u32;
970    let mut bits = 0;
971
972    for c in input.bytes() {
973        let value = ALPHABET.iter().position(|&x| x == c)? as u32;
974        buffer = (buffer << 6) | value;
975        bits += 6;
976
977        if bits >= 8 {
978            bits -= 8;
979            output.push((buffer >> bits) as u8);
980            buffer &= (1 << bits) - 1;
981        }
982    }
983
984    Some(output)
985}
986
987/// Handles a signal event from Tor.
988///
989/// Processes signals received by the Tor daemon and takes appropriate action.
990/// Currently handles SIGHUP/RELOAD to reapply vanguard configuration.
991///
992/// # Supported Signals
993///
994/// | Signal | Action |
995/// |--------|--------|
996/// | `RELOAD` (SIGHUP) | Reapply vanguard configuration to Tor |
997///
998/// # Arguments
999///
1000/// * `controller` - The Tor controller
1001/// * `state` - The vanguard state
1002/// * `config` - The vanguards configuration
1003/// * `signal` - The signal name (e.g., "RELOAD")
1004///
1005/// # Returns
1006///
1007/// Returns `Ok(())` on successful handling.
1008///
1009/// # Errors
1010///
1011/// Returns [`Error::Control`] if reconfiguring Tor fails.
1012///
1013/// # Example
1014///
1015/// ```rust,no_run
1016/// use stem_rs::controller::Controller;
1017/// use vanguards_rs::vanguards::VanguardState;
1018/// use vanguards_rs::config::Config;
1019/// use vanguards_rs::control::signal_event;
1020///
1021/// # async fn example() -> Result<(), vanguards_rs::error::Error> {
1022/// let mut controller = Controller::from_port("127.0.0.1:9051".parse().unwrap()).await?;
1023/// controller.authenticate(None).await?;
1024///
1025/// let state = VanguardState::new("/tmp/vanguards.state");
1026/// let config = Config::default();
1027///
1028/// signal_event(&mut controller, &state, &config, "RELOAD").await?;
1029/// # Ok(())
1030/// # }
1031/// ```
1032pub async fn signal_event(
1033    controller: &mut Controller,
1034    state: &VanguardState,
1035    config: &Config,
1036    signal: &str,
1037) -> Result<()> {
1038    if signal == "RELOAD" {
1039        plog(LogLevel::Notice, "Tor got SIGHUP. Reapplying vanguards.");
1040        configure_tor(controller, state, config).await?;
1041    }
1042    Ok(())
1043}
1044
1045/// Application state shared across event handlers.
1046///
1047/// `AppState` aggregates all the stateful components needed during the main
1048/// event loop. It is passed to event handlers to allow them to update their
1049/// respective state and access configuration.
1050///
1051/// # Components
1052///
1053/// ```text
1054/// ┌─────────────────────────────────────────────┐
1055/// │                  AppState                   │
1056/// │                                             │
1057/// │  ┌─────────────────┐  ┌─────────────────┐   │
1058/// │  │ VanguardState   │  │ BandwidthStats  │   │
1059/// │  │ • layer2 guards │  │ • circuit stats │   │
1060/// │  │ • layer3 guards │  │ • conn tracking │   │
1061/// │  │ • rendguard     │  │ • attack detect │   │
1062/// │  └─────────────────┘  └─────────────────┘   │
1063/// │                                             │
1064/// │  ┌─────────────────┐  ┌─────────────────┐   │
1065/// │  │ TimeoutStats    │  │ LogGuard        │   │
1066/// │  │ • CBT tracking  │  │ • log buffering │   │
1067/// │  │ • timeout rates │  │ • warn detection│   │
1068/// │  └─────────────────┘  └─────────────────┘   │
1069/// │                                             │
1070/// │  ┌─────────────────┐  ┌─────────────────┐   │
1071/// │  │ PathVerify      │  │ Config          │   │
1072/// │  │ • path checking │  │ • all settings  │   │
1073/// │  │ • guard verify  │  │ • thresholds    │   │
1074/// │  └─────────────────┘  └─────────────────┘   │
1075/// └────────────────────────────────────────────┘
1076/// ```
1077///
1078/// # Thread Safety
1079///
1080/// `AppState` is not thread-safe. It is designed to be used within a single
1081/// async task (the main event loop). For concurrent access, wrap in appropriate
1082/// synchronization primitives.
1083///
1084/// # Example
1085///
1086/// ```rust,no_run
1087/// use vanguards_rs::control::AppState;
1088/// use vanguards_rs::vanguards::VanguardState;
1089/// use vanguards_rs::config::Config;
1090///
1091/// // Create state for the event loop
1092/// let vanguard_state = VanguardState::new("/var/lib/tor/vanguards.state");
1093/// let config = Config::default();
1094/// let app_state = AppState::new(vanguard_state, config);
1095/// ```
1096///
1097/// # See Also
1098///
1099/// - [`VanguardState`] - Guard layer management
1100/// - [`BandwidthStats`] - Bandwidth attack detection
1101/// - [`TimeoutStats`] - Circuit build timeout verification
1102/// - [`LogGuard`] - Log buffering and analysis
1103/// - [`PathVerify`] - Circuit path verification
1104pub struct AppState {
1105    /// Vanguard state containing guard layers and rendguard.
1106    pub vanguard_state: VanguardState,
1107    /// Bandwidth statistics for attack detection.
1108    pub bandwidth_stats: BandwidthStats,
1109    /// Circuit build timeout statistics.
1110    pub timeout_stats: TimeoutStats,
1111    /// Optional log guard for log buffering and analysis.
1112    pub logguard: Option<LogGuard>,
1113    /// Optional path verifier for circuit path validation.
1114    pub pathverify: Option<PathVerify>,
1115    /// Application configuration.
1116    pub config: Config,
1117}
1118
1119impl AppState {
1120    /// Creates a new application state with the given vanguard state and configuration.
1121    ///
1122    /// Initializes bandwidth and timeout statistics to empty state. LogGuard and
1123    /// PathVerify are initialized later in the control loop based on configuration.
1124    ///
1125    /// # Arguments
1126    ///
1127    /// * `vanguard_state` - The vanguard state (loaded from file or newly created)
1128    /// * `config` - The application configuration
1129    ///
1130    /// # Returns
1131    ///
1132    /// A new `AppState` with initialized statistics and the provided state/config.
1133    ///
1134    /// # Example
1135    ///
1136    /// ```rust,no_run
1137    /// use vanguards_rs::control::AppState;
1138    /// use vanguards_rs::vanguards::VanguardState;
1139    /// use vanguards_rs::config::Config;
1140    ///
1141    /// let state = VanguardState::new("/tmp/vanguards.state");
1142    /// let config = Config::default();
1143    /// let app_state = AppState::new(state, config);
1144    /// ```
1145    pub fn new(vanguard_state: VanguardState, config: Config) -> Self {
1146        Self {
1147            vanguard_state,
1148            bandwidth_stats: BandwidthStats::new(),
1149            timeout_stats: TimeoutStats::new(),
1150            logguard: None,
1151            pathverify: None,
1152            config,
1153        }
1154    }
1155}
1156
1157/// Connects to Tor's control port.
1158///
1159/// Attempts connection in this order:
1160/// 1. Unix socket if configured
1161/// 2. TCP port if configured
1162/// 3. Default Unix socket /run/tor/control
1163/// 4. Default TCP port 127.0.0.1:9051
1164async fn connect_to_tor(config: &Config) -> Result<Controller> {
1165    // Try configured socket first
1166    if let Some(ref socket_path) = config.control_socket {
1167        match Controller::from_socket_file(socket_path.as_path()).await {
1168            Ok(controller) => {
1169                plog(
1170                    LogLevel::Notice,
1171                    &format!("Connected to Tor via socket {}", socket_path.display()),
1172                );
1173                return Ok(controller);
1174            }
1175            Err(e) => {
1176                return Err(Error::Control(e));
1177            }
1178        }
1179    }
1180
1181    // Try configured port
1182    if let Some(port) = config.control_port {
1183        let addr = format!("{}:{}", config.control_ip, port);
1184        match Controller::from_port(
1185            addr.parse()
1186                .map_err(|e| Error::Config(format!("Invalid control address: {}", e)))?,
1187        )
1188        .await
1189        {
1190            Ok(controller) => {
1191                plog(
1192                    LogLevel::Notice,
1193                    &format!("Connected to Tor via control port {}", addr),
1194                );
1195                return Ok(controller);
1196            }
1197            Err(e) => {
1198                return Err(Error::Control(e));
1199            }
1200        }
1201    }
1202
1203    // Try default socket
1204    if let Ok(controller) = Controller::from_socket_file(Path::new("/run/tor/control")).await {
1205        plog(
1206            LogLevel::Notice,
1207            "Connected to Tor via /run/tor/control socket",
1208        );
1209        return Ok(controller);
1210    }
1211
1212    // Try default port
1213    let addr = format!("{}:9051", config.control_ip);
1214    match Controller::from_port(
1215        addr.parse()
1216            .map_err(|e| Error::Config(format!("Invalid control address: {}", e)))?,
1217    )
1218    .await
1219    {
1220        Ok(controller) => {
1221            plog(
1222                LogLevel::Notice,
1223                &format!("Connected to Tor via {} control port", addr),
1224            );
1225            Ok(controller)
1226        }
1227        Err(e) => Err(Error::Control(e)),
1228    }
1229}
1230
1231/// Gets the list of event types to subscribe to based on configuration.
1232fn get_event_types(config: &Config, tor_version: &Version) -> Vec<EventType> {
1233    let mut events = Vec::new();
1234
1235    // Always subscribe to these if vanguards or rendguard enabled
1236    if config.enable_vanguards || config.enable_rendguard {
1237        events.push(EventType::NewConsensus);
1238        events.push(EventType::Signal);
1239    }
1240
1241    // Rendguard needs CIRC events
1242    if config.enable_rendguard {
1243        events.push(EventType::Circ);
1244    }
1245
1246    // Bandguards events
1247    if config.enable_bandguards {
1248        events.push(EventType::Circ);
1249        events.push(EventType::Bw);
1250        events.push(EventType::OrConn);
1251        events.push(EventType::NetworkLiveness);
1252
1253        // CIRC_BW and CIRC_MINOR require Tor 0.3.4.10+
1254        let min_version = Version::new(0, 3, 4).with_patch(10);
1255        if *tor_version >= min_version {
1256            events.push(EventType::CircBw);
1257            events.push(EventType::CircMinor);
1258        } else {
1259            plog(
1260                LogLevel::Notice,
1261                "In order for bandwidth-based protections to be enabled, you must use Tor 0.3.4.10 or newer.",
1262            );
1263        }
1264    }
1265
1266    // CBT verify events
1267    if config.enable_cbtverify {
1268        events.push(EventType::Circ);
1269        events.push(EventType::BuildTimeoutSet);
1270    }
1271
1272    // Path verify events
1273    if config.enable_pathverify {
1274        events.push(EventType::Circ);
1275        events.push(EventType::CircMinor);
1276        events.push(EventType::OrConn);
1277        events.push(EventType::Guard);
1278        events.push(EventType::ConfChanged);
1279    }
1280
1281    // Log guard events
1282    if config.enable_logguard {
1283        events.push(EventType::Circ);
1284        events.push(EventType::Warn);
1285
1286        // Add log events based on dump level
1287        let log_events = LogGuard::get_log_event_types(config.logguard.dump_level);
1288        for event_name in log_events {
1289            match event_name {
1290                "DEBUG" => events.push(EventType::Debug),
1291                "INFO" => events.push(EventType::Info),
1292                "NOTICE" => events.push(EventType::Notice),
1293                "WARN" => events.push(EventType::Warn),
1294                "ERR" => events.push(EventType::Err),
1295                _ => {}
1296            }
1297        }
1298    }
1299
1300    // Deduplicate
1301    events.sort_by_key(|e| format!("{:?}", e));
1302    events.dedup();
1303
1304    events
1305}
1306
1307/// Handles a circuit event, dispatching to all enabled handlers.
1308fn handle_circ_event(state: &mut AppState, event: &stem_rs::events::CircuitEvent, arrived_at: f64) {
1309    let circ_id = &event.id.0;
1310    let status = format!("{:?}", event.status);
1311    let purpose = event.purpose.as_ref().map(|p| format!("{:?}", p));
1312    let hs_state = event.hs_state.as_ref().map(|s| format!("{:?}", s));
1313    let reason = event.reason.as_ref().map(|r| format!("{:?}", r));
1314    let path: Vec<String> = event.path.iter().map(|(fp, _)| fp.clone()).collect();
1315
1316    // Rendguard: check for HS_SERVICE_REND in HSSR_CONNECTING
1317    if state.config.enable_rendguard {
1318        if let (Some(ref p), Some(ref hs)) = (&purpose, &hs_state) {
1319            if p == "HS_SERVICE_REND" && hs == "HSSR_CONNECTING" {
1320                // Get the rendezvous point (last hop in path)
1321                if let Some(rp_fp) = path.last() {
1322                    let valid = state
1323                        .vanguard_state
1324                        .rendguard
1325                        .valid_rend_use(rp_fp, &state.config.rendguard);
1326                    if !valid {
1327                        let usage_rate = state.vanguard_state.rendguard.usage_rate(rp_fp);
1328                        let expected = state.vanguard_state.rendguard.expected_weight(rp_fp);
1329                        plog(
1330                            LogLevel::Warn,
1331                            &format!(
1332                                "Possible rendezvous point overuse attack: {} used {:.2}% vs expected {:.2}%",
1333                                rp_fp, usage_rate, expected
1334                            ),
1335                        );
1336                    }
1337                }
1338            }
1339        }
1340    }
1341
1342    // Bandguards
1343    if state.config.enable_bandguards {
1344        state.bandwidth_stats.circ_event(
1345            circ_id,
1346            &status,
1347            purpose.as_deref().unwrap_or("GENERAL"),
1348            hs_state.as_deref(),
1349            &path,
1350            reason.as_deref(),
1351            arrived_at,
1352        );
1353    }
1354
1355    // CBT verify
1356    if state.config.enable_cbtverify {
1357        state.timeout_stats.circ_event(
1358            circ_id,
1359            &status,
1360            purpose.as_deref().unwrap_or("GENERAL"),
1361            hs_state.as_deref(),
1362            reason.as_deref(),
1363        );
1364    }
1365
1366    // Log guard
1367    if state.config.enable_logguard {
1368        if let Some(ref mut lg) = state.logguard {
1369            lg.circ_event(circ_id, &status, reason.as_deref());
1370        }
1371    }
1372
1373    // Path verify
1374    if state.config.enable_pathverify {
1375        if let Some(ref mut pv) = state.pathverify {
1376            pv.circ_event(
1377                circ_id,
1378                &status,
1379                purpose.as_deref().unwrap_or("GENERAL"),
1380                hs_state.as_deref(),
1381                &event.path,
1382            );
1383        }
1384    }
1385}
1386
1387/// Handles a circuit bandwidth event.
1388fn handle_circbw_event(
1389    state: &mut AppState,
1390    event: &stem_rs::events::CircuitBandwidthEvent,
1391    arrived_at: f64,
1392) {
1393    if state.config.enable_bandguards {
1394        state.bandwidth_stats.circbw_event(
1395            &event.id.0,
1396            event.read,
1397            event.written,
1398            event.delivered_read.unwrap_or(0),
1399            event.delivered_written.unwrap_or(0),
1400            event.overhead_read.unwrap_or(0),
1401            event.overhead_written.unwrap_or(0),
1402            arrived_at,
1403        );
1404    }
1405}
1406
1407/// Handles a circuit minor event.
1408#[allow(dead_code)]
1409fn handle_circ_minor_event(state: &mut AppState, event: &stem_rs::events::CircuitEvent) {
1410    let circ_id = &event.id.0;
1411    let purpose = event.purpose.as_ref().map(|p| format!("{:?}", p));
1412    let hs_state = event.hs_state.as_ref().map(|s| format!("{:?}", s));
1413    let path: Vec<String> = event.path.iter().map(|(fp, _)| fp.clone()).collect();
1414
1415    // Bandguards
1416    if state.config.enable_bandguards {
1417        // For CIRC_MINOR, we need old_purpose and old_hs_state which aren't in the event
1418        // We'll pass None for now as the event doesn't provide these
1419        state.bandwidth_stats.circ_minor_event(
1420            circ_id,
1421            "PURPOSE_CHANGED",
1422            purpose.as_deref().unwrap_or("GENERAL"),
1423            hs_state.as_deref(),
1424            None, // old_purpose
1425            None, // old_hs_state
1426            &path,
1427        );
1428    }
1429
1430    // Path verify
1431    if state.config.enable_pathverify {
1432        if let Some(ref mut pv) = state.pathverify {
1433            pv.circ_minor_event(
1434                circ_id,
1435                purpose.as_deref().unwrap_or("GENERAL"),
1436                None, // old_purpose
1437                &event.path,
1438            );
1439        }
1440    }
1441}
1442
1443/// Handles a raw CIRC_MINOR event from Unknown variant.
1444///
1445/// CIRC_MINOR events indicate minor changes to circuits like purpose changes.
1446/// Format: CircuitID EVENT [Path] [PURPOSE=...] [HS_STATE=...] [OLD_PURPOSE=...] [OLD_HS_STATE=...]
1447fn handle_circ_minor_raw(state: &mut AppState, content: &str) {
1448    let parts: Vec<&str> = content.split_whitespace().collect();
1449    if parts.len() < 2 {
1450        return;
1451    }
1452
1453    let circ_id = parts[0];
1454    let _event_type = parts[1]; // e.g., "PURPOSE_CHANGED"
1455
1456    // Parse path and key-value pairs
1457    let mut path: Vec<(String, Option<String>)> = Vec::new();
1458    let mut purpose: Option<String> = None;
1459    let mut hs_state: Option<String> = None;
1460    let mut old_purpose: Option<String> = None;
1461    let mut old_hs_state: Option<String> = None;
1462
1463    for part in parts.iter().skip(2) {
1464        if let Some((key, value)) = part.split_once('=') {
1465            match key {
1466                "PURPOSE" => purpose = Some(value.to_string()),
1467                "HS_STATE" => hs_state = Some(value.to_string()),
1468                "OLD_PURPOSE" => old_purpose = Some(value.to_string()),
1469                "OLD_HS_STATE" => old_hs_state = Some(value.to_string()),
1470                _ => {}
1471            }
1472        } else if part.starts_with('$') || part.contains('~') || part.contains(',') {
1473            // Parse path
1474            for hop in part.split(',') {
1475                let hop = hop.trim_start_matches('$');
1476                if let Some((fp, nick)) = hop.split_once('~') {
1477                    path.push((fp.to_string(), Some(nick.to_string())));
1478                } else if let Some((fp, nick)) = hop.split_once('=') {
1479                    path.push((fp.to_string(), Some(nick.to_string())));
1480                } else if !hop.is_empty() {
1481                    path.push((hop.to_string(), None));
1482                }
1483            }
1484        }
1485    }
1486
1487    // Bandguards
1488    if state.config.enable_bandguards {
1489        let path_fps: Vec<String> = path.iter().map(|(fp, _)| fp.clone()).collect();
1490        state.bandwidth_stats.circ_minor_event(
1491            circ_id,
1492            _event_type,
1493            purpose.as_deref().unwrap_or("GENERAL"),
1494            hs_state.as_deref(),
1495            old_purpose.as_deref(),
1496            old_hs_state.as_deref(),
1497            &path_fps,
1498        );
1499    }
1500
1501    // Path verify
1502    if state.config.enable_pathverify {
1503        if let Some(ref mut pv) = state.pathverify {
1504            pv.circ_minor_event(
1505                circ_id,
1506                purpose.as_deref().unwrap_or("GENERAL"),
1507                old_purpose.as_deref(),
1508                &path,
1509            );
1510        }
1511    }
1512}
1513
1514/// Handles an OR connection event.
1515fn handle_orconn_event(
1516    state: &mut AppState,
1517    event: &stem_rs::events::OrConnEvent,
1518    arrived_at: f64,
1519) {
1520    let status = format!("{:?}", event.status);
1521    let reason = event.reason.as_ref().map(|r| format!("{:?}", r));
1522    let conn_id = event.id.as_deref().unwrap_or("");
1523
1524    // Bandguards
1525    if state.config.enable_bandguards {
1526        state.bandwidth_stats.orconn_event(
1527            conn_id,
1528            &event.target,
1529            &status,
1530            reason.as_deref(),
1531            arrived_at,
1532        );
1533    }
1534
1535    // Path verify
1536    if state.config.enable_pathverify {
1537        if let Some(ref mut pv) = state.pathverify {
1538            // Extract fingerprint from target (format: $fingerprint~nickname or $fingerprint=nickname)
1539            let guard_fp = if event.target.starts_with('$') {
1540                event.target[1..].split(['~', '=']).next().unwrap_or("")
1541            } else {
1542                &event.target
1543            };
1544            pv.orconn_event(guard_fp, &status);
1545        }
1546    }
1547}
1548
1549/// Handles a bandwidth event (1x/sec heartbeat).
1550fn handle_bw_event(
1551    state: &mut AppState,
1552    _event: &stem_rs::events::BandwidthEvent,
1553    arrived_at: f64,
1554) {
1555    if state.config.enable_bandguards {
1556        state
1557            .bandwidth_stats
1558            .check_connectivity(arrived_at, &state.config.bandguards);
1559    }
1560}
1561
1562/// Handles a network liveness event.
1563fn handle_network_liveness_event(
1564    state: &mut AppState,
1565    event: &stem_rs::events::NetworkLivenessEvent,
1566    arrived_at: f64,
1567) {
1568    if state.config.enable_bandguards {
1569        let status = format!("{:?}", event.status);
1570        state
1571            .bandwidth_stats
1572            .network_liveness_event(&status, arrived_at);
1573    }
1574}
1575
1576/// Handles a build timeout set event.
1577fn handle_buildtimeout_set_event(
1578    state: &mut AppState,
1579    event: &stem_rs::events::BuildTimeoutSetEvent,
1580) {
1581    if state.config.enable_cbtverify {
1582        let set_type = format!("{:?}", event.set_type);
1583        state.timeout_stats.cbt_event(&set_type, event.timeout_rate);
1584    }
1585}
1586
1587/// Handles a guard event.
1588fn handle_guard_event(state: &mut AppState, event: &stem_rs::events::GuardEvent) {
1589    if state.config.enable_pathverify {
1590        if let Some(ref mut pv) = state.pathverify {
1591            let status = format!("{:?}", event.status);
1592            // Use endpoint_fingerprint directly from the event
1593            pv.guard_event(&event.endpoint_fingerprint, &status);
1594        }
1595    }
1596}
1597
1598/// Handles a configuration changed event.
1599fn handle_conf_changed_event(state: &mut AppState, event: &stem_rs::events::ConfChangedEvent) {
1600    if state.config.enable_pathverify {
1601        if let Some(ref mut pv) = state.pathverify {
1602            pv.conf_changed_event(&event.changed);
1603        }
1604    }
1605}
1606
1607/// Handles a log event.
1608fn handle_log_event(state: &mut AppState, event: &stem_rs::events::LogEvent, arrived_at: f64) {
1609    if state.config.enable_logguard {
1610        if let Some(ref mut lg) = state.logguard {
1611            let runlevel = format!("{:?}", event.runlevel);
1612            lg.log_event_with_timestamp(&runlevel, &event.message, arrived_at);
1613
1614            // Also handle warn events specially
1615            if matches!(event.runlevel, stem_rs::Runlevel::Warn) {
1616                lg.log_warn_event(&event.message);
1617            }
1618        }
1619    }
1620}
1621
1622/// Handles a signal event.
1623async fn handle_signal_event(
1624    controller: &mut Controller,
1625    state: &mut AppState,
1626    event: &stem_rs::events::SignalEvent,
1627) -> Result<()> {
1628    let signal_name = format!("{:?}", event.signal);
1629    signal_event(
1630        controller,
1631        &state.vanguard_state,
1632        &state.config,
1633        &signal_name,
1634    )
1635    .await
1636}
1637
1638/// Main control loop for event processing.
1639///
1640/// Connects to Tor, authenticates, initializes state, and processes events
1641/// in a continuous loop until the connection is lost or an error occurs.
1642///
1643/// # Flow
1644///
1645/// 1. Connect to Tor's control port (socket or TCP)
1646/// 2. Authenticate using available methods
1647/// 3. Get Tor version for feature detection
1648/// 4. Initialize vanguard state from consensus
1649/// 5. Initialize optional components (logguard, pathverify)
1650/// 6. Subscribe to configured event types
1651/// 7. Process events until connection closes
1652///
1653/// # Arguments
1654///
1655/// * `state` - The application state containing all protection components
1656///
1657/// # Returns
1658///
1659/// Returns a status string:
1660/// - `"closed"` - Connection was closed normally
1661/// - `"failed: <reason>"` - Connection or operation failed
1662///
1663/// # Event Processing
1664///
1665/// The loop dispatches events to appropriate handlers:
1666///
1667/// | Event Type | Handlers |
1668/// |------------|----------|
1669/// | CIRC | RendGuard, BandGuards, CBTVerify, PathVerify, LogGuard |
1670/// | CIRC_BW | BandGuards |
1671/// | CIRC_MINOR | BandGuards, PathVerify |
1672/// | ORCONN | BandGuards, PathVerify |
1673/// | BW | BandGuards (connectivity check) |
1674/// | NEWCONSENSUS | VanguardState update |
1675/// | SIGNAL | Configuration reload (SIGHUP) |
1676///
1677/// # Example
1678///
1679/// ```rust,no_run
1680/// use vanguards_rs::control::{AppState, control_loop};
1681/// use vanguards_rs::vanguards::VanguardState;
1682/// use vanguards_rs::config::Config;
1683///
1684/// # async fn example() {
1685/// let state = VanguardState::new("/tmp/vanguards.state");
1686/// let config = Config::default();
1687/// let mut app_state = AppState::new(state, config);
1688///
1689/// let result = control_loop(&mut app_state).await;
1690/// println!("Control loop exited: {}", result);
1691/// # }
1692/// ```
1693///
1694/// # See Also
1695///
1696/// - [`run_main`] - Higher-level entry point with reconnection support
1697/// - [`authenticate_any`] - Authentication implementation
1698/// - [`new_consensus_event`] - Consensus processing
1699pub async fn control_loop(state: &mut AppState) -> String {
1700    // Connect to Tor
1701    let mut controller = match connect_to_tor(&state.config).await {
1702        Ok(c) => c,
1703        Err(e) => return format!("failed: {}", e),
1704    };
1705
1706    // Authenticate
1707    if let Err(e) = authenticate_any(&mut controller, state.config.control_pass.as_deref()).await {
1708        return format!("failed: {}", e);
1709    }
1710
1711    // Get Tor version for feature detection
1712    let tor_version = match controller.get_version().await {
1713        Ok(v) => v,
1714        Err(e) => return format!("failed: {}", e),
1715    };
1716
1717    // Initialize vanguard state from consensus
1718    if state.config.enable_vanguards || state.config.enable_rendguard {
1719        match new_consensus_event(&mut controller, &mut state.vanguard_state, &state.config).await {
1720            Ok(()) => {}
1721            Err(Error::DescriptorUnavailable(msg)) => {
1722                plog(
1723                    LogLevel::Notice,
1724                    &format!("Tor needs descriptors: {}. Trying again...", msg),
1725                );
1726                return format!("failed: {}", msg);
1727            }
1728            Err(e) => return format!("failed: {}", e),
1729        }
1730    }
1731
1732    // Handle one-shot mode
1733    if state.config.one_shot_vanguards {
1734        // Note: SaveConf is not available in stem-rs Signal enum
1735        // We just exit after setting vanguards - user should save config manually if needed
1736        plog(
1737            LogLevel::Notice,
1738            "Updated vanguards. Exiting (one-shot mode).",
1739        );
1740        std::process::exit(0);
1741    }
1742
1743    // Initialize logguard if enabled
1744    if state.config.enable_logguard {
1745        state.logguard = Some(LogGuard::new(&state.config.logguard));
1746    }
1747
1748    // Initialize pathverify if enabled
1749    if state.config.enable_pathverify {
1750        state.pathverify = Some(PathVerify::new(
1751            state.config.enable_vanguards,
1752            state.config.vanguards.num_layer1_guards,
1753            state.config.vanguards.num_layer2_guards,
1754            state.config.vanguards.num_layer3_guards,
1755        ));
1756
1757        // Send NEWNYM to get fresh circuits
1758        if let Err(e) = controller.signal(stem_rs::Signal::Newnym).await {
1759            plog(LogLevel::Warn, &format!("Failed to send NEWNYM: {}", e));
1760        }
1761    }
1762
1763    // Subscribe to events
1764    let event_types = get_event_types(&state.config, &tor_version);
1765    if let Err(e) = controller.set_events(&event_types).await {
1766        return format!("failed: {}", e);
1767    }
1768
1769    // Main event loop
1770    loop {
1771        match controller.recv_event().await {
1772            Ok(event) => {
1773                let arrived_at = std::time::SystemTime::now()
1774                    .duration_since(std::time::UNIX_EPOCH)
1775                    .map(|d| d.as_secs_f64())
1776                    .unwrap_or(0.0);
1777
1778                match event {
1779                    ParsedEvent::Circuit(ref e) => {
1780                        handle_circ_event(state, e, arrived_at);
1781                    }
1782                    ParsedEvent::CircuitBandwidth(ref e) => {
1783                        handle_circbw_event(state, e, arrived_at);
1784                    }
1785                    ParsedEvent::OrConn(ref e) => {
1786                        handle_orconn_event(state, e, arrived_at);
1787                    }
1788                    ParsedEvent::Bandwidth(ref e) => {
1789                        handle_bw_event(state, e, arrived_at);
1790                    }
1791                    ParsedEvent::NetworkLiveness(ref e) => {
1792                        handle_network_liveness_event(state, e, arrived_at);
1793                    }
1794                    ParsedEvent::BuildTimeoutSet(ref e) => {
1795                        handle_buildtimeout_set_event(state, e);
1796                    }
1797                    ParsedEvent::Guard(ref e) => {
1798                        handle_guard_event(state, e);
1799                    }
1800                    ParsedEvent::ConfChanged(ref e) => {
1801                        handle_conf_changed_event(state, e);
1802                    }
1803                    ParsedEvent::Log(ref e) => {
1804                        handle_log_event(state, e, arrived_at);
1805                    }
1806                    ParsedEvent::Signal(ref e) => {
1807                        if let Err(err) = handle_signal_event(&mut controller, state, e).await {
1808                            plog(LogLevel::Warn, &format!("Signal event error: {}", err));
1809                        }
1810                    }
1811                    ParsedEvent::Unknown {
1812                        ref event_type,
1813                        ref content,
1814                    } => {
1815                        // Handle NEWCONSENSUS specially since it may not be in ParsedEvent
1816                        if event_type == "NEWCONSENSUS" {
1817                            if let Err(err) = new_consensus_event(
1818                                &mut controller,
1819                                &mut state.vanguard_state,
1820                                &state.config,
1821                            )
1822                            .await
1823                            {
1824                                plog(LogLevel::Warn, &format!("Consensus event error: {}", err));
1825                            }
1826                        } else if event_type == "CIRC_MINOR" {
1827                            // Parse CIRC_MINOR event manually
1828                            // Format: CircuitID EVENT [Path] [PURPOSE=...] [HS_STATE=...] [OLD_PURPOSE=...] [OLD_HS_STATE=...]
1829                            handle_circ_minor_raw(state, content);
1830                        }
1831                    }
1832                    _ => {
1833                        // Ignore other events
1834                    }
1835                }
1836
1837                // Check circuit limits after bandwidth events
1838                if state.config.enable_bandguards {
1839                    let circs_to_check: Vec<String> =
1840                        state.bandwidth_stats.circs.keys().cloned().collect();
1841                    for circ_id in circs_to_check {
1842                        let limit_result = state
1843                            .bandwidth_stats
1844                            .check_circuit_limits(&circ_id, &state.config.bandguards);
1845                        match limit_result {
1846                            crate::bandguards::CircuitLimitResult::Ok => {}
1847                            crate::bandguards::CircuitLimitResult::TorBug {
1848                                bug_id,
1849                                dropped_cells,
1850                            } => {
1851                                plog(
1852                                    LogLevel::Info,
1853                                    &format!(
1854                                        "Tor bug {} (dropped {} cells): {}",
1855                                        bug_id, dropped_cells, circ_id
1856                                    ),
1857                                );
1858                            }
1859                            crate::bandguards::CircuitLimitResult::DroppedCells {
1860                                dropped_cells,
1861                            } => {
1862                                plog(
1863                                    LogLevel::Warn,
1864                                    &format!(
1865                                        "Dropped cells attack ({} cells): {}",
1866                                        dropped_cells, circ_id
1867                                    ),
1868                                );
1869                                try_close_circuit(
1870                                    &mut controller,
1871                                    &circ_id,
1872                                    state.logguard.as_mut(),
1873                                )
1874                                .await;
1875                            }
1876                            crate::bandguards::CircuitLimitResult::MaxBytesExceeded {
1877                                bytes,
1878                                limit,
1879                            } => {
1880                                plog(
1881                                    LogLevel::Warn,
1882                                    &format!(
1883                                        "Circuit {} exceeded max bytes ({} > {})",
1884                                        circ_id, bytes, limit
1885                                    ),
1886                                );
1887                                try_close_circuit(
1888                                    &mut controller,
1889                                    &circ_id,
1890                                    state.logguard.as_mut(),
1891                                )
1892                                .await;
1893                            }
1894                            crate::bandguards::CircuitLimitResult::HsdirBytesExceeded {
1895                                bytes,
1896                                limit,
1897                            } => {
1898                                plog(
1899                                    LogLevel::Warn,
1900                                    &format!(
1901                                        "HSDIR circuit {} exceeded max bytes ({} > {})",
1902                                        circ_id, bytes, limit
1903                                    ),
1904                                );
1905                                try_close_circuit(
1906                                    &mut controller,
1907                                    &circ_id,
1908                                    state.logguard.as_mut(),
1909                                )
1910                                .await;
1911                            }
1912                            crate::bandguards::CircuitLimitResult::ServIntroBytesExceeded {
1913                                bytes,
1914                                limit,
1915                            } => {
1916                                plog(
1917                                    LogLevel::Warn,
1918                                    &format!(
1919                                        "Service intro circuit {} exceeded max bytes ({} > {})",
1920                                        circ_id, bytes, limit
1921                                    ),
1922                                );
1923                                try_close_circuit(
1924                                    &mut controller,
1925                                    &circ_id,
1926                                    state.logguard.as_mut(),
1927                                )
1928                                .await;
1929                            }
1930                        }
1931                    }
1932                }
1933            }
1934            Err(e) => {
1935                // Connection closed or error
1936                plog(LogLevel::Debug, &format!("Event receive error: {}", e));
1937                return "closed".to_string();
1938            }
1939        }
1940    }
1941}
1942
1943/// Runs the main application loop with reconnection support.
1944///
1945/// This is the primary entry point for the vanguards application. It manages
1946/// the complete lifecycle including connection, reconnection, and graceful shutdown.
1947///
1948/// # Lifecycle
1949///
1950/// ```text
1951/// ┌─────────────────────────────────────────────────────────────┐
1952/// │                      run_main()                             │
1953/// │                                                             │
1954/// │  1. Set up CTRL+C handler                                   │
1955/// │  2. Load/create vanguard state                              │
1956/// │  3. Enter reconnection loop:                                │
1957/// │     ┌─────────────────────────────────────────────────────┐ │
1958/// │     │  • Check shutdown flag                              │ │
1959/// │     │  • Check retry limit                                │ │
1960/// │     │  • Run control_loop()                               │ │
1961/// │     │  • Log disconnection                                │ │
1962/// │     │  • Wait 1 second                                    │ │
1963/// │     │  • Increment reconnect counter                      │ │
1964/// │     └─────────────────────────────────────────────────────┘ │
1965/// │  4. Exit when shutdown or retry limit reached               │
1966/// └─────────────────────────────────────────────────────────────┘
1967/// ```
1968///
1969/// # Arguments
1970///
1971/// * `config` - The application configuration
1972///
1973/// # Returns
1974///
1975/// Returns `Ok(())` on graceful shutdown, or an error if the application
1976/// fails to start or encounters an unrecoverable error.
1977///
1978/// # Errors
1979///
1980/// Returns [`Error::Config`] if:
1981/// - Failed to connect to Tor after all retry attempts
1982/// - Invalid configuration values
1983///
1984/// # Shutdown Behavior
1985///
1986/// The function handles graceful shutdown via:
1987/// - CTRL+C signal (sets shutdown flag)
1988/// - Retry limit reached (configurable via `config.retry_limit`)
1989///
1990/// # Example
1991///
1992/// ```rust,no_run
1993/// use vanguards_rs::config::Config;
1994/// use vanguards_rs::control::run_main;
1995///
1996/// #[tokio::main]
1997/// async fn main() -> Result<(), vanguards_rs::error::Error> {
1998///     // Load configuration from file or use defaults
1999///     let config = Config::default();
2000///     
2001///     // Run until shutdown signal or error
2002///     run_main(config).await
2003/// }
2004/// ```
2005///
2006/// # See Also
2007///
2008/// - [`control_loop`] - The inner event processing loop
2009/// - [`Config`] - Configuration options
2010/// - [`VanguardState`] - State persistence
2011pub async fn run_main(config: Config) -> Result<()> {
2012    // Set up CTRL+C handler
2013    let shutdown = Arc::new(AtomicBool::new(false));
2014    let shutdown_clone = shutdown.clone();
2015
2016    tokio::spawn(async move {
2017        if let Ok(()) = tokio::signal::ctrl_c().await {
2018            plog(LogLevel::Notice, "Got CTRL+C. Exiting.");
2019            shutdown_clone.store(true, Ordering::SeqCst);
2020        }
2021    });
2022
2023    // Set close circuits flag from config
2024    set_close_circuits(config.close_circuits);
2025
2026    // Load or create vanguard state
2027    let state_path = &config.state_file;
2028    let vanguard_state = match VanguardState::read_from_file(state_path) {
2029        Ok(mut state) => {
2030            plog(
2031                LogLevel::Info,
2032                &format!("Current layer2 guards: {}", state.layer2_guardset()),
2033            );
2034            plog(
2035                LogLevel::Info,
2036                &format!("Current layer3 guards: {}", state.layer3_guardset()),
2037            );
2038            state.enable_vanguards = config.enable_vanguards;
2039            state
2040        }
2041        Err(_) => {
2042            plog(
2043                LogLevel::Notice,
2044                &format!(
2045                    "Creating new vanguard state file at: {}",
2046                    state_path.display()
2047                ),
2048            );
2049            let mut state = VanguardState::new(&state_path.to_string_lossy());
2050            state.enable_vanguards = config.enable_vanguards;
2051            state
2052        }
2053    };
2054
2055    let mut app_state = AppState::new(vanguard_state, config.clone());
2056
2057    let mut reconnects = 0u32;
2058    let mut last_connected_at: Option<f64> = None;
2059    let mut connected = false;
2060
2061    loop {
2062        // Check for shutdown
2063        if shutdown.load(Ordering::SeqCst) {
2064            break;
2065        }
2066
2067        // Check retry limit
2068        if let Some(limit) = config.retry_limit {
2069            if reconnects >= limit {
2070                break;
2071            }
2072        }
2073
2074        let result = control_loop(&mut app_state).await;
2075
2076        if last_connected_at.is_none() {
2077            last_connected_at = Some(
2078                std::time::SystemTime::now()
2079                    .duration_since(std::time::UNIX_EPOCH)
2080                    .map(|d| d.as_secs_f64())
2081                    .unwrap_or(0.0),
2082            );
2083        }
2084
2085        if result == "closed" {
2086            connected = true;
2087        }
2088
2089        // Log reconnection attempts (every 10 seconds or on first close)
2090        if result == "closed" || reconnects.is_multiple_of(10) {
2091            let now = std::time::SystemTime::now()
2092                .duration_since(std::time::UNIX_EPOCH)
2093                .map(|d| d.as_secs_f64())
2094                .unwrap_or(0.0);
2095
2096            let disconnected_secs = now - last_connected_at.unwrap_or(now);
2097            let max_disconnected = config.bandguards.conn_max_disconnected_secs as f64;
2098
2099            if disconnected_secs > max_disconnected {
2100                plog(
2101                    LogLevel::Warn,
2102                    &format!("Tor daemon connection {}. Trying again...", result),
2103                );
2104            } else {
2105                plog(
2106                    LogLevel::Notice,
2107                    &format!("Tor daemon connection {}. Trying again...", result),
2108                );
2109            }
2110        }
2111
2112        reconnects += 1;
2113
2114        // Wait before reconnecting
2115        tokio::time::sleep(Duration::from_secs(1)).await;
2116    }
2117
2118    if !connected {
2119        return Err(Error::Config("Failed to connect to Tor".to_string()));
2120    }
2121
2122    Ok(())
2123}
2124
2125#[cfg(test)]
2126mod tests {
2127    use super::*;
2128    use std::io::Write;
2129    use tempfile::NamedTempFile;
2130
2131    #[test]
2132    fn test_get_consensus_weights() {
2133        let mut file = NamedTempFile::new().unwrap();
2134        writeln!(
2135            file,
2136            "network-status-version 3 microdesc\n\
2137             bandwidth-weights Wbd=0 Wbe=0 Wbg=4194 Wbm=10000 Wdb=10000 Wed=10000 Wee=10000 Weg=10000 Wem=10000 Wgb=10000 Wgd=0 Wgg=5806 Wgm=5806 Wmb=10000 Wmd=0 Wme=0 Wmg=4194 Wmm=10000"
2138        )
2139        .unwrap();
2140
2141        let weights = get_consensus_weights(file.path()).unwrap();
2142
2143        assert_eq!(weights.get("Wmm"), Some(&10000));
2144        assert_eq!(weights.get("Wgg"), Some(&5806));
2145        assert_eq!(weights.get("Wbd"), Some(&0));
2146    }
2147
2148    #[test]
2149    fn test_get_consensus_weights_missing() {
2150        let mut file = NamedTempFile::new().unwrap();
2151        writeln!(file, "network-status-version 3 microdesc").unwrap();
2152
2153        let result = get_consensus_weights(file.path());
2154        assert!(result.is_err());
2155    }
2156
2157    #[test]
2158    fn test_base64_decode() {
2159        // Test standard base64 decoding
2160        let decoded = base64_decode("SGVsbG8=").unwrap();
2161        assert_eq!(decoded, b"Hello");
2162
2163        // Test without padding
2164        let decoded = base64_decode("SGVsbG8").unwrap();
2165        assert_eq!(decoded, b"Hello");
2166    }
2167
2168    #[test]
2169    fn test_decode_base64_fingerprint() {
2170        // A typical Tor fingerprint in base64 (27 chars without padding)
2171        // 20 bytes = 160 bits, which is 27 base64 chars (ceil(160/6) = 27)
2172        let b64 = "AAAAAAAAAAAAAAAAAAAAAAAAAAA";
2173        let hex = decode_base64_fingerprint(b64);
2174        // Should produce 40 hex characters (20 bytes)
2175        assert_eq!(hex.len(), 40);
2176        assert!(hex.chars().all(|c| c.is_ascii_hexdigit()));
2177    }
2178
2179    #[test]
2180    fn test_parse_network_statuses() {
2181        let response = "\
2182r relay1 AAAAAAAAAAAAAAAAAAAAAAAAAAAA BBBBBBBBBBBBBBBBBBBBBBBBBBBB 2024-01-01 00:00:00 192.168.1.1 9001 0
2183s Fast Guard Running Stable Valid
2184w Bandwidth=1000 Measured=900
2185r relay2 CCCCCCCCCCCCCCCCCCCCCCCCCCCC DDDDDDDDDDDDDDDDDDDDDDDDDDDD 2024-01-01 00:00:00 192.168.1.2 9002 0
2186s Fast Running Stable Valid Exit
2187w Bandwidth=2000";
2188
2189        let routers = parse_network_statuses(response).unwrap();
2190        assert_eq!(routers.len(), 2);
2191
2192        assert_eq!(routers[0].nickname, "relay1");
2193        assert!(routers[0].flags.contains(&"Guard".to_string()));
2194        assert_eq!(routers[0].bandwidth, Some(1000));
2195        assert_eq!(routers[0].measured, Some(900));
2196
2197        assert_eq!(routers[1].nickname, "relay2");
2198        assert!(routers[1].flags.contains(&"Exit".to_string()));
2199        assert_eq!(routers[1].bandwidth, Some(2000));
2200        assert_eq!(routers[1].measured, None);
2201    }
2202
2203    #[test]
2204    fn test_close_circuits_flag() {
2205        set_close_circuits(true);
2206        assert!(get_close_circuits());
2207
2208        set_close_circuits(false);
2209        assert!(!get_close_circuits());
2210
2211        // Reset to default
2212        set_close_circuits(true);
2213    }
2214}