This is code associated with the blog post ...
Last active
February 1, 2021 21:57
-
-
Save copiousfreetime/e6ea5c901270706271c763fd2fbd355e to your computer and use it in GitHub Desktop.
Fault Tolerant TCP Client
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env ruby | |
require 'socket' | |
require 'zlib' | |
require 'logger' | |
require 'fcntl' | |
def usage | |
$stderr.puts "#{$0} host port [output-location]" | |
exit 1 | |
end | |
class LogFormatter < ::Logger::Formatter | |
FORMAT = "%s %5d %05s : %s\n".freeze | |
DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ".freeze | |
def initialize | |
super | |
self.datetime_format = DATETIME_FORMAT | |
end | |
def call(severity, time, progname, msg) | |
FORMAT % [format_datetime(time.utc), Process.pid, severity, msg2str(msg)] | |
end | |
end | |
host = ARGV.shift || usage | |
port = ARGV.shift.to_i || usage | |
output = ARGV.shift || "/dev/null" | |
logger = ::Logger.new($stderr, formatter: LogFormatter.new) | |
# Create the socket and make it non-blocking since this application is doing other things | |
# too | |
socket = ::Socket.tcp(host, port) | |
socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) # make it non-blocking | |
logger.info "Socket created" | |
read_buffer = String.new # A resuable string for use by readpartial | |
bufsize = 64*1024 # How much maximum data to read from the socket at a go | |
stop_after = 1024*1024 # 1 megabyte of data | |
total_bytes = 0 | |
read_count = 0 | |
logger.info "Reading..." | |
loop do | |
bytes = socket.readpartial(bufsize, read_buffer) | |
total_bytes += bytes.bytesize | |
read_count += 1 | |
break if total_bytes > stop_after | |
end | |
logger.info "Stopped after #{total_bytes} bytes read in #{read_count} reads" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env ruby | |
require 'socket' | |
require 'zlib' | |
require 'logger' | |
require 'fcntl' | |
def usage | |
$stderr.puts "#{$0} host port [output-location]" | |
exit 1 | |
end | |
class LogFormatter < ::Logger::Formatter | |
FORMAT = "%s %5d %05s : %s\n".freeze | |
DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ".freeze | |
def initialize | |
super | |
self.datetime_format = DATETIME_FORMAT | |
end | |
def call(severity, time, progname, msg) | |
FORMAT % [format_datetime(time.utc), Process.pid, severity, msg2str(msg)] | |
end | |
end | |
host = ARGV.shift || usage | |
port = ARGV.shift.to_i || usage | |
output_to = ARGV.shift || "-" | |
logger = ::Logger.new($stderr, formatter: LogFormatter.new) | |
# Create the socket and make it non-blocking since this application is doing other things | |
# too | |
socket = ::Socket.tcp(host, port) | |
socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) # make it non-blocking | |
logger.info "Socket created" | |
bufsize = 64*1024 # How much maximum data to read from the socket at a go | |
stop_after = 1024*1024 # 1 megabyte of data | |
total_bytes = 0 | |
read_count = 0 | |
compressed_buffer = String.new # A resuable string for use by readpartial for compressed bytes | |
inflater = ::Zlib::Inflate.new(::Zlib::MAX_WBITS + 32) | |
uncompressed_bytes = 0 | |
logger.info "Reading..." | |
logger.info "Writing to #{output_to}" | |
output = output_to == "-" ? $stdout : File.open(output_to, "w+") | |
loop do | |
socket.readpartial(bufsize, compressed_buffer) | |
total_bytes += compressed_buffer.bytesize | |
read_count += 1 | |
uncompressed_buffer = inflater.inflate(compressed_buffer) | |
uncompressed_bytes += uncompressed_buffer.bytesize | |
output.write(uncompressed_buffer) | |
break if total_bytes > stop_after | |
end | |
output.close | |
logger.info "Read #{read_count} times from data source" | |
logger.info "Received #{total_bytes} of compressed data" | |
logger.info "Resulting in #{uncompressed_bytes} of decompressed data" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env ruby | |
require 'socket' | |
require 'zlib' | |
require 'logger' | |
require 'fcntl' | |
require 'json' | |
require 'thread' | |
def usage | |
$stderr.puts "#{$0} host port [output-location]" | |
exit 1 | |
end | |
class LogFormatter < ::Logger::Formatter | |
FORMAT = "%s %5d %05s : %s\n".freeze | |
DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ".freeze | |
def initialize | |
super | |
self.datetime_format = DATETIME_FORMAT | |
end | |
def call(severity, time, progname, msg) | |
FORMAT % [format_datetime(time.utc), Process.pid, severity, msg2str(msg)] | |
end | |
end | |
# | |
# class to read data from an input IO, decompress the data, and write it to an | |
# output IO. it'll collect stats during the process | |
# | |
class Decompressor | |
attr_reader :input | |
attr_reader :output | |
attr_reader :top_after | |
attr_reader :buffer_size | |
attr_reader :compressed_bytes | |
attr_reader :uncompressed_bytes | |
attr_reader :read_count | |
def initialize(input:, output:, stop_after: Float::INFINITY) | |
@input = input | |
@output = output | |
@stop_after = stop_after | |
@buffer_size = 64*1024 # How much maximum data to read from the socket at a go | |
@compressed_bytes = 0 | |
@uncompressed_bytes = 0 | |
@read_count = 0 | |
end | |
def call | |
compressed_buffer = String.new | |
inflater = ::Zlib::Inflate.new(::Zlib::MAX_WBITS + 32) | |
loop do | |
input.readpartial(@buffer_size, compressed_buffer) | |
@compressed_bytes += compressed_buffer.bytesize | |
@read_count += 1 | |
uncompressed_buffer = inflater.inflate(compressed_buffer) | |
@uncompressed_bytes += uncompressed_buffer.bytesize | |
output.write(uncompressed_buffer) | |
break if @compressed_bytes > @stop_after | |
end | |
output.close | |
end | |
end | |
# | |
# class to read newlines from an input and write the output parsed object something | |
# else that responds to `<<` | |
# | |
class Parser | |
attr_reader :item_count | |
attr_reader :input_bytes | |
def initialize(input:, output:) | |
@item_count = 0 | |
@input_bytes = 0 | |
@stop = false | |
@input = input | |
@output = output | |
end | |
def stop | |
@stop = true | |
end | |
def call | |
loop do | |
break if @stop | |
line = @input.readline | |
@input_bytes += line.bytesize | |
event = JSON.parse(line) | |
@output << event | |
@item_count += 1 | |
end | |
end | |
end | |
host = ARGV.shift || usage | |
port = ARGV.shift.to_i || usage | |
logger = ::Logger.new($stderr, formatter: LogFormatter.new) | |
# Create the socket and make it non-blocking since this application is doing other things | |
# too | |
socket = ::Socket.tcp(host, port) | |
socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) # make it non-blocking | |
# Create a pipe to buffer the uncompressed data from the socket so that the text may be | |
# parsed into newlines. | |
# | |
read_io, write_io = IO.pipe | |
write_io.set_encoding("BINARY") # to handle multibyte-character splitting | |
stop_after = 1024*1014 | |
# final output to this location | |
events = Queue.new | |
# setup the decompressor and parser objects | |
decompressor = Decompressor.new(input: socket, output: write_io, stop_after: stop_after) | |
parser = Parser.new(input: read_io, output: events) | |
# spawn threads for each of the objects | |
decompressor_thread = Thread.new { decompressor.call } | |
parser_thread = Thread.new { parser.call } | |
# spawn a thread to consume all the events from the parser and throw them away | |
consumed_count = 0 | |
consumer_thread = Thread.new { | |
loop do | |
e = events.deq | |
consumed_count += 1 unless e.nil? | |
break if events.closed? && events.empty? | |
end | |
} | |
# wait for the decompressor to stop reading from input | |
decompressor_thread.join | |
# tell the parser to stop | |
parser.stop | |
parser_thread.join | |
# close the events queue so the consumer thread will drain the queue and stop | |
events.close | |
consumer_thread.join | |
logger.info "Decompressor: read #{decompressor.read_count} times" | |
logger.info "Decompressor: received #{decompressor.compressed_bytes} bytes" | |
logger.info "Decompressor: forwarded on #{decompressor.uncompressed_bytes} bytes" | |
logger.info "Parser : received #{parser.input_bytes}" | |
logger.info "Parser : forwarded on #{parser.item_count} events" | |
logger.info "Consumer : threw away #{consumed_count} events" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment