Skip to content

Instantly share code, notes, and snippets.

@charlescui
Created May 8, 2015 10:30
Show Gist options
  • Save charlescui/d2a231dbc85b11586fa0 to your computer and use it in GitHub Desktop.
Save charlescui/d2a231dbc85b11586fa0 to your computer and use it in GitHub Desktop.
require "aliyun_sls"
module Fluent
class SlsOutput < Output
Plugin.register_output('sls', self)
def initialize
super
@sls_con = AliyunSls::Connection.new(ENV["PROJECT"], ENV["REGION"], ENV["SECRET"], ENV["KEY"])
@topic = `hostname`.strip
@source = ENV["SOURCE"]
end
def emit(tag, es, chain)
begin
log_list = AliyunSls::Protobuf::LogGroup.new(:logs => [], :topic => @topic, :source => @source)
es.each {|time,record|
log = AliyunSls::Protobuf::Log.new(:time => Time.now.to_i, :contents => [])
# 将record内容解析后,装载到log_list对象
pack_log_item(log_list, log, record)
}
@sls_con.puts_logs(ENV["STORE"], log_list)
rescue Exception => e
puts e
ensure
chain.next
end
end
private
# 此插件适用于Nginx的access_log经过in_tail插件的输出
# Nginx日志会被in_tail解析成Hash格式,如下
# {"host"=>"10.41.255.156",
# "user"=>"-",
# "method"=>"GET",
# "path"=>
# "/analyse/analyse.gif?referrer=&j_nocache=1431069320258&entry=http%3A%2F%2Futc.hzdtv.tv%2Findex.jsp&stb_id=1304001091900024C140A5C6&uid=2489593&tvbuy_ad_id=553070ca77656225fe8f0000&tvbuy_area_code=00010001&tvbuy_channel_no=39&title=&type=page_view",
# "code"=>"200",
# "size"=>"1095",
# "referer"=>"http://21.254.47.227:8087/tvbuy/item/acr_query",
# "agent"=>
# "Wasu/1.0(mwver 41617.0.0.0.10025;hwver 0371666f;swver 20571005;uiver 3.14;caver 5.2.2.2)"}
def pack_log_item(log_list, log, record)
# 避免所有日志都记录,先做一下过滤判断
if /\/analyse\/analyse\.gif.*/ =~ record["path"]
path = record.delete("path")
referer = record.delete("referer")
pack_url_log_item(log, path, 'path')
pack_url_log_item(log, referer, 'referer')
pack_hash_log_item(log, record)
# 打包到log_list对象
log_list.logs << log
end
end
def pack_url_log_item(log, url, prefix='')
if url
# URL中如果是unicode编码,有中文
# 则在URI.parse的时候会报错,需要使用下面的方法编码先
url = url.strip.encode('UTF-8', 'GB2312', invalid: :replace, undef: :replace, replace: '')
uri = URI.parse(url)
uri.query and uri.query.split('&').each{|paire|
k, v = paire.split('=')
log_item = AliyunSls::Protobuf::Log::Content.new(:key => "#{prefix}_#{CGI.unescape(k)}", :value => CGI.unescape(v) || "")
log.contents << log_item
}
log_item = AliyunSls::Protobuf::Log::Content.new(:key => "#{prefix}_@_path", :value => uri.path || "")
log.contents << log_item
end
end
def pack_hash_log_item(log, hash, prefix='')
if hash
hash.each { |k, v|
log_item = AliyunSls::Protobuf::Log::Content.new(:key => "#{prefix}_#{k}", :value => v || "")
log.contents << log_item
}
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment