Skip to content

Instantly share code, notes, and snippets.

@charlescui
Created May 8, 2015 10:30

Revisions

  1. charlescui created this gist May 8, 2015.
    82 changes: 82 additions & 0 deletions fluentd-plugin-sls
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,82 @@
    require "aliyun_sls"

    module Fluent
    class SlsOutput < Output
    Plugin.register_output('sls', self)

    def initialize
    super
    @sls_con = AliyunSls::Connection.new(ENV["PROJECT"], ENV["REGION"], ENV["SECRET"], ENV["KEY"])
    @topic = `hostname`.strip
    @source = ENV["SOURCE"]
    end

    def emit(tag, es, chain)
    begin
    log_list = AliyunSls::Protobuf::LogGroup.new(:logs => [], :topic => @topic, :source => @source)
    es.each {|time,record|
    log = AliyunSls::Protobuf::Log.new(:time => Time.now.to_i, :contents => [])
    # 将record内容解析后,装载到log_list对象
    pack_log_item(log_list, log, record)
    }
    @sls_con.puts_logs(ENV["STORE"], log_list)
    rescue Exception => e
    puts e
    ensure
    chain.next
    end
    end

    private

    # 此插件适用于Nginx的access_log经过in_tail插件的输出
    # Nginx日志会被in_tail解析成Hash格式,如下
    # {"host"=>"10.41.255.156",
    # "user"=>"-",
    # "method"=>"GET",
    # "path"=>
    # "/analyse/analyse.gif?referrer=&j_nocache=1431069320258&entry=http%3A%2F%2Futc.hzdtv.tv%2Findex.jsp&stb_id=1304001091900024C140A5C6&uid=2489593&tvbuy_ad_id=553070ca77656225fe8f0000&tvbuy_area_code=00010001&tvbuy_channel_no=39&title=&type=page_view",
    # "code"=>"200",
    # "size"=>"1095",
    # "referer"=>"http://21.254.47.227:8087/tvbuy/item/acr_query",
    # "agent"=>
    # "Wasu/1.0(mwver 41617.0.0.0.10025;hwver 0371666f;swver 20571005;uiver 3.14;caver 5.2.2.2)"}
    def pack_log_item(log_list, log, record)
    # 避免所有日志都记录,先做一下过滤判断
    if /\/analyse\/analyse\.gif.*/ =~ record["path"]
    path = record.delete("path")
    referer = record.delete("referer")
    pack_url_log_item(log, path, 'path')
    pack_url_log_item(log, referer, 'referer')
    pack_hash_log_item(log, record)
    # 打包到log_list对象
    log_list.logs << log
    end
    end

    def pack_url_log_item(log, url, prefix='')
    if url
    # URL中如果是unicode编码,有中文
    # 则在URI.parse的时候会报错,需要使用下面的方法编码先
    url = url.strip.encode('UTF-8', 'GB2312', invalid: :replace, undef: :replace, replace: '')
    uri = URI.parse(url)
    uri.query and uri.query.split('&').each{|paire|
    k, v = paire.split('=')
    log_item = AliyunSls::Protobuf::Log::Content.new(:key => "#{prefix}_#{CGI.unescape(k)}", :value => CGI.unescape(v) || "")
    log.contents << log_item
    }
    log_item = AliyunSls::Protobuf::Log::Content.new(:key => "#{prefix}_@_path", :value => uri.path || "")
    log.contents << log_item
    end
    end

    def pack_hash_log_item(log, hash, prefix='')
    if hash
    hash.each { |k, v|
    log_item = AliyunSls::Protobuf::Log::Content.new(:key => "#{prefix}_#{k}", :value => v || "")
    log.contents << log_item
    }
    end
    end
    end
    end