Created
July 9, 2010 19:57
-
-
Save lamikae/469967 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class CoreService(Thread): | |
"""Every message in the CoreService goes through AMQP. | |
The CoreService initiates an Exchange, when a | |
* JSON request is received from remote host by RPC | |
* SOAP request is received from remote host by WSDL | |
These are internal methods that are executed in daemon mode. | |
The CoreService API has public methods to | |
* send JSON request to remote host by RPC | |
* send SOAP request to remote host by WSDL | |
Messages are stateless; they store no session state. | |
In that sense the CoreService is purely functional, | |
except for the fact that it triggers side-effects, | |
that are RPC and WSDL requests. | |
The CoreService uses the AMQP for internal message passing | |
between threads. This allows for easier threading and an | |
interface to add more services to the message bus. | |
An incoming JSON-RPC message is stored to "rpc:inbox" as such. | |
An incoming SOAP message is stored to "soap:inbox" queue parsed to JSON. | |
An outgoing JSON message to another RPC server is stored to "rpc:outbox". | |
An outgoing SOAP message to another SOAP server is stored to "soap:outbox", | |
""" | |
class Meta: | |
abstract = True | |
connection = None | |
def __init__(self, *args, **kwargs): | |
Thread.__init__(self, daemon=True) | |
self.connection = BrokerConnection( | |
hostname="localhost", port=5672, | |
userid="guest", password="guest", | |
virtual_host="/") | |
class ExchangeFilter(): | |
"""Message filter.""" | |
def rpc_exchange(func): | |
"""Filter RPC -> AMQP.""" | |
def do_filter(obj, message): | |
"""Preprocesses the message.""" | |
# do whatever | |
if obj.__class__ == 'amqp': | |
core_msg = AMQPTranslation.amqp2rpc(message) | |
else: | |
core_msg = AMQPTranslation.rpc2amqp(message) | |
return func(obj, *core_msg) | |
do_filter.__name__ = func.__name__ | |
do_filter.__dict__ = func.__dict__ | |
do_filter.__doc__ = func.__doc__ | |
return do_filter | |
class RPCService(CoreService): | |
""" RPC adapter for AMQP broker. | |
This daemon runs: | |
* RPC server for receiving incoming JSON requests from a remote host. | |
The request will be dispatched to AMQP: | |
exchange="CoreService", queue="rpc", routing_key="inbox". | |
* AMQP consumer exchange="CoreService", queue="rpc", routing_key="outbox" | |
will dispatch an JSON-RPC request to a pre-configured host. | |
The AMQP message content is in JSON, and can be filtered by business logic. | |
HOW to connect rpc:inbox -> soap:outbox ?? | |
""" | |
def __init__(self, *args, **kwargs): | |
CoreService.__init__(self, *args, **kwargs) | |
# AMQP -> RPC | |
self.consumer = Consumer( | |
connection=self.connection, | |
exchange="CoreService", | |
queue="rpc", | |
routing_key="outbox" | |
) | |
# send RPC when there are messages in the AMQP outbox | |
self.consumer.register_callback(self.dispatch_rpc) | |
# RPC -> AMQP | |
self.server = RPCServer() | |
self.publisher = Publisher( | |
connection=self.connection, | |
exchange="CoreService", | |
queue="rpc", | |
routing_key="inbox" | |
# define an RPC API call | |
self.server.register_function(dispatch_amqp, 'localAPICall') # XXX | |
def run(self): | |
self.server.run() # forks | |
it = self.consumer.iterconsume() | |
# Go into the consumer loop. | |
while True: | |
log.debug("consumer loop") | |
it.next() | |
@ExchangeFilter.rpc_exchange | |
def dispatch_amqp(self, json, **kwargs): | |
""" RPC -> AMQP dispatcher. | |
Uses persistant publisher stored in object state. | |
Receives (self<RPCService>, message) from RPC, | |
and the filter returns (self, json, **kwargs). | |
""" | |
return self.publisher.send(json) | |
@staticmethod | |
@ExchangeFilter.rpc_exchange | |
def dispatch_rpc(message_data, json, **kwargs): | |
""" AMQP -> RPC dispatcher. | |
Receives (message_data, message) from AMQP, | |
and the filter returns (message_data, json, **kwargs). | |
@kwarg rpcmethod | |
""" | |
log.debug('dispatching RPC: %s' % json) | |
try: | |
# some pre-configured server | |
server = jsonrpclib.Server('http://127.0.0.1:8181') | |
# call dynamic method | |
#reply = server.anotherAPICall(message_data) | |
func = getattr(server, kwargs['rpcmethod'], json) | |
if callable(func): | |
reply = func() | |
log.debug(jsonrpclib.history.response) | |
finally: | |
message.ack() | |
return reply | |
class SOAPService(CoreService): | |
""" SOAP/WSDL adapter for AMQP broker. | |
The daemon runs: | |
* WSGI server offering a SOAP interface. The methods should be explicitly | |
defined in the implementation. The method can dispatch the request to AMQP: | |
exchange="CoreService", queue="soap", routing_key="inbox". | |
* AMQP consumer exchange="CoreService", queue="soap", routing_key="outbox". | |
HOW to connect soap:inbox -> rpc:outbox ?? | |
""" | |
def __init__(self, *args, **kwargs): | |
CoreService.__init__(self, *args, **kwargs) | |
self.consumer = Consumer( | |
connection=self.connection, | |
exchange="CoreService", | |
queue="soap", | |
routing_key="outbox" | |
) | |
self.publisher = Publisher( | |
connection=self.connection, | |
exchange="CoreService", | |
queue="soap", | |
routing_key="inbox" | |
class Reactor(): # single-core Reactor | |
"""One Thread per one CoreService; the Reactor should start 2 Threads. | |
""" | |
core_services = None | |
def __init__(self,*args,**kwargs): | |
self.core_services = [ | |
RPCService(), | |
SOAPService() | |
] | |
def __unicode__(self): | |
return 'Reactor' | |
def run(self): | |
log.debug("running reactor thread: %s" % self) | |
for service in self.core_services: | |
log.debug('activating service %s' % service) | |
service.start() | |
class AMQPTranslation(): | |
"""The interface between adapters and AMQP.""" | |
@staticmethod | |
def amqp2rpc(message): | |
return (message, {'rpcmethod': 'remoteAPICall'} | |
@staticmethod | |
def rpc2amqp(self): | |
return message |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment