Created
February 10, 2018 04:00
-
-
Save adavidzh/9ba740baac541c225cfd587b8905f224 to your computer and use it in GitHub Desktop.
My little zmq DAQ
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
"""A client-server for simple DAQ operations. | |
The goal is offload the event data as soon as possible from the acquisition machine so that it can focus on talking to the hardware. | |
This code was developed with the concrete case of: | |
- a Raspberry Pi performing the detector readout via GPIO (the server) and | |
- the event data being procesed in a powerful computer (the client) | |
Flow control is performed over a synchronous REQquest-REPly connection and has three verbs: | |
- CONFIGUREBLING string - to configure the hardware. | |
- GIMMEEVENTS N_events - to request the acquisition on N_events. | |
- MICDROP - to signal the server that we're done with the session. | |
The client operates from a flight plan that is a list of commands to be executed. | |
The event data is sent from the server to the client using a PUSH-PULL topology. This means that more PULL clients can be added in which case they will receive events in a round-robin. This can be used to parallelise processing of the event data with more PULL clients. | |
""" | |
__version__ = '0.1' | |
__author__ = 'André David <[email protected]>' | |
import zmq | |
import time | |
import sys | |
import string | |
import random as rd | |
from functools import partial | |
from collections import defaultdict | |
import logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(relativeCreated)6d %(levelname)10s %(name)s %(processName)12s: %(message)s' | |
) | |
logger = logging.getLogger(__name__) | |
def random_string(length, chars=string.ascii_letters): | |
"""just a way to get random data""" | |
return ''.join( rd.choice(chars) for x in xrange(length) ) | |
def decode_request(request): | |
"""decodes a request as 'command [anything_else]'""" | |
parts = request.split(' ', 1) | |
command = parts.pop(0) | |
try: | |
payload = parts[0] | |
except IndexError: | |
payload = None | |
return (command, payload) | |
def server(port=5556): | |
"""Skeleton of the server side, running close to the hardware""" | |
context = zmq.Context() | |
# Create the command socket where requests are received from and replies are sent to | |
rep = context.socket(zmq.REP) | |
rep.bind("tcp://*:%d" % port) | |
logger.info("REP'ing @%d" % port) | |
def acquire_event(): | |
"""emulates the acquisiton of an event from hardware""" | |
logger.debug("(Quickly) acquiring one event from hardware") | |
time.sleep(0.5) | |
event_data = random_string(10) | |
return event_data | |
def send_events(payload, port): | |
"""takes care of PUSHing events out asynchronously""" | |
nevents = int(payload) | |
push = zmq.Context().socket(zmq.PUSH) | |
push.bind("tcp://*:%d" % port) | |
logger.info("PUSH'ing @%d" % port) | |
for event in xrange(nevents): | |
event_data = acquire_event() | |
message = '%d %s'% (event, event_data) | |
push.send(message) | |
logger.info("PUSH'ed event %d [%s]" % (event, event_data) ) | |
push.close() | |
logger.info("PUSH'ing @%d stopped" % port) | |
return 'SENT %d' % nevents | |
def configure_hw(configuration): | |
"""emulates the configuration of hardware""" | |
logger.info("(Slowly) configuring with [%s]" % configuration) | |
time.sleep(1) | |
return 'AWESOME SAUCE' | |
# Disptach table for the server functions | |
# the default returns a function (the second lambda) that returns something not obviously bad (empty string) | |
commands = defaultdict( lambda: lambda x: '', | |
{ | |
'CONFIGUREBLING' : configure_hw, | |
'GIMMEEVENTS' : partial( send_events, port=port+1 ), | |
}) | |
while (True): | |
# Wait for a command from the client | |
request = rep.recv() | |
logger.info("rx REQ [%s]" % request) | |
command, payload = decode_request(request) | |
logger.debug("Dispatching [%s][%s]" % (command, payload)) | |
result = commands[command](payload) | |
reply = ' '.join( (command, result) ) | |
rep.send(reply) | |
logger.debug("tx REP [%s]" % reply) | |
# Least fancy way to terminate the loop | |
if command == 'MICDROP': | |
break | |
rep.close() | |
context.term() | |
logger.info("Mic dropped") | |
def client(server_host='localhost', port=5556, flight_plan=('MICDROP',)): | |
"""Skeleton of the client side, running anywhere in the network (in a fast machine)""" | |
context = zmq.Context() | |
CMDhostport = '%s:%d' % (server_host, port) | |
EVThostport = '%s:%d' % (server_host, port+1) | |
# Connect to the control socket of the server to where requests are sent | |
logger.debug("REQ'ing @%s" % CMDhostport) | |
req = context.socket(zmq.REQ) | |
req.connect ("tcp://%s" % CMDhostport) | |
def process_event(data): | |
"""emulates the (slow) processing of one event's data""" | |
logger.debug("(Slowly) processing one event [%s]" % data) | |
time.sleep(1.5) | |
return | |
def receive_events(payload, hostport): | |
"""takes care of PULLing events in and processing them""" | |
nevents = int(payload) | |
logger.info("PULL'ing @%s started" % hostport) | |
pull = zmq.Context().socket(zmq.PULL) | |
pull.connect("tcp://%s" % hostport) | |
for event in xrange(nevents): | |
message = pull.recv() | |
logger.info("PULL'ed event %d [%s]" %(event,message)) | |
process_event(message) | |
pull.close() | |
logger.info("PULL'ing @%s stopped" % hostport) | |
return | |
# Dispatch table for commands in the client | |
commands = defaultdict( lambda: lambda x: '', | |
{ | |
'GIMMEEVENTS' : partial( receive_events, hostport=EVThostport ), | |
}) | |
for request in flight_plan: | |
req.send(request) | |
logger.debug("tx REQ [%s]" % request) | |
command, payload = decode_request(request) | |
logger.debug("Dispatching [%s][%s]" % (command, payload)) | |
result = commands[command](payload) | |
reply = req.recv() | |
logger.info("rx REP [%s]" % reply) | |
req.close() | |
context.term() | |
logger.info("All done with the flight plan") | |
if __name__ == "__main__": | |
"""This example code uses threads to emulate different processes""" | |
logger.setLevel(logging.DEBUG) | |
from pprint import pformat | |
import multiprocessing as mp | |
server_host = 'localhost' | |
server_port = rd.randint(49152, 65535) | |
flight_plan = ( | |
'CONFIGUREBLING %s' % random_string(20), | |
'GIMMEEVENTS %d' % rd.randint(4, 12), | |
'MICDROP', | |
) | |
server = mp.Process( | |
target=server, args=(server_port,), name='Server', | |
) | |
client = mp.Process( | |
target=client, args=(server_host, server_port, flight_plan), name='Client', | |
) | |
logger.info('The flight plan reads as follows:') | |
logger.info(pformat(flight_plan)) | |
logger.info("Starting server process") | |
server.start() | |
time.sleep(2) | |
logger.info("Starting client process") | |
client.start() | |
logger.info("Waiting for processes to end") | |
server.join() | |
client.join() | |
logger.info("All done") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Sample output, showing the server being done with taking events at 6546 ms and the client only being done processing events at 14045 ms: