##Requirements
- git
- json (1.8.1)
- mysql (2.9.1)
- sqlite3 (1.3.10)
Deb based OS:
- apt-get install libmysqlclient-dev libsqlite3-dev
OSX:
-
brew install mysql
-
brew install sqlite3
-
gem install mysql -v '2.9.1'
-
gem install sqlite3 -v '1.3.10'
#!/usr/bin/env ruby | |
# coding: UTF-8 | |
# elb-logs.rb | |
# | |
# fetch and analyze ELB access logs | |
# (http://docs.aws.amazon.com/ElasticLoadBalancing/latest/DeveloperGuide/access-log-collection.html) | |
# | |
# created on : 2014.03.07 | |
# last update: 2014.03.10 | |
# | |
# by [email protected] | |
require 'bundler/setup' | |
require 'my_aws' | |
require 'my_sqlite' | |
require 'thor' | |
require 'geocoder' | |
DB_FILENAME = 'elb_access_logs.sqlite' | |
S3_CONFIGS = { | |
access_key_id: '__aws_access_key_id__', | |
secret_access_key: '__aws_secret_access_key__', | |
bucket: '__s3_bucket_name__', | |
} | |
module AWS | |
class ELB::AccessLog | |
attr_reader :timestamp, :elb, :client_ip, :client_port, :backend_ip, :backend_port, :process_time, :received_bytes, :sent_bytes, :request_method, :request_url, :request_protocol, :response_status | |
def initialize(line) | |
components = line.split(' ') | |
# timestamp | |
@timestamp = Time.parse(components[0]) | |
# elb name | |
@elb = components[1] | |
# client | |
@client_ip, @client_port = components[2].split(':') | |
# backend (EC2 instance) | |
@backend_ip, @backend_port = components[3].split(':') | |
# time taken | |
@process_time = components[5].to_f | |
# recv/send | |
@received_bytes = components[9].to_i | |
@sent_bytes = components[10].to_i | |
# request (HTTP only) | |
request = line.scan(/"(.*?)"$/)[0][0] | |
@request_method, @request_url, @request_protocol = request.split(/\s/).map(&:strip) | |
@request_method = nil if @request_method == '-' | |
@request_url = nil if @request_url == '-' | |
@request_protocol = nil if @request_protocol == '-' | |
# response (HTTP only) | |
@response_status = components[8] | |
@response_status = nil if @response_status == '-' | |
end | |
def to_s | |
"[#{@elb}] #{@timestamp.strftime('%Y-%m-%d %H:%M:%S.%N')} | #{@client_ip}:#{@client_port} - #{@backend_ip}:#{@backend_port}, #{@process_time} seconds, recv: #{@received_bytes} / send: #{@sent_bytes} bytes" + (@request_method.nil? ? '' : " (#{@request_method} #{@request_protocol} #{@request_url} => #{@response_status})") | |
end | |
class Helper | |
def self.parse_key(key) | |
components = File.basename(key.split('/')[-1], '.*').split('_') | |
{ | |
aws_account: components[0], | |
region: components[2], | |
elb: components[3], | |
datetime: Time.parse(components[4]), | |
elb_ip: components[5], | |
key: key, | |
} | |
end | |
end | |
class Database | |
attr_accessor :filepath | |
@@db = nil | |
private | |
def initialize(filepath) | |
@filepath = filepath | |
@db = MySqlite.open(@filepath) | |
# table: fetched | |
@db.execute_query('create table if not exists fetched( | |
id integer primary key autoincrement, | |
aws_account text not null, | |
region text not null, | |
elb_ip text not null, | |
log_time text not null | |
)') | |
@db.execute_query('create index if not exists idx_fetched on fetched( | |
aws_account, region, elb_ip, log_time | |
)') | |
# table: access_logs | |
@db.execute_query('create table if not exists access_logs( | |
id integer primary key autoincrement, | |
aws_account text not null, | |
region text not null, | |
elb_ip text not null, | |
log_time text not null, | |
elb text not null, | |
timestamp text not null, | |
client_ip text not null, | |
client_port integer default null, | |
backend_ip text not null, | |
backend_port integer default null, | |
process_time real not null, | |
received_bytes integer not null, | |
sent_bytes integer not null, | |
request_method text default null, | |
request_url text default null, | |
request_protocol text default null, | |
response_status text default null | |
)') | |
@db.execute_query('create index if not exists idx_access_logs_1 on access_logs( | |
aws_account, region, elb_ip | |
)') | |
@db.execute_query('create index if not exists idx_access_logs_2 on access_logs( | |
aws_account, region, elb_ip, log_time | |
)') | |
@db.execute_query('create index if not exists idx_access_logs_3 on access_logs( | |
timestamp | |
)') | |
@db.execute_query('create index if not exists idx_access_logs_4 on access_logs( | |
client_ip | |
)') | |
@db.execute_query('create index if not exists idx_access_logs_5 on access_logs( | |
response_status | |
)') | |
# table: geo_locations | |
@db.execute_query('create table if not exists geo_locations( | |
ip text primary key, | |
country_code text default null, | |
country_name text default null, | |
city text default null, | |
latitude real default null, | |
longitude real default null | |
)') | |
@db.execute_query('create index if not exists idx_geo_locations_1 on geo_locations( | |
country_code | |
)') | |
@db.execute_query('create index if not exists idx_geo_locations_2 on geo_locations( | |
country_name | |
)') | |
@db.execute_query('create index if not exists idx_geo_locations_999 on geo_locations( | |
latitude, | |
longitude | |
)') | |
end | |
public | |
def self.instance | |
@@db = Database.new(File.join(File.dirname(__FILE__), DB_FILENAME)) unless @@db | |
@@db | |
end | |
# check if logs with given aws_account/region/elb_ip/log_time already exist | |
# | |
# @param aws_account [String] aws account | |
# @param region [String] region | |
# @param elb_ip [String] ELB's ip address | |
# @param log_time [Date] log time | |
# | |
# @return [true,false] | |
def log_exists?(aws_account, region, elb_ip, log_time) | |
@db.execute_query('select count(id) from fetched where aws_account = ? and region = ? and elb_ip = ? and log_time = ?', | |
[aws_account, region, elb_ip, log_time.strftime('%Y-%m-%d %H:%M:%S')] | |
)[0][0] > 0 | |
end | |
# save log entry | |
# | |
# @param aws_account [String] aws account | |
# @param region [String] region | |
# @param elb_ip [String] ELB's ip address | |
# @param log_time [Date] log time | |
# @param log_entry [AWS::ELB::LogEntry] log entry | |
def save_log(aws_account, region, elb_ip, log_time, log_entry) | |
# insert to the table | |
@db.execute_query( | |
'insert into access_logs( | |
aws_account, region, elb_ip, log_time, | |
elb, timestamp, client_ip, client_port, backend_ip, backend_port, process_time, received_bytes, sent_bytes, | |
request_method, request_url, request_protocol, response_status | |
) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)', | |
[ | |
aws_account, | |
region, | |
elb_ip, | |
log_time.strftime('%Y-%m-%d %H:%M:%S'), | |
log_entry.elb, | |
log_entry.timestamp.strftime('%Y-%m-%d %H:%M:%S.%N'), | |
log_entry.client_ip, | |
log_entry.client_port, | |
log_entry.backend_ip, | |
log_entry.backend_port, | |
log_entry.process_time, | |
log_entry.received_bytes, | |
log_entry.sent_bytes, | |
log_entry.request_method, | |
log_entry.request_url, | |
log_entry.request_protocol, | |
log_entry.response_status | |
] | |
) | |
end | |
# mark given log as fetched | |
# | |
# @param aws_account [String] aws account | |
# @param region [String] region | |
# @param elb_ip [String] ip address | |
# @param log_time [Time] log time | |
def mark_log_fetched(aws_account, region, elb_ip, log_time) | |
# mark this log as fetched | |
@db.execute_query( | |
'insert into fetched(aws_account, region, elb_ip, log_time) values(?, ?, ?, ?)', | |
[aws_account, region, elb_ip, log_time.strftime('%Y-%m-%d %H:%M:%S')] | |
) | |
end | |
# list all ips from saved logs | |
# | |
# @param option [Hash] option for listing | |
# @return [Array<String>] array of ips | |
def client_ips(option = nil) | |
@db.execute_query("select #{option[:unique] ? 'distinct' : ''} client_ip from access_logs order by timestamp").map{|x| x[0]} | |
end | |
# cache ip and its geo location information for future use | |
# | |
# @param ip [String] ip address | |
# @return [Database::GeoLocation, nil] successfully cached or not | |
def cache_geo(ip) | |
unless cached = cached_geo(ip) | |
info = Geocoder.search(ip).first | |
@db.execute_query('insert or replace into geo_locations(ip, country_code, country_name, city, latitude, longitude) values(?, ?, ?, ?, ?, ?)', | |
[info.ip, info.country_code, info.country, info.city, info.latitude, info.longitude] | |
) | |
return GeoLocation.new(info.ip, info.country_code, info.country, info.city, info.latitude, info.longitude) | |
end | |
nil | |
rescue | |
puts "* exception while caching ip: #{ip} - #{$!}" | |
nil | |
end | |
# get cached geo location information | |
# | |
# @param ip [String] ip address | |
# @return [Database::GeoLocation, nil] nil if not cached | |
def cached_geo(ip) | |
geo = @db.execute_query('select * from geo_locations where ip = ?', | |
[ip] | |
) | |
if geo.count > 0 | |
GeoLocation.new(geo[0], geo[1], geo[2], geo[3], geo[4], geo[5]) | |
else | |
nil | |
end | |
end | |
# get ELB names | |
# | |
# @return [Array<String>] ELB names | |
def elbs | |
@db.execute_query('select distinct elb from access_logs order by elb').map{|x| x[0]} | |
end | |
# get number of ips per country for given ELB name | |
# | |
# @param elb [String] ELB name | |
def num_ips_per_country(elb) | |
@db.execute_query('select country_name as country, count(ip) as num_ips | |
from geo_locations | |
where ip in (select distinct client_ip from access_logs where elb = ?) | |
group by country_name | |
order by num_ips desc', | |
[elb] | |
).map{|x| {country: x[0], num_ips: x[1]}} | |
end | |
def info | |
# from ~ to | |
from = @db.execute_query('select timestamp from access_logs order by timestamp limit 1').first[0] | |
to = @db.execute_query('select timestamp from access_logs order by timestamp desc limit 1').first[0] | |
# number of access logs | |
num_access_logs = {} | |
elbs.each{|elb| | |
num_access_logs[elb] = @db.execute_query('select count(id) from access_logs where elb = ?', [elb]).first[0] | |
} | |
num_access_logs[:all] = @db.execute_query('select count(id) from access_logs').first[0] | |
# number of ips | |
num_ips = {} | |
elbs.each{|elb| | |
num_ips[elb] = @db.execute_query('select count(ip) from geo_locations | |
where ip in (select distinct client_ip from access_logs where elb = ?)', | |
[elb] | |
).first[0] | |
} | |
num_ips[:all] = @db.execute_query('select count(distinct ip) from geo_locations').first[0] | |
# return | |
{ | |
from: from, | |
to: to, | |
num_access_logs: num_access_logs, | |
num_ips: num_ips, | |
} | |
end | |
class GeoLocation | |
attr_accessor :ip, :country_code, :country_name, :city, :latitude, :longitude | |
def initialize(ip, country_code, country_name, city, latitude, longitude) | |
@ip = ip | |
@country_code = country_code | |
@country_name = country_name | |
@city = city | |
@latitude = latitude | |
@longitude = longitude | |
end | |
def to_s | |
"[%s] %s(%s), %s (%.4f, %.4f)" %[@ip, @country_name, @country_code, @city, @latitude, @longitude] | |
end | |
end | |
end | |
end | |
end | |
class Exec < Thor | |
default_task :fetch_logs | |
desc "fetch_logs", "fetch all ELB access log files from S3" | |
method_option :cache_geo, type: :boolean, aliases: '-g', desc: 'also cache geo locations for logged ips' | |
method_option :verbose, type: :boolean, aliases: '-v', desc: 'show verbose messages' | |
def fetch_logs | |
puts "> fetching logs..." | |
# configure | |
if MyAws::S3.config(S3_CONFIGS) | |
num_logs = 0 | |
# fetch objects from the bucket | |
MyAws::S3.bucket(S3_CONFIGS[:bucket]).objects.each{|o| | |
# get the database instance | |
db = AWS::ELB::AccessLog::Database.instance | |
# parse only actual log files | |
if o.key =~ /elasticloadbalancing.*?\.log$/ | |
parsed = AWS::ELB::AccessLog::Helper.parse_key(o.key) | |
aws_account = parsed[:aws_account] | |
elb = parsed[:elb] | |
region = parsed[:region] | |
elb_ip = parsed[:elb_ip] | |
log_time = parsed[:datetime] | |
# check if this log is already fetched/saved | |
if db.log_exists?(aws_account, region, elb_ip, log_time) | |
puts "* skipping alread-fetched log: #{aws_account}/#{region}/#{elb}(#{elb_ip})/#{log_time.strftime('%Y-%m-%d %H:%M:%S')}" if options.verbose? | |
next | |
else | |
num_logs += 1 | |
end | |
# read all log lines | |
puts "> processing logs for: #{aws_account}/#{region}/#{elb}(#{elb_ip})/#{log_time.strftime('%Y-%m-%d %H:%M:%S')}" if options.verbose? | |
o.read.lines.map{|x| x.strip}.each{|l| | |
entry = AWS::ELB::AccessLog.new(l) | |
db.save_log(aws_account, region, elb_ip, log_time, entry) | |
} | |
# mark log as fetched | |
db.mark_log_fetched(aws_account, region, elb_ip, log_time) | |
end | |
} | |
puts ">>> fetched #{num_logs} new log file(s)" | |
# also cache geo locations if option is provided | |
cache_geo if options.cache_geo? | |
else | |
puts "* S3 configuration failed" | |
end | |
end | |
desc "cache_geo", "cache geo location info for client ips in saved logs" | |
method_option :verbose, type: :boolean, aliases: '-v', desc: 'show verbose messages' | |
def cache_geo | |
puts "> caching geo locations for logged ips..." | |
db = AWS::ELB::AccessLog::Database.instance | |
num_ips = 0 | |
db.client_ips({unique: true}).each{|ip| | |
if cached = db.cache_geo(ip) | |
puts "> cached geo ip: #{cached}" if options.verbose? | |
num_ips += 1 | |
end | |
} | |
puts ">>> cached #{num_ips} new geo location(s)" | |
end | |
desc "count_ips", "list number of unique ips per country" | |
def count_ips | |
puts "> counting ips per country..." | |
db = AWS::ELB::AccessLog::Database.instance | |
db.elbs.each{|elb| | |
puts ">>> ELB: #{elb}" | |
db.num_ips_per_country(elb).each{|counts| | |
puts "#{counts[:country]}: #{counts[:num_ips]}" | |
} | |
puts | |
} | |
end | |
desc "info", "show info on database file" | |
def info | |
puts "> printing info of database..." | |
db = AWS::ELB::AccessLog::Database.instance | |
puts ">>> database: #{db.filepath}" | |
info = db.info | |
puts ">>> #{info[:from]} ~ #{info[:to]}" | |
puts ">>> number of accesses" | |
info[:num_access_logs].each{|k, v| | |
puts " #{k}: #{v}" | |
} | |
puts ">>> number of ips" | |
info[:num_ips].each{|k, v| | |
puts " #{k}: #{v}" | |
} | |
end | |
end | |
trap('SIGINT') { puts; exit 1 } | |
Exec.start(ARGV) |
source 'http://rubygems.org' | |
gem 'meinside-ruby', github: 'meinside/meinside-ruby' # my ruby scripts and libraries | |
gem 'thor' | |
gem 'geocoder' |