1
Fork 0
mirror of https://github.com/thegeneralist01/p2p-failover synced 2026-03-07 12:29:54 +01:00

keep IP, remove DDNS wording

This commit is contained in:
TheGeneralist 2025-04-20 13:28:54 +02:00
parent 2a87dabc55
commit 4308950b01
No known key found for this signature in database
GPG key ID: C391D4D52D630F45
7 changed files with 56 additions and 91 deletions

View file

@ -27,19 +27,15 @@ cargo build --release
Create a `p2p-failover.config.yaml` file in your project directory. Here's an example configuration: Create a `p2p-failover.config.yaml` file in your project directory. Here's an example configuration:
```yaml ```yaml
ddns: nodes:
- name: pc - name: pc
ddns: ''
ip: 127.0.0.1 ip: 127.0.0.1
port: 8080 port: 8080
preference: 1
priority: 100 priority: 100
last_updated: 2025-02-04 19:19:18 UTC last_updated: 2025-02-04 19:19:18 UTC
- name: phone - name: phone
ddns: ''
ip: 100.11.111.111 ip: 100.11.111.111
port: 8081 port: 8081
preference: 1
priority: 20 priority: 20
last_updated: 2025-01-09 16:45:00 UTC last_updated: 2025-01-09 16:45:00 UTC
config_metadata: config_metadata:
@ -52,12 +48,10 @@ execution:
### Configuration Fields ### Configuration Fields
- `ddns`: List of nodes in the network - `nodes`: List of nodes in the network
- `name`: Unique identifier for the node - `name`: Unique identifier for the node
- `ddns`: Domain name (optional)
- `ip`: IP address - `ip`: IP address
- `port`: TCP port for node communication - `port`: TCP port for node communication
- `preference`: Connection preference (0 for DDNS, 1 for IP)
- `priority`: Node priority (higher number = higher priority) - `priority`: Node priority (higher number = higher priority)
- `last_updated`: Timestamp of last update - `last_updated`: Timestamp of last update
- `config_metadata`: Node-specific metadata - `config_metadata`: Node-specific metadata

View file

@ -1,16 +1,12 @@
ddns: nodes:
- name: pc - name: pc
ddns: ''
ip: 127.0.0.1 ip: 127.0.0.1
port: 8080 port: 8080
preference: 1
priority: 100 priority: 100
last_updated: 2025-02-04 19:19:18 UTC last_updated: 2025-02-04 19:19:18 UTC
- name: phone - name: phone
ddns: ''
ip: 100.11.111.111 ip: 100.11.111.111
port: 8081 port: 8081
preference: 1
priority: 20 priority: 20
last_updated: 2025-01-09 16:45:00 UTC last_updated: 2025-01-09 16:45:00 UTC
config_metadata: config_metadata:

View file

@ -5,10 +5,8 @@ use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize, Clone)] #[derive(Debug, Serialize, Deserialize, Clone)]
pub struct ProviderNode { pub struct ProviderNode {
pub name: String, pub name: String,
pub ddns: String,
pub ip: String, pub ip: String,
pub port: u32, pub port: u32,
pub preference: u8,
pub priority: u32, pub priority: u32,
pub last_updated: Timestamp, pub last_updated: Timestamp,
} }
@ -27,7 +25,7 @@ pub struct ExecutionInstructions {
#[derive(Debug, Serialize, Deserialize, Clone)] #[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Config { pub struct Config {
pub ddns: Vec<ProviderNode>, pub nodes: Vec<ProviderNode>,
pub config_metadata: ConfigMetadata, pub config_metadata: ConfigMetadata,
pub execution: ExecutionInstructions, pub execution: ExecutionInstructions,
} }

View file

@ -14,7 +14,7 @@ pub struct Node {
impl Node { impl Node {
pub fn new(config: Arc<Mutex<Config>>) -> Node { pub fn new(config: Arc<Mutex<Config>>) -> Node {
let alives = vec![false; config.lock().unwrap().ddns.len()]; let alives = vec![false; config.lock().unwrap().nodes.len()];
Node { Node {
alive: false, alive: false,
@ -31,7 +31,7 @@ impl Node {
let alives = Arc::new(Mutex::new(0u8)); let alives = Arc::new(Mutex::new(0u8));
let mut handles = Vec::new(); let mut handles = Vec::new();
for (index, host) in self.config.lock().unwrap().ddns.iter().enumerate() { for (index, host) in self.config.lock().unwrap().nodes.iter().enumerate() {
if host.name == config_metadata_name { if host.name == config_metadata_name {
continue; continue;
} }
@ -47,11 +47,7 @@ impl Node {
log!( log!(
"Checking: {}:{}", "Checking: {}:{}",
if host_clone.preference == 0 { &host_clone.ip,
&host_clone.ddns
} else {
&host_clone.ip
},
&host_clone.port &host_clone.port
); );
@ -101,16 +97,16 @@ impl Node {
&& (alives == 0 || { && (alives == 0 || {
// There are nodes alive with less priority // There are nodes alive with less priority
let config_guard = self.config.lock().unwrap(); let config_guard = self.config.lock().unwrap();
assert!(config_guard.ddns.len() == self.alives.len()); assert!(config_guard.nodes.len() == self.alives.len());
let local_priority = config_guard let local_priority = config_guard
.ddns .nodes
.iter() .iter()
.find(|d| d.name == config_guard.config_metadata.name) .find(|d| d.name == config_guard.config_metadata.name)
.map(|d| d.priority) .map(|d| d.priority)
.unwrap_or(0); .unwrap_or(0);
!config_guard !config_guard
.ddns .nodes
.iter() .iter()
.zip(self.alives.iter()) .zip(self.alives.iter())
.any(|(host, &alive)| alive && host.priority > local_priority) .any(|(host, &alive)| alive && host.priority > local_priority)
@ -124,7 +120,7 @@ impl Node {
// First check configs and then kill or otherwise? // First check configs and then kill or otherwise?
let config_guard = self.config.lock().unwrap(); let config_guard = self.config.lock().unwrap();
let local_priority = config_guard let local_priority = config_guard
.ddns .nodes
.iter() .iter()
.find(|d| d.name == config_guard.config_metadata.name) .find(|d| d.name == config_guard.config_metadata.name)
.map(|d| d.priority) .map(|d| d.priority)
@ -132,7 +128,7 @@ impl Node {
if self.process.is_some() if self.process.is_some()
&& config_guard && config_guard
.ddns .nodes
.iter() .iter()
.any(|d| d.priority > local_priority) .any(|d| d.priority > local_priority)
{ {

View file

@ -22,10 +22,8 @@ use crate::{
#[derive(Debug)] #[derive(Debug)]
pub struct NodeInfo { pub struct NodeInfo {
pub target_name: String, pub target_name: String,
/// Either the IP or the DDNS
pub target: String, pub target: String,
pub port: u32, pub port: u32,
pub preference: u8,
stream: Option<TcpStream>, stream: Option<TcpStream>,
} }
@ -34,22 +32,20 @@ impl NodeInfo {
target_name: String, target_name: String,
target: String, target: String,
port: u32, port: u32,
preference: u8, stream: Option<TcpStream>,
streamp: Option<TcpStream>,
) -> NodeInfo { ) -> NodeInfo {
NodeInfo { NodeInfo {
target_name, target_name,
target, target,
port, port,
preference, stream,
stream: streamp,
} }
} }
pub fn update_config(&mut self, config_self_mutex: Arc<Mutex<Config>>) -> Result<()> { pub fn update_config(&mut self, config_self_mutex: Arc<Mutex<Config>>) -> Result<()> {
if let Some(ref mut streamp) = self.stream { if let Some(ref mut stream) = self.stream {
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
let read_stream = streamp.try_clone().unwrap(); let read_stream = stream.try_clone().unwrap();
thread::spawn(move || { thread::spawn(move || {
let mut reader = BufReader::new(read_stream); let mut reader = BufReader::new(read_stream);
@ -66,7 +62,7 @@ impl NodeInfo {
} }
}); });
streamp.write_all(b"GET CONFIG\n")?; stream.write_all(b"GET CONFIG\n")?;
let s = match rx.recv_timeout(Duration::from_secs(2)) { let s = match rx.recv_timeout(Duration::from_secs(2)) {
Ok(response) => { Ok(response) => {
@ -104,19 +100,19 @@ impl NodeInfo {
let node_self_name = config_self.config_metadata.name.clone(); let node_self_name = config_self.config_metadata.name.clone();
// Add new Nodes (that do not exist in our config, but exist in the other config) // Add new Nodes (that do not exist in our config, but exist in the other config)
for ddns in &cfg.ddns { for node in &cfg.nodes {
if ddns.name == node_self_name { if node.name == node_self_name {
continue; continue;
} }
if !config_self.ddns.iter().any(|d| d.name == ddns.name) { if !config_self.nodes.iter().any(|d| d.name == node.name) {
config_self.ddns.push(ddns.clone()); config_self.nodes.push(node.clone());
} }
} }
config_self.config_metadata.last_updated = cfg.config_metadata.last_updated.clone(); config_self.config_metadata.last_updated = cfg.config_metadata.last_updated.clone();
// Wondering if we should update the last updated // Wondering if we should update the last updated
config_self config_self
.ddns .nodes
.iter_mut() .iter_mut()
.find(|d| d.name == node_self_name) .find(|d| d.name == node_self_name)
.unwrap() .unwrap()
@ -128,8 +124,8 @@ impl NodeInfo {
return Ok(()); return Ok(());
} }
debug!("No streamp for {}", self.target_name); debug!("No stream for {}", self.target_name);
bail!("No streamp"); bail!("No stream");
} }
} }
@ -165,24 +161,18 @@ impl NodeConnections {
&self.connections &self.connections
} }
pub fn ping(&mut self, ddns: &ProviderNode) -> bool { pub fn ping(&mut self, node: &ProviderNode) -> bool {
let target = { let target = node.ip.clone();
if ddns.preference == 0 {
ddns.ddns.clone()
} else {
ddns.ip.clone()
}
};
let mut connection: Option<Arc<Mutex<NodeInfo>>> = let mut connection: Option<Arc<Mutex<NodeInfo>>> =
self.get_node_connection(ddns.name.clone()); self.get_node_connection(node.name.clone());
if connection.is_none() if connection.is_none()
|| (connection.is_some() && !is_connection_alive(connection.clone().unwrap())) || (connection.is_some() && !is_connection_alive(connection.clone().unwrap()))
{ {
if connection.is_some() { if connection.is_some() {
self.remove_node_connection(ddns.name.clone()); self.remove_node_connection(node.name.clone());
} }
connection = self.create_node_connection(ddns); connection = self.create_node_connection(node);
if connection.is_none() { if connection.is_none() {
return false; return false;
} }
@ -195,7 +185,7 @@ impl NodeConnections {
return false; return false;
} }
let mut streamp = connection_guard let mut stream = connection_guard
.stream .stream
.as_ref() .as_ref()
.unwrap() .unwrap()
@ -203,7 +193,7 @@ impl NodeConnections {
.unwrap(); .unwrap();
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
let read_stream = streamp.try_clone().unwrap(); let read_stream = stream.try_clone().unwrap();
thread::spawn(move || { thread::spawn(move || {
let mut reader = BufReader::new(read_stream); let mut reader = BufReader::new(read_stream);
@ -221,9 +211,9 @@ impl NodeConnections {
}); });
// Write PING // Write PING
let _ = streamp.write_all(b"PING\n"); let _ = stream.write_all(b"PING\n");
let _ = streamp.flush(); let _ = stream.flush();
let reply = rx.recv_timeout(Duration::from_secs(2)).unwrap_or_default(); let reply = rx.recv_timeout(Duration::from_secs(2)).unwrap_or_default();
if reply == -1 { if reply == -1 {
@ -233,24 +223,18 @@ impl NodeConnections {
} }
pub fn create_node_connection(&mut self, node: &ProviderNode) -> Option<Arc<Mutex<NodeInfo>>> { pub fn create_node_connection(&mut self, node: &ProviderNode) -> Option<Arc<Mutex<NodeInfo>>> {
// TODO: DDNS let stream = TcpStream::connect_timeout(
let streamp = TcpStream::connect_timeout(
&std::net::SocketAddr::new(node.ip.clone().parse().unwrap(), node.port as u16), &std::net::SocketAddr::new(node.ip.clone().parse().unwrap(), node.port as u16),
Duration::from_millis(500), Duration::from_millis(500),
); );
match streamp { match stream {
Ok(streamp) => { Ok(stream) => {
let connection = Arc::new(Mutex::new(NodeInfo::new( let connection = Arc::new(Mutex::new(NodeInfo::new(
node.name.clone(), node.name.clone(),
if node.preference == 0 { node.ip.clone(),
node.ddns.clone()
} else {
node.ip.clone()
},
node.port, node.port,
node.preference, Some(stream),
Some(streamp),
))); )));
self.connections.push(connection.clone()); self.connections.push(connection.clone());
@ -259,7 +243,7 @@ impl NodeConnections {
Err(error) => { Err(error) => {
if error.kind() != std::io::ErrorKind::ConnectionRefused { if error.kind() != std::io::ErrorKind::ConnectionRefused {
log!("-> Problem creating the streamp: {:?}", error); log!("-> Problem creating the stream: {:?}", error);
} }
None None
} }
@ -283,12 +267,12 @@ impl NodeConnections {
continue; continue;
} }
let mut streamp = conn.stream.as_ref().unwrap(); let mut stream = conn.stream.as_ref().unwrap();
streamp stream
.write_all(format!("CONFIRM:{}:{}\n", is_ip as u8, source).as_bytes()) .write_all(format!("CONFIRM:{}:{}\n", is_ip as u8, source).as_bytes())
.unwrap(); .unwrap();
let reader = BufReader::new(streamp); let reader = BufReader::new(stream);
let sis_ip = is_ip.to_string(); let sis_ip = is_ip.to_string();
@ -333,7 +317,6 @@ impl NodeConnections {
pub fn get_config_for( pub fn get_config_for(
&mut self, &mut self,
source: &str, source: &str,
is_ip: bool,
target_name: String, target_name: String,
) -> Option<ProviderNode> { ) -> Option<ProviderNode> {
for connection in &self.connections { for connection in &self.connections {
@ -342,10 +325,10 @@ impl NodeConnections {
continue; continue;
} }
let mut streamp = conn.stream.as_ref().unwrap(); let mut stream = conn.stream.as_ref().unwrap();
streamp.write_all(b"GET CONFIG\n").unwrap(); stream.write_all(b"GET CONFIG\n").unwrap();
let reader = BufReader::new(streamp); let reader = BufReader::new(stream);
for line in reader.lines() { for line in reader.lines() {
if line.is_err() { if line.is_err() {
@ -358,12 +341,12 @@ impl NodeConnections {
let cfg = match parser.parse(None) { let cfg = match parser.parse(None) {
Ok(cfg) => cfg, Ok(cfg) => cfg,
Err(_) => { Err(_) => {
streamp.write_all(b"AUTH FAIL: BAD CONFIG\n").unwrap(); stream.write_all(b"AUTH FAIL: BAD CONFIG\n").unwrap();
continue; continue;
} }
}; };
if let Some(provider) = cfg.ddns.iter().find(|d| {if is_ip { d.ip.clone() } else { d.ddns.clone() } } == source) { if let Some(provider) = cfg.nodes.iter().find(|d| d.ip.clone() == source) {
return Some(provider.clone()); return Some(provider.clone());
} else { } else {
return None; return None;
@ -380,8 +363,8 @@ fn is_connection_alive(connection: Arc<Mutex<NodeInfo>>) -> bool {
return false; return false;
} }
let mut streamp = connection_guard.stream.as_ref().unwrap(); let mut stream = connection_guard.stream.as_ref().unwrap();
match streamp.write(&[]) { match stream.write(&[]) {
Ok(_) => true, Ok(_) => true,
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => true, Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => true,
Err(e) if e.kind() == std::io::ErrorKind::BrokenPipe => false, Err(e) if e.kind() == std::io::ErrorKind::BrokenPipe => false,

View file

@ -39,12 +39,10 @@ mod tests {
#[test] #[test]
fn test_parser() { fn test_parser() {
let yaml = r#" let yaml = r#"
ddns: nodes:
- name: test - name: test
ddns: ''
ip: 127.0.0.1 ip: 127.0.0.1
port: 8080 port: 8080
preference: 1
priority: 100 priority: 100
last_updated: 2024-03-20 00:00:00 UTC last_updated: 2024-03-20 00:00:00 UTC
config_metadata: config_metadata:
@ -59,10 +57,10 @@ execution:
assert!(result.is_ok()); assert!(result.is_ok());
let config = result.unwrap(); let config = result.unwrap();
assert_eq!(config.ddns.len(), 1); assert_eq!(config.nodes.len(), 1);
assert_eq!(config.ddns[0].ip, "127.0.0.1"); assert_eq!(config.nodes[0].ip, "127.0.0.1");
assert_eq!(config.ddns[0].priority, 100); assert_eq!(config.nodes[0].priority, 100);
assert_eq!(config.ddns[0].name, "test"); assert_eq!(config.nodes[0].name, "test");
assert_eq!(config.ddns[0].name, config.config_metadata.name); assert_eq!(config.nodes[0].name, config.config_metadata.name);
} }
} }

View file

@ -12,7 +12,7 @@ pub fn start_tcp_listener(config: Arc<Mutex<Config>>, config_string: Arc<Mutex<S
let port = { let port = {
let cfg = config.lock().unwrap(); let cfg = config.lock().unwrap();
let self_name = &cfg.config_metadata.name; let self_name = &cfg.config_metadata.name;
cfg.ddns.iter().find(|d| d.name == *self_name).unwrap().port cfg.nodes.iter().find(|d| d.name == *self_name).unwrap().port
}; };
let listener = match TcpListener::bind(format!("0.0.0.0:{}", port)) { let listener = match TcpListener::bind(format!("0.0.0.0:{}", port)) {