-
-
Save amerryma/73996ee01132108323a9175babb61ea5 to your computer and use it in GitHub Desktop.
Graphile worker + OpenTelemetry metrics
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Observable } from '@opentelemetry/api'; | |
import debug from 'debug'; | |
import { deepEqual } from 'fast-equals'; | |
import { DateTime, Duration } from 'luxon'; | |
const debugLog = debug('graphile-worker:metrics'); | |
/** | |
* A wrapper around an OpenTelemetry gauge, allowing synchronous reporting of values (which are still collected asynchronously). | |
* | |
* OpenTelemetry added synchronous gauges to the spec in | |
* [this PR](https://github.com/open-telemetry/opentelemetry-specification/issues/2318) but the JS API doesn't yet | |
* support them, so this is a stopgap until it's implemented. | |
* | |
* The gauge reports the most recently-recorded value in-memory for each set of recorded attributes. | |
* The `ttl` parameter causes the reported values to be retained and reported until the duration has elapsed since the | |
* metric was reported. If not specified, the value will not be retained beyond its first record (possibly causing a gap in the reporting metric). | |
*/ | |
export class OpenTelemetryGauge<T extends Record<string, string | boolean> | undefined> { | |
private currentValues: GaugeValue<T>[]; | |
constructor(private gauge: Observable, ttl?: Duration) { | |
this.currentValues = []; | |
this.gauge = gauge; | |
this.gauge.addCallback((result) => { | |
// Report the values and remove them if their ttl has elapsed. | |
debugLog(`Starting gauge reporting callback`); | |
for (let i = this.currentValues.length - 1; i >= 0; i--) { | |
const { attributes, value, observedAt } = this.currentValues[i]; | |
debugLog(`otel observing`, value, JSON.stringify(attributes)); | |
result.observe(value, attributes); | |
if (!ttl || observedAt.diffNow().negate() > ttl) { | |
const entry = this.currentValues.pop(); | |
debugLog(`Dropped entry ${JSON.stringify(entry)} due to no ttl / ttl elapsed`); | |
} | |
} | |
}); | |
} | |
observeValue(value: number, attributes: GaugeValue<T>['attributes'], observedAt?: DateTime) { | |
const finalObservedAt = observedAt ?? DateTime.utc(); | |
const entry = this.currentValues.find((el) => deepEqual(el.attributes, attributes)); | |
if (entry) { | |
entry.value = value; | |
entry.observedAt = finalObservedAt; | |
debugLog(`Updated existing metric for observation: ${value} ${JSON.stringify(attributes)}`); | |
} else { | |
this.currentValues.push({ value, attributes, observedAt: finalObservedAt }); | |
debugLog(`Added new metric for observation: ${value} ${JSON.stringify(attributes)}`); | |
} | |
} | |
getValue(attributes: GaugeValue<T>['attributes']): number | undefined { | |
return this.currentValues.find((el) => deepEqual(el.attributes, attributes))?.value; | |
} | |
getAllValues(): GaugeValue<T>[] { | |
return [...this.currentValues]; | |
} | |
clearAllValues(): void { | |
this.currentValues = []; | |
} | |
} | |
interface GaugeValue<T extends Record<string, string | boolean> | undefined> { | |
attributes?: T; | |
value: number; | |
observedAt: DateTime; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Meter, ValueType } from '@opentelemetry/api'; | |
import EventEmitter from 'events'; | |
import { Job, WorkerEvents } from 'graphile-worker'; | |
import { DateTime, Duration } from 'luxon'; | |
import { Pool, QueryResult } from 'pg'; | |
import { getMeterProvider } from './your-otel-meter'; | |
import { OpenTelemetryGauge } from './gauge'; | |
/** Time to wait before polling. This is also passed in to graphile worker. */ | |
const POLL_INTERVAL_JOB_COUNT_SEC = Number(process.env.POLL_INTERVAL_JOB_COUNT_SEC ?? 60); | |
/** Only report job attempt delays that take longer than this. */ | |
const DELAY_REPORTING_THRESHOLD_SECONDS = POLL_INTERVAL_JOB_COUNT_SEC + 1; | |
export type WorkerJobStatus = | |
| 'leased' | |
| 'waiting_on_queue' | |
| 'permanently_failed' | |
| 'future' | |
| 'waiting_to_retry' | |
| 'ready'; | |
export type JobStatusCount = Partial<Record<WorkerJobStatus, number>> & { ready: number }; | |
const meter: Meter = getMeterProvider().getMeter('graphile_worker'); | |
export const counters = { | |
activeConnections: meter.createUpDownCounter('graphile_worker.db.active_connections', { | |
description: 'The number of active DB connections', | |
valueType: ValueType.INT, | |
}), | |
jobAttempts: meter.createCounter('graphile_worker.job.attempts', { | |
description: | |
'The number of jobs that have been attempted, whether succeeded, errored temporarily or failed completely', | |
valueType: ValueType.INT, | |
}), | |
} as const; | |
const histograms = { | |
jobAttemptDelayTime: meter.createHistogram('graphile_worker.job.attempt_delay', { | |
description: `How long after a job was scheduled to run before it actually started. | |
Large delays in this metric may indicate that there aren't enough workers for the current queue depth, or that the database | |
is overloaded. | |
Note that failed/retrying jobs have a schedule time in the future; this metric does NOT capture those "delays" from the | |
originally scheduled time -- this is *only* worker run latency.`, | |
unit: 's', | |
valueType: ValueType.DOUBLE, | |
}), | |
} as const; | |
// In OpenTelemetry, gauges are supposed to monitor things that cannot be aggregated (e.g., CPU temp). | |
export const gauges = { | |
// Reports the age of the oldest job (in minutes) based on the run_at time. | |
oldestStaleJob: new OpenTelemetryGauge<{}>( | |
meter.createObservableGauge('graphile_worker.job.oldest-job', { | |
description: 'The age of the oldest stale job in the queue.', | |
valueType: ValueType.INT, | |
}), | |
Duration.fromObject({ seconds: POLL_INTERVAL_JOB_COUNT_SEC * 2 }) | |
), | |
// Although the overall queue depth could be combined into a single total, it makes no sense to aggregate multiple | |
// observations in a given time window. | |
queueDepth: new OpenTelemetryGauge<{ status: WorkerJobStatus }>( | |
meter.createObservableGauge('graphile_worker.queue.depth', { | |
description: 'The number of jobs currently in the queue.', | |
valueType: ValueType.INT, | |
}), | |
Duration.fromObject({ seconds: POLL_INTERVAL_JOB_COUNT_SEC * 2 }) | |
), | |
}; | |
const DISCONNECTED = Symbol.for('DISCONNECTED'); | |
const ERROR = Symbol.for('ERROR'); | |
const connectivityMeta = { | |
lastPoolConnectTs: DISCONNECTED, | |
}; | |
/** Create a new event emitter which can be passed in to graphile worker `run` as the `events` parameter */ | |
export function registerEventHandler(): WorkerEvents { | |
const emitter: WorkerEvents = new EventEmitter(); | |
emitter.on('pool:listen:success', () => { | |
connectivityMeta.lastPoolConnectTs = DateTime.utc(); | |
// This will only be a rough estimate of the number of workers | |
counters.activeConnections.add(1); | |
}); | |
emitter.on('pool:listen:error', ({ error }) => { | |
connectivityMeta.lastPoolConnectTs = ERROR; | |
// This will only be a rough estimate of the number of workers | |
counters.activeConnections.add(-1); | |
}); | |
emitter.on('job:success', ({ job }) => { | |
const meta = { ...jobMeta(job), status: 'success' }; | |
recordAttempt(job, meta); | |
}); | |
emitter.on('job:error', ({ job }) => { | |
const meta = { ...jobMeta(job), status: 'error' }; | |
recordAttempt(job, meta); | |
}); | |
emitter.on('job:failed', ({ job }) => { | |
const meta = { ...jobMeta(job), status: 'failed' }; | |
recordAttempt(job, meta); | |
}); | |
return emitter; | |
} | |
function recordAttempt(job: Job, meta: { status: string }) { | |
counters.jobAttempts.add(1, meta); | |
const delay = jobAttemptDelaySec(job); | |
// Histogram reporting can be expensive, so only report jobs that are delayed past normal latencies. | |
// If you can afford to report everything, you'll get accurate p50, p95 etc but the volume may be very high. | |
if (delay > DELAY_REPORTING_THRESHOLD_SECONDS) { | |
histograms.jobAttemptDelayTime.record(delay, meta); | |
} | |
} | |
/** | |
* Helper method that periodically reports queue depth via otel metrics. | |
* | |
* Use this method with caution + don't use a short interval since overall graphile worker performance will be reduced. | |
*/ | |
export function periodicallyUpdateQueueDepthMetrics( | |
pool: Pool, | |
interval: Duration | |
): { promise: Promise<void>; cancel: () => void } { | |
return periodicallyRun(pool, interval, updateQueueDepth); | |
} | |
export async function getReadyQueueDepth(pool: Pool): Promise<number> { | |
const count = gauges.queueDepth.getValue({ status: 'ready' }); | |
if (count !== undefined) { | |
return count; | |
} | |
const counts = await queryQueueDepthByStatus(pool); | |
return counts.ready; | |
} | |
async function updateQueueDepth(pool: Pool): Promise<void> { | |
const counts = await queryQueueDepthByStatus(pool); | |
for (const [status, count] of Object.entries(counts)) { | |
gauges.queueDepth.observeValue(count, { status: status as WorkerJobStatus }); | |
} | |
} | |
async function queryQueueDepthByStatus(pool: Pool): Promise<JobStatusCount> { | |
// Note: This query is cribbed from v0.13.0 schema https://github.com/graphile/worker/blob/v0.13.0/sql/000009.sql | |
// and will likely need to be updated if we switch to a new version as it's directly querying a "private" schema. | |
const result: QueryResult<{ num_jobs: number; status: string }> = await pool.query(` | |
select | |
count(*) as num_jobs, | |
case | |
when jobs.locked_at is not null and jobs.locked_at >= (now() - interval '4 hours') then 'leased' | |
when jobs.queue_name is not null and exists ( | |
select 1 | |
from graphile_worker.job_queues | |
where job_queues.queue_name = jobs.queue_name | |
and (job_queues.locked_at is not null or job_queues.locked_at >= (now() - interval '4 hours')) | |
) then 'waiting_on_queue' | |
when attempts >= max_attempts then 'permanently_failed' | |
when attempts = 0 and run_at >= now() then 'future' | |
when attempts > 1 and run_at >= now() then 'waiting_to_retry' | |
else 'ready' | |
end as status | |
from graphile_worker.jobs | |
group by 2 | |
`); | |
const counts: Partial<JobStatusCount> = Object.fromEntries(result.rows.map((row) => [row.status, +row.num_jobs])); | |
const completeCounts: JobStatusCount = { | |
...counts, | |
ready: counts.ready ?? 0, // always report `ready` count. | |
}; | |
return completeCounts; | |
} | |
export function periodicallyUpdateOldestStaleJobMetrics( | |
pool: Pool, | |
interval: Duration | |
): { promise: Promise<void>; cancel: () => void } { | |
return periodicallyRun(pool, interval, updateOldestStaleJob); | |
} | |
async function updateOldestStaleJob(pool: Pool): Promise<void> { | |
const result: QueryResult<{ mins: number }> = await pool.query(` | |
select | |
max(extract(epoch from now() - run_at) / 60) as mins | |
from graphile_worker.jobs | |
`); | |
const row = result.rows[0]; | |
if (row !== null) { | |
gauges.oldestStaleJob.observeValue(row.mins); | |
} | |
} | |
function jobMeta(job: Job): Record<string, string> { | |
return { | |
task_identifier: job.task_identifier, | |
}; | |
} | |
function jobAttemptDelaySec(job: Job): number { | |
const scheduledTime = DateTime.fromISO(job.run_at as unknown as string); | |
const actualStartTime = DateTime.fromISO(job.locked_at as unknown as string); | |
return actualStartTime.diff(scheduledTime).as('seconds'); | |
} | |
function periodicallyRun( | |
pool: Pool, | |
interval: Duration, | |
work: (pool: Pool) => Promise<void> | |
): { promise: Promise<void>; cancel: () => void } { | |
let cancel: (() => void) | undefined = undefined; | |
const promise = new Promise<void>((resolve, reject) => { | |
const handle = setInterval(() => { | |
work(pool).catch(reject); | |
}, interval.as('milliseconds')); | |
cancel = () => { | |
clearInterval(handle); | |
resolve(); | |
}; | |
}); | |
return { | |
cancel: cancel!, | |
promise, | |
}; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment