|
# -*- coding: utf-8 -*- |
|
""" |
|
# publisher of pub/sub pattern |
|
""" |
|
import os, sys, types, psycopg2 |
|
is_str=lambda s:s in types.StringTypes |
|
wopen=(lambda f:open(f,'w') if is_str(f) else f or sys.stdout) |
|
ropen=(lambda f:open(f,'r') if is_str(f) else f or sys.stdin) |
|
__version__ = "1.0.0.1" |
|
DSN = os.getenv("DSN", "dbname=postgres") |
|
conn = psycopg2.connect(DSN) |
|
conn.set_isolation_level( |
|
psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) |
|
curs = conn.cursor() |
|
def pub(channel, nickname, message): |
|
""" |
|
publish <message> to <channel> as <nickname> |
|
""" |
|
import psycopg2.extensions |
|
chat_message = psycopg2.extensions.QuotedString( |
|
'[%s]:%s' % (nickname, message)).getquoted() |
|
curs.execute("NOTIFY %s, %s;" % (channel, chat_message)) |
|
def sub(channel): |
|
""" |
|
subscribe to <channel> |
|
""" |
|
print('SUBSCRIBE TO channel #%s' % channel) |
|
curs.execute("LISTEN %s;" % channel) |
|
def uns(channel): |
|
""" |
|
unsubscribe from <channel> |
|
""" |
|
print('UNSUBSCRIBE FROM channel #%s' % channel) |
|
curs.execute("UNLISTEN %s;" % channel) |
|
def notifications(): |
|
""" |
|
iterator for notifications |
|
""" |
|
conn.poll() |
|
while conn.notifies: |
|
yield conn.notifies.pop() |
|
def subs_loop(channels, callback_func): |
|
""" |
|
subscribe to <channels> and send notifies to <callback_func> |
|
""" |
|
[ sub(channel) for channel in channels ] |
|
while True: |
|
for n in notifications(): |
|
callback_func(n) |
|
def tap(channels, out_file_spec=None): |
|
""" |
|
tap <channels> and send to out_file |
|
""" |
|
out_file = wopen(out_file_spec) |
|
def cb(n): |
|
s = "[%s] #%s - %s"%(n.pid, n.channel, n.payload) |
|
out_file.write(s) |
|
return subs_loop(channels, cb) |
|
|
|
def drain(channel, nickname, in_file_spec=None): |
|
""" |
|
drain (publish in_file) as <nickname> to <channel> |
|
""" |
|
print("PUBLISH to channel #%s" % channel) |
|
in_file = ropen(in_file_spec) |
|
for line in in_file: |
|
pub(channel, nickname, line) |