Last active
October 3, 2023 10:14
-
-
Save ExFed/5f061330377551d5c72e6c2cdcf96cc5 to your computer and use it in GitHub Desktop.
Stupid Gossip Network
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env groovy | |
import groovy.transform.* | |
import java.util.concurrent.atomic.* | |
@Immutable(knownImmutableClasses = [InetSocketAddress]) | |
class Peer { | |
private static final SELF_HOSTS = (InetAddress.getAllByName('localhost') as List | |
+ InetAddress.localHost | |
+ InetAddress.loopbackAddress | |
+ InetAddress.getByName('::0') | |
+ InetAddress.getByName('0')).asUnmodifiable() | |
static Peer decode(String str) { | |
// localhost is all that matters | |
return new Peer(new InetSocketAddress('localhost', str as int)) | |
} | |
InetSocketAddress socketAddress | |
String encode() { | |
"$socketAddress.port" // we only care about localhost | |
} | |
boolean equals(o) { | |
if (this === o) return true | |
if (o !instanceof Peer) return false | |
def that = o as Peer | |
if (this.socketAddress.port != that.socketAddress.port) return false | |
if (this.socketAddress in SELF_HOSTS && that.socketAddress in SELF_HOSTS) return true | |
return socketAddress == that.socketAddress | |
} | |
int hashCode() { | |
if (socketAddress in SELF_HOSTS) return socketAddress.port | |
return socketAddress.hashCode() | |
} | |
} | |
@Immutable | |
@ToString(includeNames = true, excludes = 'unbannedPeers') | |
class State { | |
static State init(int timestamp, String... peerStrs) { | |
def peers = peerStrs.collectEntries { | |
[Peer.decode(it), 0] | |
} | |
return new State(timestamp, peers, [] as Set) | |
} | |
static State decode(String str) { | |
def toks = str.split(' ') | |
def timestamp = toks.head() as int | |
def peers = toks.tail().collectEntries { | |
def (addrStr, tsStr) = it.split(';') | |
[Peer.decode(addrStr), tsStr as int] | |
} | |
return new State(timestamp, peers, [] as Set) | |
} | |
int timestamp | |
Map<Peer, Integer> peers | |
Set<Peer> banlist | |
Map<Peer, Integer> getUnbannedPeers() { peers.findAll { it.key !in banlist } } | |
State plus(Map<Peer, Integer> peers) { | |
def pm = this.peers + peers | |
def ts = pm.values().max() | |
return new State(Math.max(timestamp, ts), pm, banlist) | |
} | |
State plus(State that) { | |
def timestamp = Math.max(this.timestamp, that.timestamp) | |
def peers = this.peers + that.peers | |
return new State(timestamp, peers, banlist) | |
} | |
State minus(Peer peer) { | |
return new State(timestamp, peers.findAll { it.key != peer }, banlist) | |
} | |
State next() { | |
return new State(timestamp + 1, peers, banlist) | |
} | |
State ban(Peer peer) { | |
return peer in banlist ? this : new State(timestamp, peers, banlist + peer) - peer | |
} | |
State unban(Peer peer) { | |
return peer in banlist ? new State(timestamp, peers, banlist - peer) : this | |
} | |
String encode() { | |
def peerTokens = unbannedPeers.collect { p, ts -> "${p.encode()};$ts"} | |
return ([timestamp] + peerTokens).join(' ') | |
} | |
} | |
def peer = { int listenPort = 0, initPeers = [] -> | |
def sock = listenPort > 0 ? new DatagramSocket(listenPort) : new DatagramSocket() | |
def self = new Peer(new InetSocketAddress(InetAddress.loopbackAddress, sock.localPort)) | |
def state = new AtomicReference(State.init(0, initPeers)) | |
def log = { msg -> | |
println "$sock.localSocketAddress.port :: $msg" | |
} | |
def send = { String message, SocketAddress... dests -> | |
def buf = message.bytes | |
assert buf.length <= 512 | |
dests.each { dest -> | |
sock.send(new DatagramPacket(buf, buf.length, dest)) | |
} | |
} | |
def broadcast = { String message -> | |
def peerAddrs = state.get().peers.keySet()*.socketAddress | |
send(message, *peerAddrs) | |
} | |
def receive = { int timeout = 0 -> | |
def pack = new DatagramPacket(new byte[512], 512) | |
sock.soTimeout = timeout | |
try { | |
sock.receive(pack) | |
} catch (SocketTimeoutException e) { | |
return null | |
} | |
return [pack.socketAddress, new String(pack.data, 0, pack.length)] | |
} | |
def postReceive = { SocketAddress sender, String msg -> | |
try { | |
def peer = new Peer(sender) | |
def peerState = State.decode(msg) | |
def peerTimestamp = peerState.timestamp | |
def peers = peerState.peers + [(peer): peerTimestamp] | |
peers.remove(self) | |
return state.updateAndGet { (it + peers).unban(peer).next() } | |
} catch (Exception e) { | |
e.printStackTrace(System.err) | |
// remove peer that sent a faulty packet | |
return state.updateAndGet { it.ban(peer) - peer } | |
} | |
} | |
Thread.startDaemon { | |
def grace = 8 | |
def rand = new Random() | |
def calcSleep = { n, base, jitter -> n * base + jitter - rand.nextInt(jitter * 2) } | |
def cleanPeers = { State currState -> | |
def threshold = currState.timestamp - grace * (1 + currState.peers.size()) | |
def delinquent = currState.unbannedPeers.findAll { k, v -> v < threshold }.keySet() | |
return delinquent.inject(currState) { acc, p -> acc.ban(p) } | |
} | |
while (true) { | |
def currState = state.getAndUpdate(cleanPeers) | |
broadcast(currState.encode()) | |
Thread.sleep(calcSleep(currState.peers.size() + 1, 500, 250)) | |
} | |
} | |
def currState = state.get() | |
log(currState.encode()) | |
while (currState = receive()?.with(postReceive)) { | |
log(currState.encode()) | |
} | |
} | |
if (args) { | |
peer(args.head() as int, args.tail()) | |
} else { | |
peer() | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/sh | |
trap "trap - SIGTERM && kill -- -$$" SIGINT SIGTERM EXIT | |
for N in $(seq $(($1 + 1)) $2) | |
do | |
groovy gossip.groovy $N $1 > gossip.$N.out & | |
done | |
groovy gossip.groovy $1 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment