use anyhow::{bail, Result}; use std::{ io::{BufRead, BufReader}, sync::mpsc, thread, time::Duration, }; #[allow(unused_imports)] use std::{ io::{Read, Write}, net::TcpStream, sync::{Arc, Mutex}, }; use crate::{ config::{Config, ProviderNode}, debug, log, parser::Parser, timestamp::Timestamp, }; #[derive(Debug)] pub struct NodeInfo { pub target_name: String, /// Either the IP or the DDNS pub target: String, pub port: u32, pub preference: u8, streamp: Option, } impl NodeInfo { pub fn new( target_name: String, target: String, port: u32, preference: u8, streamp: Option, ) -> NodeInfo { NodeInfo { target_name, target, port, preference, streamp, } } pub fn update_config( &mut self, config_self_mutex: Arc>, ) -> Result<()> { if let Some(ref mut streamp) = self.streamp { let (tx, rx) = mpsc::channel(); let read_stream = streamp.try_clone().unwrap(); thread::spawn(move || { let mut reader = BufReader::new(read_stream); let mut response = String::new(); match reader.read_line(&mut response) { Ok(_) => { debug!("Inside: Received response: {:?}", response); tx.send(response.trim().to_string()).unwrap_or_default(); } Err(e) => { debug!("Error reading response: {:?}", e); tx.send(String::new()).unwrap_or_default(); } } // let mut response = String::new(); // for line in reader.lines().map_while(Result::ok) { // if line == "DONE" { // break; // } // response.push_str(&line); // } // tx.send(response).unwrap_or_default(); }); streamp.write_all(b"GET CONFIG\n")?; let s = match rx.recv_timeout(Duration::from_secs(2)) { Ok(response) => { debug!("Received response: {:?}", response); response.replace("\\n", "\n") } Err(e) => { debug!("Timeout waiting for response: {:?}", e); String::new() } }; if s.is_empty() { debug!("Empty response: {:?}", s); bail!("No response"); } let cfg: Config = match serde_yaml::from_str(&s) { Ok(cfg) => cfg, Err(e) => { debug!("Error parsing config: {:?}", e); bail!(e); } }; let mut config_self = config_self_mutex.lock().unwrap(); if config_self.config_metadata.last_updated > cfg.config_metadata.last_updated { debug!("Local config is newer, aborting"); return Ok(()); } // Update the config // Execution instructions config_self.execution.instructions = cfg.execution.instructions; let node_self_name = config_self.config_metadata.name.clone(); // Add new Nodes (that do not exist in our config, but exist in the other config) for ddns in &cfg.ddns { if ddns.name == node_self_name { continue; } if !config_self.ddns.iter().any(|d| d.name == ddns.name) { config_self.ddns.push(ddns.clone()); } } config_self.config_metadata.last_updated = cfg.config_metadata.last_updated.clone(); // Wondering if we should update the last updated config_self .ddns .iter_mut() .find(|d| d.name == node_self_name) .unwrap() .last_updated = Timestamp::now(); config_self.write(); log!("Updated config successfully"); return Ok(()); } debug!("No streamp for {}", self.target_name); bail!("No streamp"); } } #[allow(dead_code)] pub struct NodeConnections { connections: Vec>>, } impl Default for NodeConnections { fn default() -> Self { Self::new() } } impl NodeConnections { pub fn new() -> NodeConnections { NodeConnections { connections: vec![], } } pub fn get_node_connection(&self, node_name: String) -> Option>> { for connection in &self.connections { let conn = connection.lock().unwrap(); if conn.target_name == node_name && conn.streamp.is_some() { return Some(connection.clone()); } } None } pub fn get_alive_connections(&self) -> &Vec>> { &self.connections } pub fn ping(&mut self, ddns: &ProviderNode) -> bool { let target = { if ddns.preference == 0 { ddns.ddns.clone() } else { ddns.ip.clone() } }; let mut connection: Option>> = self.get_node_connection(ddns.name.clone()); if connection.is_none() || (connection.is_some() && !is_connection_alive(connection.clone().unwrap())) { if connection.is_some() { self.remove_node_connection(ddns.name.clone()); } connection = self.create_node_connection(ddns); if connection.is_none() { return false; } } let connection = connection.unwrap(); let connection_guard = connection.lock().unwrap(); if connection_guard.streamp.is_none() { return false; } let mut streamp = connection_guard .streamp .as_ref() .unwrap() .try_clone() .unwrap(); let (tx, rx) = mpsc::channel(); let read_stream = streamp.try_clone().unwrap(); thread::spawn(move || { let mut reader = BufReader::new(read_stream); let mut response = String::new(); match reader.read_line(&mut response) { Ok(_) => { let is_pong = response.trim() == "PONG"; tx.send(is_pong as i8).unwrap_or_default(); } Err(e) => { debug!("Error reading response (in fn `ping`): {:?}", e); tx.send(-1).unwrap_or_default(); } } }); // Write PING let _ = streamp.write_all(b"PING\n"); let _ = streamp.flush(); let reply = rx.recv_timeout(Duration::from_secs(2)).unwrap_or_default(); if reply == -1 { self.remove_node_connection(target.clone()); } reply == 1 } pub fn create_node_connection(&mut self, node: &ProviderNode) -> Option>> { // TODO: DDNS let streamp = TcpStream::connect_timeout( &std::net::SocketAddr::new(node.ip.clone().parse().unwrap(), node.port as u16), Duration::from_millis(500), ); match streamp { Ok(streamp) => { let connection = Arc::new(Mutex::new(NodeInfo::new( node.name.clone(), if node.preference == 0 { node.ddns.clone() } else { node.ip.clone() }, node.port, node.preference, Some(streamp), ))); self.connections.push(connection.clone()); Some(connection.clone()) } Err(error) => { if error.kind() != std::io::ErrorKind::ConnectionRefused { log!("-> Problem creating the streamp: {:?}", error); } None } } } pub fn remove_node_connection(&mut self, target_name: String) { if let Some(pos) = self .connections .iter() .position(|conn| conn.lock().unwrap().target_name == target_name) { self.connections.remove(pos); } } pub fn confirm(&mut self, source: &str, is_ip: bool) -> Option { for connection in &self.connections { let conn = connection.lock().unwrap(); if conn.streamp.is_none() { continue; } let mut streamp = conn.streamp.as_ref().unwrap(); streamp .write_all(format!("CONFIRM:{}:{}\n", is_ip as u8, source).as_bytes()) .unwrap(); let reader = BufReader::new(streamp); // let mut writer = &streamp; let sis_ip = is_ip.to_string(); for line in reader.lines() { if line.is_err() { log!("Error reading line: {:?}", line.err()); return None; }; // Template: CONFIRM:_:_:bool // bool is 0/1 let line = line.unwrap(); let parts: Vec<&str> = line.split(':').collect(); if parts.len() != 4 { log!("Invalid response: {}", line); return None; } if parts[0] != "CONFIRM" { log!("Invalid response: {}", line); return None; } if parts[1] != sis_ip { log!("Invalid response: {}", line); return None; } if parts[2] != source { log!("Invalid response: {}", line); return None; } if parts[3] == "1" { return Some(conn.target_name.clone()); } } } None } pub fn get_config_for( &mut self, source: &str, is_ip: bool, target_name: String, ) -> Option { for connection in &self.connections { let conn = connection.lock().unwrap(); if conn.streamp.is_none() || conn.target_name != target_name { continue; } let mut streamp = conn.streamp.as_ref().unwrap(); streamp.write_all(b"GET CONFIG\n").unwrap(); let reader = BufReader::new(streamp); for line in reader.lines() { if line.is_err() { log!("Error reading line: {:?}", line.err()); continue; }; let line = line.unwrap(); let mut parser = Parser::new(line.as_bytes()); let cfg = match parser.parse(None) { Ok(cfg) => cfg, Err(_) => { streamp.write_all(b"AUTH FAIL: BAD CONFIG\n").unwrap(); continue; } }; if let Some(provider) = cfg.ddns.iter().find(|d| {if is_ip { d.ip.clone() } else { d.ddns.clone() } } == source) { return Some(provider.clone()); } else { return None; } } } None } } fn is_connection_alive(connection: Arc>) -> bool { let connection_guard = connection.lock().unwrap(); if connection_guard.streamp.is_none() { return false; } let mut streamp = connection_guard.streamp.as_ref().unwrap(); match streamp.write(&[]) { Ok(_) => true, Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => true, Err(e) if e.kind() == std::io::ErrorKind::BrokenPipe => false, Err(_) => false, } }