In the InfluxDB 2.0 Cloud, you can write tasks to perform common actions when something special occurs.
These examples are written against data produced by the telegraf
system plugin.
If a sensor gets stuck, it might start sending the same value over and over. In this case you might want to send an alert to notify someone.
import "experimental"
import "http"
import "influxdata/influxdb/v1"
import "json"
// This script is intended to be a task that runs at regular intervals.
// It runs against the data produced by the "system" plugin in Telegraf.
//
// At a high level, this is what it does:
// Gets the most recent 5m of CPU data from the telegraf bucket
// Determines if the "usage_idle" field has stayed the same for the past 5m
// - if so, a CRIT status is created. Otherwise, status is OK.
// This most recent status is written to the "statuses" measurement
//
// Then we fetch the status from the previous run of the script and compare this
// with the current status. If we detect a transition from either ok->crit or crit->ok,
// then we generate a notification, and write a point to the notifications measurement.
// how far back to look to find the most recent status
timeRange = 5m
// where to write a record of this check/notification event
monitoringBucket = "chrisw_monitoring"
// name to use in the monitoringBucket
statusMeas = "statuses"
// Name of the check being performed by this script
statusCheckName = "CPU Fluctuation"
// naem to use in the monitoringBucket for notifications
notificationMeas = "notifications"
// code for telling how to form a payload and send a notification message to an outside service.
destinationURL = "URL GOES HERE"
destinationHeaders = {"Content-Type": "application/json"}
notifyOkToCrit = (r) => {
// This code can be customized to send whatever payload to the endpoint that is needed:
// alertMsg = "CRIT: fluctuation is ${string(v: r.fluctuation)}"
// return http.post(url: destinationURL, headers: destinationHeaders, data: json.encode(v: {r with message: alertMsg})))
return 200
}
notifyCritToOk = (r) => {
// return http.post(url: destinationURL, headers: destinationHeaders, data: json.encode(v: {r with message: "OK"})))
return 200
}
rawData = from(bucket: "telegraf")
|> range(start: -timeRange)
|> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_idle" and r.cpu == "cpu0")
// CHECK LOGIC:
// Look at the last 5m of data, and if it hasn't changed at all,
// create a CRIT status
curStatus = rawData
|> difference() // subtract adjacent samples from one another
|> sum()
|> map(fn: (r) =>
({
_measurement: statusMeas,
check_name: statusCheckName,
fluctuation: r._value,
status: if r._value == 0 then "crit" else "ok",
_time: now(),
}))
|> group(columns: ["_measurement", "check_name"])
// SAVE NEW STATUS
curStatus
|> experimental.to(bucket: monitoringBucket)
// This logic in this function could be common to many different kinds of notifications
notifyOnStatusChange = (curStatus, timeRange, checkName) => {
// GET LAST STATUS
lastStatus = from(bucket: monitoringBucket)
|> range(start: -timeRange)
|> filter(fn: (r) =>
(r._measurement == statusMeas
and r.check_name == statusCheckName)
)
|> last()
|> v1.fieldsAsCols()
|> drop(columns: ["_start", "_stop"])
// Put most recent status and previous status into the same table
// and sort by time
twoStatuses = union(tables: [curStatus, lastStatus])
|> sort(columns: ["_time"])
twoStatuses |> yield(name: "two statuses")
// SEND A NOTIFICATION IF STATUS CHANGES
return twoStatuses
|> map(fn: (r) =>
({r with __state_num: if r.status == "ok" then 0 else 1}))
|> difference(columns: ["__state_num"])
|> filter(fn: (r) =>
(r.__state_num != 0))
|> drop(columns: ["__state_num"])
|> map(fn: (r) =>
({
_measurement: notificationMeas,
check_name: checkName,
notification_kind: if r.status == "ok" then "crit to ok" else "ok to crit",
http_response_code: notifyCritToOk(r),
_time: now(),
}))
|> group(columns: ["_measurement", "check_name", "notification_kind"])
|> experimental.to(bucket: monitoringBucket)
|> yield(name: "alert on status change")
}
notifyOnStatusChange(curStatus: curStatus, timeRange: timeRange, checkName: statusCheckName)
It's very common to want to send an alert when a metric goes above or below a given range.
// threshold function for determining the status of a record
threshold = (r) =>
(r._value < 5 or r._value > 10)
// Name of the check being performed by this script
statusCheckName = "Threshold"
// CHECK LOGIC:
// Look at the last 5m of data.
// For every row that crosses some threshold, create a CRIT status, otherwise an OK status.
curStatus = from(bucket: "telegraf")
|> range(start: -timeRange)
|> map(fn: (r) =>
({
_measurement: statusMeas,
check_name: statusCheckName,
fluctuation: r._value,
status: if threshold(r) then "crit" else "ok",
_time: now(),
}))
|> group(columns: ["_measurement", "check_name"])
|> experimental.to(bucket: monitoringBucket)
// Note: this uses the same value of notifyOnStatusChange as above.
notifyOnStatusChange(curStatus: curStatus, timeRange: timeRange, checkName: statusCheckName)
Sometimes you may want to look at more than one measurement within the same check.
Or, you may wish to only perform a certain check if the outcome of an intial check is ok
.
statusCheckName = "Compound check"
// A compound check:
//. If we are low on memory, then we expect a lot of processes to be blocked
//. waiting for swap.
//
// In that case, low memory will cause a large number of blocked processes.
//. So only go to crit on a lerge number of processes if percentage of used memory is high.
rawProcs = from(bucket: "telegraf")
|> range(start: -timeRange)
|> filter(fn: (r) => r._measurement == "mem" and r._field == "used_percent")
|> v1.fieldsAsCols()
|> aggregateWindow(fn: last, every: timeRange)
rawMem = from(bucket: "telegraf")
|> range(start: -timeRange)
|> filter(fn: (r) => r._measurement == "procs" and r._field == "blocked")
|> v1.fieldsAsCols()
|> aggregateWindow(fn: last, every: timeRange)
curStatus =
experimental.join(left: rawMem, right: rawProcs, fn: (left, right) => ({left with blocked: right.blocked}))
|> last()
|> map(fn: (r) => ({
_measurement: statusMeas,
check_name: statusCheckName,
mem_used_percent: r.used_percent,
blocked_processes: r.blocked,
status: if r.used_percent > 90
then "crit"
else if r.blocked > 10 then "crit" else "ok",
_time: now(),
}))
|> group(columns: ["_measurement", "check_name"])
// SAVE NEW STATUS
curStatus
|> experimental.to(bucket: monitoringBucket)
notifyOnStatusChange(curStatus: curStatus, timeRange: timeRange, checkName: statusCheckName)
If a system stops sending data to InfluxDB, this can indicate a problem. Here's a script that does a "deadman" check, which alerts when it stops receiving updates to particular series.
import "experimental"
import "http"
import "json"
// how far back to look to find the most recent status
timeRange = 10m
// how far back to look for dead series
deadRange = 5m
// where to write a record of this check/notification event
monitoring_bucket = "monitoring_bucket"
// name to use in the monitoring_bucket
status_meas = "statuses"
// name to use for the status column
status_field = "status"
// Name of the check being performed by this script
status_check_name = "Deadman"
// naem to use in the monitoring_bucket for notifications
notification_meas = "notifications"
// field to record for the response code
notification_field = "http_response_code"
// code for telling how to form a payload and send a notification message to an outside service.
destinationURL = "URL GOES HERE"
destinationHeaders = {"Content-Type": "application/json"}
notifyOkToCrit = (r) =>
(200)
notifyCritToOk = (r) =>
(200)
// CHECK LOGIC:
// Look at the last 10m and 5m of data.
// If there's a series that is missing from the last 5m create a CRIT status.
// If there's a new series that appears in the last 5m create an OK status.
from(bucket: "telegraf")
|> range(start: -timeRange)
|> first()
|> filter(fn: (r) =>
(r._time > experimental.subDuration(d: deadRange, from: now())))
|> map(fn: (r) =>
({
_measurement: status_meas,
check_name: status_check_name,
status: "ok",
_time: now(),
}))
|> group(columns: ["_measurement", "check_name"])
|> experimental.to(bucket: monitoring_bucket)
|> map(fn: (r) =>
({
_measurement: notification_meas,
notification_kind: "crit to ok",
http_response_code: notifyCritToOk(r),
_time: now(),
}))
|> group(columns: ["_measurement", "notification_kind"])
|> experimental.to(bucket: monitoring_bucket)
from(bucket: "telegraf")
|> range(start: -timeRange)
|> last()
|> filter(fn: (r) =>
(r._time < experimental.subDuration(d: deadRange, from: now())))
|> map(fn: (r) =>
({
_measurement: status_meas,
check_name: status_check_name,
status: "crit",
_time: now(),
}))
|> group(columns: ["_measurement", "check_name"])
|> experimental.to(bucket: monitoring_bucket)
|> map(fn: (r) =>
({
_measurement: notification_meas,
notification_kind: "ok to crit",
http_response_code: notifyOkToCrit(r),
_time: now(),
}))
|> group(columns: ["_measurement", "notification_kind"])
|> experimental.to(bucket: monitoring_bucket)