Created
March 19, 2015 18:51
-
-
Save jasisk/c17583dd2dfd99200f95 to your computer and use it in GitHub Desktop.
Heka aggregation question
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[HttpInput] | |
url = "http://localhost:8000" | |
ticker_interval = 2 | |
success_severity = 6 | |
error_severity = 1 | |
decoder = "KappaStatusDecoder" | |
[KappaStatusDecoder] | |
type = "SandboxDecoder" | |
script_type = "lua" | |
filename = "lua_decoders/kappa_status.lua" | |
[KappaStatusDecoder.config] | |
type = "KappaStatus" | |
[KappaAggregatorFilter] | |
type = "SandboxFilter" | |
script_type = "lua" | |
filename = "lua_filters/kappa_aggregate.lua" | |
message_matcher = "Type == 'KappaStatus'" | |
ticker_interval = 5 | |
[KappaAggregatorFilter.config] | |
sec_per_row = 6 | |
rows = 40 | |
alert_throttle = 1 | |
anomaly_config = 'mww_nonparametric("4xx", 4, 2, 5, 0.55)' | |
[RstEncoder] | |
[LogOutput] | |
encoder = "RstEncoder" | |
message_matcher = "Type == 'heka.sandbox-output' && Fields[payload_type] == 'alert'" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
local annotation = require "annotation" | |
local anomaly = require "anomaly" | |
local alert = require "alert" | |
require "circular_buffer" | |
require "string" | |
local hosts = {} | |
local cbufs = {} | |
local titles = {} | |
local hosts_size = 0 | |
local rows = read_config("rows") or 288 -- 24 hours @ 5 mins | |
local sec_per_row = read_config("sec_per_row") or 300 -- 5 mins | |
local alert_throttle = read_config("alert_throttle") or 5 * 60 * 1e9 -- 5 mins | |
local anomaly_config = anomaly.parse_config(read_config("anomaly_config")) | |
alert.set_throttle(alert_throttle) | |
function init_cb() | |
local cb = circular_buffer.new(rows, 6, sec_per_row) | |
for i=1,5 do cb:set_header(i, i*100) end | |
cb:set_header(6, "Unknown") | |
return cb | |
end | |
function process_message () | |
local ts = read_message("Timestamp") | |
local hostname = read_message("Hostname") | |
local status = read_message("Fields[status]") | |
local count = read_message("Fields[count]") | |
local host = hosts[hostname] | |
if not host then | |
hosts_size = hosts_size + 1 | |
hosts[hostname] = {last_update = ts, index = hosts_size} | |
titles[hosts_size] = string.format("%s http status", hostname) | |
annotation.set_prune(titles[hosts_size], rows * sec_per_row * 1e9) | |
host = hosts[hostname] | |
end | |
local statuses = cbufs[host.index] | |
if not statuses then | |
cbufs[host.index] = init_cb() | |
statuses = cbufs[host.index] | |
end | |
local col = status/100 | |
if col >= 1 and col < 6 then | |
statuses:add(ts, col, count) -- col will be truncated to an int | |
else | |
statuses:add(ts, 6, count) | |
end | |
return 0 | |
end | |
function timer_event(ns) | |
for host, meta in pairs(hosts) do | |
local cbuf = cbufs[meta.index] | |
local title = titles[meta.index] | |
if anomaly_config then | |
if not alert.throttled(ns) then | |
local msg4xx, annos4xx = anomaly.detect(ns, "4xx", cbuf, anomaly_config) | |
if msg4xx then | |
annotation.concat(title, annos4xx) | |
local sum, rows = cbuf:compute("sum", 4, ns - (1 * 60 * 1e9)) | |
alert.queue(ns, string.format("*%s* has seen *%d* 4xx statuses in the last *1* minute", host, sum)) | |
end | |
end | |
inject_payload("cbuf", title, annotation.prune(title, ns), cbuf) | |
else | |
inject_payload("cbuf", title, cbuf) | |
end | |
end | |
alert.send_queue(ns) | |
end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
local alert = require("alert") | |
require "string" | |
require "cjson" | |
require "table" | |
local type = read_config("type") | |
local previously_error = false | |
function process_message() | |
local raw_message = read_message("Payload") | |
local original_type = read_message("Type") | |
if original_type == 'heka.httpinput.error' then | |
-- get the query out of the payload because it might contain sensitive | |
-- information like credentials | |
local logger = read_message("Logger") | |
logger = string.gsub(logger, "%p", "%%%1") | |
local new_payload = string.gsub(raw_message, logger, "[REDACTED]") | |
local m = { | |
Payload = new_payload, | |
Fields = { | |
previous_error = previously_error | |
} | |
} | |
inject_message(m) -- change the original payload ... | |
previously_error = true | |
return 0 | |
end | |
previously_error = false | |
local ok, json = pcall(cjson.decode, raw_message) | |
if not ok then | |
return -1 -- if we can't parse the body, that's a problem | |
end | |
if json[1] ~= nil then | |
local points = json[1]["points"] | |
for i, row in ipairs(points) do | |
local count, host, status = row[2], row[3], row[4] | |
local ok, json = pcall(cjson.encode, row) | |
if not ok then | |
json = table.concat(row, ':') | |
end | |
local msg = { | |
Type = type, | |
Payload = json, | |
Hostname = host, | |
Fields = { | |
count = count, | |
status = status | |
} | |
} | |
-- inject a new message into the pipeline | |
if not pcall(inject_message, msg) then return -1 end | |
end | |
end | |
return 0 | |
end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var http = require('http'); | |
var server = http.createServer(); | |
var count = 1; | |
server.on('request', function (req, res) { | |
var x = count % 43 ? 1 : count + 43; | |
res.writeHead(200, {'content-type': 'application/json'}); | |
res.end(JSON.stringify([{points: [[1, x, 'A', 404]]}])); | |
console.log('Req', count++, x); | |
}); | |
server.listen(8000, function () { | |
console.log('Listening on %d ...', server.address().port); | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment