Last active
August 29, 2015 14:11
-
-
Save mariusv/7c7dd25c052232a10a1a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env ruby | |
# coding: UTF-8 | |
# elb-logs.rb | |
# | |
# fetch and analyze ELB access logs | |
# (http://docs.aws.amazon.com/ElasticLoadBalancing/latest/DeveloperGuide/access-log-collection.html) | |
# | |
# created on : 2014.03.07 | |
# last update: 2014.03.10 | |
# | |
# by [email protected] | |
require 'bundler/setup' | |
require 'my_aws' | |
require 'my_sqlite' | |
require 'thor' | |
require 'geocoder' | |
DB_FILENAME = 'elb_access_logs.sqlite' | |
S3_CONFIGS = { | |
access_key_id: '__aws_access_key_id__', | |
secret_access_key: '__aws_secret_access_key__', | |
bucket: '__s3_bucket_name__', | |
} | |
module AWS | |
class ELB::AccessLog | |
attr_reader :timestamp, :elb, :client_ip, :client_port, :backend_ip, :backend_port, :process_time, :received_bytes, :sent_bytes, :request_method, :request_url, :request_protocol, :response_status | |
def initialize(line) | |
components = line.split(' ') | |
# timestamp | |
@timestamp = Time.parse(components[0]) | |
# elb name | |
@elb = components[1] | |
# client | |
@client_ip, @client_port = components[2].split(':') | |
# backend (EC2 instance) | |
@backend_ip, @backend_port = components[3].split(':') | |
# time taken | |
@process_time = components[5].to_f | |
# recv/send | |
@received_bytes = components[9].to_i | |
@sent_bytes = components[10].to_i | |
# request (HTTP only) | |
request = line.scan(/"(.*?)"$/)[0][0] | |
@request_method, @request_url, @request_protocol = request.split(/\s/).map(&:strip) | |
@request_method = nil if @request_method == '-' | |
@request_url = nil if @request_url == '-' | |
@request_protocol = nil if @request_protocol == '-' | |
# response (HTTP only) | |
@response_status = components[8] | |
@response_status = nil if @response_status == '-' | |
end | |
def to_s | |
"[#{@elb}] #{@timestamp.strftime('%Y-%m-%d %H:%M:%S.%N')} | #{@client_ip}:#{@client_port} - #{@backend_ip}:#{@backend_port}, #{@process_time} seconds, recv: #{@received_bytes} / send: #{@sent_bytes} bytes" + (@request_method.nil? ? '' : " (#{@request_method} #{@request_protocol} #{@request_url} => #{@response_status})") | |
end | |
class Helper | |
def self.parse_key(key) | |
components = File.basename(key.split('/')[-1], '.*').split('_') | |
{ | |
aws_account: components[0], | |
region: components[2], | |
elb: components[3], | |
datetime: Time.parse(components[4]), | |
elb_ip: components[5], | |
key: key, | |
} | |
end | |
end | |
class Database | |
attr_accessor :filepath | |
@@db = nil | |
private | |
def initialize(filepath) | |
@filepath = filepath | |
@db = MySqlite.open(@filepath) | |
# table: fetched | |
@db.execute_query('create table if not exists fetched( | |
id integer primary key autoincrement, | |
aws_account text not null, | |
region text not null, | |
elb_ip text not null, | |
log_time text not null | |
)') | |
@db.execute_query('create index if not exists idx_fetched on fetched( | |
aws_account, region, elb_ip, log_time | |
)') | |
# table: access_logs | |
@db.execute_query('create table if not exists access_logs( | |
id integer primary key autoincrement, | |
aws_account text not null, | |
region text not null, | |
elb_ip text not null, | |
log_time text not null, | |
elb text not null, | |
timestamp text not null, | |
client_ip text not null, | |
client_port integer default null, | |
backend_ip text not null, | |
backend_port integer default null, | |
process_time real not null, | |
received_bytes integer not null, | |
sent_bytes integer not null, | |
request_method text default null, | |
request_url text default null, | |
request_protocol text default null, | |
response_status text default null | |
)') | |
@db.execute_query('create index if not exists idx_access_logs_1 on access_logs( | |
aws_account, region, elb_ip | |
)') | |
@db.execute_query('create index if not exists idx_access_logs_2 on access_logs( | |
aws_account, region, elb_ip, log_time | |
)') | |
@db.execute_query('create index if not exists idx_access_logs_3 on access_logs( | |
timestamp | |
)') | |
@db.execute_query('create index if not exists idx_access_logs_4 on access_logs( | |
client_ip | |
)') | |
@db.execute_query('create index if not exists idx_access_logs_5 on access_logs( | |
response_status | |
)') | |
# table: geo_locations | |
@db.execute_query('create table if not exists geo_locations( | |
ip text primary key, | |
country_code text default null, | |
country_name text default null, | |
city text default null, | |
latitude real default null, | |
longitude real default null | |
)') | |
@db.execute_query('create index if not exists idx_geo_locations_1 on geo_locations( | |
country_code | |
)') | |
@db.execute_query('create index if not exists idx_geo_locations_2 on geo_locations( | |
country_name | |
)') | |
@db.execute_query('create index if not exists idx_geo_locations_999 on geo_locations( | |
latitude, | |
longitude | |
)') | |
end | |
public | |
def self.instance | |
@@db = Database.new(File.join(File.dirname(__FILE__), DB_FILENAME)) unless @@db | |
@@db | |
end | |
# check if logs with given aws_account/region/elb_ip/log_time already exist | |
# | |
# @param aws_account [String] aws account | |
# @param region [String] region | |
# @param elb_ip [String] ELB's ip address | |
# @param log_time [Date] log time | |
# | |
# @return [true,false] | |
def log_exists?(aws_account, region, elb_ip, log_time) | |
@db.execute_query('select count(id) from fetched where aws_account = ? and region = ? and elb_ip = ? and log_time = ?', | |
[aws_account, region, elb_ip, log_time.strftime('%Y-%m-%d %H:%M:%S')] | |
)[0][0] > 0 | |
end | |
# save log entry | |
# | |
# @param aws_account [String] aws account | |
# @param region [String] region | |
# @param elb_ip [String] ELB's ip address | |
# @param log_time [Date] log time | |
# @param log_entry [AWS::ELB::LogEntry] log entry | |
def save_log(aws_account, region, elb_ip, log_time, log_entry) | |
# insert to the table | |
@db.execute_query( | |
'insert into access_logs( | |
aws_account, region, elb_ip, log_time, | |
elb, timestamp, client_ip, client_port, backend_ip, backend_port, process_time, received_bytes, sent_bytes, | |
request_method, request_url, request_protocol, response_status | |
) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)', | |
[ | |
aws_account, | |
region, | |
elb_ip, | |
log_time.strftime('%Y-%m-%d %H:%M:%S'), | |
log_entry.elb, | |
log_entry.timestamp.strftime('%Y-%m-%d %H:%M:%S.%N'), | |
log_entry.client_ip, | |
log_entry.client_port, | |
log_entry.backend_ip, | |
log_entry.backend_port, | |
log_entry.process_time, | |
log_entry.received_bytes, | |
log_entry.sent_bytes, | |
log_entry.request_method, | |
log_entry.request_url, | |
log_entry.request_protocol, | |
log_entry.response_status | |
] | |
) | |
end | |
# mark given log as fetched | |
# | |
# @param aws_account [String] aws account | |
# @param region [String] region | |
# @param elb_ip [String] ip address | |
# @param log_time [Time] log time | |
def mark_log_fetched(aws_account, region, elb_ip, log_time) | |
# mark this log as fetched | |
@db.execute_query( | |
'insert into fetched(aws_account, region, elb_ip, log_time) values(?, ?, ?, ?)', | |
[aws_account, region, elb_ip, log_time.strftime('%Y-%m-%d %H:%M:%S')] | |
) | |
end | |
# list all ips from saved logs | |
# | |
# @param option [Hash] option for listing | |
# @return [Array<String>] array of ips | |
def client_ips(option = nil) | |
@db.execute_query("select #{option[:unique] ? 'distinct' : ''} client_ip from access_logs order by timestamp").map{|x| x[0]} | |
end | |
# cache ip and its geo location information for future use | |
# | |
# @param ip [String] ip address | |
# @return [Database::GeoLocation, nil] successfully cached or not | |
def cache_geo(ip) | |
unless cached = cached_geo(ip) | |
info = Geocoder.search(ip).first | |
@db.execute_query('insert or replace into geo_locations(ip, country_code, country_name, city, latitude, longitude) values(?, ?, ?, ?, ?, ?)', | |
[info.ip, info.country_code, info.country, info.city, info.latitude, info.longitude] | |
) | |
return GeoLocation.new(info.ip, info.country_code, info.country, info.city, info.latitude, info.longitude) | |
end | |
nil | |
rescue | |
puts "* exception while caching ip: #{ip} - #{$!}" | |
nil | |
end | |
# get cached geo location information | |
# | |
# @param ip [String] ip address | |
# @return [Database::GeoLocation, nil] nil if not cached | |
def cached_geo(ip) | |
geo = @db.execute_query('select * from geo_locations where ip = ?', | |
[ip] | |
) | |
if geo.count > 0 | |
GeoLocation.new(geo[0], geo[1], geo[2], geo[3], geo[4], geo[5]) | |
else | |
nil | |
end | |
end | |
# get ELB names | |
# | |
# @return [Array<String>] ELB names | |
def elbs | |
@db.execute_query('select distinct elb from access_logs order by elb').map{|x| x[0]} | |
end | |
# get number of ips per country for given ELB name | |
# | |
# @param elb [String] ELB name | |
def num_ips_per_country(elb) | |
@db.execute_query('select country_name as country, count(ip) as num_ips | |
from geo_locations | |
where ip in (select distinct client_ip from access_logs where elb = ?) | |
group by country_name | |
order by num_ips desc', | |
[elb] | |
).map{|x| {country: x[0], num_ips: x[1]}} | |
end | |
def info | |
# from ~ to | |
from = @db.execute_query('select timestamp from access_logs order by timestamp limit 1').first[0] | |
to = @db.execute_query('select timestamp from access_logs order by timestamp desc limit 1').first[0] | |
# number of access logs | |
num_access_logs = {} | |
elbs.each{|elb| | |
num_access_logs[elb] = @db.execute_query('select count(id) from access_logs where elb = ?', [elb]).first[0] | |
} | |
num_access_logs[:all] = @db.execute_query('select count(id) from access_logs').first[0] | |
# number of ips | |
num_ips = {} | |
elbs.each{|elb| | |
num_ips[elb] = @db.execute_query('select count(ip) from geo_locations | |
where ip in (select distinct client_ip from access_logs where elb = ?)', | |
[elb] | |
).first[0] | |
} | |
num_ips[:all] = @db.execute_query('select count(distinct ip) from geo_locations').first[0] | |
# return | |
{ | |
from: from, | |
to: to, | |
num_access_logs: num_access_logs, | |
num_ips: num_ips, | |
} | |
end | |
class GeoLocation | |
attr_accessor :ip, :country_code, :country_name, :city, :latitude, :longitude | |
def initialize(ip, country_code, country_name, city, latitude, longitude) | |
@ip = ip | |
@country_code = country_code | |
@country_name = country_name | |
@city = city | |
@latitude = latitude | |
@longitude = longitude | |
end | |
def to_s | |
"[%s] %s(%s), %s (%.4f, %.4f)" %[@ip, @country_name, @country_code, @city, @latitude, @longitude] | |
end | |
end | |
end | |
end | |
end | |
class Exec < Thor | |
default_task :fetch_logs | |
desc "fetch_logs", "fetch all ELB access log files from S3" | |
method_option :cache_geo, type: :boolean, aliases: '-g', desc: 'also cache geo locations for logged ips' | |
method_option :verbose, type: :boolean, aliases: '-v', desc: 'show verbose messages' | |
def fetch_logs | |
puts "> fetching logs..." | |
# configure | |
if MyAws::S3.config(S3_CONFIGS) | |
num_logs = 0 | |
# fetch objects from the bucket | |
MyAws::S3.bucket(S3_CONFIGS[:bucket]).objects.each{|o| | |
# get the database instance | |
db = AWS::ELB::AccessLog::Database.instance | |
# parse only actual log files | |
if o.key =~ /elasticloadbalancing.*?\.log$/ | |
parsed = AWS::ELB::AccessLog::Helper.parse_key(o.key) | |
aws_account = parsed[:aws_account] | |
elb = parsed[:elb] | |
region = parsed[:region] | |
elb_ip = parsed[:elb_ip] | |
log_time = parsed[:datetime] | |
# check if this log is already fetched/saved | |
if db.log_exists?(aws_account, region, elb_ip, log_time) | |
puts "* skipping alread-fetched log: #{aws_account}/#{region}/#{elb}(#{elb_ip})/#{log_time.strftime('%Y-%m-%d %H:%M:%S')}" if options.verbose? | |
next | |
else | |
num_logs += 1 | |
end | |
# read all log lines | |
puts "> processing logs for: #{aws_account}/#{region}/#{elb}(#{elb_ip})/#{log_time.strftime('%Y-%m-%d %H:%M:%S')}" if options.verbose? | |
o.read.lines.map{|x| x.strip}.each{|l| | |
entry = AWS::ELB::AccessLog.new(l) | |
db.save_log(aws_account, region, elb_ip, log_time, entry) | |
} | |
# mark log as fetched | |
db.mark_log_fetched(aws_account, region, elb_ip, log_time) | |
end | |
} | |
puts ">>> fetched #{num_logs} new log file(s)" | |
# also cache geo locations if option is provided | |
cache_geo if options.cache_geo? | |
else | |
puts "* S3 configuration failed" | |
end | |
end | |
desc "cache_geo", "cache geo location info for client ips in saved logs" | |
method_option :verbose, type: :boolean, aliases: '-v', desc: 'show verbose messages' | |
def cache_geo | |
puts "> caching geo locations for logged ips..." | |
db = AWS::ELB::AccessLog::Database.instance | |
num_ips = 0 | |
db.client_ips({unique: true}).each{|ip| | |
if cached = db.cache_geo(ip) | |
puts "> cached geo ip: #{cached}" if options.verbose? | |
num_ips += 1 | |
end | |
} | |
puts ">>> cached #{num_ips} new geo location(s)" | |
end | |
desc "count_ips", "list number of unique ips per country" | |
def count_ips | |
puts "> counting ips per country..." | |
db = AWS::ELB::AccessLog::Database.instance | |
db.elbs.each{|elb| | |
puts ">>> ELB: #{elb}" | |
db.num_ips_per_country(elb).each{|counts| | |
puts "#{counts[:country]}: #{counts[:num_ips]}" | |
} | |
puts | |
} | |
end | |
desc "info", "show info on database file" | |
def info | |
puts "> printing info of database..." | |
db = AWS::ELB::AccessLog::Database.instance | |
puts ">>> database: #{db.filepath}" | |
info = db.info | |
puts ">>> #{info[:from]} ~ #{info[:to]}" | |
puts ">>> number of accesses" | |
info[:num_access_logs].each{|k, v| | |
puts " #{k}: #{v}" | |
} | |
puts ">>> number of ips" | |
info[:num_ips].each{|k, v| | |
puts " #{k}: #{v}" | |
} | |
end | |
end | |
trap('SIGINT') { puts; exit 1 } | |
Exec.start(ARGV) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment