Skip to content

Instantly share code, notes, and snippets.

@jakebiesinger-storyhealth
Last active April 17, 2025 19:04
Show Gist options
  • Save jakebiesinger-storyhealth/371f0121a4ad84c94e1afcab7c5a5b38 to your computer and use it in GitHub Desktop.
Save jakebiesinger-storyhealth/371f0121a4ad84c94e1afcab7c5a5b38 to your computer and use it in GitHub Desktop.
Graphile worker + OpenTelemetry metrics
import { Observable } from '@opentelemetry/api';
import debug from 'debug';
import { deepEqual } from 'fast-equals';
import { DateTime, Duration } from 'luxon';
const debugLog = debug('graphile-worker:metrics');
/**
* A wrapper around an OpenTelemetry gauge, allowing synchronous reporting of values (which are still collected asynchronously).
*
* OpenTelemetry added synchronous gauges to the spec in
* [this PR](https://github.com/open-telemetry/opentelemetry-specification/issues/2318) but the JS API doesn't yet
* support them, so this is a stopgap until it's implemented.
*
* The gauge reports the most recently-recorded value in-memory for each set of recorded attributes.
* The `ttl` parameter causes the reported values to be retained and reported until the duration has elapsed since the
* metric was reported. If not specified, the value will not be retained beyond its first record (possibly causing a gap in the reporting metric).
*/
export class OpenTelemetryGauge<T extends Record<string, string | boolean> | undefined> {
private currentValues: GaugeValue<T>[];
constructor(private gauge: Observable, ttl?: Duration) {
this.currentValues = [];
this.gauge = gauge;
this.gauge.addCallback((result) => {
// Report the values and remove them if their ttl has elapsed.
debugLog(`Starting gauge reporting callback`);
for (let i = this.currentValues.length - 1; i >= 0; i--) {
const { attributes, value, observedAt } = this.currentValues[i];
debugLog(`otel observing`, value, JSON.stringify(attributes));
result.observe(value, attributes);
if (!ttl || observedAt.diffNow().negate() > ttl) {
const entry = this.currentValues.pop();
debugLog(`Dropped entry ${JSON.stringify(entry)} due to no ttl / ttl elapsed`);
}
}
});
}
observeValue(value: number, attributes: GaugeValue<T>['attributes'], observedAt?: DateTime) {
const finalObservedAt = observedAt ?? DateTime.utc();
const entry = this.currentValues.find((el) => deepEqual(el.attributes, attributes));
if (entry) {
entry.value = value;
entry.observedAt = finalObservedAt;
debugLog(`Updated existing metric for observation: ${value} ${JSON.stringify(attributes)}`);
} else {
this.currentValues.push({ value, attributes, observedAt: finalObservedAt });
debugLog(`Added new metric for observation: ${value} ${JSON.stringify(attributes)}`);
}
}
getValue(attributes: GaugeValue<T>['attributes']): number | undefined {
return this.currentValues.find((el) => deepEqual(el.attributes, attributes))?.value;
}
getAllValues(): GaugeValue<T>[] {
return [...this.currentValues];
}
clearAllValues(): void {
this.currentValues = [];
}
}
interface GaugeValue<T extends Record<string, string | boolean> | undefined> {
attributes?: T;
value: number;
observedAt: DateTime;
}
import { Meter, ValueType } from '@opentelemetry/api';
import EventEmitter from 'events';
import { Job, WorkerEvents } from 'graphile-worker';
import { DateTime, Duration } from 'luxon';
import { Pool, QueryResult } from 'pg';
import { getMeterProvider } from './your-otel-meter';
import { OpenTelemetryGauge } from './gauge';
/** Time to wait before polling. This is also passed in to graphile worker. */
const POLL_INTERVAL_JOB_COUNT_SEC = Number(process.env.POLL_INTERVAL_JOB_COUNT_SEC ?? 60);
/** Only report job attempt delays that take longer than this. */
const DELAY_REPORTING_THRESHOLD_SECONDS = POLL_INTERVAL_JOB_COUNT_SEC + 1;
export type WorkerJobStatus =
| 'leased'
| 'waiting_on_queue'
| 'permanently_failed'
| 'future'
| 'waiting_to_retry'
| 'ready';
export type JobStatusCount = Partial<Record<WorkerJobStatus, number>> & { ready: number };
const meter: Meter = getMeterProvider().getMeter('graphile_worker');
export const counters = {
activeConnections: meter.createUpDownCounter('graphile_worker.db.active_connections', {
description: 'The number of active DB connections',
valueType: ValueType.INT,
}),
jobAttempts: meter.createCounter('graphile_worker.job.attempts', {
description:
'The number of jobs that have been attempted, whether succeeded, errored temporarily or failed completely',
valueType: ValueType.INT,
}),
} as const;
const histograms = {
jobAttemptDelayTime: meter.createHistogram('graphile_worker.job.attempt_delay', {
description: `How long after a job was scheduled to run before it actually started.
Large delays in this metric may indicate that there aren't enough workers for the current queue depth, or that the database
is overloaded.
Note that failed/retrying jobs have a schedule time in the future; this metric does NOT capture those "delays" from the
originally scheduled time -- this is *only* worker run latency.`,
unit: 's',
valueType: ValueType.DOUBLE,
}),
} as const;
// In OpenTelemetry, gauges are supposed to monitor things that cannot be aggregated (e.g., CPU temp).
export const gauges = {
// Reports the age of the oldest job (in minutes) based on the run_at time.
oldestStaleJob: new OpenTelemetryGauge<{}>(
meter.createObservableGauge('graphile_worker.job.oldest-job', {
description: 'The age of the oldest stale job in the queue.',
valueType: ValueType.INT,
}),
Duration.fromObject({ seconds: POLL_INTERVAL_JOB_COUNT_SEC * 2 })
),
// Although the overall queue depth could be combined into a single total, it makes no sense to aggregate multiple
// observations in a given time window.
queueDepth: new OpenTelemetryGauge<{ status: WorkerJobStatus }>(
meter.createObservableGauge('graphile_worker.queue.depth', {
description: 'The number of jobs currently in the queue.',
valueType: ValueType.INT,
}),
Duration.fromObject({ seconds: POLL_INTERVAL_JOB_COUNT_SEC * 2 })
),
};
const DISCONNECTED = Symbol.for('DISCONNECTED');
const ERROR = Symbol.for('ERROR');
const connectivityMeta = {
lastPoolConnectTs: DISCONNECTED,
};
/** Create a new event emitter which can be passed in to graphile worker `run` as the `events` parameter */
export function registerEventHandler(): WorkerEvents {
const emitter: WorkerEvents = new EventEmitter();
emitter.on('pool:listen:success', () => {
connectivityMeta.lastPoolConnectTs = DateTime.utc();
// This will only be a rough estimate of the number of workers
counters.activeConnections.add(1);
});
emitter.on('pool:listen:error', ({ error }) => {
connectivityMeta.lastPoolConnectTs = ERROR;
// This will only be a rough estimate of the number of workers
counters.activeConnections.add(-1);
});
emitter.on('job:success', ({ job }) => {
const meta = { ...jobMeta(job), status: 'success' };
recordAttempt(job, meta);
});
emitter.on('job:error', ({ job }) => {
const meta = { ...jobMeta(job), status: 'error' };
recordAttempt(job, meta);
});
emitter.on('job:failed', ({ job }) => {
const meta = { ...jobMeta(job), status: 'failed' };
recordAttempt(job, meta);
});
return emitter;
}
function recordAttempt(job: Job, meta: { status: string }) {
counters.jobAttempts.add(1, meta);
const delay = jobAttemptDelaySec(job);
// Histogram reporting can be expensive, so only report jobs that are delayed past normal latencies.
// If you can afford to report everything, you'll get accurate p50, p95 etc but the volume may be very high.
if (delay > DELAY_REPORTING_THRESHOLD_SECONDS) {
histograms.jobAttemptDelayTime.record(delay, meta);
}
}
/**
* Helper method that periodically reports queue depth via otel metrics.
*
* Use this method with caution + don't use a short interval since overall graphile worker performance will be reduced.
*/
export function periodicallyUpdateQueueDepthMetrics(
pool: Pool,
interval: Duration
): { promise: Promise<void>; cancel: () => void } {
return periodicallyRun(pool, interval, updateQueueDepth);
}
export async function getReadyQueueDepth(pool: Pool): Promise<number> {
const count = gauges.queueDepth.getValue({ status: 'ready' });
if (count !== undefined) {
return count;
}
const counts = await queryQueueDepthByStatus(pool);
return counts.ready;
}
async function updateQueueDepth(pool: Pool): Promise<void> {
const counts = await queryQueueDepthByStatus(pool);
for (const [status, count] of Object.entries(counts)) {
gauges.queueDepth.observeValue(count, { status: status as WorkerJobStatus });
}
}
async function queryQueueDepthByStatus(pool: Pool): Promise<JobStatusCount> {
// Note: This query is cribbed from v0.13.0 schema https://github.com/graphile/worker/blob/v0.13.0/sql/000009.sql
// and will likely need to be updated if we switch to a new version as it's directly querying a "private" schema.
const result: QueryResult<{ num_jobs: number; status: string }> = await pool.query(`
select
count(*) as num_jobs,
case
when jobs.locked_at is not null and jobs.locked_at >= (now() - interval '4 hours') then 'leased'
when jobs.queue_name is not null and exists (
select 1
from graphile_worker.job_queues
where job_queues.queue_name = jobs.queue_name
and (job_queues.locked_at is not null or job_queues.locked_at >= (now() - interval '4 hours'))
) then 'waiting_on_queue'
when attempts >= max_attempts then 'permanently_failed'
when attempts = 0 and run_at >= now() then 'future'
when attempts > 1 and run_at >= now() then 'waiting_to_retry'
else 'ready'
end as status
from graphile_worker.jobs
group by 2
`);
const counts: Partial<JobStatusCount> = Object.fromEntries(result.rows.map((row) => [row.status, +row.num_jobs]));
const completeCounts: JobStatusCount = {
...counts,
ready: counts.ready ?? 0, // always report `ready` count.
};
return completeCounts;
}
export function periodicallyUpdateOldestStaleJobMetrics(
pool: Pool,
interval: Duration
): { promise: Promise<void>; cancel: () => void } {
return periodicallyRun(pool, interval, updateOldestStaleJob);
}
async function updateOldestStaleJob(pool: Pool): Promise<void> {
const result: QueryResult<{ mins: number }> = await pool.query(`
select
max(extract(epoch from now() - run_at) / 60) as mins
from graphile_worker.jobs
`);
const row = result.rows[0];
if (row !== null) {
gauges.oldestStaleJob.observeValue(row.mins);
}
}
function jobMeta(job: Job): Record<string, string> {
return {
task_identifier: job.task_identifier,
};
}
function jobAttemptDelaySec(job: Job): number {
const scheduledTime = DateTime.fromISO(job.run_at as unknown as string);
const actualStartTime = DateTime.fromISO(job.locked_at as unknown as string);
return actualStartTime.diff(scheduledTime).as('seconds');
}
function periodicallyRun(
pool: Pool,
interval: Duration,
work: (pool: Pool) => Promise<void>
): { promise: Promise<void>; cancel: () => void } {
let cancel: (() => void) | undefined = undefined;
const promise = new Promise<void>((resolve, reject) => {
const handle = setInterval(() => {
work(pool).catch(reject);
}, interval.as('milliseconds'));
cancel = () => {
clearInterval(handle);
resolve();
};
});
return {
cancel: cancel!,
promise,
};
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment