Skip to content

Instantly share code, notes, and snippets.

@bufrr
Created August 21, 2024 10:11
Show Gist options
  • Save bufrr/6e555c72f0f5793ad9df98b93634818f to your computer and use it in GitHub Desktop.
Save bufrr/6e555c72f0f5793ad9df98b93634818f to your computer and use it in GitHub Desktop.
use std::collections::HashMap;
use std::time::{SystemTime, UNIX_EPOCH};
#[derive(Debug, Clone)]
struct Transaction {
vector_clock: HashMap<String, u64>,
timestamp: f64,
content: String,
}
#[derive(Debug)]
struct Node {
id: String,
vector_clock: HashMap<String, u64>,
transactions: Vec<Transaction>,
}
impl Node {
fn new(id: String) -> Self {
Node {
id,
vector_clock: HashMap::new(),
transactions: Vec::new(),
}
}
fn update_vector_clock(&mut self, other_clock: &HashMap<String, u64>) {
for (node, &count) in other_clock {
let entry = self.vector_clock.entry(node.clone()).or_insert(0);
*entry = (*entry).max(count);
}
*self.vector_clock.entry(self.id.clone()).or_insert(0) += 1;
}
fn record_transaction(&mut self, content: String) {
let unix_timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_secs_f64();
self.update_vector_clock(&HashMap::new());
self.transactions.push(Transaction {
vector_clock: self.vector_clock.clone(),
timestamp: unix_timestamp,
content,
});
}
fn receive_transaction(&mut self, sender_clock: HashMap<String, u64>, unix_timestamp: f64, content: String) {
self.update_vector_clock(&sender_clock);
self.transactions.push(Transaction {
vector_clock: self.vector_clock.clone(),
timestamp: unix_timestamp,
content,
});
}
}
fn compare_vector_clocks(a: &HashMap<String, u64>, b: &HashMap<String, u64>) -> Option<std::cmp::Ordering> {
if a == b {
return Some(std::cmp::Ordering::Equal);
}
if a.iter().all(|(k, &v)| v <= *b.get(k).unwrap_or(&0)) {
return Some(std::cmp::Ordering::Less);
}
if a.iter().all(|(k, &v)| v >= *b.get(k).unwrap_or(&0)) {
return Some(std::cmp::Ordering::Greater);
}
None // Concurrent events
}
fn sequence_transactions(nodes: &[Node]) -> Vec<Transaction> {
let mut all_transactions: Vec<Transaction> = nodes
.iter()
.flat_map(|node| node.transactions.clone())
.collect();
all_transactions.sort_by(|a, b| {
let sum_a: u64 = a.vector_clock.values().sum();
let sum_b: u64 = b.vector_clock.values().sum();
sum_a.cmp(&sum_b)
.then_with(|| a.timestamp.partial_cmp(&b.timestamp).unwrap())
});
let mut final_sequence: Vec<Transaction> = Vec::new();
for transaction in all_transactions {
if final_sequence.is_empty() ||
compare_vector_clocks(&final_sequence.last().unwrap().vector_clock, &transaction.vector_clock) != Some(std::cmp::Ordering::Equal) {
final_sequence.push(transaction);
} else if transaction.timestamp < final_sequence.last().unwrap().timestamp {
*final_sequence.last_mut().unwrap() = transaction;
}
}
final_sequence
}
fn main() {
let mut node1 = Node::new("A".to_string());
let mut node2 = Node::new("B".to_string());
let mut node3 = Node::new("C".to_string());
node1.record_transaction("T1 from A".to_string());
std::thread::sleep(std::time::Duration::from_millis(100));
node2.record_transaction("T2 from B".to_string());
std::thread::sleep(std::time::Duration::from_millis(100));
node3.record_transaction("T3 from C".to_string());
std::thread::sleep(std::time::Duration::from_millis(100));
let t2_from_b = node2.transactions.last().unwrap().clone();
node1.receive_transaction(t2_from_b.vector_clock, t2_from_b.timestamp, t2_from_b.content);
node1.record_transaction("T4 from A".to_string());
let nodes = vec![node1, node2, node3];
let sequenced_transactions = sequence_transactions(&nodes);
println!("Sequenced Transactions:");
for t in sequenced_transactions {
println!("Vector Clock: {:?}, Timestamp: {}, Transaction: {}", t.vector_clock, t.timestamp, t.content);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment