Created
August 29, 2013 16:37
-
-
Save txus/6380457 to your computer and use it in GitHub Desktop.
Broadcast TCP traffic. Spike
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'celluloid/io' | |
module Duplex | |
class Server | |
include Celluloid::IO | |
finalizer :finalize | |
def initialize(name, host, port) | |
@name = name | |
puts "[#{name}] *** Starting server on #{host}:#{port}" | |
@server = TCPServer.new(host, port) | |
async.run | |
end | |
def finalize | |
@server.close if @server | |
end | |
def run | |
loop { async.handle_connection @server.accept } | |
end | |
def handle_connection(socket) | |
_, port, host = socket.peeraddr | |
puts "[#{name}] *** Received connection from #{host}:#{port}" | |
reply(socket) | |
rescue EOFError | |
puts "[#{name}] *** #{host}:#{port} disconnected" | |
socket.close | |
end | |
private | |
attr_reader :name | |
end | |
class Proxy < Server | |
def initialize(host, port, *subscribers) | |
@subscribers = subscribers | |
super("PROXY", host, port) | |
end | |
def reply(socket) | |
sockets = subscribers.map { |subscriber| | |
host, port = subscriber | |
TCPSocket.new(host, port) | |
} | |
loop do | |
content = socket.readpartial(4096) | |
sockets.each do |subscriber| | |
subscriber.write(content) | |
end | |
end | |
end | |
private | |
attr_reader :subscribers | |
end | |
class Backend < Server | |
def reply(socket) | |
puts "[#{name}] RECEIVED:" | |
loop { puts socket.readpartial(4096) } | |
end | |
end | |
end | |
production = Duplex::Backend.supervise(:PRODUCTION, "127.0.0.1", 1234) | |
test = Duplex::Backend.supervise(:TEST, "127.0.0.1", 1235) | |
proxy = Duplex::Proxy.supervise("127.0.0.1", 2000, | |
["127.0.0.1", 1234], | |
["127.0.0.1", 1235]) | |
trap("INT") { production.terminate; exit } | |
trap("INT") { test.terminate; exit } | |
trap("INT") { proxy.terminate; exit } | |
sleep |
Hehe "production" and "test" here are just stubs though. In the real world, only Duplex::Proxy would exist -- that's why it takes host and port directly, because it would broadcast to other machines. I have some other features planned!! I should write them in a Readme and push it soon.
@txus once you have something more solid, post to the ML.
I'd like to see where this heads and what we can change in celluloid to make this more of a contained unit.
The reverse path is missing, though I guess this is a PoC.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
To make it extra awesome, you could create a supervision group for each one so it creates a pool using all the cores on the machine :) - eat that, node!