Skip to content

Instantly share code, notes, and snippets.

@josacar
Created June 19, 2019 12:50
Show Gist options
  • Save josacar/8dc92d746182627bc165517f349d276c to your computer and use it in GitHub Desktop.
Save josacar/8dc92d746182627bc165517f349d276c to your computer and use it in GitHub Desktop.
Cloudflare logs to Sumologic
# frozen_string_literal: true
require 'net/http'
require 'json'
require 'uri'
module Environment
SUMOURL = ENV.fetch('SUMO_ENDPOINT')
ZONEID = ENV.fetch('CLOUDFLARE_ZONE_ID')
CLOUDFLAREAUTHEMAIL = ENV.fetch('CLOUDFLARE_AUTH_EMAIL')
CLOUDFLAREAUTHKEY = ENV.fetch('CLOUDFLARE_AUTH_KEY')
LOGFIELDS = ENV.fetch('CLOUDFLARE_LOG_FIELDS',
'CacheCacheStatus,CacheResponseBytes,CacheResponseStatus,CacheTieredFill,ZoneID' \
'ClientASN,ClientCountry,ClientDeviceType,ClientIP,ClientIPClass,ClientRequestBytes,ClientRequestHost,' \
'ClientRequestMethod,ClientRequestProtocol,ClientRequestReferer,ClientRequestURI,ClientRequestUserAgent,' \
'ClientSSLCipher,ClientSSLProtocol,ClientSrcPort,EdgeColoID,EdgeEndTimestamp,EdgePathingOp,EdgePathingSrc,' \
'EdgePathingStatus,EdgeRateLimitAction,EdgeRateLimitID,EdgeRequestHost,EdgeResponseBytes,EdgeResponseCompressionRatio,' \
'EdgeResponseContentType,EdgeResponseStatus,EdgeServerIP,EdgeStartTimestamp,OriginIP,OriginResponseBytes,OriginResponseHTTPExpires,' \
'OriginResponseHTTPLastModified,OriginResponseStatus,OriginResponseTime,OriginSSLProtocol,ParentRayID,RayID,SecurityLevel,' \
'WAFAction,WAFFlags,WAFMatchedVar,WAFProfile,WAFRuleID,WAFRuleMessage,WorkerCPUTime,WorkerStatus,WorkerSubrequest,WorkerSubrequestCount'
)
SOURCECATEGORYOVERRIDE = ENV.fetch('SOURCE_CATEGORY_OVERRIDE', 'none')
SOURCEHOSTOVERRIDE = ENV.fetch('SOURCE_HOST_OVERRIDE', 'api.cloudflare.com')
SOURCENAMEOVERRIDE = ENV.fetch('SOURCE_NAME_OVERRIDE', ZONEID)
end
def variable_set?(var)
!var.nil? && var != '' && var != 'none'
end
def sumo_meta_key
source_category = Environment::SOURCECATEGORYOVERRIDE if variable_set?(Environment::SOURCECATEGORYOVERRIDE)
source_host = Environment::SOURCEHOSTOVERRIDE if variable_set?(Environment::SOURCEHOSTOVERRIDE)
source_name = Environment::SOURCENAMEOVERRIDE if variable_set?(Environment::SOURCENAMEOVERRIDE)
"#{source_name}:#{source_category}:#{source_host}"
end
def post_to_sumologic(context:, messages:)
messages_total = messages.keys.length
messages_sent = 0
message_errors = []
url_object = URI(Environment::SUMOURL)
finalize_context = lambda do
total = messages_sent + message_errors.length
if total == messages_total
puts "messages_sent: #{messages_sent} message_errors: #{message_errors.length}"
raise "errors: #{message_errors}" unless message_errors.empty?
end
end
messages.each do |key, logs|
header_array = key.split(':')
headers = {
'X-Sumo-Name': header_array[0],
'X-Sumo-Category': header_array[1],
'X-Sumo-Host': header_array[2]
}
Net::HTTP.start(url_object.host, url_object.port, use_ssl: true) do |http|
request = Net::HTTP::Post.new(url_object)
headers.each { |header, value| request.public_send(:[]=, header, value) }
request.body = logs.map do |msg|
JSON.dump(msg)
end.join("\n")
response = http.request(request)
if response.code == '200'
messages_sent += 1
else
message_errors.push('HTTP Return code ' + res.statusCode)
end
rescue StandardError => e
message_errors.push(e.message)
ensure
finalize_context.call
end
end
end
def sumo_endpoint_invalid?(url_object)
url_object.scheme != 'https' || url_object.host.nil? || url_object.path.nil?
end
def format_cloudflare_to_sumologic(str)
message_list = {}
logs = str.split("\n")
puts "Log events: #{logs.length}"
logs.each do |log|
next if log.empty?
parsed_log = JSON.parse(log)
# Sumo Logic only supports 13 digit epoch time convert original timestamp to a JSON Date object
parsed_log['EdgeStartTimestamp'] = parsed_log.fetch('EdgeStartTimestamp') / 1_000_000 if parsed_log['EdgeStartTimestamp']
parsed_log['EdgeEndTimestamp'] = parsed_log.fetch('EdgeEndTimestamp') / 1_000_000 if parsed_log['EdgeEndTimestamp']
metadata_key = sumo_meta_key
if message_list.key?(metadata_key)
message_list[metadata_key].push(parsed_log)
else
message_list[metadata_key] = [parsed_log]
end
end
message_list
end
def handler(event:, context:)
half_hour = 30 * 60
one_minute = 1 * 60
if sumo_endpoint_invalid?(URI(Environment::SUMOURL))
raise "Invalid SUMO_ENDPOINT environment variable: #{Environment::SUMOURL}"
end
end_time = Time.now.utc - half_hour
end_time = Time.utc(end_time.year, end_time.month, end_time.day, end_time.hour, end_time.min, 0)
start_time = end_time - one_minute
puts "start_time: #{start_time}"
puts "end_time: #{end_time}"
headers = {
'X-Auth-Email' => Environment::CLOUDFLAREAUTHEMAIL,
'X-Auth-Key' => Environment::CLOUDFLAREAUTHKEY
}
url_object = URI("https://api.cloudflare.com/client/v4/zones/#{Environment::ZONEID}/logs/received")
params = {
start: start_time.to_i,
end: end_time.to_i,
fields: Environment::LOGFIELDS
}
url_object.query = URI.encode_www_form(params)
Net::HTTP.start(url_object.host, url_object.port, use_ssl: true) do |http|
request = Net::HTTP::Get.new(url_object)
headers.each { |header, value| request.send(:[]=, header, value) }
response = http.request(request)
puts "response.code: #{response.code}"
message_list = format_cloudflare_to_sumologic(response.body)
post_to_sumologic(context: context, messages: message_list) if response.code == '200'
return 0
rescue StandardError => e
raise "Failed to push to Sumologic: #{e.message}"
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment