Skip to content

Instantly share code, notes, and snippets.

@craig8
Last active October 16, 2015 18:34
Show Gist options
  • Save craig8/0b4d4989e9924bbee50e to your computer and use it in GitHub Desktop.
Save craig8/0b4d4989e9924bbee50e to your computer and use it in GitHub Desktop.
VOLTTRON Weather Agent test
# Example file using the weather agent.
#
# Requirements
# - A VOLTTRON instance must be started
# - A weatheragnet must be running prior to running this code.
#
# Author: Craig Allwardt
from volttron.platform.vip.agent import Agent
import gevent
from gevent.core import callback
def onmessage(peer, sender, bus, topic, headers, message):
print('received: peer=%r, sender=%r, bus=%r, topic=%r, headers=%r, message=%r' % (
peer, sender, bus, topic, headers, message))
a = Agent()
gevent.spawn(a.core.run).join(0)
a.vip.pubsub.subscribe(peer='pubsub',
prefix='weather/response',
callback=onmessage).get(timeout=5)
a.vip.pubsub.publish(peer='pubsub',
topic='weather/request',
headers={'requesterID': 'agent1'},
message={'zipcode': '99336'}).get(timeout=5)
gevent.sleep(5)
a.core.stop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment