- Introduction
- Core Architecture
- Plugin Types Overview
- Quick Start Guide
- Problem Daemon Plugins
- Exporter Plugins
- Configuration System
- The Status Data Model
- Complete Integration Example
- Best Practices
- Testing Guide
- Performance Tuning
- Plugin Registration & Lifecycle
- Implementing Custom Monitor Types
- Problem Metrics Manager
- Troubleshooting
The Node Problem Detector (NPD) is a Kubernetes daemon designed to detect various node problems and report them to the Kubernetes control plane. At its core, NPD implements a highly modular plugin architecture that separates problem detection from problem reporting, enabling extensibility and customization for different environments and use cases.
This guide provides a comprehensive overview of the plugin system for developers familiar with Kubernetes who want to understand, extend, or customize NPD's behavior.
NPD implements a two-layer plugin architecture with clear separation of concerns:
Monitor plugins detect problems and emit status information through Go channels. They implement the Monitor interface and run independently in their own goroutines.
Exporter plugins consume status information from monitors and export it to various backends (Kubernetes API, Prometheus, cloud monitoring services, etc.).
The foundation of the plugin system rests on two key interfaces defined in pkg/types/types.go:
// Monitor detects problems and reports status
type Monitor interface {
Start() (<-chan *Status, error) // Returns channel for problem reporting
Stop() // Clean shutdown
}
// Exporter exports detected problems to external systems
type Exporter interface {
ExportProblems(*Status) // Process and export problem status
}┌─────────────┐ ┌──────────────────┐ ┌─────────────┐
│ Monitor │───▶│ Problem Detector │───▶│ Exporter │
│ Plugins │ │ (Orchestrator) │ │ Plugins │
└─────────────┘ └──────────────────┘ └─────────────┘
The Status struct serves as the central data model exchanged between monitors and exporters:
type Status struct {
Source string // Problem daemon name (e.g., "kernel-monitor")
Events []Event // Temporary problems (generate Kubernetes Events)
Conditions []Condition // Permanent node conditions (update Node status)
}| Plugin Type | Purpose | Configuration File | Key Features |
|---|---|---|---|
| System Log Monitor | Parse system logs using regex patterns | kernel-monitor.json |
Multi-source log reading, pattern matching |
| Custom Plugin Monitor | Execute external scripts/binaries | custom-plugin-monitor.json |
Script execution, exit code interpretation |
| System Stats Monitor | Collect system metrics | system-stats-monitor.json |
CPU, memory, disk, network metrics (metrics-only) |
Note: HealthChecker and LogCounter are standalone helper binaries that work WITH the Custom Plugin Monitor, not separate plugin types.
| Exporter Type | Purpose | Output Format |
|---|---|---|
| Kubernetes Exporter | Report to K8s API server | Events & Node Conditions |
| Prometheus Exporter | Expose metrics endpoint | Prometheus metrics format |
| Stackdriver Exporter | Export to Google Cloud Monitoring | Cloud monitoring metrics |
This section provides a practical introduction for developers who want to get started with NPD plugins immediately.
- Monitors detect problems and send
Statusobjects through channels - Problem Detector orchestrates monitors and forwards status to exporters
- Exporters process status and send to external systems (Kubernetes API, Prometheus, etc.)
Let's create a simple custom plugin that checks if a service is running:
Step 1: Create the script (my-service-check.sh):
#!/bin/bash
readonly OK=0
readonly NONOK=1
readonly UNKNOWN=2
# Check if my-service is running
if systemctl -q is-active my-service; then
echo "my-service is running"
exit $OK
else
echo "my-service is not running"
exit $NONOK
fiStep 2: Create configuration (my-custom-monitor.json):
{
"plugin": "custom",
"source": "my-service-monitor",
"conditions": [{
"type": "MyServiceHealthy",
"reason": "MyServiceIsHealthy",
"message": "my-service is functioning properly"
}],
"rules": [{
"type": "permanent",
"condition": "MyServiceHealthy",
"reason": "MyServiceUnhealthy",
"path": "/path/to/my-service-check.sh",
"timeout": "10s"
}]
}Step 3: Run NPD with your plugin:
./node-problem-detector --config.custom-plugin-monitor=my-custom-monitor.jsonStep 4: Check results:
# View node conditions (may take up to invoke_interval to appear)
kubectl describe node <node-name>
# View events
kubectl get events --field-selector involvedObject.name=<node-name>- Exit codes matter: 0=healthy, 1=problem detected, 2+=error
- Status vs Events vs Conditions:
- Events: Temporary problems (like crashes)
- Conditions: Persistent node states (like "OutOfDisk")
- Status: Contains both events and conditions
- Configuration drives behavior: Rules define what to monitor and how to react
The System Log Monitor is NPD's most sophisticated plugin, designed to parse system logs and detect problems using configurable regex patterns.
┌─────────────────┐ ┌─────────────────┐ ┌──────────────────┐
│ Log Watcher │───▶│ Log Monitor │───▶│ Problem Detector │
│ (pluggable) │ │ (core logic) │ │ (orchestrator) │
└─────────────────┘ └─────────────────┘ └──────────────────┘
The System Log Monitor implements a two-level plugin architecture: the monitor itself is a plugin, and it has its own sub-plugin system for different log sources. This demonstrates how monitors can be internally extensible.
- kmsg: Reads kernel messages from
/dev/kmsg(always available) - filelog: Reads from log files (e.g.,
/var/log/syslog) (always available) - journald: Reads from systemd journal (requires
journaldbuild tag)
Log watchers have their own registration system separate from problem daemons:
// pkg/systemlogmonitor/logwatchers/log_watchers.go
var createFuncs = map[string]types.WatcherCreateFunc{}
func registerLogWatcher(name string, create types.WatcherCreateFunc) {
createFuncs[name] = create
}Each log watcher registers in separate files with specific build tags:
// pkg/systemlogmonitor/logwatchers/register_kmsg.go
//go:build !disable_system_log_monitor
// +build !disable_system_log_monitor
package logwatchers
import (
"k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/kmsg"
)
func init() {
registerLogWatcher("kmsg", kmsg.NewKmsgWatcher)
}// pkg/systemlogmonitor/logwatchers/register_journald.go
//go:build journald && !disable_system_log_monitor
// +build journald,!disable_system_log_monitor
package logwatchers
import (
"k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/journald"
)
func init() {
registerLogWatcher("journald", journald.NewJournaldWatcher)
}Key Insights:
- Two-Level Architecture: Problem Daemon Plugins → Log Watcher Sub-Plugins
- Conditional Compilation: journald requires special build tag AND systemd libraries
- Extensible Pattern: This same pattern could be used for other monitors that need pluggable backends
The kernel monitor configuration (config/kernel-monitor.json) demonstrates the power and flexibility of pattern-based problem detection:
{
"plugin": "kmsg",
"logPath": "/dev/kmsg",
"lookback": "5m",
"source": "kernel-monitor",
"conditions": [
{
"type": "KernelDeadlock",
"reason": "KernelHasNoDeadlock",
"message": "kernel has no deadlock"
}
],
"rules": [
{
"type": "temporary",
"reason": "OOMKilling",
"pattern": "Killed process \\d+ (.+) total-vm:\\d+kB.*"
},
{
"type": "permanent",
"condition": "KernelDeadlock",
"reason": "DockerHung",
"pattern": "task docker:\\w+ blocked for more than \\w+ seconds\\."
}
]
}Key features:
- Pattern Matching: Uses regex patterns to identify problems in log output
- Problem Types: Supports both temporary events and permanent conditions
- Buffer Management: Configurable buffer for multi-line pattern matching
- Lookback: Can process historical log entries on startup
Temporary Events from Log Patterns:
func (l *logMonitor) generateStatus(logs []*Log, rule Rule) *types.Status {
message := generateMessage(logs, rule.PatternGeneratedMessageSuffix)
if rule.Type == types.Temp {
// Temporary rule: generate event only
return &types.Status{
Source: l.config.Source,
Events: []types.Event{
{
Severity: types.Warn,
Timestamp: logs[0].Timestamp,
Reason: rule.Reason, // e.g., "TaskHung"
Message: message, // Extracted from log
},
},
Conditions: l.conditions, // Unchanged
}
} else {
// Permanent rule: update condition + generate event
updateConditionAndGenerateEvent(rule, message, timestamp)
}
}Example Temporary Events:
OOMKilling: Process killed due to out of memoryTaskHung: Process blocked for extended timeKernelOops: Kernel crash or error
Example Permanent Conditions:
KernelDeadlock: System deadlock detectedFilesystemCorruption: Filesystem errors found
The Custom Plugin Monitor enables integration of external scripts and binaries as problem detection plugins, following the Unix philosophy of small, composable tools.
Custom plugin monitors communicate with external helper binaries using a simple, Unix-philosophy protocol focused on exit codes and text output.
CLI Arguments Only: Helpers receive inputs exclusively through command-line arguments:
{
"rules": [{
"path": "/usr/local/bin/my-checker",
"args": ["--timeout=5s", "--component=kubelet", "--enable-repair=true"]
}]
}No Other Input Channels:
- ❌ No stdin: NPD doesn't send data via stdin
- ❌ No custom environment variables: NPD doesn't set special env vars
- ✅ System environment: Helpers inherit standard environment (PATH, HOME, etc.)
Exit Codes (Primary Communication):
const (
OK Status = 0 // Healthy/working correctly
NonOK Status = 1 // Problem detected
Unknown Status = 2 // Error/timeout/unable to determine
)Exit Code Mapping:
- Exit 0 (OK): Helper executed successfully, no problems → Condition Status =
False(Healthy) - Exit 1 (NonOK): Helper detected a problem → Condition Status =
True(Problem) - Exit 2+ (Unknown): Plugin error/timeout → Condition Status =
Unknown(Error)
Stdout (Status Messages):
- Purpose: Human-readable status message for node conditions/events
- Max capture: 4KB total buffer per execution
- Max usage: 80 bytes by default (configurable via
max_output_length) - Processing: Trimmed of whitespace, truncated if exceeds limit
Stderr (Debug Only):
- Purpose: Debug logging only - NOT used in conditions
- Visibility: Only logged at debug verbosity levels
- Usage: Troubleshooting helper execution
Simple Health Check Script:
#!/bin/bash
readonly OK=0
readonly NONOK=1
readonly UNKNOWN=2
# Check if NTP service is running
if systemctl -q is-active ntp.service; then
echo "ntp.service is running"
exit $OK
else
echo "ntp.service is not running"
exit $NONOK
fiGo-based Health Checker (cmd/healthchecker/health_checker.go):
func main() {
// Parse CLI arguments
hco := options.NewHealthCheckerOptions()
hco.AddFlags(pflag.CommandLine)
pflag.Parse()
// Perform health check
hc, err := healthchecker.NewHealthChecker(hco)
healthy, err := hc.CheckHealth()
// Output result and exit with appropriate code
if err != nil {
fmt.Printf("error checking %v health: %v\n", hco.Component, err)
os.Exit(int(types.Unknown))
}
if !healthy {
fmt.Printf("%v:%v was found unhealthy\n", hco.Component, hco.Service)
os.Exit(int(types.NonOK))
}
fmt.Printf("%v:%v is healthy\n", hco.Component, hco.Service)
os.Exit(int(types.OK))
}Configuration for Health Checker:
{
"rules": [{
"type": "permanent",
"condition": "KubeletUnhealthy",
"reason": "KubeletUnhealthy",
"path": "/home/kubernetes/bin/health-checker",
"args": [
"--component=kubelet",
"--enable-repair=true",
"--cooldown-time=1m"
],
"timeout": "3m"
}]
}Helper Execution (pkg/custompluginmonitor/plugin/plugin.go):
func (p *Plugin) run(rule cpmtypes.CustomRule) (exitStatus cpmtypes.Status, output string) {
// 1. Set up timeout context
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
// 2. Create command with arguments
cmd := util.Exec(rule.Path, rule.Args...)
// 3. Capture stdout/stderr
stdoutPipe, _ := cmd.StdoutPipe()
stderrPipe, _ := cmd.StderrPipe()
// 4. Start and wait for completion
cmd.Start()
cmd.Wait()
// 5. Parse output and exit code
output = strings.TrimSpace(string(stdout))
if len(output) > maxOutputLength {
output = output[:maxOutputLength] // Truncate long messages
}
exitCode := cmd.ProcessState.Sys().(syscall.WaitStatus).ExitStatus()
switch exitCode {
case 0: return cpmtypes.OK, output
case 1: return cpmtypes.NonOK, output
default: return cpmtypes.Unknown, output
}
}Timing and Concurrency:
- Invoke Interval:
pluginConfig.invoke_interval(default: 30s) - Timeout: Per-rule timeout or global
pluginConfig.timeout(default: 5s) - Concurrency: Max concurrent executions via
pluginConfig.concurrency(default: 3) - Timeout Handling: Process group killed on timeout
Complete Data Flow:
Helper Binary
├─ Receives: CLI args from rule.args
├─ Outputs: Exit code + stdout message + stderr (debug)
└─ Timeout: Per-rule or global
↓
Plugin.run()
├─ Captures output (max 4KB buffer)
├─ Maps exit code to Status enum
├─ Trims and truncates message (max 80 bytes used)
└─ Returns Result{ExitStatus, Message}
↓
CustomPluginMonitor.generateStatus()
├─ Converts Status to ConditionStatus
├─ Updates node condition if changed
├─ Generates Kubernetes event if needed
└─ Returns Status for exporters
All custom plugins and helper binaries use standardized exit codes defined in pkg/custompluginmonitor/types/types.go:
type Status int
const (
// OK means everything is fine.
OK Status = 0
// NonOK means error or unhealthy.
NonOK Status = 1
// Unknown means plugin returns unknown error.
Unknown Status = 2
)These codes are used by:
- Custom Plugin Monitor (executes and interprets exit codes)
- HealthChecker binary (
cmd/healthchecker/health_checker.go) - LogCounter binary (
cmd/logcounter/log_counter.go)
Any external binary that follows this convention can be used as a custom plugin helper.
#!/bin/bash
# Exit code constants (MUST match NPD protocol)
readonly OK=0
readonly NONOK=1
readonly UNKNOWN=2
# Best Practice: Make your check idempotent and safe to run repeatedly
# NPD will invoke this script at every invoke_interval
# Function to output message and exit
die() {
echo "$1"
exit "$2"
}
# Parse arguments (optional)
while [[ $# -gt 0 ]]; do
case $1 in
--verbose) VERBOSE=true; shift ;;
--timeout) TIMEOUT=$2; shift 2 ;;
*) die "Unknown option: $1" $UNKNOWN ;;
esac
done
# Perform your health check logic here
if check_system_health; then
echo "System is healthy"
exit $OK
else
echo "Health check failed: $(get_failure_reason)"
exit $NONOK
fiCritical Requirements:
- Always explicitly exit with 0, 1, or 2+
- Keep stdout messages concise (under 80 bytes recommended)
- Use CLI arguments for input, not stdin or custom environment variables
- Handle timeouts gracefully
- Return meaningful status messages for debugging
NPD includes several standalone helper binaries that work WITH the custom plugin monitor:
Health Checker: Monitors the health of critical Kubernetes components (kubelet, container runtime, kube-proxy) with auto-repair capabilities.
LogCounter: Counts occurrences of log patterns and reports whether thresholds are exceeded.
Both follow the standard exit code convention and are executed by the custom plugin monitor.
{
"plugin": "custom",
"pluginConfig": {
"invoke_interval": "30s",
"timeout": "5s",
"max_output_length": 80,
"concurrency": 3
},
"rules": [
{
"type": "permanent",
"condition": "NTPProblem",
"reason": "NTPIsDown",
"path": "./config/plugin/check_ntp.sh",
"timeout": "3s"
}
]
}Key features:
- Concurrent Execution: Configurable concurrency for plugin execution
- Timeout Management: Global and per-rule timeout configuration
- Output Management: Configurable output length limits
- Process Lifecycle: Graceful termination with escalation to SIGKILL
Many teams have existing monitoring scripts that run via cron, systemd timers, or standalone processes. This guide helps you migrate these scripts to work seamlessly with NPD's Custom Plugin Monitor.
Ideal Candidates:
- ✅ Health checks: Scripts that test component availability
- ✅ Resource monitors: Scripts checking disk space, memory, network
- ✅ Service validation: Scripts testing service responsiveness
- ✅ Configuration audits: Scripts validating system configuration
Requires Modification:
⚠️ Complex workflows: Multi-step processes with intermediate state⚠️ Interactive scripts: Scripts requiring user input⚠️ Long-running processes: Continuous monitoring (consider native monitor instead)
Not Suitable:
- ❌ Deployment scripts: One-time setup operations
- ❌ Data processing pipelines: ETL or batch processing jobs
- ❌ High-frequency polling: Sub-second monitoring intervals
Start by understanding your existing script's behavior:
# Example: Existing cron-based disk monitor
#!/bin/bash
# /usr/local/bin/old-disk-monitor.sh
# Runs every 5 minutes via cron
THRESHOLD=90
USAGE=$(df / | awk 'NR==2 {print $5}' | sed 's/%//')
if [ "$USAGE" -gt "$THRESHOLD" ]; then
echo "CRITICAL: Disk usage at ${USAGE}% (threshold: ${THRESHOLD}%)" >&2
logger "Disk space critical on $(hostname)"
exit 1
else
echo "OK: Disk usage at ${USAGE}%"
exit 0
fiAnalysis:
- ✅ Clear exit codes: 0 for success, 1 for problem
- ✅ Descriptive output: Human-readable status messages
- ✅ Configurable threshold: Parameterizable via environment or args
⚠️ Hardcoded values: Threshold and mount point need parameterization
Modify the script to follow NPD's helper binary protocol:
#!/bin/bash
# /usr/local/bin/npd-disk-monitor.sh
# NPD-compatible disk space monitor
# Parse command-line arguments (NPD helper protocol requirement)
THRESHOLD=90
MOUNT_POINT="/"
while [[ $# -gt 0 ]]; do
case $1 in
--threshold=*)
THRESHOLD="${1#*=}"
shift
;;
--mount=*)
MOUNT_POINT="${1#*=}"
shift
;;
--help)
echo "Usage: $0 [--threshold=N] [--mount=PATH]"
echo "Monitors disk usage and reports problems to NPD"
exit 0
;;
*)
echo "Unknown option: $1" >&2
exit 2 # NPD Unknown status
;;
esac
done
# Validate inputs
if ! [[ "$THRESHOLD" =~ ^[0-9]+$ ]] || [ "$THRESHOLD" -lt 1 ] || [ "$THRESHOLD" -gt 100 ]; then
echo "Error: threshold must be 1-100" >&2
exit 2 # NPD Unknown status
fi
if [ ! -d "$MOUNT_POINT" ]; then
echo "Error: mount point '$MOUNT_POINT' not found" >&2
exit 2 # NPD Unknown status
fi
# Perform the actual check
USAGE=$(df "$MOUNT_POINT" | awk 'NR==2 {print $5}' | sed 's/%//')
if ! [[ "$USAGE" =~ ^[0-9]+$ ]]; then
echo "Error: unable to determine disk usage for $MOUNT_POINT" >&2
exit 2 # NPD Unknown status
fi
# Best Practice: Make your check idempotent and safe to run repeatedly
# NPD will invoke this script at every invoke_interval
# Return status based on NPD protocol
if [ "$USAGE" -gt "$THRESHOLD" ]; then
echo "Disk usage at ${USAGE}% exceeds threshold ${THRESHOLD}% for $MOUNT_POINT"
exit 1 # NPD NonOK status (problem detected)
else
echo "Disk usage at ${USAGE}% within threshold ${THRESHOLD}% for $MOUNT_POINT"
exit 0 # NPD OK status (healthy)
fiKey Changes Made:
- Argument parsing: Uses
--key=valueformat NPD expects - Error handling: Uses exit code 2 for NPD Unknown status
- Input validation: Validates all parameters before proceeding
- Idempotency: Safe to run repeatedly at configured intervals
- Clear output: Status messages for both success and failure cases
Create the JSON configuration for your converted script:
{
"plugin": "custom",
"pluginConfig": {
"invoke_interval": "300s",
"timeout": "30s",
"max_output_length": 200,
"concurrency": 1,
"enable_message_change_based_condition_update": false
},
"source": "disk-monitor",
"conditions": [
{
"type": "DiskPressure",
"reason": "DiskSpaceRunningLow",
"message": "Disk usage exceeds configured threshold"
}
],
"rules": [
{
"type": "permanent",
"condition": "DiskPressure",
"reason": "DiskSpaceRunningLow",
"path": "/usr/local/bin/npd-disk-monitor.sh",
"args": ["--threshold=85", "--mount=/"]
},
{
"type": "permanent",
"condition": "DiskPressure",
"reason": "DiskSpaceRunningLow",
"path": "/usr/local/bin/npd-disk-monitor.sh",
"args": ["--threshold=90", "--mount=/var"]
}
]
}Configuration Notes:
- invoke_interval: Matches your original cron frequency (5 minutes = 300s)
- timeout: Reasonable timeout for script execution
- Multiple rules: Monitor different mount points with different thresholds
- Permanent condition: Disk pressure persists until resolved
Before deploying, thoroughly test your converted script:
# Test script directly with NPD-style arguments
/usr/local/bin/npd-disk-monitor.sh --threshold=85 --mount=/
echo "Exit code: $?"
# Test error conditions
/usr/local/bin/npd-disk-monitor.sh --threshold=invalid
echo "Exit code: $?" # Should be 2 (Unknown)
# Test NPD integration (if running locally)
node-problem-detector --config=/etc/node-problem-detector/config/disk-monitor.json --logtostderr --v=3Validation Checklist:
- ✅ Script handles all expected argument combinations
- ✅ Exit codes follow NPD protocol (0, 1, 2)
- ✅ Output messages are informative and concise
- ✅ Script executes within configured timeout
- ✅ No hanging processes or resource leaks
Once validated, replace your original monitoring setup:
# Disable old cron job
crontab -e
# Comment out: */5 * * * * /usr/local/bin/old-disk-monitor.sh
# Or disable systemd timer
systemctl disable old-disk-monitor.timer
systemctl stop old-disk-monitor.timer
# Deploy NPD configuration
kubectl apply -f - <<EOF
apiVersion: v1
kind: ConfigMap
metadata:
name: disk-monitor-config
namespace: kube-system
data:
disk-monitor.json: |
$(cat /path/to/your/disk-monitor.json)
EOF
# Update NPD DaemonSet to include new config
kubectl patch daemonset node-problem-detector -n kube-system --patch='...'Before (standalone health check):
#!/bin/bash
curl -f http://localhost:8080/health || exit 1After (NPD-compatible):
#!/bin/bash
SERVICE_URL="http://localhost:8080/health"
TIMEOUT=5
while [[ $# -gt 0 ]]; do
case $1 in
--url=*) SERVICE_URL="${1#*=}"; shift ;;
--timeout=*) TIMEOUT="${1#*=}"; shift ;;
*) echo "Unknown option: $1" >&2; exit 2 ;;
esac
done
if curl -f --max-time "$TIMEOUT" "$SERVICE_URL" >/dev/null 2>&1; then
echo "Service at $SERVICE_URL is healthy"
exit 0
else
echo "Service at $SERVICE_URL is unhealthy or unreachable"
exit 1
fiBefore (memory monitor):
#!/bin/bash
USED=$(free | awk 'NR==2{print int($3/$2*100)}')
[ "$USED" -gt 80 ] && exit 1 || exit 0After (NPD-compatible):
#!/bin/bash
THRESHOLD=80
while [[ $# -gt 0 ]]; do
case $1 in
--threshold=*) THRESHOLD="${1#*=}"; shift ;;
*) echo "Unknown option: $1" >&2; exit 2 ;;
esac
done
USED=$(free | awk 'NR==2{print int($3/$2*100)}')
if [ -z "$USED" ] || ! [[ "$USED" =~ ^[0-9]+$ ]]; then
echo "Error: unable to determine memory usage"
exit 2
fi
if [ "$USED" -gt "$THRESHOLD" ]; then
echo "Memory usage at ${USED}% exceeds threshold ${THRESHOLD}%"
exit 1
else
echo "Memory usage at ${USED}% within threshold ${THRESHOLD}%"
exit 0
fiBefore (config checker):
#!/bin/bash
nginx -t && exit 0 || exit 1After (NPD-compatible):
#!/bin/bash
CONFIG_FILE="/etc/nginx/nginx.conf"
while [[ $# -gt 0 ]]; do
case $1 in
--config=*) CONFIG_FILE="${1#*=}"; shift ;;
*) echo "Unknown option: $1" >&2; exit 2 ;;
esac
done
if [ ! -f "$CONFIG_FILE" ]; then
echo "Error: configuration file '$CONFIG_FILE' not found"
exit 2
fi
if nginx -t -c "$CONFIG_FILE" >/dev/null 2>&1; then
echo "Nginx configuration is valid"
exit 0
else
echo "Nginx configuration validation failed"
exit 1
fi- Start Small: Migrate one script at a time to validate the process
- Preserve Semantics: Maintain the same detection logic and thresholds
- Add Observability: Include relevant context in output messages
- Handle Edge Cases: Robust error handling for NPD Unknown status
- Test Thoroughly: Validate all exit code paths and argument combinations
- Document Changes: Update runbooks and documentation
- Monitor Metrics: Ensure migrated checks appear in Problem Metrics Manager
Issue: Script works standalone but fails in NPD
- Check: File permissions, SELinux contexts, AppArmor profiles
- Solution: Ensure NPD user can execute script and access dependencies
Issue: Conditions not appearing in kubectl
- Check: Configuration syntax, condition/reason name consistency
- Solution: Validate JSON config with
jqor similar tool
Issue: Script execution timeouts
- Check: Realistic timeout values, network dependencies
- Solution: Optimize script performance or increase timeout
Issue: Different behavior in NPD vs standalone
- Check: Environment variables, working directory, PATH
- Solution: Make script environment-independent or set explicit paths
This migration approach ensures your existing monitoring logic is preserved while gaining the benefits of NPD's centralized problem detection, Kubernetes integration, and metrics export capabilities.
The System Stats Monitor focuses purely on metrics collection without problem detection, following the principle that metrics and alerting should be separate concerns.
The monitor implements a collector pattern with specialized collectors for different metric categories:
// Core interface for all collectors
type Collector interface {
Collect() (map[string]interface{}, error)
}| Collector | Metrics | Source |
|---|---|---|
| CPU Collector | Load averages, CPU usage, process counts | /proc/loadavg, /proc/stat |
| Disk Collector | I/O statistics, usage | /proc/diskstats, filesystem info |
| Memory Collector | Memory usage, swap | /proc/meminfo |
| Network Collector | Interface statistics | /proc/net/dev |
| Host Collector | System uptime | /proc/uptime |
{
"cpu": {
"metricsConfigs": {
"cpu/load_1m": {"displayName": "cpu/load_1m"},
"cpu/usage_time": {"displayName": "cpu/usage_time"}
}
},
"disk": {
"includeAllAttachedBlk": true,
"metricsConfigs": {
"disk/io_read_bytes": {"displayName": "disk/io_read_bytes"}
}
},
"invokeInterval": "60s"
}func (ssm *systemStatsMonitor) Start() (<-chan *types.Status, error) {
go ssm.monitorLoop() // Collects CPU, memory, disk metrics
return nil, nil // No status channel - metrics only!
}Important: The System Stats Monitor returns a nil status channel since it only collects metrics and doesn't detect problems. Metrics are exposed through the Prometheus exporter and Problem Metrics Manager.
Exporter plugins consume status information from monitors and export it to various backends. NPD includes three main exporter types with different integration patterns.
The Kubernetes Exporter is the primary mechanism for integrating NPD with Kubernetes, translating internal problem status into Kubernetes API objects.
- Event Creation: Temporary problems become Kubernetes Events
- Node Condition Management: Permanent problems update Node conditions
- Condition Heartbeat: Maintains condition freshness
- Health Endpoint: Provides
/healthzfor liveness probes
The exporter maintains a condition manager that handles the lifecycle of node conditions:
// pkg/exporters/k8sexporter/condition/manager.go
type ConditionManager interface {
// Start starts the condition manager.
Start(ctx context.Context)
// UpdateCondition updates a specific condition.
UpdateCondition(types.Condition)
// GetConditions returns all current conditions.
GetConditions() []types.Condition
}The condition manager uses RWMutex for thread-safe concurrent access and implements sophisticated synchronization logic to prevent flooding the API server while ensuring timely updates.
func (ke *k8sExporter) ExportProblems(status *types.Status) {
// Convert events to Kubernetes Events
for _, event := range status.Events {
ke.client.Eventf(
util.ConvertToAPIEventType(event.Severity), // Normal/Warning
status.Source, // Event source
event.Reason, // Event reason
event.Message, // Event message
)
}
// Update node conditions (batched for efficiency)
for _, condition := range status.Conditions {
ke.conditionManager.UpdateCondition(condition)
}
}// Batches condition updates to prevent API server flooding
func (c *conditionManager) UpdateCondition(condition types.Condition) {
c.Lock()
defer c.Unlock()
// Only keep newest condition per type (deduplication)
c.updates[condition.Type] = condition
}
// Syncs every 1 second with collected updates
func (c *conditionManager) sync(ctx context.Context) {
conditions := []v1.NodeCondition{}
for _, condition := range c.conditions {
apiCondition := problemutil.ConvertToAPICondition(condition)
conditions = append(conditions, apiCondition)
}
// Single API call to update all conditions
c.client.SetConditions(ctx, conditions)
}Integrates with OpenCensus/OpenTelemetry for metrics export, providing a /metrics endpoint in Prometheus format.
- Metrics Integration: Automatically exposes metrics from System Stats Monitor
- Problem Metrics: Converts problem counters and gauges to Prometheus format
- Standard Format: Uses standard Prometheus exposition format
- Auto-discovery: Metrics from Problem Metrics Manager are automatically available
The Prometheus exporter is enabled by default and requires no additional configuration. Metrics are exposed at:
- Default endpoint:
http://localhost:20257/metrics - Configurable port: Set via command-line flags
Note: The Prometheus exporter processes metrics from the System Stats Monitor and Problem Metrics Manager. Problem events and conditions are not converted to Prometheus metrics - they go to the Kubernetes exporter.
Exports metrics to Google Cloud Monitoring with automatic GCE metadata integration.
- Cloud Integration: Automatic GCE instance metadata detection
- Resource Mapping: Maps NPD metrics to Cloud Monitoring resource types
- Authentication: Uses default service account credentials
- Conditional Compilation: Can be disabled at compile time
The Stackdriver exporter uses the registry pattern and can be configured via command-line options:
--exporter.stackdriver.project-id=my-project
--exporter.stackdriver.cluster-name=my-cluster
--exporter.stackdriver.zone=us-central1-aThis exporter is optional and can be disabled with build tags.
NPD uses three different patterns for exporter initialization, reflecting their importance and dependencies:
Kubernetes and Prometheus exporters are considered essential and are initialized directly in cmd/nodeproblemdetector/node_problem_detector.go:
// Direct initialization - always attempted
defaultExporters := []types.Exporter{}
if ke := k8sexporter.NewExporterOrDie(ctx, npdo); ke != nil {
defaultExporters = append(defaultExporters, ke)
}
if pe := prometheusexporter.NewExporterOrDie(npdo); pe != nil {
defaultExporters = append(defaultExporters, pe)
}Optional exporters like Stackdriver use the registry pattern similar to problem daemons:
// pkg/exporters/stackdriver/stackdriver_exporter.go
func init() {
clo := commandLineOptions{}
exporters.Register(exporterName, types.ExporterHandler{
CreateExporterOrDie: NewExporterOrDie,
Options: &clo,
})
}Pluggable exporters are initialized from the registry:
// Initialize from registry
plugableExporters := exporters.NewExporters()The final exporter list combines both patterns:
allExporters := append(defaultExporters, plugableExporters...)Why This Architecture?
- K8s & Prometheus: Core functionality, always enabled
- Stackdriver: Optional, cloud-specific, can be disabled with build tags
- Future Exporters: Can use either pattern based on importance
The configuration system in NPD provides a consistent, flexible way to configure different plugin types while allowing plugin-specific customization.
All monitor plugins follow a consistent configuration schema:
{
"plugin": "plugin_type", // Plugin type identifier
"source": "monitor_name", // Source name for status reports
"metricsReporting": true, // Enable metrics reporting
"conditions": [...], // Default node conditions
"rules": [...], // Problem detection rules
"pluginConfig": {...} // Plugin-specific configuration
}Each plugin type has a specific identifier used in the plugin field:
| Plugin Type | Identifier | Purpose |
|---|---|---|
| System Log Monitor | "kmsg", "filelog", "journald" |
Log source type |
| Custom Plugin Monitor | "custom" |
External script execution |
| System Stats Monitor | "system-stats" |
Metrics collection |
The source field identifies the monitor in status reports and logs:
- Must be unique across all monitors
- Used as the source field in
types.Status - Appears in Kubernetes events and node conditions
- Examples:
"kernel-monitor","custom-plugin-monitor","system-stats-monitor"
The metricsReporting field controls integration with the Problem Metrics Manager:
{
"metricsReporting": true // Enable problem counter/gauge reporting
}When enabled:
- Problem events increment counters
- Condition changes update gauges
- Metrics are exposed via Prometheus exporter
NPD supports two fundamental rule types across all monitor plugins:
Generate Kubernetes Events for transient problems:
{
"type": "temporary",
"reason": "OOMKilling",
"pattern": "Killed process \\d+ (.+).*"
}Characteristics:
- Create Kubernetes Events only
- Don't update node conditions
- Used for alerts and notifications
- Examples: Process crashes, resource spikes, kernel warnings
Update Node Conditions for persistent issues:
{
"type": "permanent",
"condition": "KernelDeadlock",
"reason": "DockerHung",
"pattern": "task docker:\\w+ blocked.*"
}Characteristics:
- Update node conditions AND generate events
- Reflect persistent node state
- Used for scheduling decisions
- Examples: Service failures, persistent resource issues, hardware problems
{
"plugin": "kmsg",
"logPath": "/dev/kmsg",
"lookback": "5m",
"bufferSize": 10,
"source": "kernel-monitor",
"conditions": [
{
"type": "KernelDeadlock",
"reason": "KernelHasNoDeadlock",
"message": "kernel has no deadlock"
}
],
"rules": [
{
"type": "temporary",
"reason": "OOMKilling",
"pattern": "Killed process \\d+ (.+) total-vm:\\d+kB.*",
"patternGeneratedMessageSuffix": " - Check memory usage"
}
]
}Key Fields:
logPath: Path to log sourcelookback: How far back to read on startupbufferSize: Lines to buffer for multi-line patternspatternGeneratedMessageSuffix: Appended to matched patterns
{
"plugin": "custom",
"source": "health-monitor",
"metricsReporting": true,
"pluginConfig": {
"invoke_interval": "30s",
"timeout": "5s",
"max_output_length": 80,
"concurrency": 3,
"enable_message_change_based_condition_update": false
},
"conditions": [
{
"type": "KubeletHealthy",
"reason": "KubeletIsHealthy",
"message": "kubelet is functioning properly"
}
],
"rules": [
{
"type": "permanent",
"condition": "KubeletHealthy",
"reason": "KubeletUnhealthy",
"path": "/home/kubernetes/bin/health-checker",
"args": ["--component=kubelet", "--enable-repair=true"],
"timeout": "3m"
}
]
}Key Fields:
invoke_interval: How often to run checkstimeout: Global timeout for helper executionmax_output_length: Truncate helper outputconcurrency: Max parallel helper executionspath: Path to helper binary/scriptargs: Command-line arguments for helper
{
"invokeInterval": "60s",
"cpu": {
"metricsConfigs": {
"cpu/load_1m": {
"displayName": "cpu/load_1m"
},
"cpu/usage_time": {
"displayName": "cpu/usage_time"
}
}
},
"disk": {
"includeAllAttachedBlk": true,
"includeRootBlk": true,
"lsblkTimeout": "5s",
"metricsConfigs": {
"disk/io_read_bytes": {
"displayName": "disk/io_read_bytes"
},
"disk/io_write_bytes": {
"displayName": "disk/io_write_bytes"
}
}
},
"memory": {
"metricsConfigs": {
"memory/anonymous_bytes": {
"displayName": "memory/anonymous_bytes"
},
"memory/available_bytes": {
"displayName": "memory/available_bytes"
}
}
},
"network": {
"interfaceIncludeRegexp": "^(en|eth|wlan)\\\\d+",
"interfaceExcludeRegexp": "^(docker|br-|veth)",
"metricsConfigs": {
"network/rx_bytes": {
"displayName": "network/rx_bytes"
},
"network/tx_bytes": {
"displayName": "network/tx_bytes"
}
}
}
}Key Fields:
invokeInterval: Metrics collection frequencyincludeAllAttachedBlk: Include all block devicesinterfaceIncludeRegexp: Network interfaces to monitormetricsConfigs: Which metrics to collect and expose
Each plugin implements configuration validation in its factory function, ensuring invalid configurations are caught at startup rather than runtime.
// Validate required fields
if config.Source == "" {
return nil, fmt.Errorf("source field is required")
}
// Validate rule patterns
for _, rule := range config.Rules {
if _, err := regexp.Compile(rule.Pattern); err != nil {
return nil, fmt.Errorf("invalid pattern %q: %v", rule.Pattern, err)
}
}
// Validate timeouts
if config.PluginConfig.Timeout <= 0 {
return nil, fmt.Errorf("timeout must be positive")
}Each plugin validates its specific configuration requirements:
- System Log Monitor: Validates regex patterns, log paths, buffer sizes
- Custom Plugin Monitor: Validates script paths, timeouts, concurrency limits
- System Stats Monitor: Validates metric configurations, intervals, regex patterns
- Use Meaningful Source Names: Source names appear in events and logs
- Test Regex Patterns: Use tools like
grepto test patterns before deployment - Set Appropriate Timeouts: Balance responsiveness with reliability
- Enable Metrics Reporting: Provides valuable observability data
- Provide Default Conditions: Establish baseline health states
- Document Custom Scripts: Include clear documentation for helper binaries
The types.Status struct is the unified interface between problem detection (monitors) and problem reporting (exporters). Understanding its structure and usage patterns is crucial for working with NPD plugins.
// pkg/types/types.go
type Status struct {
// Source identifies which monitor generated this status
Source string `json:"source"`
// Events are temporary problems (sorted oldest to newest)
Events []Event `json:"events"`
// Conditions are permanent node states (ALL conditions for this monitor)
Conditions []Condition `json:"conditions"`
}
type Event struct {
Severity Severity `json:"severity"` // Info or Warn
Timestamp time.Time `json:"timestamp"` // When event occurred
Reason string `json:"reason"` // Short identifier
Message string `json:"message"` // Human-readable description
}
type Condition struct {
Type string `json:"type"` // e.g., "KernelDeadlock"
Status ConditionStatus `json:"status"` // True/False/Unknown
Transition time.Time `json:"transition"` // When status last changed
Reason string `json:"reason"` // Short cause description
Message string `json:"message"` // Detailed explanation
}- Immutable Events: Events represent point-in-time occurrences
- Stateful Conditions: Conditions represent current node state
- Complete Condition Sets: Status always contains ALL conditions for a monitor
- Chronological Events: Events are sorted from oldest to newest
Every monitor sends an initial status establishing default conditions:
func (l *logMonitor) initializeStatus() {
// Initialize all conditions to False (healthy state)
l.conditions = initialConditions(l.config.DefaultConditions)
l.output <- &types.Status{
Source: l.config.Source, // e.g., "kernel-monitor"
Events: nil, // No events on startup
Conditions: l.conditions, // All default conditions
}
}For transient issues that don't affect persistent node state:
// System Log Monitor detecting OOM kill from kernel logs
status := &types.Status{
Source: "kernel-monitor",
Events: []types.Event{
{
Severity: types.Warn,
Timestamp: time.Now(),
Reason: "OOMKilling",
Message: "Killed process 1234 (chrome) total-vm:2048MB",
},
},
Conditions: l.conditions, // Unchanged from previous state
}For persistent issues that affect node health status:
// Custom Plugin Monitor detecting component failure
// 1. Update the affected condition
for i := range c.conditions {
condition := &c.conditions[i]
if condition.Type == "KubeletHealthy" {
condition.Status = types.True // Problem detected
condition.Transition = time.Now()
condition.Reason = "KubeletUnhealthy"
condition.Message = "Kubelet health check failed: timeout"
break
}
}
// 2. Generate event for the condition change
changeEvent := util.GenerateConditionChangeEvent(
"KubeletHealthy", types.True,
"KubeletUnhealthy", "Kubelet health check failed: timeout",
time.Now(),
)
// 3. Send status with both event and all current conditions
status := &types.Status{
Source: "custom-plugin-monitor",
Events: []types.Event{changeEvent},
Conditions: c.conditions, // ALL conditions, not just the changed one
}// types.Condition → v1.NodeCondition
func ConvertToAPICondition(condition types.Condition) v1.NodeCondition {
return v1.NodeCondition{
Type: v1.NodeConditionType(condition.Type),
Status: ConvertToAPIConditionStatus(condition.Status),
LastTransitionTime: ConvertToAPITimestamp(condition.Transition),
Reason: condition.Reason,
Message: condition.Message,
}
}
// types.ConditionStatus → v1.ConditionStatus
func ConvertToAPIConditionStatus(status types.ConditionStatus) v1.ConditionStatus {
switch status {
case types.True: return v1.ConditionTrue // Problem present
case types.False: return v1.ConditionFalse // Healthy state
case types.Unknown: return v1.ConditionUnknown // Cannot determine
}
}
// types.Event severity → Kubernetes event type
func ConvertToAPIEventType(severity types.Severity) string {
switch severity {
case types.Info: return v1.EventTypeNormal // Informational
case types.Warn: return v1.EventTypeWarning // Problem detected
}
}func (p *problemDetector) Run(ctx context.Context) error {
// Collect status channels from all monitors
var chans []<-chan *types.Status
for _, monitor := range p.monitors {
if ch, err := monitor.Start(); ch != nil {
chans = append(chans, ch) // nil channels filtered out
}
}
// Multiplex all status channels
statusCh := groupChannel(chans)
// Fan out to all exporters
for {
select {
case status := <-statusCh:
for _, exporter := range p.exporters {
exporter.ExportProblems(status) // Parallel processing
}
case <-ctx.Done():
return nil
}
}
}-
Always send ALL conditions: Don't send only changed conditions
// ✅ Correct status := &types.Status{ Source: source, Conditions: allConditions, // All conditions for this monitor } // ❌ Incorrect status := &types.Status{ Source: source, Conditions: []types.Condition{changedCondition}, // Only changed one }
-
Sort events chronologically: Oldest to newest
sort.Slice(events, func(i, j int) bool { return events[i].Timestamp.Before(events[j].Timestamp) })
-
Use appropriate severities:
types.Info: Normal operations, condition resolvedtypes.Warn: Problems detected, requires attention
-
Update transition time only when needed:
if condition.Status != newStatus || condition.Reason != newReason { condition.Transition = time.Now() // Only update on actual change }
-
Handle nil/empty arrays gracefully:
for _, event := range status.Events { // Safe even if Events is nil processEvent(event) }
-
Process conditions efficiently:
// Batch condition updates rather than individual API calls for _, condition := range status.Conditions { conditionManager.UpdateCondition(condition) // Batched internally }
-
Convert types appropriately: Use utility functions for type conversions
// Every monitor should send initial status
func (m *monitor) Start() (<-chan *types.Status, error) {
// 1. Initialize conditions to healthy state
m.conditions = initializeConditions(m.config.DefaultConditions)
// 2. Send initial status
m.output <- &types.Status{
Source: m.config.Source,
Conditions: m.conditions,
}
// 3. Start monitoring loop
go m.monitorLoop()
return m.output, nil
}// Only generate events when conditions actually change
func needsUpdate(old, new types.Condition) bool {
return old.Status != new.Status ||
old.Reason != new.Reason ||
(enableMessageChanges && old.Message != new.Message)
}// Report to Problem Metrics Manager for Prometheus/Stackdriver
if metricsEnabled {
for _, event := range status.Events {
problemmetrics.GlobalProblemMetricsManager.IncrementProblemCounter(
event.Reason, 1)
}
for _, condition := range status.Conditions {
problemmetrics.GlobalProblemMetricsManager.SetProblemGauge(
condition.Type, condition.Reason, condition.Status == types.True)
}
}This types.Status design enables NPD's flexible architecture where different monitors can report different types of problems through a unified interface, while multiple exporters can process the same information in different ways (Kubernetes API, metrics, external systems, etc.).
This section walks through creating a complete custom monitor from start to finish, demonstrating all aspects of NPD plugin development.
Let's create a monitor that checks network connectivity to critical services and reports failures as node conditions.
Goal: Monitor network connectivity to essential services Problem Types:
- Temporary: Connection timeouts or transient failures
- Permanent: Persistent connectivity issues that affect node scheduling
Services to Monitor:
- DNS servers
- Kubernetes API server
- Container registry
File: /usr/local/bin/network-connectivity-check.sh
#!/bin/bash
# Exit code constants (MUST match NPD protocol)
readonly OK=0
readonly NONOK=1
readonly UNKNOWN=2
# Configuration
DEFAULT_TIMEOUT=5
DEFAULT_DNS="8.8.8.8"
DEFAULT_REGISTRY="gcr.io"
# Parse arguments
TIMEOUT=$DEFAULT_TIMEOUT
DNS_SERVER=$DEFAULT_DNS
REGISTRY=$DEFAULT_REGISTRY
VERBOSE=false
while [[ $# -gt 0 ]]; do
case $1 in
--timeout) TIMEOUT="$2"; shift 2 ;;
--dns) DNS_SERVER="$2"; shift 2 ;;
--registry) REGISTRY="$2"; shift 2 ;;
--verbose) VERBOSE=true; shift ;;
--help)
echo "Usage: $0 [--timeout SECONDS] [--dns DNS_IP] [--registry REGISTRY] [--verbose]"
exit $OK ;;
*)
echo "Unknown option: $1"
exit $UNKNOWN ;;
esac
done
# Function to log verbose messages
log() {
if [[ "$VERBOSE" == "true" ]]; then
echo "[DEBUG] $*" >&2
fi
}
# Function to test connectivity
test_connectivity() {
local service="$1"
local test_command="$2"
log "Testing connectivity to $service..."
if timeout "$TIMEOUT" bash -c "$test_command" >/dev/null 2>&1; then
log "$service connectivity: OK"
return 0
else
log "$service connectivity: FAILED"
return 1
fi
}
# Track failures
failures=()
# Test DNS connectivity
if ! test_connectivity "DNS ($DNS_SERVER)" "nslookup google.com $DNS_SERVER"; then
failures+=("DNS")
fi
# Test Kubernetes API server (if we can find it)
if [[ -n "$KUBERNETES_SERVICE_HOST" ]]; then
if ! test_connectivity "Kubernetes API" "curl -k -s --connect-timeout $TIMEOUT https://$KUBERNETES_SERVICE_HOST:${KUBERNETES_SERVICE_PORT:-443}/healthz"; then
failures+=("K8s API")
fi
fi
# Test container registry connectivity
if ! test_connectivity "Registry ($REGISTRY)" "curl -s --connect-timeout $TIMEOUT https://$REGISTRY/v2/"; then
failures+=("Registry")
fi
# Generate output and exit
if [[ ${#failures[@]} -eq 0 ]]; then
echo "All network services reachable"
exit $OK
else
# Join failures with commas
failed_services=$(IFS=','; echo "${failures[*]}")
echo "Network connectivity issues: $failed_services"
exit $NONOK
fiMake the script executable:
chmod +x /usr/local/bin/network-connectivity-check.shFile: /etc/node-problem-detector/config/network-monitor.json
{
"plugin": "custom",
"source": "network-connectivity-monitor",
"metricsReporting": true,
"pluginConfig": {
"invoke_interval": "60s",
"timeout": "30s",
"max_output_length": 200,
"concurrency": 1,
"enable_message_change_based_condition_update": false
},
"conditions": [
{
"type": "NetworkConnectivity",
"reason": "NetworkConnectivityHealthy",
"message": "All critical network services are reachable"
}
],
"rules": [
{
"type": "permanent",
"condition": "NetworkConnectivity",
"reason": "NetworkConnectivityUnhealthy",
"path": "/usr/local/bin/network-connectivity-check.sh",
"args": [
"--timeout=10",
"--dns=8.8.8.8",
"--registry=gcr.io",
"--verbose"
],
"timeout": "25s"
}
]
}Test the helper script independently first:
# Test successful case
/usr/local/bin/network-connectivity-check.sh --verbose
echo "Exit code: $?"
# Test with unreachable DNS
/usr/local/bin/network-connectivity-check.sh --dns=192.0.2.1 --timeout=2 --verbose
echo "Exit code: $?"
# Test help
/usr/local/bin/network-connectivity-check.sh --helpValidate the JSON configuration:
# Check JSON syntax
cat /etc/node-problem-detector/config/network-monitor.json | jq .
# Test NPD with dry-run (if available)
node-problem-detector --config.custom-plugin-monitor=/etc/node-problem-detector/config/network-monitor.json --dry-runDeploy the monitor:
# Run NPD with the network monitor
node-problem-detector \
--config.custom-plugin-monitor=/etc/node-problem-detector/config/network-monitor.json \
--v=2Check that the monitor is working:
# 1. Check NPD logs for successful startup
kubectl logs <npd-pod> | grep "network-connectivity-monitor"
# 2. Check node conditions
kubectl describe node <node-name> | grep -A 5 "NetworkConnectivity"
# 3. Check events
kubectl get events --field-selector involvedObject.name=<node-name> | grep NetworkConnectivity
# 4. Check metrics (if Prometheus is configured)
curl http://<node>:20257/metrics | grep problemCreate a more sophisticated configuration with different checks:
File: /etc/node-problem-detector/config/network-monitor-advanced.json
{
"plugin": "custom",
"source": "network-connectivity-monitor",
"metricsReporting": true,
"pluginConfig": {
"invoke_interval": "30s",
"timeout": "25s",
"max_output_length": 200,
"concurrency": 2
},
"conditions": [
{
"type": "DNSConnectivity",
"reason": "DNSConnectivityHealthy",
"message": "DNS resolution is working"
},
{
"type": "RegistryConnectivity",
"reason": "RegistryConnectivityHealthy",
"message": "Container registry is reachable"
}
],
"rules": [
{
"type": "permanent",
"condition": "DNSConnectivity",
"reason": "DNSConnectivityUnhealthy",
"path": "/usr/local/bin/dns-check.sh",
"args": ["--timeout=5"],
"timeout": "10s"
},
{
"type": "permanent",
"condition": "RegistryConnectivity",
"reason": "RegistryConnectivityUnhealthy",
"path": "/usr/local/bin/registry-check.sh",
"args": ["--registry=gcr.io", "--timeout=10"],
"timeout": "15s"
},
{
"type": "temporary",
"reason": "NetworkTimeout",
"path": "/usr/local/bin/network-latency-check.sh",
"args": ["--threshold=1000"],
"timeout": "20s"
}
]
}After successful deployment, you should see:
-
Node Conditions: New conditions appearing in
kubectl describe nodeNetworkConnectivity False NetworkConnectivityHealthy All critical network services are reachable -
Events: Condition change events when connectivity issues occur
Warning NetworkConnectivityUnhealthy Network connectivity issues: DNS,Registry Normal NetworkConnectivityHealthy All critical network services are reachable -
Metrics: Problem metrics available via Prometheus
problem_gauge{type="NetworkConnectivity",reason="NetworkConnectivityHealthy"} 0 problem_counter_total{reason="NetworkConnectivityUnhealthy"} 3 -
Logs: Monitor execution logs in NPD output
I0101 12:00:00.000000 1 custom_plugin_monitor.go:123] network-connectivity-monitor: All network services reachable
The helper script handles common edge cases:
- Command not found: Returns
UNKNOWNstatus - Timeout scenarios: Uses
timeoutcommand for reliability - Argument validation: Provides help and validates inputs
- Verbose logging: Enables debugging via stderr
- Multiple failure reporting: Reports all failed services
This complete example demonstrates:
- Problem Detection: Automated network connectivity monitoring
- Kubernetes Integration: Node conditions affect pod scheduling
- Observability: Events and metrics provide visibility
- Flexibility: Configurable timeouts, services, and thresholds
- Reliability: Proper error handling and timeout management
This pattern can be extended for any custom monitoring requirement, from hardware health to application-specific checks.
- Use the Tomb Pattern: Implement graceful shutdown using the tomb library (
pkg/util/tomb) for goroutine lifecycle management. NPD's tomb provides a simple pattern: calltomb.Stop()to signal shutdown, checktomb.Stopping()in select loops, and calltomb.Done()when cleanup is complete. Example:type monitor struct { tomb *tomb.Tomb output chan *types.Status } func (m *monitor) Start() (<-chan *types.Status, error) { m.output = make(chan *types.Status) // Start goroutine manually go m.monitorLoop() return m.output, nil } func (m *monitor) monitorLoop() { // Always defer Done() to signal completion defer func() { close(m.output) m.tomb.Done() }() for { select { case <-m.tomb.Stopping(): // Stop signal received - cleanup and return return // ... other monitoring cases } } } func (m *monitor) Stop() { m.tomb.Stop() // Blocks until Done() is called }
- Handle Context Cancellation: Respect context cancellation for clean shutdown
- Validate Configuration: Fail fast with clear error messages for invalid configurations
- Log Appropriately: Use structured logging with appropriate log levels
- Metrics Integration: Expose relevant metrics through the problem metrics manager
- Default Conditions: Always provide sensible default conditions for permanent rules
- Timeout Configuration: Provide both global and per-rule timeout settings
- Resource Limits: Configure appropriate limits for output length, concurrency, etc.
- Pattern Testing: Thoroughly test regex patterns with sample log data
- Channel Buffering: Use appropriately sized buffers for status channels
- Regex Compilation: Pre-compile regex patterns for efficiency
- Resource Monitoring: Monitor CPU and memory usage of monitor plugins
- Rate Limiting: Implement rate limiting for high-frequency events
-
Monitor Internal State: Monitors run in their own goroutines
type monitor struct { mutex sync.RWMutex conditions []types.Condition tomb *tomb.Tomb output chan *types.Status } func (m *monitor) updateConditions() { m.mutex.Lock() defer m.mutex.Unlock() // Safe to modify conditions } func (m *monitor) getConditions() []types.Condition { m.mutex.RLock() defer m.mutex.RUnlock() // Safe to read conditions return append([]types.Condition(nil), m.conditions...) }
-
Status Channel: Only one goroutine should send to the status channel
// ✅ Correct: Single goroutine sends func (m *monitor) monitorLoop() { for { select { case <-m.tomb.Stopping(): return case <-ticker.C: status := m.generateStatus() m.output <- status // Only this goroutine sends } } } // ❌ Incorrect: Multiple goroutines sending go func() { m.output <- status1 }() go func() { m.output <- status2 }() // Race condition
-
Exporter Concurrency: Multiple exporters process the same status
// Safe: Each exporter receives a copy, no coordination needed for _, exporter := range p.exporters { exporter.ExportProblems(status) // Concurrent execution is safe }
-
Problem Metrics Manager: Thread-safe (uses sync.Mutex internally)
// Safe: Can be called from multiple goroutines problemmetrics.GlobalProblemMetricsManager.IncrementProblemCounter(reason, 1) problemmetrics.GlobalProblemMetricsManager.SetProblemGauge(type, reason, value)
-
Configuration Access: Read-only after initialization
// Safe: Configuration is read-only after monitor creation func (m *monitor) someMethod() { timeout := m.config.Timeout // No synchronization needed }
-
Common Patterns for Shared State:
// Pattern 1: Atomic operations for simple counters type monitor struct { eventCount int64 } func (m *monitor) incrementEvents() { atomic.AddInt64(&m.eventCount, 1) } // Pattern 2: Channel-based communication type monitor struct { configUpdates chan Config } func (m *monitor) updateConfig(newConfig Config) { select { case m.configUpdates <- newConfig: default: // Non-blocking send } }
- Input Validation: Validate all external inputs, especially in custom plugins
- Process Isolation: Run custom plugin scripts with minimal privileges
- Resource Limits: Enforce timeout and output limits to prevent resource exhaustion
- Path Validation: Validate and sanitize file paths in configurations
func TestMonitorInterface(t *testing.T) {
monitor := NewMyCustomMonitor(validConfig)
// Test Start() returns proper channel
statusCh, err := monitor.Start()
assert.NoError(t, err)
assert.NotNil(t, statusCh)
// Test status generation
select {
case status := <-statusCh:
assert.Equal(t, "my-monitor", status.Source)
assert.NotEmpty(t, status.Conditions)
case <-time.After(5 * time.Second):
t.Fatal("No status received within timeout")
}
// Test Stop() cleans up properly
monitor.Stop()
// Channel should be closed
select {
case _, ok := <-statusCh:
assert.False(t, ok, "Channel should be closed")
default:
t.Fatal("Channel not closed")
}
}func TestConfigValidation(t *testing.T) {
tests := []struct {
name string
config MyConfig
expectError bool
}{
{
name: "valid config",
config: MyConfig{
Source: "test-monitor",
Rules: []Rule{{Pattern: "valid.*pattern"}},
},
expectError: false,
},
{
name: "empty source",
config: MyConfig{
Source: "",
Rules: []Rule{{Pattern: "valid.*pattern"}},
},
expectError: true,
},
{
name: "invalid regex",
config: MyConfig{
Source: "test-monitor",
Rules: []Rule{{Pattern: "[invalid"}},
},
expectError: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, err := NewMyCustomMonitor(tt.config)
if tt.expectError {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
})
}
}func TestPluginRegistration(t *testing.T) {
// Clear registry for clean test
handlers := problemdaemon.GetRegisteredHandlers()
originalCount := len(handlers)
// Register test plugin
problemdaemon.Register("test-plugin", types.ProblemDaemonHandler{
CreateProblemDaemonOrDie: func(string) types.Monitor {
return &mockMonitor{}
},
})
// Verify registration
newHandlers := problemdaemon.GetRegisteredHandlers()
assert.Equal(t, originalCount+1, len(newHandlers))
assert.Contains(t, newHandlers, "test-plugin")
}func TestEndToEndFlow(t *testing.T) {
// Create test monitor
monitor := &testMonitor{
statusCh: make(chan *types.Status, 1),
}
// Create test exporter
exporter := &testExporter{
receivedStatus: make([]*types.Status, 0),
}
// Create problem detector
pd := NewProblemDetector([]types.Monitor{monitor}, []types.Exporter{exporter})
// Start in background
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go pd.Run(ctx)
// Send test status
testStatus := &types.Status{
Source: "test-monitor",
Events: []types.Event{{Reason: "TestEvent"}},
}
monitor.statusCh <- testStatus
// Verify exporter received status
assert.Eventually(t, func() bool {
return len(exporter.receivedStatus) > 0
}, 5*time.Second, 100*time.Millisecond)
assert.Equal(t, testStatus, exporter.receivedStatus[0])
}#!/bin/bash
# test-helper-script.sh - Framework for testing helper binaries
set -euo pipefail
readonly SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
readonly TEMP_DIR="$(mktemp -d)"
readonly OK=0
readonly NONOK=1
readonly UNKNOWN=2
cleanup() {
rm -rf "$TEMP_DIR"
}
trap cleanup EXIT
# Test function that verifies exit code and output
test_helper() {
local test_name="$1"
local expected_exit="$2"
local expected_output="$3"
shift 3
local args=("$@")
echo "Testing: $test_name"
local output
local exit_code
# Capture output and exit code
set +e
output=$("${SCRIPT_DIR}/my-helper-script.sh" "${args[@]}" 2>&1)
exit_code=$?
set -e
# Verify exit code
if [[ $exit_code -ne $expected_exit ]]; then
echo "FAIL: Expected exit code $expected_exit, got $exit_code"
return 1
fi
# Verify output contains expected text
if [[ ! "$output" =~ $expected_output ]]; then
echo "FAIL: Output '$output' doesn't match expected pattern '$expected_output'"
return 1
fi
echo "PASS: $test_name"
}
# Test cases
test_helper "successful check" $OK "System is healthy" --component=test
test_helper "failed check" $NONOK "System unhealthy" --component=failing-test
test_helper "invalid argument" $UNKNOWN "Unknown option" --invalid-arg
test_helper "help message" $OK "Usage:" --help
echo "All tests passed!"#!/bin/bash
# mock-service.sh - Create mock services for testing
# Start mock HTTP server
start_mock_server() {
local port="$1"
local response="$2"
# Simple HTTP server using nc (netcat)
# Note: netcat might not be available on all systems
# Consider Python's http.server or httpbin for production testing
while true; do
echo -e "HTTP/1.1 200 OK\r\n\r\n$response" | nc -l -p "$port"
done &
echo $! # Return PID
}
# Usage in tests
mock_pid=$(start_mock_server 8080 "OK")
trap "kill $mock_pid" EXIT
# Test helper script against mock server
test_helper "mock server check" $OK "Service available" --url=http://localhost:8080func BenchmarkMonitorPerformance(b *testing.B) {
monitor := NewMyCustomMonitor(benchConfig)
statusCh, _ := monitor.Start()
defer monitor.Stop()
// Drain status channel in background
go func() {
for range statusCh {
// Consume status updates
}
}()
b.ResetTimer()
for i := 0; i < b.N; i++ {
// Trigger monitor activity
monitor.TriggerCheck()
}
}func TestMonitorMemoryLeak(t *testing.T) {
var m1, m2 runtime.MemStats
runtime.GC()
runtime.ReadMemStats(&m1)
// Run monitor for extended period
monitor := NewMyCustomMonitor(testConfig)
statusCh, _ := monitor.Start()
// Simulate extended operation
for i := 0; i < 1000; i++ {
select {
case <-statusCh:
// Process status
case <-time.After(10 * time.Millisecond):
// Continue if no status
}
}
monitor.Stop()
runtime.GC()
runtime.ReadMemStats(&m2)
// Check for significant memory increase
memIncrease := int64(m2.Alloc) - int64(m1.Alloc)
if memIncrease > 1024*1024 { // 1MB threshold
t.Errorf("Potential memory leak: %d bytes increase", memIncrease)
}
}func TestConfigurationValidation(t *testing.T) {
testCases := []struct {
name string
config string
valid bool
}{
{
name: "valid config",
config: `{
"plugin": "custom",
"source": "test-monitor",
"rules": [{"type": "temporary", "reason": "Test"}]
}`,
valid: true,
},
{
name: "invalid JSON",
config: `{
"plugin": "custom",
"source": "test-monitor"
"rules": []
}`,
valid: false,
},
{
name: "missing required field",
config: `{
"plugin": "custom",
"rules": []
}`,
valid: false,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
var config MyConfig
err := json.Unmarshal([]byte(tc.config), &config)
if tc.valid {
assert.NoError(t, err)
assert.NoError(t, validateConfig(config))
} else {
assert.Error(t, err)
}
})
}
}// Size buffers based on expected throughput
func (m *monitor) Start() (<-chan *types.Status, error) {
// For high-frequency monitors, use larger buffers
bufferSize := 100
if m.config.HighFrequency {
bufferSize = 1000
}
m.output = make(chan *types.Status, bufferSize)
go m.monitorLoop()
return m.output, nil
}type logMonitor struct {
compiledPatterns map[string]*regexp.Regexp
}
func (l *logMonitor) compilePatterns() error {
l.compiledPatterns = make(map[string]*regexp.Regexp)
for _, rule := range l.config.Rules {
compiled, err := regexp.Compile(rule.Pattern)
if err != nil {
return fmt.Errorf("failed to compile pattern %q: %v", rule.Pattern, err)
}
l.compiledPatterns[rule.Pattern] = compiled
}
return nil
}
// Use pre-compiled patterns in hot path
func (l *logMonitor) matchPattern(line string, pattern string) bool {
compiled := l.compiledPatterns[pattern]
return compiled.MatchString(line)
}// Pool objects to reduce GC pressure
type statusPool struct {
pool sync.Pool
}
func newStatusPool() *statusPool {
return &statusPool{
pool: sync.Pool{
New: func() interface{} {
return &types.Status{
Events: make([]types.Event, 0, 10),
Conditions: make([]types.Condition, 0, 5),
}
},
},
}
}
func (p *statusPool) Get() *types.Status {
status := p.pool.Get().(*types.Status)
// Reset for reuse
status.Source = ""
status.Events = status.Events[:0]
status.Conditions = status.Conditions[:0]
return status
}
func (p *statusPool) Put(status *types.Status) {
p.pool.Put(status)
}func (m *monitor) monitorLoop() {
defer func() {
// Cleanup resources
close(m.output)
m.tomb.Done()
}()
// Use bounded channels and proper cleanup
ticker := time.NewTicker(m.config.Interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
status := m.generateStatus()
select {
case m.output <- status:
// Status sent successfully
default:
// Channel full - log warning but don't block
klog.Warning("Status channel full, dropping status")
}
case <-m.tomb.Stopping():
return
}
}
}// Avoid string concatenation in hot paths
func (m *monitor) formatMessage(template string, args ...interface{}) string {
var buf strings.Builder
buf.Grow(len(template) + 100) // Pre-allocate capacity
fmt.Fprintf(&buf, template, args...)
return buf.String()
}
// Use byte slices for log processing
func (l *logMonitor) processLogLine(line []byte) {
// Process directly on byte slice to avoid string allocation
if bytes.Contains(line, []byte("ERROR")) {
// Only convert to string when necessary
errorLine := string(line)
l.handleError(errorLine)
}
}// Use efficient string matching for simple patterns
func (l *logMonitor) fastMatch(line string, pattern string) bool {
switch pattern {
case "OOM":
return strings.Contains(line, "Out of memory")
case "PANIC":
return strings.Contains(line, "kernel panic")
default:
// Fall back to regex for complex patterns
return l.compiledPatterns[pattern].MatchString(line)
}
}func (m *monitor) processRulesParallel(line string) []types.Event {
if len(m.config.Rules) <= 1 {
// Not worth parallelizing for small rule sets
return m.processRulesSequential(line)
}
var wg sync.WaitGroup
results := make(chan types.Event, len(m.config.Rules))
for _, rule := range m.config.Rules {
wg.Add(1)
go func(r Rule) {
defer wg.Done()
if event := m.processRule(line, r); event != nil {
results <- *event
}
}(rule)
}
// Close channel when all goroutines complete
go func() {
wg.Wait()
close(results)
}()
var events []types.Event
for event := range results {
events = append(events, event)
}
return events
}func (l *logMonitor) readLogs() error {
// Use buffered I/O for better performance
file, err := os.Open(l.config.LogPath)
if err != nil {
return err
}
defer file.Close()
// Large buffer for reading
scanner := bufio.NewScanner(file)
scanner.Buffer(make([]byte, 64*1024), 1024*1024) // 1MB max line
for scanner.Scan() {
line := scanner.Bytes() // Get bytes directly
l.processLogLine(line)
}
return scanner.Err()
}func (m *monitor) processBatch(items []LogItem) {
const batchSize = 100
for i := 0; i < len(items); i += batchSize {
end := i + batchSize
if end > len(items) {
end = len(items)
}
batch := items[i:end]
m.processBatchItems(batch)
// Yield CPU periodically
if i%1000 == 0 {
runtime.Gosched()
}
}
}{
"pluginConfig": {
"invoke_interval": "30s", // Balance between responsiveness and overhead
"timeout": "10s", // Prevent hanging helper scripts
"max_output_length": 200, // Limit memory per helper execution
"concurrency": 3, // Match CPU cores for I/O bound tasks
"enable_message_change_based_condition_update": false // Reduce noise
},
"bufferSize": 50, // Match expected log volume
"lookback": "5m" // Reasonable startup history
}# Kubernetes resource limits for NPD
resources:
requests:
cpu: 100m # Baseline CPU
memory: 128Mi # Baseline memory
limits:
cpu: 500m # Burst capacity
memory: 512Mi # Prevent OOM// Add performance metrics to monitors
type performanceMetrics struct {
processedLines *prometheus.CounterVec
processingLatency *prometheus.HistogramVec
memoryUsage *prometheus.GaugeVec
}
func (m *monitor) recordMetrics(duration time.Duration, lineCount int) {
m.metrics.processedLines.WithLabelValues(m.config.Source).Add(float64(lineCount))
m.metrics.processingLatency.WithLabelValues(m.config.Source).Observe(duration.Seconds())
}# Monitor resource usage
kubectl top pods -l name=node-problem-detector
# Check memory usage over time
kubectl logs -l name=node-problem-detector | grep "memory usage"
# Monitor processing latency
curl http://node:20257/metrics | grep processing_latency- Unbuffered Channels: Can cause goroutine blocking
- Large Regex Sets: Pre-compile and optimize patterns
- String Operations: Use byte slices in hot paths
- Memory Leaks: Ensure proper cleanup in defer statements
- Excessive Logging: Use appropriate log levels
- I/O Blocking: Use timeouts and buffered I/O
Following these performance tuning guidelines ensures NPD operates efficiently even under high load conditions.
NPD uses a centralized registry pattern with Go's init() function for plugin registration:
// pkg/problemdaemon/problem_daemon.go
var handlers = make(map[types.ProblemDaemonType]types.ProblemDaemonHandler)
func Register(problemDaemonType types.ProblemDaemonType, handler types.ProblemDaemonHandler) {
handlers[problemDaemonType] = handler
}Each plugin registers itself during package initialization using string constants:
// pkg/systemlogmonitor/log_monitor.go
const SystemLogMonitorName = "system-log-monitor"
func init() {
problemdaemon.Register(
SystemLogMonitorName,
types.ProblemDaemonHandler{
CreateProblemDaemonOrDie: NewLogMonitorOrDie,
CmdOptionDescription: "Set to config file paths.",
})
}All three monitor types use this pattern with their respective constants:
SystemLogMonitorName = "system-log-monitor"CustomPluginMonitorName = "custom-plugin-monitor"SystemStatsMonitorName = "system-stats-monitor"
Plugins are imported through separate files with build tags that use blank imports to trigger registration. Each plugin has its own file to enable selective compilation:
// cmd/nodeproblemdetector/problemdaemonplugins/custom_plugin_monitor_plugin.go
//go:build !disable_custom_plugin_monitor
// +build !disable_custom_plugin_monitor
package problemdaemonplugins
import (
_ "k8s.io/node-problem-detector/pkg/custompluginmonitor"
)// cmd/nodeproblemdetector/problemdaemonplugins/system_log_monitor_plugin.go
//go:build !disable_system_log_monitor
// +build !disable_system_log_monitor
package problemdaemonplugins
import (
_ "k8s.io/node-problem-detector/pkg/systemlogmonitor"
)// cmd/nodeproblemdetector/problemdaemonplugins/system_stats_monitor_plugin.go
//go:build !disable_system_stats_monitor
// +build !disable_system_stats_monitor
package problemdaemonplugins
import (
_ "k8s.io/node-problem-detector/pkg/systemstatsmonitor"
)Plugins can be selectively excluded using build tags. NPD uses both modern and legacy build tag syntax for compatibility:
# Disable specific plugins
BUILD_TAGS="disable_custom_plugin_monitor disable_stackdriver_exporter" make
# Only include system log monitor
BUILD_TAGS="disable_custom_plugin_monitor disable_system_stats_monitor" make
# Enable journald support (opt-in)
BUILD_TAGS="journald" makedisable_system_log_monitor- Exclude system log monitordisable_custom_plugin_monitor- Exclude custom plugin monitordisable_system_stats_monitor- Exclude system stats monitordisable_stackdriver_exporter- Exclude Stackdriver exporterjournald- Enable journald log watcher (requires systemd libraries)
NPD uses both formats for Go version compatibility:
//go:build !disable_custom_plugin_monitor // Modern format (Go 1.17+)
// +build !disable_custom_plugin_monitor // Legacy format (compatibility)# Check what's included in a build
go list -tags "disable_custom_plugin_monitor" ./...
# Verbose build to see what's excluded
go build -v -x -tags "disable_system_stats_monitor"The Problem Detector orchestrates plugin lifecycle through a robust, channel-based architecture with error handling:
// Actual implementation from pkg/problemdetector/problem_detector.go
func (p *problemDetector) Run(ctx context.Context) error {
// Start the log monitors one by one.
var chans []<-chan *types.Status
failureCount := 0
for _, m := range p.monitors {
ch, err := m.Start()
if err != nil {
// Do not return error and keep on trying the following config files.
klog.Errorf("Failed to start problem daemon %v: %v", m, err)
failureCount++
continue
}
if ch != nil {
chans = append(chans, ch)
}
}
allMonitors := p.monitors
if len(allMonitors) == failureCount {
return fmt.Errorf("no problem daemon is successfully setup")
}
defer func() {
for _, m := range allMonitors {
m.Stop()
}
}()
ch := groupChannel(chans)
klog.Info("Problem detector started")
for {
select {
case <-ctx.Done():
return nil
case status := <-ch:
for _, exporter := range p.exporters {
exporter.ExportProblems(status)
}
}
}
}Key aspects of the actual implementation:
- Error Handling: Logs errors and continues, doesn't silently skip failed monitors
- Failure Counting: Returns error only when ALL monitors fail to start
- Cleanup: Uses defer to ensure Stop() is called on all monitors
- Graceful Degradation: Continues operation with partially failed monitor setup
- Nil Channel Filtering: Metrics-only monitors return nil channels which are excluded
This section provides a comprehensive guide for advanced developers who need to create entirely new monitor types from scratch, extending NPD's core monitoring capabilities beyond the existing System Log, Custom Plugin, and System Stats monitors.
NPD supports three core monitor types out of the box:
- System Log Monitor: Parses system logs (syslog, journald) using configurable patterns
- Custom Plugin Monitor: Executes external scripts/binaries for custom monitoring
- System Stats Monitor: Collects system metrics (CPU, memory, disk) for Prometheus export
When these don't meet your needs, you can implement a completely new monitor type.
Consider implementing a new monitor type when:
- Unique Data Sources: You need to monitor data sources not covered by existing monitors (e.g., hardware sensors, network interfaces, container runtimes)
- Complex Logic: The monitoring logic is too complex for script-based custom plugins
- Performance Requirements: You need high-performance monitoring with minimal overhead
- Integration Needs: You need deep integration with NPD's lifecycle and status reporting
A new monitor type consists of several components:
// Core interface that all monitors must implement
type Monitor interface {
Start() (<-chan *types.Status, error)
Stop()
}
// Configuration parser for your monitor type
type ConfigParser interface {
Parse(config []byte) (interface{}, error)
}
// Factory function for creating monitor instances
type CreateMonitorFunc func(config interface{}) MonitorCreate a new package for your monitor type:
// pkg/monitors/networkmonitor/network_monitor.go
package networkmonitor
import (
"context"
"sync"
"time"
"k8s.io/node-problem-detector/pkg/types"
"k8s.io/node-problem-detector/pkg/util/tomb"
)
type networkMonitor struct {
config *NetworkMonitorConfig
conditions []types.Condition
tomb *tomb.Tomb
output chan *types.Status
mutex sync.RWMutex
// Monitor-specific fields
interfaces map[string]*InterfaceState
lastCheck time.Time
}
type NetworkMonitorConfig struct {
Source string `json:"source"`
MetricsReporting bool `json:"metricsReporting"`
CheckInterval time.Duration `json:"checkInterval"`
Interfaces []string `json:"interfaces"`
// Monitor-specific configuration
PacketLossThreshold float64 `json:"packetLossThreshold"`
BandwidthThreshold int64 `json:"bandwidthThreshold"`
ErrorRateThreshold float64 `json:"errorRateThreshold"`
}
type InterfaceState struct {
Name string
PacketsSent uint64
PacketsRecv uint64
Errors uint64
LastUpdated time.Time
}func (nm *networkMonitor) Start() (<-chan *types.Status, error) {
klog.Info("Starting network monitor")
// Initialize monitoring state
if err := nm.initialize(); err != nil {
return nil, fmt.Errorf("failed to initialize network monitor: %v", err)
}
// Start the monitoring goroutine
go nm.monitorLoop()
return nm.output, nil
}
func (nm *networkMonitor) Stop() {
klog.Info("Stopping network monitor")
nm.tomb.Stop()
}
func (nm *networkMonitor) monitorLoop() {
defer nm.tomb.Done()
defer close(nm.output)
ticker := time.NewTicker(nm.config.CheckInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
nm.checkNetworkHealth()
case <-nm.tomb.Stopping():
klog.Info("Network monitor stopping")
return
}
}
}
func (nm *networkMonitor) checkNetworkHealth() {
nm.mutex.Lock()
defer nm.mutex.Unlock()
var problems []types.Event
var conditions []types.Condition
// Check each configured interface
for _, ifName := range nm.config.Interfaces {
if state, exists := nm.interfaces[ifName]; exists {
// Perform network health checks
if issues := nm.analyzeInterface(state); len(issues) > 0 {
problems = append(problems, issues...)
conditions = append(conditions, nm.createCondition(ifName, issues))
}
}
}
// Send status update if problems detected
if len(problems) > 0 || len(conditions) > 0 {
status := &types.Status{
Source: nm.config.Source,
Events: problems,
Conditions: conditions,
}
select {
case nm.output <- status:
case <-nm.tomb.Stopping():
return
}
}
}
func (nm *networkMonitor) analyzeInterface(state *InterfaceState) []types.Event {
var events []types.Event
// Read current network statistics
stats, err := nm.readInterfaceStats(state.Name)
if err != nil {
events = append(events, types.Event{
Severity: types.Warn,
Timestamp: time.Now(),
Reason: "NetworkInterfaceReadError",
Message: fmt.Sprintf("Failed to read stats for interface %s: %v", state.Name, err),
})
return events
}
// Calculate error rate
totalPackets := stats.PacketsSent + stats.PacketsRecv
if totalPackets > 0 {
errorRate := float64(stats.Errors) / float64(totalPackets)
if errorRate > nm.config.ErrorRateThreshold {
events = append(events, types.Event{
Severity: types.Warn,
Timestamp: time.Now(),
Reason: "HighNetworkErrorRate",
Message: fmt.Sprintf("Interface %s error rate %.2f%% exceeds threshold %.2f%%",
state.Name, errorRate*100, nm.config.ErrorRateThreshold*100),
})
}
}
// Update state for next check
state.PacketsSent = stats.PacketsSent
state.PacketsRecv = stats.PacketsRecv
state.Errors = stats.Errors
state.LastUpdated = time.Now()
return events
}// pkg/monitors/networkmonitor/config.go
package networkmonitor
import (
"encoding/json"
"fmt"
"time"
)
type NetworkConfigParser struct{}
func (p *NetworkConfigParser) Parse(config []byte) (interface{}, error) {
var networkConfig NetworkMonitorConfig
if err := json.Unmarshal(config, &networkConfig); err != nil {
return nil, fmt.Errorf("failed to parse network monitor config: %v", err)
}
// Set defaults
if networkConfig.CheckInterval == 0 {
networkConfig.CheckInterval = 30 * time.Second
}
if networkConfig.ErrorRateThreshold == 0 {
networkConfig.ErrorRateThreshold = 0.01 // 1%
}
if len(networkConfig.Interfaces) == 0 {
networkConfig.Interfaces = []string{"eth0"} // Default interface
}
// Validate configuration
if err := p.validateConfig(&networkConfig); err != nil {
return nil, err
}
return &networkConfig, nil
}
func (p *NetworkConfigParser) validateConfig(config *NetworkMonitorConfig) error {
if config.Source == "" {
return fmt.Errorf("source field is required")
}
if config.CheckInterval < time.Second {
return fmt.Errorf("checkInterval must be at least 1 second")
}
if config.ErrorRateThreshold < 0 || config.ErrorRateThreshold > 1 {
return fmt.Errorf("errorRateThreshold must be between 0 and 1")
}
return nil
}// pkg/monitors/networkmonitor/factory.go
package networkmonitor
import (
"fmt"
"k8s.io/node-problem-detector/pkg/types"
"k8s.io/node-problem-detector/pkg/util/tomb"
)
func CreateNetworkMonitor(config interface{}) types.Monitor {
networkConfig, ok := config.(*NetworkMonitorConfig)
if !ok {
panic(fmt.Sprintf("invalid config type for network monitor: %T", config))
}
return &networkMonitor{
config: networkConfig,
conditions: []types.Condition{},
tomb: tomb.NewTomb(),
output: make(chan *types.Status),
interfaces: make(map[string]*InterfaceState),
}
}// pkg/monitors/networkmonitor/register.go
package networkmonitor
import (
"k8s.io/node-problem-detector/pkg/problemdaemon"
)
func init() {
// Register the monitor type with NPD
problemdaemon.RegisterMonitorFactory(
"network", // Monitor type name
&NetworkConfigParser{}, // Configuration parser
CreateNetworkMonitor, // Factory function
)
}Add your monitor to the build tags and imports:
// pkg/problemdaemon/monitors.go
//go:build !disable_network_monitor
// +build !disable_network_monitor
package problemdaemon
import (
// Existing monitors
_ "k8s.io/node-problem-detector/pkg/systemlogmonitor"
_ "k8s.io/node-problem-detector/pkg/custompluginmonitor"
_ "k8s.io/node-problem-detector/pkg/systemstatsmonitor"
// Your new monitor
_ "k8s.io/node-problem-detector/pkg/monitors/networkmonitor"
)Create a configuration file for your new monitor:
{
"plugin": "network",
"pluginConfig": {
"invoke_interval": "30s",
"timeout": "5s",
"max_output_length": 80,
"concurrency": 3,
"enable_message_change_based_condition_update": false
},
"source": "network-monitor",
"metricsReporting": true,
"conditions": [
{
"type": "NetworkNotReady",
"reason": "NetworkInterfaceDown",
"message": "Network interface is down"
},
{
"type": "NetworkNotReady",
"reason": "HighNetworkErrorRate",
"message": "Network interface error rate is high"
}
],
"rules": [
{
"type": "permanent",
"condition": "NetworkNotReady",
"reason": "NetworkInterfaceDown",
"pattern": "Interface .* is down"
},
{
"type": "permanent",
"condition": "NetworkNotReady",
"reason": "HighNetworkErrorRate",
"pattern": "Interface .* error rate .* exceeds threshold"
}
],
// Monitor-specific configuration
"checkInterval": "30s",
"interfaces": ["eth0", "eth1"],
"packetLossThreshold": 0.05,
"bandwidthThreshold": 1000000000,
"errorRateThreshold": 0.01
}Create comprehensive tests for your monitor:
// pkg/monitors/networkmonitor/network_monitor_test.go
package networkmonitor
import (
"testing"
"time"
"k8s.io/node-problem-detector/pkg/types"
)
func TestNetworkMonitor_HighErrorRate(t *testing.T) {
config := &NetworkMonitorConfig{
Source: "test-network-monitor",
CheckInterval: 100 * time.Millisecond,
Interfaces: []string{"test0"},
ErrorRateThreshold: 0.01,
}
monitor := &networkMonitor{
config: config,
tomb: tomb.NewTomb(),
output: make(chan *types.Status, 10),
interfaces: make(map[string]*InterfaceState),
}
// Initialize test interface with high error rate
monitor.interfaces["test0"] = &InterfaceState{
Name: "test0",
PacketsSent: 1000,
PacketsRecv: 1000,
Errors: 50, // 2.5% error rate, above 1% threshold
}
statusChan, err := monitor.Start()
if err != nil {
t.Fatalf("Failed to start monitor: %v", err)
}
defer monitor.Stop()
// Wait for status update
select {
case status := <-statusChan:
if len(status.Events) == 0 {
t.Error("Expected events for high error rate, got none")
}
found := false
for _, event := range status.Events {
if event.Reason == "HighNetworkErrorRate" {
found = true
break
}
}
if !found {
t.Error("Expected HighNetworkErrorRate event")
}
case <-time.After(200 * time.Millisecond):
t.Error("Timeout waiting for status update")
}
}If your monitor produces metrics:
func (nm *networkMonitor) reportMetrics() {
if !nm.config.MetricsReporting {
return
}
for ifName, state := range nm.interfaces {
// Report interface-specific metrics
metrics.RecordInterfacePackets(ifName, state.PacketsSent, state.PacketsRecv)
metrics.RecordInterfaceErrors(ifName, state.Errors)
}
}Connect to the Problem Metrics Manager:
import "k8s.io/node-problem-detector/pkg/problemmetrics"
func (nm *networkMonitor) reportProblem(reason string) {
if nm.config.MetricsReporting {
problemmetrics.GlobalProblemMetricsManager.IncrementProblemCounter(reason, 1)
}
}- Resource Management: Always implement proper cleanup in Stop()
- Error Handling: Log errors but continue monitoring when possible
- Thread Safety: Use mutexes for shared state access
- Configuration Validation: Validate all configuration parameters
- Graceful Degradation: Handle partial failures in multi-component monitoring
- Testing: Create unit tests for all monitoring logic
- Documentation: Document configuration options and expected behavior
type MonitorState struct {
lastUpdate time.Time
counters map[string]uint64
mutex sync.RWMutex
}
func (s *MonitorState) Update(key string, value uint64) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.counters[key] = value
s.lastUpdate = time.Now()
}func (nm *networkMonitor) updateConfig(newConfig *NetworkMonitorConfig) error {
nm.mutex.Lock()
defer nm.mutex.Unlock()
// Validate new configuration
parser := &NetworkConfigParser{}
if err := parser.validateConfig(newConfig); err != nil {
return err
}
// Apply configuration changes
nm.config = newConfig
return nil
}This comprehensive guide provides the foundation for implementing custom monitor types that integrate seamlessly with NPD's architecture while maintaining the same reliability and performance standards as the built-in monitors.
The Problem Metrics Manager serves as the bridge between monitors and metric exporters, providing a standardized way to expose problem-related metrics without direct coupling between monitors and exporters.
// pkg/problemmetrics/problem_metrics.go
type ProblemMetricsManager struct {
problemCounter metrics.Int64MetricInterface
problemGauge metrics.Int64MetricInterface
problemTypeToReason map[string]string
problemTypeToReasonMutex sync.Mutex
}
// Key methods:
func (pmm *ProblemMetricsManager) SetProblemGauge(problemType, reason string, value bool) error
func (pmm *ProblemMetricsManager) IncrementProblemCounter(reason string, count int64) error
// Global singleton instance
var GlobalProblemMetricsManager *ProblemMetricsManagerMonitors use the global metrics manager to report both events and conditions:
// Example from pkg/systemlogmonitor/log_monitor.go
if *l.config.EnableMetricsReporting {
initializeProblemMetricsOrDie(l.config.Rules)
}
// When problems are detected:
problemmetrics.GlobalProblemMetricsManager.IncrementProblemCounter(event.Reason, 1)
problemmetrics.GlobalProblemMetricsManager.SetProblemGauge(condition.Type, condition.Reason, condition.Status == types.True)-
Problem Gauges: Track the current state of conditions
- Value:
true(condition present) orfalse(condition resolved) - Used for: Permanent problems like "KernelDeadlock", "OutOfDisk"
- Value:
-
Problem Counters: Count occurrences of events
- Value: Incremented each time an event occurs
- Used for: Temporary problems like "OOMKilling", "TaskHung"
The metrics manager uses OpenCensus/OpenTelemetry under the hood, making metrics automatically available to:
- Prometheus Exporter: Exposes metrics at
/metricsendpoint - Stackdriver Exporter: Sends metrics to Google Cloud Monitoring
- Future Metric Exporters: Automatic integration through OpenCensus
This architecture ensures that problem detection (monitors) remains decoupled from metric export (exporters) while providing a consistent interface for all monitoring integrations.
Given the complexity of plugin registration, build tags, and initialization, developers often encounter issues when working with NPD. This section covers common problems and their solutions.
When encountering issues with NPD plugins, follow this systematic debugging approach:
┌─────────────────────────────────────┐
│ NPD Plugin Issue? │
└─────────────┬───────────────────────┘
│
▼
┌─────────────────────────────────────┐
│ Does NPD start successfully? │
├─────────────┬───────────────────────┤
│ NO │ YES │
└─────────────┼───────────────────────┘
│ │
▼ ▼
┌─────────────────────────┐ ┌──────────────────────────┐
│ Build/Startup Issues │ │ Is your plugin loaded? │
│ • Check build tags │ │ grep "Registered.*plugin"│
│ • Check dependencies │ │ /var/log/npd.log │
│ • Check system logs │ └─────────┬────────────────┘
└─────────────────────────┘ │
▼
┌─────────────────────────────┐
│ Plugin loaded? │
├──────────┬──────────────────┤
│ NO │ YES │
└──────────┼──────────────────┘
│ │
▼ ▼
┌─────────────────────┐ ┌─────────────────────────┐
│ Registration Issues │ │ Does plugin generate │
│ • Check init() call │ │ status messages? │
│ • Check build tags │ │ Monitor output channel │
│ • Check imports │ └─────────┬───────────────┘
└─────────────────────┘ │
▼
┌─────────────────────────┐
│ Status messages? │
├──────────┬──────────────┤
│ NO │ YES │
└──────────┼──────────────┘
│ │
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ Monitor Issues │ │ Are conditions │
│ • Check config │ │ appearing in │
│ • Check logic │ │ kubectl? │
│ • Check inputs │ └─────────┬───────┘
└─────────────────┘ │
▼
┌─────────────────┐
│ Conditions? │
├─────────┬───────┤
│ NO │ YES │
└─────────┼───────┘
│ │
▼ ▼
┌─────────────┐ ┌─────────┐
│ Exporter │ │ SUCCESS │
│ Issues │ │ ✓ │
│ • K8s API │ └─────────┘
│ • RBAC │
│ • Network │
└─────────────┘
# 1. Check NPD pod status
kubectl get pods -n kube-system | grep node-problem-detector
# 2. Check recent logs
kubectl logs -n kube-system daemonset/node-problem-detector --tail=50
# 3. Check node conditions
kubectl describe node $(hostname) | grep -A 10 "Conditions:"
# 4. Check for custom conditions
kubectl get nodes -o json | jq '.items[].status.conditions[] | select(.type | contains("Custom"))'# Check what plugins are compiled in
strings /usr/bin/node-problem-detector | grep -i "registered.*monitor"
# Verify build tags
go list -tags "$BUILD_TAGS" ./pkg/monitors/...
# Check plugin registration in logs
grep -i "register" /var/log/node-problem-detector.log# Monitor status channel output (requires debug build)
kubectl logs -n kube-system node-problem-detector-xyz | grep "status.*channel"
# Check configuration loading
kubectl logs -n kube-system node-problem-detector-xyz | grep "config.*loaded"
# Monitor problem detection
kubectl logs -n kube-system node-problem-detector-xyz | grep -E "(problem|condition|event)"Symptoms: Plugin code exists but never runs, no registration messages in logs
Common Causes & Solutions:
-
Missing Blank Import
# Check if import exists in plugin files grep -r "your-plugin-package" cmd/nodeproblemdetector/problemdaemonplugins/
Solution: Add blank import in appropriate plugin file:
import _ "your-module/pkg/yourcustomplugin"
-
Missing init() Function
# Verify init() function exists and calls Register() grep -A 5 "func init()" pkg/yourplugin/
Solution: Ensure init() function calls problemdaemon.Register()
-
Wrong Build Tags
# Check what's included in build go list -tags "$BUILD_TAGS" ./...
Solution: Verify BUILD_TAGS environment variable doesn't exclude your plugin
Symptoms: Linker errors about missing functions, undefined references
Common Causes & Solutions:
-
Missing Build Tags for Dependencies
# Example: journald requires special build tag BUILD_TAGS="journald" make
-
Missing System Dependencies
# For journald support sudo apt-get install libsystemd-dev -
Platform-Specific Code Issues
# Check for platform-specific files find . -name "*_unix.go" -o -name "*_windows.go"
Symptoms: Monitor shows in logs but no events/conditions generated
Diagnostic Steps:
-
Check Channel Return Value
// In your monitor's Start() method func (m *monitor) Start() (<-chan *types.Status, error) { // Return nil for metrics-only monitors return nil, nil // Return actual channel for problem-detecting monitors statusCh := make(chan *types.Status) return statusCh, nil }
-
Add Debug Logging
// Add debug logs in monitor loop klog.V(2).Infof("Monitor loop iteration, checking conditions...")
-
Test Configuration Patterns
# For log monitors, test regex patterns echo "sample log line" | grep -E "your-regex-pattern"
-
Check Metrics Reporting Flag
// In configuration file { "metricsReporting": true, // Enables problem metrics "rules": [...] }
Symptoms: Monitor detects problems but exporter doesn't process them
Diagnostic Steps:
-
Check Exporter Initialization
# Look for exporter startup logs kubectl logs <pod> | grep -i "exporter"
-
Verify Problem Detector Startup
# Look for successful startup message kubectl logs <pod> | grep "Problem detector started"
-
Check Exporter List
# Enable verbose logging to see exporter initialization /node-problem-detector -v=5
Symptoms: Startup errors, configuration validation failures
Common Problems:
-
JSON Syntax Errors
# Validate JSON syntax cat config.json | jq .
-
Wrong Plugin Type
// Ensure plugin type matches available log watchers { "plugin": "kmsg", // Must be: kmsg, filelog, or journald }
-
Invalid Regex Patterns
# Test regex patterns echo "test log line" | grep -E "your-pattern-here"
Debug Techniques:
-
Add Registration Logging
func init() { klog.Info("Registering my custom plugin") // Add this problemdaemon.Register(...) }
-
Verify Plugin Handler
# Check if handler is registered # Add debug code to list registered handlers go run -tags debug cmd/debug/list_handlers.go
-
Check Factory Function
func NewMyPluginOrDie(configPath string) types.Monitor { klog.Infof("Creating plugin with config: %s", configPath) // Add this // ... rest of function }
# Check what plugins are compiled in
go list -tags "$(echo $BUILD_TAGS)" ./...
# Verbose build to see compilation details
go build -v -x -tags "$(echo $BUILD_TAGS)"
# Run with verbose logging
/node-problem-detector -v=5 --config.custom-plugin-monitor=config.json
# Check systemd status (for journald issues)
systemctl --version
systemctl is-active systemd-journald
# Test custom plugin scripts directly
/path/to/your/script && echo "Exit code: $?"Investigation Steps:
-
Profile the Application
# Enable pprof endpoint go tool pprof http://localhost:6060/debug/pprof/profile -
Check Regex Compilation
// Pre-compile regex patterns var compiledPattern = regexp.MustCompile("your-pattern")
-
Monitor Channel Buffers
// Add buffered channels for high-throughput scenarios statusCh := make(chan *types.Status, 100)
-
Check Log Volume
# Monitor log file growth tail -f /var/log/messages | wc -l
When debugging NPD plugin issues, start with the most common causes (missing imports, build tags, configuration syntax) and work your way through the more complex scenarios. The verbose logging flag (-v=5) is particularly helpful for understanding initialization flow.