Skip to content

Instantly share code, notes, and snippets.

@mariusv
Last active August 29, 2015 14:11

Revisions

  1. mariusv renamed this gist Dec 13, 2014. 1 changed file with 0 additions and 0 deletions.
    File renamed without changes.
  2. mariusv revised this gist Dec 13, 2014. 1 changed file with 20 additions and 0 deletions.
    20 changes: 20 additions & 0 deletions README
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,20 @@
    ##Requirements

    - git
    - json (1.8.1)
    - mysql (2.9.1)
    - sqlite3 (1.3.10)

    ### How to:

    Deb based OS:
    - apt-get install libmysqlclient-dev libsqlite3-dev

    OSX:

    - brew install mysql
    - brew install sqlite3


    - gem install mysql -v '2.9.1'
    - gem install sqlite3 -v '1.3.10'
  3. Sungjin Han revised this gist Mar 10, 2014. No changes.
  4. Sungjin Han revised this gist Mar 10, 2014. 1 changed file with 6 additions and 0 deletions.
    6 changes: 6 additions & 0 deletions Gemfile
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,6 @@
    source 'http://rubygems.org'

    gem 'meinside-ruby', github: 'meinside/meinside-ruby' # my ruby scripts and libraries

    gem 'thor'
    gem 'geocoder'
  5. Sungjin Han created this gist Mar 10, 2014.
    469 changes: 469 additions & 0 deletions elb-logs.rb
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,469 @@
    #!/usr/bin/env ruby
    # coding: UTF-8

    # elb-logs.rb
    #
    # fetch and analyze ELB access logs
    # (http://docs.aws.amazon.com/ElasticLoadBalancing/latest/DeveloperGuide/access-log-collection.html)
    #
    # created on : 2014.03.07
    # last update: 2014.03.10
    #
    # by [email protected]

    require 'bundler/setup'

    require 'my_aws'
    require 'my_sqlite'

    require 'thor'
    require 'geocoder'

    DB_FILENAME = 'elb_access_logs.sqlite'

    S3_CONFIGS = {
    access_key_id: '__aws_access_key_id__',
    secret_access_key: '__aws_secret_access_key__',
    bucket: '__s3_bucket_name__',
    }

    module AWS
    class ELB::AccessLog
    attr_reader :timestamp, :elb, :client_ip, :client_port, :backend_ip, :backend_port, :process_time, :received_bytes, :sent_bytes, :request_method, :request_url, :request_protocol, :response_status

    def initialize(line)
    components = line.split(' ')

    # timestamp
    @timestamp = Time.parse(components[0])

    # elb name
    @elb = components[1]

    # client
    @client_ip, @client_port = components[2].split(':')

    # backend (EC2 instance)
    @backend_ip, @backend_port = components[3].split(':')

    # time taken
    @process_time = components[5].to_f

    # recv/send
    @received_bytes = components[9].to_i
    @sent_bytes = components[10].to_i

    # request (HTTP only)
    request = line.scan(/"(.*?)"$/)[0][0]
    @request_method, @request_url, @request_protocol = request.split(/\s/).map(&:strip)
    @request_method = nil if @request_method == '-'
    @request_url = nil if @request_url == '-'
    @request_protocol = nil if @request_protocol == '-'

    # response (HTTP only)
    @response_status = components[8]
    @response_status = nil if @response_status == '-'
    end

    def to_s
    "[#{@elb}] #{@timestamp.strftime('%Y-%m-%d %H:%M:%S.%N')} | #{@client_ip}:#{@client_port} - #{@backend_ip}:#{@backend_port}, #{@process_time} seconds, recv: #{@received_bytes} / send: #{@sent_bytes} bytes" + (@request_method.nil? ? '' : " (#{@request_method} #{@request_protocol} #{@request_url} => #{@response_status})")
    end

    class Helper
    def self.parse_key(key)
    components = File.basename(key.split('/')[-1], '.*').split('_')
    {
    aws_account: components[0],
    region: components[2],
    elb: components[3],
    datetime: Time.parse(components[4]),
    elb_ip: components[5],
    key: key,
    }
    end
    end

    class Database
    attr_accessor :filepath

    @@db = nil

    private

    def initialize(filepath)
    @filepath = filepath
    @db = MySqlite.open(@filepath)

    # table: fetched
    @db.execute_query('create table if not exists fetched(
    id integer primary key autoincrement,
    aws_account text not null,
    region text not null,
    elb_ip text not null,
    log_time text not null
    )')
    @db.execute_query('create index if not exists idx_fetched on fetched(
    aws_account, region, elb_ip, log_time
    )')

    # table: access_logs
    @db.execute_query('create table if not exists access_logs(
    id integer primary key autoincrement,
    aws_account text not null,
    region text not null,
    elb_ip text not null,
    log_time text not null,
    elb text not null,
    timestamp text not null,
    client_ip text not null,
    client_port integer default null,
    backend_ip text not null,
    backend_port integer default null,
    process_time real not null,
    received_bytes integer not null,
    sent_bytes integer not null,
    request_method text default null,
    request_url text default null,
    request_protocol text default null,
    response_status text default null
    )')
    @db.execute_query('create index if not exists idx_access_logs_1 on access_logs(
    aws_account, region, elb_ip
    )')
    @db.execute_query('create index if not exists idx_access_logs_2 on access_logs(
    aws_account, region, elb_ip, log_time
    )')
    @db.execute_query('create index if not exists idx_access_logs_3 on access_logs(
    timestamp
    )')
    @db.execute_query('create index if not exists idx_access_logs_4 on access_logs(
    client_ip
    )')
    @db.execute_query('create index if not exists idx_access_logs_5 on access_logs(
    response_status
    )')

    # table: geo_locations
    @db.execute_query('create table if not exists geo_locations(
    ip text primary key,
    country_code text default null,
    country_name text default null,
    city text default null,
    latitude real default null,
    longitude real default null
    )')
    @db.execute_query('create index if not exists idx_geo_locations_1 on geo_locations(
    country_code
    )')
    @db.execute_query('create index if not exists idx_geo_locations_2 on geo_locations(
    country_name
    )')
    @db.execute_query('create index if not exists idx_geo_locations_999 on geo_locations(
    latitude,
    longitude
    )')
    end

    public

    def self.instance
    @@db = Database.new(File.join(File.dirname(__FILE__), DB_FILENAME)) unless @@db
    @@db
    end

    # check if logs with given aws_account/region/elb_ip/log_time already exist
    #
    # @param aws_account [String] aws account
    # @param region [String] region
    # @param elb_ip [String] ELB's ip address
    # @param log_time [Date] log time
    #
    # @return [true,false]
    def log_exists?(aws_account, region, elb_ip, log_time)
    @db.execute_query('select count(id) from fetched where aws_account = ? and region = ? and elb_ip = ? and log_time = ?',
    [aws_account, region, elb_ip, log_time.strftime('%Y-%m-%d %H:%M:%S')]
    )[0][0] > 0
    end

    # save log entry
    #
    # @param aws_account [String] aws account
    # @param region [String] region
    # @param elb_ip [String] ELB's ip address
    # @param log_time [Date] log time
    # @param log_entry [AWS::ELB::LogEntry] log entry
    def save_log(aws_account, region, elb_ip, log_time, log_entry)
    # insert to the table
    @db.execute_query(
    'insert into access_logs(
    aws_account, region, elb_ip, log_time,
    elb, timestamp, client_ip, client_port, backend_ip, backend_port, process_time, received_bytes, sent_bytes,
    request_method, request_url, request_protocol, response_status
    ) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)',
    [
    aws_account,
    region,
    elb_ip,
    log_time.strftime('%Y-%m-%d %H:%M:%S'),
    log_entry.elb,
    log_entry.timestamp.strftime('%Y-%m-%d %H:%M:%S.%N'),
    log_entry.client_ip,
    log_entry.client_port,
    log_entry.backend_ip,
    log_entry.backend_port,
    log_entry.process_time,
    log_entry.received_bytes,
    log_entry.sent_bytes,
    log_entry.request_method,
    log_entry.request_url,
    log_entry.request_protocol,
    log_entry.response_status
    ]
    )
    end

    # mark given log as fetched
    #
    # @param aws_account [String] aws account
    # @param region [String] region
    # @param elb_ip [String] ip address
    # @param log_time [Time] log time
    def mark_log_fetched(aws_account, region, elb_ip, log_time)
    # mark this log as fetched
    @db.execute_query(
    'insert into fetched(aws_account, region, elb_ip, log_time) values(?, ?, ?, ?)',
    [aws_account, region, elb_ip, log_time.strftime('%Y-%m-%d %H:%M:%S')]
    )
    end

    # list all ips from saved logs
    #
    # @param option [Hash] option for listing
    # @return [Array<String>] array of ips
    def client_ips(option = nil)
    @db.execute_query("select #{option[:unique] ? 'distinct' : ''} client_ip from access_logs order by timestamp").map{|x| x[0]}
    end

    # cache ip and its geo location information for future use
    #
    # @param ip [String] ip address
    # @return [Database::GeoLocation, nil] successfully cached or not
    def cache_geo(ip)
    unless cached = cached_geo(ip)
    info = Geocoder.search(ip).first
    @db.execute_query('insert or replace into geo_locations(ip, country_code, country_name, city, latitude, longitude) values(?, ?, ?, ?, ?, ?)',
    [info.ip, info.country_code, info.country, info.city, info.latitude, info.longitude]
    )
    return GeoLocation.new(info.ip, info.country_code, info.country, info.city, info.latitude, info.longitude)
    end
    nil
    rescue
    puts "* exception while caching ip: #{ip} - #{$!}"
    nil
    end

    # get cached geo location information
    #
    # @param ip [String] ip address
    # @return [Database::GeoLocation, nil] nil if not cached
    def cached_geo(ip)
    geo = @db.execute_query('select * from geo_locations where ip = ?',
    [ip]
    )
    if geo.count > 0
    GeoLocation.new(geo[0], geo[1], geo[2], geo[3], geo[4], geo[5])
    else
    nil
    end
    end

    # get ELB names
    #
    # @return [Array<String>] ELB names
    def elbs
    @db.execute_query('select distinct elb from access_logs order by elb').map{|x| x[0]}
    end

    # get number of ips per country for given ELB name
    #
    # @param elb [String] ELB name
    def num_ips_per_country(elb)
    @db.execute_query('select country_name as country, count(ip) as num_ips
    from geo_locations
    where ip in (select distinct client_ip from access_logs where elb = ?)
    group by country_name
    order by num_ips desc',
    [elb]
    ).map{|x| {country: x[0], num_ips: x[1]}}
    end

    def info
    # from ~ to
    from = @db.execute_query('select timestamp from access_logs order by timestamp limit 1').first[0]
    to = @db.execute_query('select timestamp from access_logs order by timestamp desc limit 1').first[0]

    # number of access logs
    num_access_logs = {}
    elbs.each{|elb|
    num_access_logs[elb] = @db.execute_query('select count(id) from access_logs where elb = ?', [elb]).first[0]
    }
    num_access_logs[:all] = @db.execute_query('select count(id) from access_logs').first[0]

    # number of ips
    num_ips = {}
    elbs.each{|elb|
    num_ips[elb] = @db.execute_query('select count(ip) from geo_locations
    where ip in (select distinct client_ip from access_logs where elb = ?)',
    [elb]
    ).first[0]
    }
    num_ips[:all] = @db.execute_query('select count(distinct ip) from geo_locations').first[0]

    # return
    {
    from: from,
    to: to,
    num_access_logs: num_access_logs,
    num_ips: num_ips,
    }
    end

    class GeoLocation
    attr_accessor :ip, :country_code, :country_name, :city, :latitude, :longitude

    def initialize(ip, country_code, country_name, city, latitude, longitude)
    @ip = ip
    @country_code = country_code
    @country_name = country_name
    @city = city
    @latitude = latitude
    @longitude = longitude
    end

    def to_s
    "[%s] %s(%s), %s (%.4f, %.4f)" %[@ip, @country_name, @country_code, @city, @latitude, @longitude]
    end
    end
    end
    end
    end

    class Exec < Thor
    default_task :fetch_logs

    desc "fetch_logs", "fetch all ELB access log files from S3"
    method_option :cache_geo, type: :boolean, aliases: '-g', desc: 'also cache geo locations for logged ips'
    method_option :verbose, type: :boolean, aliases: '-v', desc: 'show verbose messages'
    def fetch_logs
    puts "> fetching logs..."

    # configure
    if MyAws::S3.config(S3_CONFIGS)
    num_logs = 0

    # fetch objects from the bucket
    MyAws::S3.bucket(S3_CONFIGS[:bucket]).objects.each{|o|
    # get the database instance
    db = AWS::ELB::AccessLog::Database.instance

    # parse only actual log files
    if o.key =~ /elasticloadbalancing.*?\.log$/
    parsed = AWS::ELB::AccessLog::Helper.parse_key(o.key)
    aws_account = parsed[:aws_account]
    elb = parsed[:elb]
    region = parsed[:region]
    elb_ip = parsed[:elb_ip]
    log_time = parsed[:datetime]

    # check if this log is already fetched/saved
    if db.log_exists?(aws_account, region, elb_ip, log_time)
    puts "* skipping alread-fetched log: #{aws_account}/#{region}/#{elb}(#{elb_ip})/#{log_time.strftime('%Y-%m-%d %H:%M:%S')}" if options.verbose?
    next
    else
    num_logs += 1
    end

    # read all log lines
    puts "> processing logs for: #{aws_account}/#{region}/#{elb}(#{elb_ip})/#{log_time.strftime('%Y-%m-%d %H:%M:%S')}" if options.verbose?
    o.read.lines.map{|x| x.strip}.each{|l|
    entry = AWS::ELB::AccessLog.new(l)

    db.save_log(aws_account, region, elb_ip, log_time, entry)
    }

    # mark log as fetched
    db.mark_log_fetched(aws_account, region, elb_ip, log_time)
    end
    }

    puts ">>> fetched #{num_logs} new log file(s)"

    # also cache geo locations if option is provided
    cache_geo if options.cache_geo?
    else
    puts "* S3 configuration failed"
    end
    end

    desc "cache_geo", "cache geo location info for client ips in saved logs"
    method_option :verbose, type: :boolean, aliases: '-v', desc: 'show verbose messages'
    def cache_geo
    puts "> caching geo locations for logged ips..."

    db = AWS::ELB::AccessLog::Database.instance

    num_ips = 0
    db.client_ips({unique: true}).each{|ip|
    if cached = db.cache_geo(ip)
    puts "> cached geo ip: #{cached}" if options.verbose?

    num_ips += 1
    end
    }

    puts ">>> cached #{num_ips} new geo location(s)"
    end

    desc "count_ips", "list number of unique ips per country"
    def count_ips
    puts "> counting ips per country..."

    db = AWS::ELB::AccessLog::Database.instance

    db.elbs.each{|elb|
    puts ">>> ELB: #{elb}"

    db.num_ips_per_country(elb).each{|counts|
    puts "#{counts[:country]}: #{counts[:num_ips]}"
    }

    puts
    }
    end

    desc "info", "show info on database file"
    def info
    puts "> printing info of database..."

    db = AWS::ELB::AccessLog::Database.instance

    puts ">>> database: #{db.filepath}"

    info = db.info

    puts ">>> #{info[:from]} ~ #{info[:to]}"

    puts ">>> number of accesses"
    info[:num_access_logs].each{|k, v|
    puts " #{k}: #{v}"
    }

    puts ">>> number of ips"
    info[:num_ips].each{|k, v|
    puts " #{k}: #{v}"
    }
    end
    end

    trap('SIGINT') { puts; exit 1 }
    Exec.start(ARGV)