Last active
January 19, 2017 13:26
-
-
Save boltzj/586f654358da2f56cc4d358870a5b148 to your computer and use it in GitHub Desktop.
Publisher.js
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var zmq = require('zmq'); | |
/** | |
* Publish Message to Subscriber | |
*/ | |
var publisher = zmq.socket('pub'); | |
publisher.bind('tcp://*:8688', function(err) { | |
if (err) { | |
return console.log(err); | |
} | |
console.log('Listening on 8688…'); | |
// TODO: Send message to subscriber | |
setInterval(function loop() { | |
var rand = { | |
id: Math.ceil(Math.random() * 5) | |
} | |
publisher.send(JSON.stringify(rand)); | |
}, 1000); | |
}); | |
/** | |
* Handle Subscriber Responses | |
*/ | |
var pull = zmq.socket('pull'); | |
pull.bind('tcp://*:8888', function(err) { | |
pull.on('message', function(msg) { | |
// TODO: Handle Message here | |
console.log(msg.toString()); | |
}) | |
}); | |
process.on('SIGINT', function() { | |
publisher.close() | |
pull.close() | |
process.exit() | |
}); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import sys | |
import random | |
import time | |
import zmq | |
sub_port = "8688" | |
push_port = "8888" | |
# Init context | |
context = zmq.Context() | |
# Subscriber socket | |
sub = context.socket(zmq.SUB) | |
# Socket to respond to publisher | |
push = zmq.Context().socket(zmq.PUSH) | |
# Connect to Subscriber and it's push respond | |
sub.connect("tcp://localhost:%s" % sub_port) | |
push.connect("tcp://localhost:%s" % push_port) | |
rand = random.randrange(1, 5) | |
print('Random id: %s' % rand) | |
# topicfilter = "%datasetId%" | |
sub.setsockopt(zmq.SUBSCRIBE, b'') | |
ttl = time.time() + 20 | |
while 42: | |
try: | |
# Read from publisher socket | |
buf = sub.recv(zmq.DONTWAIT).decode('utf-8') | |
# Parse receipt message | |
msg = json.loads(buf) | |
# Message Processing | |
if msg["id"] == rand: | |
print('Message Id %s! Update TTL' % rand) | |
# Send response to Publisher | |
push.send_json({"id": rand, "message": "Hello from %s" % rand}) | |
# Update TTL | |
ttl = time.time() + 20 | |
# Break TTL | |
except zmq.Again: | |
# No message | |
if time.time() > ttl: | |
print('TTL exceeded.. Bye bye') | |
sys.exit(0) | |
else: | |
# Sleep for 33ms | |
time.sleep(0.033) | |
# Error Handling | |
except zmq.ZMQError as e: | |
if e.errno == zmq.ETERM: | |
break # Socked Interrupted | |
else: | |
raise | |
except AttributeError as e: | |
print("Malformed Message: AttributeError") | |
except KeyboardInterrupt as e: | |
print('\n', 'Catch KeyboardInterrupt...') | |
sys.exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment