-
-
Save pilgrim2go/7ee3bc07bb474267cc65d395ab7500fd to your computer and use it in GitHub Desktop.
DataDog script for collecting PostgreSQL stats
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Create the datadog user with select only permissions: | |
# CREATE USER datadog WITH PASSWORD '<complex_password>'; | |
# | |
# Grant select permissions on a table or view that you want to monitor: | |
# GRANT SELECT ON <schema>.<table> TO datadog; | |
# | |
# Grant permissions for a specific column on a table or view that you want to monitor: | |
# GRANT SELECT (id, name) ON <schema>.<table> TO datadog; | |
# | |
# Let non-superusers look at pg_stat_activity in a read-only fashon. | |
# Create this function as a superuser for the database: | |
# | |
# CREATE OR REPLACE FUNCTION public.pg_stat_activity() RETURNS SETOF pg_catalog.pg_stat_activity AS $BODY$ | |
# DECLARE | |
# rec RECORD; | |
# BEGIN | |
# FOR rec IN SELECT * FROM pg_stat_activity | |
# LOOP | |
# RETURN NEXT rec; | |
# END LOOP; | |
# RETURN; | |
# END; | |
# $BODY$ LANGUAGE plpgsql SECURITY DEFINER; | |
# | |
# Enable stats collection for functions: | |
# set track_functions to 'pl'; | |
from checks import AgentCheck | |
import psycopg2 as pg | |
class PostgresqlCheck(AgentCheck): | |
def __init__(self, name, init_config, agentConfig): | |
AgentCheck.__init__(self, name, init_config, agentConfig) | |
self.name = init_config.get('name', 'postgresql') | |
def check(self, instance): | |
self.log.info('Starting PostgreSQL Check') | |
tags = instance.get('tags', []) | |
host = instance.get('host', 'localhost') | |
port = instance.get('port', '5432') | |
username = instance.get('username') | |
password = instance.get('password') | |
database = instance.get('database') | |
tags = tags + ['database:%s' % (database)] | |
self.log.info('Connecting to PostgreSQL') | |
db = pg.connect(host=host, port=port, user=username, password=password, database=database) | |
cu = db.cursor() | |
# Start Collecting Table Stats | |
self.log.info('Collecting Table Stats') | |
cu.execute(self.query_table_stats()) | |
for stat in cu.fetchall(): | |
( | |
schemaname, | |
relname, | |
relsize, | |
total_relsize, | |
reltuples, | |
relpages, | |
avg_tuplesize, | |
seq_scan, | |
idx_scan, | |
per_idx_scan, | |
per_rel_hit, | |
per_idx_hit, | |
n_tup_ins, | |
n_tup_upd, | |
n_tup_hot_upd, | |
per_hot_upd, | |
n_tup_del, | |
n_live_tup, | |
n_dead_tup, | |
per_deadfill | |
) = stat | |
self.gauge('%s.table.relsize' % (self.name), relsize, tags = (tags + ['schema:%s' % (schemaname), 'table:%s' % (relname)])) | |
self.gauge('%s.table.total_relsize' % (self.name), total_relsize, tags = (tags + ['schema:%s' % (schemaname), 'table:%s' % (relname)])) | |
self.gauge('%s.table.reltuples' % (self.name), reltuples, tags = (tags + ['schema:%s' % (schemaname), 'table:%s' % (relname)])) | |
self.gauge('%s.table.relpages' % (self.name), relpages, tags = (tags + ['schema:%s' % (schemaname), 'table:%s' % (relname)])) | |
self.gauge('%s.table.avg_tuplesize' % (self.name), avg_tuplesize, tags = (tags + ['schema:%s' % (schemaname), 'table:%s' % (relname)])) | |
self.rate( '%s.table.seq_scan' % (self.name), seq_scan, tags = (tags + ['schema:%s' % (schemaname), 'table:%s' % (relname)])) | |
self.rate( '%s.table.idx_scan' % (self.name), idx_scan, tags = (tags + ['schema:%s' % (schemaname), 'table:%s' % (relname)])) | |
self.gauge('%s.table.per_idx_scan' % (self.name), per_idx_scan, tags = (tags + ['schema:%s' % (schemaname), 'table:%s' % (relname)])) | |
self.gauge('%s.table.per_rel_hit' % (self.name), per_rel_hit, tags = (tags + ['schema:%s' % (schemaname), 'table:%s' % (relname)])) | |
self.gauge('%s.table.per_idx_hit' % (self.name), per_idx_hit, tags = (tags + ['schema:%s' % (schemaname), 'table:%s' % (relname)])) | |
self.rate( '%s.table.n_tup_ins' % (self.name), n_tup_ins, tags = (tags + ['schema:%s' % (schemaname), 'table:%s' % (relname)])) | |
self.rate( '%s.table.n_tup_upd' % (self.name), n_tup_upd, tags = (tags + ['schema:%s' % (schemaname), 'table:%s' % (relname)])) | |
self.rate( '%s.table.n_tup_hot_upd' % (self.name), n_tup_hot_upd, tags = (tags + ['schema:%s' % (schemaname), 'table:%s' % (relname)])) | |
self.gauge('%s.table.per_hot_upd' % (self.name), per_hot_upd, tags = (tags + ['schema:%s' % (schemaname), 'table:%s' % (relname)])) | |
self.rate( '%s.table.n_tup_del' % (self.name), n_tup_del, tags = (tags + ['schema:%s' % (schemaname), 'table:%s' % (relname)])) | |
self.gauge('%s.table.n_live_tup' % (self.name), n_live_tup, tags = (tags + ['schema:%s' % (schemaname), 'table:%s' % (relname)])) | |
self.gauge('%s.table.n_dead_tup' % (self.name), n_dead_tup, tags = (tags + ['schema:%s' % (schemaname), 'table:%s' % (relname)])) | |
self.gauge('%s.table.per_deadfill' % (self.name), per_deadfill, tags = (tags + ['schema:%s' % (schemaname), 'table:%s' % (relname)])) | |
# Start Collecting Index Stats | |
self.log.info('Collecting Index Stats') | |
cu.execute(self.query_index_stats()) | |
for stat in cu.fetchall(): | |
( | |
schemaname, | |
relname, | |
idxname, | |
idx_scan, | |
idx_tup_read, | |
idx_tup_fetch | |
) = stat | |
self.rate('%s.index.idx_scan' % (self.name), idx_scan, tags = (tags + ['schema:%s' % (schemaname), 'table:%s' % (relname), 'index:%s' % (idxname)])) | |
self.rate('%s.index.idx_tup_read' % (self.name), idx_tup_read, tags = (tags + ['schema:%s' % (schemaname), 'table:%s' % (relname), 'index:%s' % (idxname)])) | |
self.rate('%s.index.idx_tup_fetch' % (self.name), idx_tup_fetch, tags = (tags + ['schema:%s' % (schemaname), 'table:%s' % (relname), 'index:%s' % (idxname)])) | |
# Start Collecting Function Stats | |
self.log.info('Collecting Function Stats') | |
cu.execute(self.query_function_stats()) | |
for stat in cu.fetchall(): | |
( | |
schemaname, | |
funcname, | |
calls, | |
self_time, | |
total_time | |
) = stat | |
self.rate('%s.function.calls' % (self.name) , calls, tags = (tags + ['schema:%s' % (schemaname), 'function:%s' % (funcname)])) | |
self.rate('%s.function.self_time' % (self.name) , self_time, tags = (tags + ['schema:%s' % (schemaname), 'function:%s' % (funcname)])) | |
self.rate('%s.function.total_time' % (self.name), total_time, tags = (tags + ['schema:%s' % (schemaname), 'function:%s' % (funcname)])) | |
# Start Collecting Database Stats | |
self.log.info('Collecting Database Stats') | |
cu.execute(self.query_database_stats(database)) | |
for stat in cu.fetchall(): | |
( | |
datname, | |
xact_commit, | |
xact_rollback, | |
blks_read, | |
blks_hit | |
) = stat | |
self.rate('%s.database.xact_commit' % (self.name), xact_commit, tags = tags) | |
self.rate('%s.database.xact_rollback' % (self.name), xact_rollback, tags = tags) | |
self.rate('%s.database.blks_read' % (self.name), blks_read, tags = tags) | |
self.rate('%s.database.blks_hit' % (self.name), blks_hit, tags = tags) | |
# Start Collecting Database Connections | |
self.log.info('Collecting Database Connections') | |
cu.execute(self.query_connections(database)) | |
for stat in cu.fetchall(): | |
(connections) = stat | |
self.gauge('%s.database.connections' % (self.name), connections, tags = tags) | |
# Start Collecting Database Waiting Connections | |
self.log.info('Collecting Database Waiting Connections') | |
cu.execute(self.query_waiting_connections(database)) | |
for stat in cu.fetchall(): | |
(waiting_connections) = stat | |
self.gauge('%s.database.waiting_connections' % (self.name), waiting_connections, tags = tags) | |
# Start Collecting Database Replication Delay | |
self.log.info('Collecting Database Replication Delay') | |
cu.execute(self.query_replication_delay()) | |
for stat in cu.fetchall(): | |
(replication_delay) = stat | |
self.gauge('%s.database.replication_delay' % (self.name), replication_delay, tags = tags) | |
# Start Collecting Heap Memory Stats | |
self.log.info('Collecting Heap Memory Stats') | |
cu.execute(self.query_heap_memory_stats()) | |
for stat in cu.fetchall(): | |
( | |
heap_read, | |
heap_hit, | |
per_heap_ratio | |
) = stat | |
self.rate( '%s.database.heap_read' % (self.name), heap_read, tags = tags) | |
self.rate( '%s.database.heap_hit' % (self.name), heap_hit, tags = tags) | |
self.gauge('%s.database.per_heap_ratio' % (self.name), per_heap_ratio, tags = tags) | |
# Start Collecting Index Memory Stats | |
self.log.info('Collecting Index Memory Stats') | |
cu.execute(self.query_index_memory_stats()) | |
for stat in cu.fetchall(): | |
( | |
idx_read, | |
idx_hit, | |
per_idx_ratio | |
) = stat | |
self.rate( '%s.database.idx_read' % (self.name), idx_read, tags = tags) | |
self.rate( '%s.database.idx_hit' % (self.name), idx_hit, tags = tags) | |
self.gauge('%s.database.per_idx_ratio' % (self.name), per_idx_ratio, tags = tags) | |
# Closing PostgreSQL Connection | |
self.log.info('Closing PostgreSQL Connection') | |
cu.close() | |
db.close() | |
def query_table_stats(self): | |
return """ | |
select | |
psut.schemaname, | |
pc.relname, | |
pg_table_size(pc.relname::varchar) tblsize, | |
pg_indexes_size(pc.relname::varchar) idxsize, | |
pg_total_relation_size(pc.relname::varchar) relsize, | |
pc.reltuples::bigint, | |
pc.relpages, | |
coalesce(round((8192 / (nullif(pc.reltuples, 0) / nullif(pc.relpages, 0)))), 0) avg_tuplesize, | |
psut.seq_scan, | |
psut.idx_scan, | |
coalesce(100 * psut.idx_scan / nullif((psut.idx_scan + psut.seq_scan), 0), 0)::int per_idx_scan, | |
coalesce(100 * psiout.heap_blks_hit / nullif((psiout.heap_blks_hit + psiout.heap_blks_read), 0), 0)::int per_rel_hit, | |
coalesce(100 * psiout.idx_blks_hit / nullif((psiout.idx_blks_hit + psiout.idx_blks_read), 0), 0)::int per_idx_hit, | |
psut.n_tup_ins, | |
psut.n_tup_upd, | |
psut.n_tup_hot_upd, | |
coalesce(100 * psut.n_tup_hot_upd / nullif(psut.n_tup_upd, 0), 0)::int per_hot_upd, | |
psut.n_tup_del, | |
psut.n_live_tup, | |
psut.n_dead_tup, | |
coalesce(100 * psut.n_dead_tup / nullif(psut.n_live_tup, 0), 0)::int per_deadfill | |
from pg_stat_user_tables psut | |
inner join pg_statio_user_tables psiout on psiout.relname = psut.relname | |
inner join pg_class pc on pc.relname = psut.relname | |
order by pc.relname asc | |
""" | |
def query_index_stats(self): | |
return """ | |
select | |
pi.schemaname, | |
pcr.relname as relname, | |
pci.relname as idxname, | |
pg_size_pretty(pg_total_relation_size(pci.relname::varchar)) idxsize_pret, | |
pg_total_relation_size(pci.relname::varchar) idxsize, | |
pci.reltuples::bigint idxtuples, | |
pcr.reltuples::bigint reltuples, | |
coalesce(100 * pci.reltuples / nullif(pcr.reltuples, 0), 0)::int per_idx_covered, | |
pi.idx_scan, | |
pi.idx_tup_read, | |
pi.idx_tup_fetch | |
from pg_stat_user_indexes pi | |
inner join pg_class pci on pci.oid = pi.indexrelid | |
inner join pg_class pcr on pcr.oid = pi.relid | |
order by schemaname, relname, idxname | |
""" | |
def query_function_stats(self): | |
return """ | |
select | |
schemaname, | |
funcname, | |
calls, | |
self_time, | |
total_time | |
from pg_stat_user_functions | |
where schemaname <> 'pg_catalog' | |
""" | |
def query_database_stats(self, database): | |
return """ | |
select | |
datname, | |
pg_database_size('%s') db_size, | |
xact_commit, | |
xact_rollback, | |
blks_read, | |
blks_hit | |
from pg_stat_database where datname = '%s' | |
""" % (database, database) | |
def query_connections(self, database): | |
return """ | |
select count(1) connections from pg_stat_activity() where datname = '%s' | |
""" % (database) | |
def query_waiting_connections(self, database): | |
return """ | |
select count(1) waiting_connections from pg_stat_activity() where waiting is true and datname = '%s' | |
""" % (database) | |
def query_replication_delay(self): | |
return """ | |
select extract(epoch from (now() - pg_last_xact_replay_timestamp())) * 1000 as replication_delay | |
""" | |
def query_heap_memory_stats(self): | |
return """ | |
select | |
cast(sum(heap_blks_read) as bigint) heap_read, | |
cast(sum(heap_blks_hit) as bigint) heap_hit, | |
coalesce(cast(sum(heap_blks_hit) / nullif((sum(heap_blks_hit) + sum(heap_blks_read)), 0) * 100 as bigint), 0)::int per_heap_ratio | |
from pg_statio_user_tables | |
""" | |
def query_index_memory_stats(self): | |
return """ | |
select | |
cast(sum(idx_blks_read) as bigint) idx_read, | |
cast(sum(idx_blks_hit) as bigint) idx_hit, | |
coalesce(cast(sum(idx_blks_hit) / nullif((sum(idx_blks_hit) + sum(idx_blks_read)), 0) * 100 as bigint), 0)::int per_idx_ratio | |
from pg_statio_user_indexes | |
""" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
init_config: | |
name: 'postgresql' | |
instances: | |
- host: 'localhost' | |
port: '5432' | |
username: 'datadog' | |
password: '<complex_password>' | |
database: '<database_name>' | |
tags: | |
- 'env:<env>' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment