Created
December 22, 2011 18:11
-
-
Save op/1511258 to your computer and use it in GitHub Desktop.
xpub/xsub assertion
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <assert.h> | |
#include <stdint.h> | |
#include <stdio.h> | |
#include <string.h> | |
#include <unistd.h> | |
#include <zmq.h> | |
int main (int argc, char *argv[]) | |
{ | |
const char *msg[] = {"0", "1", "2"}; | |
unsigned char buf[128]; | |
int i, rc; | |
void *ctx = zmq_init(1); | |
void *pub = zmq_socket(ctx, ZMQ_PUB); | |
rc = zmq_bind(pub, "tcp://127.0.0.1:5561"); | |
assert(rc == 0); | |
void *sub = zmq_socket(ctx, ZMQ_SUB); | |
rc = zmq_bind(sub, "tcp://127.0.0.1:5560"); | |
assert(rc == 0); | |
rc = zmq_setsockopt(sub, ZMQ_SUBSCRIBE, "", 0); | |
assert(rc == 0); | |
zmq_pollitem_t items[1]; | |
items[0].socket = sub; | |
items[0].events = ZMQ_POLLIN; | |
unsigned int x = 0; | |
for (x = 0; ; x++) { | |
if (x % 100000 == 0) { | |
printf("send %u\n", x); | |
} | |
// Publish messages. | |
rc = zmq_send(pub, msg[0], 1, ZMQ_SNDMORE); | |
assert(rc >= 0); | |
rc = zmq_send(pub, msg[1], 1, ZMQ_SNDMORE); | |
assert(rc >= 0); | |
rc = zmq_send(pub, msg[2], 1, 0); | |
assert(rc >= 0); | |
// Handle published messages. | |
while (1) { | |
rc = zmq_poll(items, 1, 0); | |
assert(rc >= 0); | |
if (rc == 0) { | |
break; | |
} | |
if (items[0].revents & ZMQ_POLLIN) { | |
int more = 1; | |
size_t more_size = sizeof(more); | |
for (i = 0; more; i++) { | |
rc = zmq_recv(sub, buf, sizeof(buf), 0); | |
assert(rc >= 0); | |
assert(i < sizeof(msg)); | |
if (memcmp(msg[i], buf, rc) != 0) { | |
printf("recv %d: %.*s (%d)\n", i, rc, buf, rc); | |
assert(memcmp(msg[i], buf, rc) == 0); | |
} | |
rc = zmq_getsockopt(sub, ZMQ_RCVMORE, &more, &more_size); | |
assert(rc == 0); | |
} | |
assert(i == 3); | |
} | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <assert.h> | |
#include <stddef.h> | |
#include <stdio.h> | |
#include <stdint.h> | |
#include <unistd.h> | |
#include <zmq.h> | |
int main (int argc, char *argv[]) | |
{ | |
char buf[128]; | |
int rc; | |
int size; | |
void *ctx = zmq_init(1); | |
void *xpub = zmq_socket(ctx, ZMQ_XPUB); | |
rc = zmq_connect(xpub, "tcp://127.0.0.1:5560"); | |
assert(rc == 0); | |
void *xsub = zmq_socket(ctx, ZMQ_XSUB); | |
rc = zmq_connect(xsub, "tcp://127.0.0.1:5561"); | |
assert(rc == 0); | |
zmq_pollitem_t items[2]; | |
items[0].socket = xpub; | |
items[0].events = ZMQ_POLLIN; | |
items[1].socket = xsub; | |
items[1].events = ZMQ_POLLIN; | |
unsigned int x; | |
for (x = 0; ; x++) { | |
if (x % 100000 == 0) { | |
printf("recv %u\n", x); | |
} | |
rc = zmq_poll(items, 2, -1); | |
assert (rc >= 0); | |
// Pass the subscription upstream through the device. | |
if (items[0].revents & ZMQ_POLLIN) { | |
printf("Handling subscriptions...\n"); | |
rc = zmq_recv(xpub, buf, sizeof(buf), 0); | |
assert(rc >= 0); | |
size = rc; | |
rc = zmq_send(xsub, buf, size, 0); | |
assert(rc >= 0); | |
} | |
// Pass the published messages through the device. | |
if (items[1].revents & ZMQ_POLLIN) { | |
int i = 0; | |
int more; | |
size_t more_size = sizeof(more); | |
do { | |
rc = zmq_recv(xsub, buf, sizeof(buf), 0); | |
assert(rc >= 0); | |
size = rc; | |
rc = zmq_getsockopt(xsub, ZMQ_RCVMORE, &more, &more_size); | |
assert(rc == 0); | |
rc = zmq_send(xpub, buf, size, more ? ZMQ_SNDMORE : 0); | |
assert(rc >= 0); | |
i++; | |
} while (more); | |
if (i != 3) { | |
assert(i > 0); | |
printf("recv %d: %.*s (%d)\n", i, rc, buf, rc); | |
assert(i == 3); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
If you run one pubsub and one xpubsub, everything is fine. Two or more xpubsub and frames will get lost.