Created
September 24, 2014 06:03
-
-
Save sqbing/076c8163e6d8ef7d4175 to your computer and use it in GitHub Desktop.
zmq example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <iostream> | |
#include <boost/program_options.hpp> | |
#include <boost/property_tree/ptree.hpp> | |
#include <boost/property_tree/json_parser.hpp> | |
#include <boost/any.hpp> | |
#include <inttypes.h> | |
#include <signal.h> | |
#ifdef __cplusplus | |
extern "C"{ | |
#endif | |
#include <uuid/uuid.h> | |
#ifdef __cplusplus | |
} | |
#endif | |
#include "zmq.hpp" | |
#define GRID_MESSAGE_PREFIX 0x22 | |
#ifndef uchar | |
#define uchar uint8_t | |
#endif | |
#ifndef DELIF | |
#define DELIF(a) do{if(a){delete (a); (a) = NULL;}}while(0) | |
#endif | |
static int router_port = 4150; | |
static const char *router_port_str = "4150"; | |
static int publish_port = 4151; | |
static const char *publish_port_str = "4151"; | |
static time_t current_time = time(0); | |
enum { | |
ROLE_MASTER, | |
ROLE_WORKER | |
}; | |
enum { | |
MSG_TYPE_HANDSHAKE = 0, | |
MSG_TYPE_HANDSHAKE_OK, | |
MSG_TYPE_HANDSHAKE_FAIL, | |
MSG_TYPE_HEARTBEAT, | |
MSG_TYPE_EXIT | |
}; | |
const char *MessageTypeString[] = { | |
"MESSAGE_TYPE_HANDSHAKE", | |
"MESSAGE_TYPE_HANDSHAKE_OK", | |
"MESSAGE_TYPE_HANDSHAKE_FAIL", | |
"MESSAGE_TYPE_HEARTBEAT", | |
"MESSAGE_TYPE_EXIT" | |
}; | |
int process_role = ROLE_MASTER; | |
std::string local_ip = ""; | |
std::string remote_ip = ""; | |
std::string remote_router_port_str = ""; | |
typedef struct GridContext GridContext; | |
typedef struct GridInstanceContext GridInstanceContext; | |
typedef struct GridMessageContext{ | |
zmq::socket_t *_sock; | |
boost::any _data; | |
}GridMessageContext; | |
struct GridInstanceContext{ | |
GridInstanceContext():_last_heartbeat_time(0){ | |
} | |
~GridInstanceContext(){ | |
CloseAllSockets(); | |
} | |
void CloseAllSockets(){ | |
std::cout<<"info: closing all sockets for "<<_id<<std::endl; | |
if(_router_sock){ | |
_router_sock->close(); | |
_router_sock.reset(); | |
} | |
if(_publish_sock){ | |
_publish_sock->close(); | |
_publish_sock.reset(); | |
} | |
if(_subscrib_sock){ | |
_subscrib_sock->close(); | |
_subscrib_sock.reset(); | |
} | |
if(_request_sock){ | |
_request_sock->close(); | |
_request_sock.reset(); | |
} | |
} | |
time_t _last_heartbeat_time; | |
std::string _id; | |
std::string _ip; | |
std::string _router_port; | |
std::string _publish_port; | |
boost::shared_ptr<zmq::socket_t> _router_sock; | |
boost::shared_ptr<zmq::socket_t> _publish_sock; | |
boost::shared_ptr<zmq::socket_t> _subscrib_sock; | |
boost::shared_ptr<zmq::socket_t> _request_sock; | |
void add_sub(std::string &address){ | |
_subscrib_sock->connect(address.c_str()); | |
} | |
void del_sub(std::string &address){ | |
_subscrib_sock->disconnect(address.c_str()); | |
} | |
void get_jsonified_info(std::string &info){ | |
boost::property_tree::ptree pt_root; | |
pt_root.put("id", _id); | |
pt_root.put("ip", _ip); | |
pt_root.put("publish_port", _publish_port); | |
pt_root.put("router_port", _router_port); | |
std::stringstream ss; | |
boost::property_tree::write_json(ss, pt_root); | |
info = ss.str(); | |
} | |
}; | |
typedef struct PollinContext{ | |
boost::any _data; | |
zmq::socket_t *_sock; | |
boost::function<void(GridMessageContext &)> _on_read; | |
}PollinContext; | |
struct GridContext{ | |
GridContext():_sock_ctx(1){ | |
std::cout<<"info: new GridContext"<<std::endl; | |
} | |
~GridContext(){ | |
_all_instances.clear(); | |
_pollins_vector.clear(); | |
_ctx.CloseAllSockets(); | |
std::cout<<"info: destroying zmq context"<<std::endl; | |
_sock_ctx.close(); | |
} | |
// 本实例信息 | |
GridInstanceContext _ctx; | |
// pollin句柄列表 | |
std::vector<boost::shared_ptr<PollinContext> > | |
_pollins_vector; | |
// 实例列表 | |
std::map<std::string, boost::shared_ptr<GridInstanceContext> > | |
_all_instances; | |
zmq::context_t _sock_ctx; | |
// 增加pollin句柄 | |
void add_pollin(boost::shared_ptr<PollinContext> pollin_ctx){ | |
_pollins_vector.insert(_pollins_vector.begin(), pollin_ctx); | |
} | |
// 删除pollin句柄 | |
void del_pollin(boost::shared_ptr<PollinContext> pollin_ctx){ | |
for(int i = 0; i < _pollins_vector.size(); i++){ | |
if(_pollins_vector[i]->_sock == pollin_ctx->_sock){ | |
_pollins_vector.erase(_pollins_vector.begin()+i); | |
} | |
} | |
} | |
void poll(){ | |
current_time = time(0); | |
if(_pollins_vector.empty()){ | |
return ; | |
} | |
zmq::pollitem_t *iterms = NULL, *piterm = NULL; | |
std::vector<boost::shared_ptr<PollinContext> > temp_vector(_pollins_vector); | |
// 注意释放! | |
try{ | |
iterms = new zmq::pollitem_t[temp_vector.size()]; | |
}catch(...){ | |
std::cout<<"alloc poll iterms failed"<<std::endl; | |
return ; | |
} | |
for(int i = 0; i < temp_vector.size(); i++){ | |
piterm = &iterms[i]; | |
boost::shared_ptr<PollinContext> pollin_ctx = temp_vector[i]; | |
piterm->socket = *pollin_ctx->_sock; | |
piterm->fd = 0; | |
piterm->events = ZMQ_POLLIN; | |
piterm->revents = 0; | |
} | |
try{ | |
int poll_ret = zmq::poll(iterms, temp_vector.size(), 1000); | |
std::cout<<"info: poll return "<<poll_ret<<std::endl; | |
if(poll_ret == 0){ | |
if(iterms){ | |
delete[] iterms; | |
iterms = NULL; | |
} | |
return ; | |
} | |
for(int i = 0; i < temp_vector.size(); i++){ | |
boost::shared_ptr<PollinContext> pollin_ctx = temp_vector[i]; | |
piterm = &iterms[i]; | |
if(piterm->revents & ZMQ_POLLIN){ | |
std::cout<<"info: dispatching message from socket "<<pollin_ctx->_sock<<std::endl; | |
// 分发消息 | |
GridMessageContext msg_ctx; | |
msg_ctx._sock = pollin_ctx->_sock; | |
msg_ctx._data = pollin_ctx->_data; | |
pollin_ctx->_on_read(msg_ctx); | |
} | |
} | |
}catch(std::exception &e){ | |
std::cout<<"failed to poll sockets"<<std::endl; | |
std::cout<<e.what()<<std::endl; | |
if(iterms){ | |
delete[] iterms; | |
iterms = NULL; | |
} | |
return ; | |
} | |
if(iterms){ | |
delete[] iterms; | |
iterms = NULL; | |
} | |
} | |
}; | |
static GridContext grid_ctx; | |
typedef struct HandshakeContext{ | |
zmq::socket_t *_sock; | |
char *_address; | |
size_t _address_len; | |
}HandshakeContext; | |
static int exit_process = 0; | |
void int_handler(int signal){ | |
exit_process = 1; | |
} | |
void make_tcp_address(std::string &ip, std::string &port, std::string &out){ | |
out = "tcp://"; | |
out += ip; | |
out += ":"; | |
out += port; | |
} | |
/** | |
* @brief 发送握手消息 | |
* | |
* @param handshake_ctx | |
*/ | |
int send_handshake_init(HandshakeContext &handshake_ctx){ | |
std::string content; | |
grid_ctx._ctx.get_jsonified_info(content); | |
uint32_t content_len = content.size()+1; | |
zmq::message_t msg(6+content_len); | |
uchar *data = (uchar *)msg.data(); | |
data[0] = GRID_MESSAGE_PREFIX; | |
data[1] = MSG_TYPE_HANDSHAKE; | |
data[2] = (uchar)(content_len >> 24); | |
data[3] = (uchar)(content_len >> 16); | |
data[4] = (uchar)(content_len >> 8); | |
data[5] = (uchar)(content_len); | |
memcpy(data+6, content.c_str(), content_len); | |
try{ | |
handshake_ctx._sock->send(msg); | |
}catch(std::exception &e){ | |
std::cout<<"error: sending handshake failed "<<e.what()<<std::endl; | |
return -1; | |
} | |
return 0; | |
} | |
/** | |
* @brief 发送握手成功消息 | |
* | |
* @param handshake_ctx | |
*/ | |
int send_handshake_ok(HandshakeContext &handshake_ctx){ | |
zmq::message_t msg1(handshake_ctx._address_len); | |
memcpy(msg1.data(), handshake_ctx._address, handshake_ctx._address_len); | |
zmq::message_t msg2; | |
std::string content; | |
grid_ctx._ctx.get_jsonified_info(content); | |
uint32_t content_len = content.size()+1; | |
zmq::message_t msg3(6+content_len); | |
uchar *data = (uchar *)msg3.data(); | |
data[0] = GRID_MESSAGE_PREFIX; | |
data[1] = MSG_TYPE_HANDSHAKE_OK; | |
data[2] = (uchar)(content_len >> 24); | |
data[3] = (uchar)(content_len >> 16); | |
data[4] = (uchar)(content_len >> 8); | |
data[5] = (uchar)(content_len); | |
memcpy(data+6, content.c_str(), content_len); | |
try{ | |
handshake_ctx._sock->send(msg1, ZMQ_SNDMORE); | |
handshake_ctx._sock->send(msg2, ZMQ_SNDMORE); | |
handshake_ctx._sock->send(msg3); | |
}catch(std::exception &e){ | |
std::cout<<"error: sending handshake_ok failed "<<e.what()<<std::endl; | |
return -1; | |
} | |
return 0; | |
} | |
/** | |
* @brief 发送握手失败消息 | |
* | |
* @param handshake_ctx | |
*/ | |
int send_handshake_fail(HandshakeContext &handshake_ctx){ | |
zmq::message_t msg1(handshake_ctx._address_len); | |
memcpy(msg1.data(), handshake_ctx._address, handshake_ctx._address_len); | |
zmq::message_t msg2; | |
std::string content; | |
grid_ctx._ctx.get_jsonified_info(content); | |
uint32_t content_len = content.size()+1; | |
zmq::message_t msg3(6+content_len); | |
uchar *data = (uchar *)msg3.data(); | |
data[0] = GRID_MESSAGE_PREFIX; | |
data[1] = MSG_TYPE_HANDSHAKE_FAIL; | |
data[2] = (uchar)(content_len >> 24); | |
data[3] = (uchar)(content_len >> 16); | |
data[4] = (uchar)(content_len >> 8); | |
data[5] = (uchar)(content_len); | |
memcpy(data+6, content.c_str(), content_len); | |
try{ | |
handshake_ctx._sock->send(msg1, ZMQ_SNDMORE); | |
handshake_ctx._sock->send(msg2, ZMQ_SNDMORE); | |
handshake_ctx._sock->send(msg3); | |
}catch(std::exception &e){ | |
std::cout<<"error: sending handshake_ok failed "<<e.what()<<std::endl; | |
return -1; | |
} | |
return 0; | |
} | |
void handle_handshake_response(GridMessageContext &msg_ctx){ | |
std::cout<<"info: handle_handshake_response()"<<std::endl; | |
// 处理handshake_ok和handshake_fail | |
if(msg_ctx._data.empty() | |
|| msg_ctx._data.type() != typeid(HandshakeContext *)){ | |
std::cout<<"error: message context invalid"<<std::endl; | |
return ; | |
} | |
HandshakeContext *handshake_ctx = | |
boost::any_cast<HandshakeContext *>(msg_ctx._data); | |
try{ | |
// 删除pollin信息 | |
boost::shared_ptr<PollinContext> pollin_ctx(new PollinContext); | |
pollin_ctx->_sock = msg_ctx._sock; | |
grid_ctx.del_pollin(pollin_ctx); | |
std::cout<<"info: pollin deleted "<<pollin_ctx->_sock<<std::endl; | |
}catch(...){ | |
std::cout<<"error: failed to delete pollin"<<std::endl; | |
DELIF(handshake_ctx); | |
return ; | |
} | |
// 接收消息 | |
zmq::message_t msg; | |
zmq::socket_t *sock = msg_ctx._sock; | |
try{ | |
sock->recv(&msg); | |
}catch(std::exception &e){ | |
std::cout<<"error: failed to recv handshake message "<<e.what()<<std::endl; | |
DELIF(handshake_ctx); | |
return ; | |
} | |
// 解析消息 | |
uchar *data = (uchar *)msg.data(); | |
uchar prefix = data[0]; | |
uchar type = data[1]; | |
if(prefix != GRID_MESSAGE_PREFIX | |
|| (type != MSG_TYPE_HANDSHAKE_OK | |
&& type != MSG_TYPE_HANDSHAKE_FAIL)){ | |
std::cout<<"error: failed to parse handshake message, format invalid"; | |
DELIF(handshake_ctx); | |
return ; | |
} | |
// 解析instance信息 | |
uint32_t content_len = (data[2] << 24) | |
| (data[3] << 16) | |
| (data[4] << 8) | |
| (data[5]); | |
std::string content; | |
try{ | |
char *content_wrapper = new char[content_len+1]; | |
memset(content_wrapper, 0, content_len+1); | |
memcpy(content_wrapper, data+6, content_len); | |
content = content_wrapper; | |
delete[] content_wrapper; | |
}catch(...){ | |
std::cout<<"error: failed to alloc content wrapper"<<std::endl; | |
DELIF(handshake_ctx); | |
return ; | |
} | |
std::stringstream ss(content); | |
boost::property_tree::ptree pt_root; | |
try{ | |
boost::property_tree::read_json(ss, pt_root); | |
}catch(std::exception &e){ | |
std::cout<<"error: failed to parse content "<<e.what()<<std::endl; | |
DELIF(handshake_ctx); | |
return ; | |
} | |
boost::shared_ptr<GridInstanceContext> instance_ctx; | |
try{ | |
instance_ctx.reset(new GridInstanceContext); | |
}catch(std::exception &e){ | |
std::cout<<"error: failed to alloc grid instance context "<<e.what()<<std::endl; | |
DELIF(handshake_ctx); | |
return ; | |
} | |
try{ | |
instance_ctx->_id = pt_root.get<std::string>("id"); | |
instance_ctx->_ip = pt_root.get<std::string>("ip"); | |
instance_ctx->_publish_port = pt_root.get<std::string>("publish_port"); | |
instance_ctx->_router_port = pt_root.get<std::string>("router_port"); | |
}catch(std::exception &e){ | |
std::cout<<"error: failed to parse content "<<e.what()<<std::endl; | |
DELIF(handshake_ctx); | |
return ; | |
} | |
std::cout<<"info: ["<<MessageTypeString[type]<<"]" | |
<<" id: "<<instance_ctx->_id | |
<<" ip: "<<instance_ctx->_ip | |
<<" publish port: "<<instance_ctx->_publish_port | |
<<" router port: "<<instance_ctx->_router_port<<std::endl; | |
grid_ctx._all_instances[instance_ctx->_id] = instance_ctx; | |
// 删除HandshakeContext | |
DELIF(handshake_ctx); | |
// subcrib对方publish接口 | |
std::string remote_publish_address; | |
make_tcp_address(instance_ctx->_ip, instance_ctx->_publish_port, remote_publish_address); | |
try{ | |
grid_ctx._ctx._subscrib_sock->connect(remote_publish_address.c_str()); | |
}catch(std::exception &e){ | |
std::cout<<"error: failed to connect to remote publish address ["<<remote_publish_address<<"] " | |
<<e.what()<<std::endl; | |
return ; | |
} | |
std::cout<<"info: succeeded to subscrib "<<remote_publish_address<<std::endl; | |
return ; | |
} | |
void handle_handshake_request(GridMessageContext &msg_ctx){ | |
std::cout<<"info: handle_handshake_request()"<<std::endl; | |
// 处理handshake消息 | |
HandshakeContext handshake_ctx; | |
zmq::message_t msg1, msg2, msg3; | |
try{ | |
msg_ctx._sock->recv(&msg1);// address | |
msg_ctx._sock->recv(&msg2);// empty | |
msg_ctx._sock->recv(&msg3);// content | |
}catch(std::exception &e){ | |
std::cout<<"error: failed to recv handshake message "<<e.what()<<std::endl; | |
send_handshake_fail(handshake_ctx); | |
return; | |
} | |
handshake_ctx._address = static_cast<char *>(msg1.data()); | |
handshake_ctx._address_len = msg1.size(); | |
handshake_ctx._sock = msg_ctx._sock; | |
uchar *data = static_cast<uchar *>(msg3.data()); | |
if(data[0] != GRID_MESSAGE_PREFIX | |
|| data[1] != MSG_TYPE_HANDSHAKE){ | |
std::cout<<"error: invalid message"; | |
send_handshake_fail(handshake_ctx); | |
return ; | |
} | |
uint32_t content_len = (data[2] << 24) | |
| (data[3] << 16) | |
| (data[4] << 8) | |
| (data[5]); | |
std::cout<<"info: content length is "<<content_len<<std::endl; | |
uchar *content = data+6; | |
boost::property_tree::ptree pt_root; | |
uchar *content_wrapper = NULL; | |
try{ | |
content_wrapper = new uchar[content_len+1]; | |
memset(content_wrapper, 0, content_len+1); | |
memcpy(content_wrapper, content, content_len); | |
std::string content_str = (char *)content_wrapper; | |
std::cout<<"info: content ["<<content_str<<"]"<<std::endl; | |
std::stringstream ss(content_str); | |
boost::property_tree::read_json(ss, pt_root); | |
}catch(std::exception &e){ | |
if(content_wrapper){ | |
delete[] content_wrapper; | |
content_wrapper = NULL; | |
} | |
std::cout<<"error: "<<e.what()<<std::endl; | |
send_handshake_fail(handshake_ctx); | |
return ; | |
} | |
if(content_wrapper){ | |
delete[] content_wrapper; | |
content_wrapper = NULL; | |
} | |
boost::shared_ptr<GridInstanceContext> instance_ctx; | |
try{ | |
instance_ctx.reset(new GridInstanceContext); | |
}catch(...){ | |
send_handshake_fail(handshake_ctx); | |
return ; | |
} | |
instance_ctx->_ip = pt_root.get<std::string>("ip"); | |
instance_ctx->_id = pt_root.get<std::string>("id"); | |
instance_ctx->_publish_port = pt_root.get<std::string>("publish_port"); | |
instance_ctx->_router_port = pt_root.get<std::string>("router_port"); | |
std::cout<<"info: id "<<instance_ctx->_id<<std::endl; | |
std::cout<<"info: ip "<<instance_ctx->_ip<<std::endl; | |
std::cout<<"info: publish port "<<instance_ctx->_publish_port<<std::endl; | |
std::cout<<"info: router port "<<instance_ctx->_router_port<<std::endl; | |
// 在实例列表中检查此实例是否已经存在 | |
std::map<std::string, boost::shared_ptr<GridInstanceContext> >::iterator iter; | |
iter = grid_ctx._all_instances.find(instance_ctx->_id); | |
if(iter != grid_ctx._all_instances.end()){ | |
boost::shared_ptr<GridInstanceContext> old_instance_ctx = iter->second; | |
// 出现同ID实例 | |
// 若同ID实例的IP也相同,则更新已有实例的端口 | |
if(old_instance_ctx->_ip.compare(instance_ctx->_ip) == 0){ | |
if(old_instance_ctx->_publish_port.compare(instance_ctx->_publish_port)){ | |
// 端口不同,则重新订阅新端口 | |
std::string remote_publish_address; | |
make_tcp_address(old_instance_ctx->_ip, old_instance_ctx->_publish_port, remote_publish_address); | |
try{ | |
grid_ctx._ctx._subscrib_sock->disconnect(remote_publish_address.c_str()); | |
}catch(std::exception &e){ | |
std::cout<<"error: failed to disconnect old publish address ["<<remote_publish_address<<"] " | |
<<e.what()<<std::endl; | |
} | |
old_instance_ctx->_publish_port = instance_ctx->_publish_port; | |
old_instance_ctx->_router_port = instance_ctx->_router_port; | |
} | |
} | |
// 若同ID实例的IP不同,则返回握手失败 | |
else{ | |
send_handshake_fail(handshake_ctx); | |
return ; | |
} | |
} | |
grid_ctx._all_instances[instance_ctx->_id] = instance_ctx; | |
boost::property_tree::ptree pt_ret; | |
pt_ret.put("id", grid_ctx._ctx._id); | |
pt_ret.put("ip", grid_ctx._ctx._ip); | |
pt_ret.put("router_port", grid_ctx._ctx._router_port); | |
pt_ret.put("publish_port", grid_ctx._ctx._publish_port); | |
// subscrib该实例的publish接口 | |
std::string remote_publish_address; | |
make_tcp_address(instance_ctx->_ip, instance_ctx->_publish_port, remote_publish_address); | |
try{ | |
grid_ctx._ctx._subscrib_sock->connect(remote_publish_address.c_str()); | |
}catch(std::exception &e){ | |
std::cout<<"error: failed to connect to publish address ["<<remote_publish_address<<"] "<<e.what()<<std::endl; | |
send_handshake_fail(handshake_ctx); | |
return ; | |
} | |
std::cout<<"info: succeeded to subscrib "<<remote_publish_address<<std::endl; | |
send_handshake_ok(handshake_ctx); | |
return ; | |
} | |
void handle_instance_exit(GridMessageContext &msg_ctx){ | |
zmq::message_t *msg = boost::any_cast<zmq::message_t*>(msg_ctx._data); | |
uchar *data = (uchar *)msg->data(); | |
data += 2; | |
uuid_t u; | |
memcpy(u, data, 16); | |
char us[37]; | |
uuid_unparse_lower(u, us); | |
us[36] = '\0'; | |
std::string id = us; | |
std::map<std::string, boost::shared_ptr<GridInstanceContext> >::iterator iter = grid_ctx._all_instances.begin(); | |
iter = grid_ctx._all_instances.find(id); | |
if(iter == grid_ctx._all_instances.end()){ | |
std::cout<<"error: failed to handle ["<<MessageTypeString[MSG_TYPE_EXIT]<<"] message, instance ["<<id<<"] not found"<<std::endl; | |
return ; | |
} | |
std::cout<<"info: remove instance "<<iter->second->_id<<" from instance list"<<std::endl; | |
grid_ctx._all_instances.erase(iter); | |
return ; | |
} | |
void handle_heartbeat(GridMessageContext &msg_ctx){ | |
zmq::message_t *msg = boost::any_cast<zmq::message_t*>(msg_ctx._data); | |
uchar *data = (uchar *)msg->data(); | |
data += 2; | |
uuid_t u; | |
memcpy(u, data, 16); | |
char us[37]; | |
uuid_unparse_lower(u, us); | |
us[36] = '\0'; | |
std::string id = us; | |
std::map<std::string, boost::shared_ptr<GridInstanceContext> >::iterator iter = grid_ctx._all_instances.begin(); | |
iter = grid_ctx._all_instances.find(id); | |
if(iter == grid_ctx._all_instances.end()){ | |
std::cout<<"error: failed to handle heartbeat message, instance ["<<id<<"] not found"<<std::endl; | |
return ; | |
} | |
iter->second->_last_heartbeat_time = current_time; | |
std::cout<<"info: updated heartbeat for ["<<id<<"]"<<std::endl; | |
return ; | |
} | |
void handle_subscrib_message(GridMessageContext &msg_ctx){ | |
std::cout<<"info: handle_subscrib_message()"<<std::endl; | |
zmq::message_t msg; | |
zmq::socket_t *sock = msg_ctx._sock; | |
try{ | |
sock->recv(&msg); | |
}catch(std::exception &e){ | |
std::cout<<"error: failed to recv from socket "<<e.what()<<std::endl; | |
return ; | |
} | |
if(msg.size() < 2){ | |
std::cout<<"error: failed to recv from socket, message too short"<<std::endl; | |
return ; | |
} | |
uchar *data = (uchar *)msg.data(); | |
uchar prefix = data[0]; | |
uchar type = data[1]; | |
if(prefix != GRID_MESSAGE_PREFIX){ | |
std::cout<<"error: failed to recv from socket, message prefix not found"<<std::endl; | |
return ; | |
} | |
switch(type){ | |
case MSG_TYPE_HEARTBEAT: | |
msg_ctx._data = &msg; | |
handle_heartbeat(msg_ctx); | |
break; | |
case MSG_TYPE_EXIT: | |
msg_ctx._data = &msg; | |
handle_instance_exit(msg_ctx); | |
break; | |
default: | |
std::cout<<"error: failed to recv from socket, message type ["<<type<<"] handler incompleted"<<std::endl; | |
break; | |
} | |
return ; | |
} | |
int init(){ | |
// 生成uuid | |
uuid_t u; | |
char us[37]; | |
uuid_generate(u); | |
memset(us, 0, 37); | |
uuid_unparse_lower(u, us); | |
GridInstanceContext &instance_ctx = grid_ctx._ctx; | |
instance_ctx._id = us; | |
instance_ctx._ip = local_ip; | |
instance_ctx._router_port = "4150"; | |
instance_ctx._publish_port = "4151"; | |
// 创建sockets | |
try{ | |
instance_ctx._router_sock = | |
boost::shared_ptr<zmq::socket_t>(new zmq::socket_t(grid_ctx._sock_ctx, ZMQ_ROUTER)); | |
instance_ctx._publish_sock = | |
boost::shared_ptr<zmq::socket_t>(new zmq::socket_t(grid_ctx._sock_ctx, ZMQ_PUB)); | |
instance_ctx._subscrib_sock = | |
boost::shared_ptr<zmq::socket_t>(new zmq::socket_t(grid_ctx._sock_ctx, ZMQ_SUB)); | |
}catch(std::exception &e){ | |
std::cout<<"error: failed to alloc socket"<<std::endl; | |
std::cout<<e.what()<<std::endl; | |
return -1; | |
} | |
uchar prefix = GRID_MESSAGE_PREFIX; | |
instance_ctx._subscrib_sock->setsockopt(ZMQ_SUBSCRIBE, &prefix, 1); | |
std::string publish_address = "tcp://", | |
router_address = "tcp://"; | |
publish_address += instance_ctx._ip; | |
router_address += instance_ctx._ip; | |
publish_address += ":"; | |
router_address += ":"; | |
publish_address += instance_ctx._publish_port; | |
router_address += instance_ctx._router_port; | |
// 绑定router和publish句柄 | |
try{ | |
instance_ctx._router_sock->bind(router_address.c_str()); | |
}catch(std::exception &e){ | |
std::cout<<"error: failed to bind router socket"<<std::endl; | |
std::cout<<e.what()<<std::endl; | |
return -1; | |
} | |
std::cout<<"info: bind router at "<<router_address<<std::endl; | |
try{ | |
instance_ctx._publish_sock->bind(publish_address.c_str()); | |
}catch(std::exception &e){ | |
std::cout<<"error: failed to bing publish socket"<<std::endl; | |
std::cout<<e.what()<<std::endl; | |
return -1; | |
} | |
std::cout<<"info: bind publish at "<<publish_address<<std::endl; | |
// 添加router和subscrib到pollin句柄列表 | |
boost::shared_ptr<PollinContext> router_pollin_ctx; | |
try{ | |
router_pollin_ctx.reset(new PollinContext); | |
}catch(...){ | |
return -1; | |
} | |
router_pollin_ctx->_sock = instance_ctx._router_sock.get(); | |
router_pollin_ctx->_on_read = handle_handshake_request; | |
grid_ctx.add_pollin(router_pollin_ctx); | |
std::cout<<"info: router socket pollin added "<<router_pollin_ctx->_sock<<std::endl; | |
boost::shared_ptr<PollinContext> subscrib_pollin_ctx; | |
try{ | |
subscrib_pollin_ctx.reset(new PollinContext); | |
}catch(...){ | |
return -1; | |
} | |
subscrib_pollin_ctx->_sock = instance_ctx._subscrib_sock.get(); | |
subscrib_pollin_ctx->_on_read = handle_subscrib_message; | |
grid_ctx.add_pollin(subscrib_pollin_ctx); | |
std::cout<<"info: subscrib socket pollin added "<<subscrib_pollin_ctx->_sock<<std::endl; | |
return 0; | |
} | |
int do_handshake(std::string remote_address){ | |
HandshakeContext *handshake_ctx = NULL; | |
try{ | |
handshake_ctx = new HandshakeContext; | |
grid_ctx._ctx._request_sock.reset(new zmq::socket_t(grid_ctx._sock_ctx, ZMQ_REQ)); | |
int liner = 1; | |
grid_ctx._ctx._request_sock->setsockopt(ZMQ_LINGER, &liner, sizeof(liner)); | |
handshake_ctx->_sock = grid_ctx._ctx._request_sock.get(); | |
}catch(std::exception &e){ | |
std::cout<<"error: failed to alloc handshake context "<<e.what()<<std::endl; | |
return -1; | |
} | |
boost::shared_ptr<PollinContext> pollin_ctx; | |
try{ | |
pollin_ctx.reset(new PollinContext); | |
}catch(...){ | |
DELIF(handshake_ctx->_sock); | |
DELIF(handshake_ctx); | |
return -1; | |
} | |
// 监听REQ接口 | |
pollin_ctx->_sock = handshake_ctx->_sock; | |
pollin_ctx->_data = handshake_ctx; | |
pollin_ctx->_on_read = handle_handshake_response; | |
grid_ctx.add_pollin(pollin_ctx); | |
handshake_ctx->_sock->connect(remote_address.c_str()); | |
// 发送握手消息 | |
if(send_handshake_init(*handshake_ctx)){ | |
grid_ctx.del_pollin(pollin_ctx); | |
std::cout<<"info: pollin deleted "<<pollin_ctx->_sock<<std::endl; | |
DELIF(handshake_ctx->_sock); | |
DELIF(handshake_ctx); | |
return -1; | |
} | |
std::cout<<"info: handshake message sent to "<<remote_address<<std::endl; | |
return 0; | |
} | |
void broadcast_instance_exit(){ | |
zmq::message_t msg(18); | |
uchar *data = (uchar *)msg.data(); | |
data[0] = GRID_MESSAGE_PREFIX; | |
data[1] = MSG_TYPE_EXIT; | |
uuid_t u; | |
char us[37]; | |
memcpy(us, grid_ctx._ctx._id.c_str(), 36); | |
us[36] = '\0'; | |
uuid_parse(us, u); | |
memcpy(data+2, u, 16); | |
try{ | |
grid_ctx._ctx._publish_sock->send(msg); | |
}catch(std::exception &e){ | |
std::cout<<"error: failed to broadcast exit "<<e.what()<<std::endl; | |
} | |
std::cout<<"info: "<<MessageTypeString[MSG_TYPE_EXIT]<<" broadcasted"<<std::endl; | |
} | |
void broadcast_heartbeat(){ | |
zmq::message_t msg(18); | |
uchar *data = (uchar *)msg.data(); | |
data[0] = GRID_MESSAGE_PREFIX; | |
data[1] = MSG_TYPE_HEARTBEAT; | |
uuid_t u; | |
char us[37]; | |
memcpy(us, grid_ctx._ctx._id.c_str(), 36); | |
us[36] = '\0'; | |
uuid_parse(us, u); | |
memcpy(data+2, u, 16); | |
try{ | |
grid_ctx._ctx._publish_sock->send(msg); | |
}catch(std::exception &e){ | |
std::cout<<"error: failed to broadcast heartbeat "<<e.what()<<std::endl; | |
} | |
std::cout<<"info: heartbeat broadcasted"<<std::endl; | |
} | |
int main(int argc, char *argv[]) | |
{ | |
// 解析命令行参数 | |
namespace po = boost::program_options; | |
po::options_description desc("Allowed options"); | |
desc.add_options() | |
("help,h", "produce help options") | |
("role", po::value<std::string>(), "set role, first or second") | |
("local_ip", po::value<std::string>(), "local ip address to bind") | |
("remote_ip", po::value<std::string>(), "remote ip address") | |
("remote_router_port", po::value<std::string>(), "remote router port"); | |
po::variables_map vm; | |
//po::store(po::parse_command_line(argc, argv, desc), vm); | |
po::store(po::command_line_parser(argc, argv).options(desc).allow_unregistered().run(), vm); | |
po::notify(vm); | |
if(vm.count("help")){ | |
std::cout<<desc<<std::endl; | |
return 1; | |
} | |
if(vm.count("role")){ | |
std::string role = vm["role"].as<std::string>(); | |
if(role.compare("master") == 0){ | |
process_role = ROLE_MASTER; | |
std::cout<<"info: role set to "<<vm["role"].as<std::string>()<<std::endl; | |
} | |
else if(role.compare("worker") == 0){ | |
process_role = ROLE_WORKER; | |
std::cout<<"info: role set to "<<vm["role"].as<std::string>()<<std::endl; | |
} | |
else { | |
std::cout<<"info: role ["<<vm["role"].as<std::string>()<<"] invalid"<<std::endl; | |
} | |
} | |
else { | |
std::cout<<"Role need to be set."<<std::endl; | |
std::cout<<desc<<std::endl; | |
return 1; | |
} | |
if(vm.count("local_ip")){ | |
local_ip = vm["local_ip"].as<std::string>(); | |
} | |
else { | |
std::cout<<"Local ip need to be set."<<std::endl; | |
std::cout<<desc<<std::endl; | |
return 1; | |
} | |
if(vm.count("remote_ip")){ | |
remote_ip = vm["remote_ip"].as<std::string>(); | |
} | |
else if(process_role == ROLE_WORKER){ | |
std::cout<<"Remote ip need to be set."<<std::endl; | |
std::cout<<desc<<std::endl; | |
return 1; | |
} | |
if(vm.count("remote_router_port")){ | |
remote_router_port_str = vm["remote_router_port"].as<std::string>(); | |
} | |
else if(process_role == ROLE_WORKER){ | |
std::cout<<"Remote router port need to be set."<<std::endl; | |
std::cout<<desc<<std::endl; | |
return 1; | |
} | |
if(init()){ | |
return -1; | |
} | |
if(process_role == ROLE_WORKER){ | |
// 连接master的router接口,并发送握手消息 | |
//创建worker request接口 | |
std::string sock_router_address = "tcp://"; | |
make_tcp_address(remote_ip, remote_router_port_str, sock_router_address); | |
if(do_handshake(sock_router_address)){ | |
std::cout<<"error: failed to do handshake with "<<sock_router_address<<std::endl; | |
return -1; | |
} | |
} | |
signal(SIGINT, int_handler); | |
while(!exit_process){ | |
grid_ctx.poll(); | |
if(current_time - grid_ctx._ctx._last_heartbeat_time > 5){ | |
broadcast_heartbeat(); | |
grid_ctx._ctx._last_heartbeat_time = current_time; | |
} | |
sleep(1); | |
} | |
broadcast_instance_exit(); | |
sleep(1); | |
std::cout<<"Process exit"<<std::endl; | |
return 0; | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment