From 5b4bc54c1f634a6960bcf442160fd31ae490af4e Mon Sep 17 00:00:00 2001 From: TheGeneralist <180094941+thegeneralist01@users.noreply.github.com> Date: Sun, 23 Mar 2025 21:21:10 +0100 Subject: [PATCH] use anyhow for errors --- Cargo.lock | 5 +-- Cargo.toml | 1 + src/main.rs | 3 +- src/node_connections.rs | 79 +++++++++++++++++++---------------------- 4 files changed, 43 insertions(+), 45 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ba98de9..2eaee2e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -43,9 +43,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.95" +version = "1.0.97" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34ac096ce696dc2fcabef30516bb13c0a68a11d30131d3df6f04711467681b04" +checksum = "dcfed56ad506cb2c684a14971b8861fdc3baaaae314b9e5f9bb532cbe3ba7a4f" [[package]] name = "atomic-waker" @@ -879,6 +879,7 @@ dependencies = [ name = "p2p-failover" version = "0.1.0" dependencies = [ + "anyhow", "chrono", "dateparser", "notify", diff --git a/Cargo.toml b/Cargo.toml index e4e0890..60f1a90 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,3 +11,4 @@ serde_yaml = "0.9" reqwest = "0.12.12" tokio = { version = "=1.40.0", features = ["full"] } notify = "8.0.0" +anyhow = "1.0.97" diff --git a/src/main.rs b/src/main.rs index 1681190..ecf8505 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,4 @@ +use anyhow::Result; use p2p_failover::{file_watcher, node::Node, parser::Parser, tcp_listener}; use std::{ fs::File, @@ -6,7 +7,7 @@ use std::{ }; #[tokio::main] -async fn main() -> Result<(), Box> { +async fn main() -> Result<()> { let config_path = std::env::var("P2P_CONFIG_PATH").unwrap_or_else(|_| "p2p-failover.config.yaml".to_string()); diff --git a/src/node_connections.rs b/src/node_connections.rs index 4776002..32d6562 100644 --- a/src/node_connections.rs +++ b/src/node_connections.rs @@ -1,3 +1,4 @@ +use anyhow::{bail, Result}; use std::{ io::{BufRead, BufReader}, sync::mpsc, @@ -25,7 +26,7 @@ pub struct NodeInfo { pub target: String, pub port: u32, pub preference: u8, - stream: Option, + streamp: Option, } impl NodeInfo { @@ -34,24 +35,24 @@ impl NodeInfo { target: String, port: u32, preference: u8, - stream: Option, + streamp: Option, ) -> NodeInfo { NodeInfo { target_name, target, port, preference, - stream, + streamp, } } pub fn update_config( &mut self, config_self_mutex: Arc>, - ) -> Result<(), Box> { - if let Some(ref mut stream) = self.stream { + ) -> Result<()> { + if let Some(ref mut streamp) = self.streamp { let (tx, rx) = mpsc::channel(); - let read_stream = stream.try_clone().unwrap(); + let read_stream = streamp.try_clone().unwrap(); thread::spawn(move || { let mut reader = BufReader::new(read_stream); @@ -76,7 +77,7 @@ impl NodeInfo { // tx.send(response).unwrap_or_default(); }); - stream.write_all(b"GET CONFIG\n")?; + streamp.write_all(b"GET CONFIG\n")?; let s = match rx.recv_timeout(Duration::from_secs(2)) { Ok(response) => { @@ -90,17 +91,14 @@ impl NodeInfo { }; if s.is_empty() { debug!("Empty response: {:?}", s); - return Err(Box::new(std::io::Error::new( - std::io::ErrorKind::Other, - "No response", - ))); + bail!("No response"); } let cfg: Config = match serde_yaml::from_str(&s) { Ok(cfg) => cfg, Err(e) => { debug!("Error parsing config: {:?}", e); - return Err(Box::new(e)); + bail!(e); } }; @@ -141,11 +139,8 @@ impl NodeInfo { return Ok(()); } - debug!("No stream for {}", self.target_name); - Err(Box::new(std::io::Error::new( - std::io::ErrorKind::Other, - "No stream", - ))) + debug!("No streamp for {}", self.target_name); + bail!("No streamp"); } } @@ -170,7 +165,7 @@ impl NodeConnections { pub fn get_node_connection(&self, node_name: String) -> Option>> { for connection in &self.connections { let conn = connection.lock().unwrap(); - if conn.target_name == node_name && conn.stream.is_some() { + if conn.target_name == node_name && conn.streamp.is_some() { return Some(connection.clone()); } } @@ -207,19 +202,19 @@ impl NodeConnections { let connection = connection.unwrap(); let connection_guard = connection.lock().unwrap(); - if connection_guard.stream.is_none() { + if connection_guard.streamp.is_none() { return false; } - let mut stream = connection_guard - .stream + let mut streamp = connection_guard + .streamp .as_ref() .unwrap() .try_clone() .unwrap(); let (tx, rx) = mpsc::channel(); - let read_stream = stream.try_clone().unwrap(); + let read_stream = streamp.try_clone().unwrap(); thread::spawn(move || { let mut reader = BufReader::new(read_stream); @@ -237,9 +232,9 @@ impl NodeConnections { }); // Write PING - let _ = stream.write_all(b"PING\n"); + let _ = streamp.write_all(b"PING\n"); - let _ = stream.flush(); + let _ = streamp.flush(); let reply = rx.recv_timeout(Duration::from_secs(2)).unwrap_or_default(); if reply == -1 { @@ -250,13 +245,13 @@ impl NodeConnections { pub fn create_node_connection(&mut self, node: &ProviderNode) -> Option>> { // TODO: DDNS - let stream = TcpStream::connect_timeout( + let streamp = TcpStream::connect_timeout( &std::net::SocketAddr::new(node.ip.clone().parse().unwrap(), node.port as u16), Duration::from_millis(500), ); - match stream { - Ok(stream) => { + match streamp { + Ok(streamp) => { let connection = Arc::new(Mutex::new(NodeInfo::new( node.name.clone(), if node.preference == 0 { @@ -266,7 +261,7 @@ impl NodeConnections { }, node.port, node.preference, - Some(stream), + Some(streamp), ))); self.connections.push(connection.clone()); @@ -275,7 +270,7 @@ impl NodeConnections { Err(error) => { if error.kind() != std::io::ErrorKind::ConnectionRefused { - log!("-> Problem creating the stream: {:?}", error); + log!("-> Problem creating the streamp: {:?}", error); } None } @@ -295,17 +290,17 @@ impl NodeConnections { pub fn confirm(&mut self, source: &str, is_ip: bool) -> Option { for connection in &self.connections { let conn = connection.lock().unwrap(); - if conn.stream.is_none() { + if conn.streamp.is_none() { continue; } - let mut stream = conn.stream.as_ref().unwrap(); - stream + let mut streamp = conn.streamp.as_ref().unwrap(); + streamp .write_all(format!("CONFIRM:{}:{}\n", is_ip as u8, source).as_bytes()) .unwrap(); - let reader = BufReader::new(stream); - // let mut writer = &stream; + let reader = BufReader::new(streamp); + // let mut writer = &streamp; let sis_ip = is_ip.to_string(); @@ -355,14 +350,14 @@ impl NodeConnections { ) -> Option { for connection in &self.connections { let conn = connection.lock().unwrap(); - if conn.stream.is_none() || conn.target_name != target_name { + if conn.streamp.is_none() || conn.target_name != target_name { continue; } - let mut stream = conn.stream.as_ref().unwrap(); - stream.write_all(b"GET CONFIG\n").unwrap(); + let mut streamp = conn.streamp.as_ref().unwrap(); + streamp.write_all(b"GET CONFIG\n").unwrap(); - let reader = BufReader::new(stream); + let reader = BufReader::new(streamp); for line in reader.lines() { if line.is_err() { @@ -375,7 +370,7 @@ impl NodeConnections { let cfg = match parser.parse(None) { Ok(cfg) => cfg, Err(_) => { - stream.write_all(b"AUTH FAIL: BAD CONFIG\n").unwrap(); + streamp.write_all(b"AUTH FAIL: BAD CONFIG\n").unwrap(); continue; } }; @@ -393,12 +388,12 @@ impl NodeConnections { fn is_connection_alive(connection: Arc>) -> bool { let connection_guard = connection.lock().unwrap(); - if connection_guard.stream.is_none() { + if connection_guard.streamp.is_none() { return false; } - let mut stream = connection_guard.stream.as_ref().unwrap(); - match stream.write(&[]) { + let mut streamp = connection_guard.streamp.as_ref().unwrap(); + match streamp.write(&[]) { Ok(_) => true, Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => true, Err(e) if e.kind() == std::io::ErrorKind::BrokenPipe => false,