1
Fork 0
mirror of https://github.com/thegeneralist01/p2p-failover synced 2026-03-07 12:29:54 +01:00

make checking alive nodes multithreaded

This commit is contained in:
TheGeneralist 2025-03-30 15:41:25 +02:00
parent 5b4bc54c1f
commit bc39e5caaf
No known key found for this signature in database
GPG key ID: C391D4D52D630F45
4 changed files with 113 additions and 37 deletions

51
Cargo.lock generated
View file

@ -254,6 +254,21 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "futures"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]] [[package]]
name = "futures-channel" name = "futures-channel"
version = "0.3.31" version = "0.3.31"
@ -261,6 +276,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10"
dependencies = [ dependencies = [
"futures-core", "futures-core",
"futures-sink",
] ]
[[package]] [[package]]
@ -269,6 +285,34 @@ version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e"
[[package]]
name = "futures-executor"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-io"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6"
[[package]]
name = "futures-macro"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "futures-sink" name = "futures-sink"
version = "0.3.31" version = "0.3.31"
@ -287,10 +331,16 @@ version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
dependencies = [ dependencies = [
"futures-channel",
"futures-core", "futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task", "futures-task",
"memchr",
"pin-project-lite", "pin-project-lite",
"pin-utils", "pin-utils",
"slab",
] ]
[[package]] [[package]]
@ -882,6 +932,7 @@ dependencies = [
"anyhow", "anyhow",
"chrono", "chrono",
"dateparser", "dateparser",
"futures",
"notify", "notify",
"reqwest", "reqwest",
"serde", "serde",

View file

@ -12,3 +12,4 @@ reqwest = "0.12.12"
tokio = { version = "=1.40.0", features = ["full"] } tokio = { version = "=1.40.0", features = ["full"] }
notify = "8.0.0" notify = "8.0.0"
anyhow = "1.0.97" anyhow = "1.0.97"
futures = "0.3.31"

View file

@ -1,11 +1,14 @@
use crate::{config::Config, log, node_connections::NodeConnections, process::Process}; use crate::{config::Config, log, node_connections::NodeConnections, process::Process};
use futures::future::join_all;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use tokio::task;
#[derive(Clone)]
pub struct Node { pub struct Node {
alive: bool, alive: bool,
pub config: Arc<Mutex<Config>>, pub config: Arc<Mutex<Config>>,
alives: Vec<bool>, alives: Vec<bool>,
process: Option<Process>, process: Option<Arc<Mutex<Process>>>,
pub node_connections: NodeConnections, pub node_connections: NodeConnections,
// tick: u8, // tick: u8,
// tick_dir: u8, // tick_dir: u8,
@ -28,47 +31,67 @@ impl Node {
/// Returns the amount of alive hosts /// Returns the amount of alive hosts
pub async fn check_hosts(&mut self) -> u8 { pub async fn check_hosts(&mut self) -> u8 {
let mut alives: u8 = 0; let config_metadata_name = self.config.lock().unwrap().config_metadata.name.clone();
let config = self.config.lock().unwrap(); let alives = Arc::new(Mutex::new(0u8));
for host in config.ddns.iter().enumerate() { let mut handles = Vec::new();
if host.1.name == config.config_metadata.name {
for (index, host) in self.config.lock().unwrap().ddns.iter().enumerate() {
if host.name == config_metadata_name {
continue; continue;
} }
self.alives[host.0] = false; self.alives[index] = false;
let host_clone = host.clone();
let alives_clone = Arc::clone(&alives);
let mut node_connections = self.node_connections.clone();
let handle = task::spawn(async move {
let host_name = host_clone.name.clone();
let host_priority = host_clone.priority;
log!( log!(
"Checking: {}:{}", "Checking: {}:{}",
if host.1.preference == 0 { if host_clone.preference == 0 {
&host.1.ddns &host_clone.ddns
} else { } else {
&host.1.ip &host_clone.ip
}, },
&host.1.port, &host_clone.port
); );
let alive = self.node_connections.ping(host.1);
let alive = task::spawn_blocking(move || node_connections.ping(&host_clone))
.await
.unwrap();
if alive { if alive {
log!( log!(
"-> Alive: host \"{}\" with priority {}", "-> Alive: host \"{}\" with priority {}",
host.1.name, host_name,
host.1.priority host_priority,
); );
alives += 1; let mut count = alives_clone.lock().unwrap();
*count += 1;
} else { } else {
log!( log!(
"-> Host \"{}\" with priority {} is dead", "-> Host \"{}\" with priority {} is dead",
host.1.name, host_name,
host.1.priority host_priority,
); );
} }
self.alives[host.0] = alive; });
handles.push(handle);
} }
join_all(handles).await;
let alives = *alives.lock().unwrap();
alives alives
} }
fn spawn(&mut self) { fn spawn(&mut self) {
let process = Process::new(&self.config.lock().unwrap()); let process = Process::new(&self.config.lock().unwrap());
self.process = Some(process); self.process = Some(Arc::new(Mutex::new(process)));
} }
/// Check for config updates and update /// Check for config updates and update
@ -152,8 +175,11 @@ impl Node {
{ {
// Clean up // Clean up
self.alive = false; self.alive = false;
if let Some(ref mut p) = self.process { if let Some(p) = &self.process {
p.kill(); {
let mut process = p.lock().unwrap();
process.kill();
}
self.process = None; self.process = None;
} }
} }

View file

@ -46,10 +46,7 @@ impl NodeInfo {
} }
} }
pub fn update_config( pub fn update_config(&mut self, config_self_mutex: Arc<Mutex<Config>>) -> Result<()> {
&mut self,
config_self_mutex: Arc<Mutex<Config>>,
) -> Result<()> {
if let Some(ref mut streamp) = self.streamp { if let Some(ref mut streamp) = self.streamp {
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
let read_stream = streamp.try_clone().unwrap(); let read_stream = streamp.try_clone().unwrap();
@ -145,6 +142,7 @@ impl NodeInfo {
} }
#[allow(dead_code)] #[allow(dead_code)]
#[derive(Clone)]
pub struct NodeConnections { pub struct NodeConnections {
connections: Vec<Arc<Mutex<NodeInfo>>>, connections: Vec<Arc<Mutex<NodeInfo>>>,
} }