Skip to content

Instantly share code, notes, and snippets.

@threez
Created September 6, 2013 14:11

Revisions

  1. threez created this gist Sep 6, 2013.
    124 changes: 124 additions & 0 deletions pipe-rpc.rb
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,124 @@
    class Message < Struct.new(:type, :payload)
    REQUEST = ?<.freeze
    RESPONSE = ?>.freeze
    HEADER = 'NA'.freeze
    CODER = Marshal

    def self.request(payload)
    new(REQUEST, payload)
    end

    def self.response(payload)
    new(RESPONSE, payload)
    end

    def self.from_wire(io, expected_type)
    size, type = io.read(5).unpack(HEADER)
    unless type == expected_type
    type_name = (type == REQUEST) ? :request : :response
    raise ArgumentError, "Response is not of type #{type_name}!"
    end
    payload = CODER.load(io.read(size))
    new(type, payload)
    end

    def to_wire(io)
    data = CODER.dump(payload)
    io.write([data.size, type].pack(HEADER) + data)
    end
    end

    class Protocol
    def initialize(client, server, message_type = Message)
    @message_type = message_type
    @server_read, @client_write = IO.pipe
    @client_read, @server_write = IO.pipe
    end

    def send(*arguments)
    @message_type.request(arguments).to_wire(@client_write)
    @message_type.from_wire(@client_read, @message_type::RESPONSE).payload
    end

    def listen(object)
    message = @message_type.from_wire(@server_read, @message_type::REQUEST)
    result = object.send(*message.payload)
    @message_type.response(result).to_wire(@server_write)
    end

    # close all server streams
    def client!
    @server_write.close
    @server_read.close
    end

    # close all client streams
    def server!
    @client_write.close
    @client_read.close
    end
    end

    class Client
    def initialize(server)
    @protocol = Protocol.new(self, server)
    server.connected(@protocol)
    end

    def connect!
    @protocol.client!
    end

    def method_missing(*args)
    @protocol.send(*args)
    end
    end

    class Server
    def connected(protocol)
    @protocol = protocol
    end

    def run
    @protocol.server!
    loop do
    @protocol.listen(self)
    end
    end
    end

    #
    # Example
    #
    require 'redis'

    class CacheServer < Server
    def initialize
    @redis = Redis.new path: "redis.sock"
    end

    def store(name, value)
    @redis.set(name, value)
    end

    def fetch(name)
    @redis.get(name)
    end
    end

    server = CacheServer.new
    client = Client.new(server)

    fork { server.run }

    client.connect!
    begin
    print "> "

    case cmd = gets.chomp
    when /s ([^ ]+) (.*)/ # store
    client.store($1, $2)
    when /f ([^ ]+)/ # fetch
    puts client.fetch($1)
    end
    end while !STDIN.eof?