Last active
January 31, 2022 01:17
-
-
Save svyatogor/6356dc1b4bea0033c086f7be2bd77843 to your computer and use it in GitHub Desktop.
AMQP-CPP Boost Asio handler
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#pragma once | |
#include <amqpcpp/linux_tcp.h> | |
#include <boost/asio/deadline_timer.hpp> | |
#include <boost/asio/io_context.hpp> | |
#include <boost/asio/posix/stream_descriptor.hpp> | |
#include <boost/foreach.hpp> | |
using namespace std; | |
class LibBoostAsioHandler : public virtual AMQP::TcpHandler { | |
private: | |
boost::asio::io_context &_iocontext; | |
protected: | |
struct SocketEvent { | |
shared_ptr<boost::asio::posix::stream_descriptor> socket; | |
int flags; | |
}; | |
class Watcher : public virtual enable_shared_from_this<Watcher> { | |
private: | |
uint16_t _heartbeat_interval{0}; | |
shared_ptr<boost::asio::deadline_timer> _timer; | |
boost::asio::io_context &_iocontext; | |
boost::asio::io_context::strand _strand; | |
map<int, SocketEvent> _sockets; | |
AMQP::TcpConnection *_connection; | |
void poll_read(int fd) { | |
auto it = _sockets.find(fd); | |
if (it == _sockets.end() || !(it->second.flags & AMQP::readable)) { | |
return; | |
} | |
auto handler = [&, self = shared_from_this(), | |
fd](const boost::system::error_code &error) { | |
if (!error) { | |
_connection->process(fd, AMQP::readable); | |
poll_read(fd); | |
} | |
}; | |
it->second.socket->async_wait( | |
boost::asio::posix::stream_descriptor::wait_read, | |
_strand.wrap(handler)); | |
} | |
void poll_write(int fd) { | |
auto it = _sockets.find(fd); | |
if (it == _sockets.end() || !(it->second.flags & AMQP::writable)) { | |
return; | |
} | |
auto handler = [&, self = shared_from_this(), | |
fd](const boost::system::error_code &error) { | |
if (!error) { | |
_connection->process(fd, AMQP::writable); | |
poll_write(fd); | |
} | |
}; | |
it->second.socket->async_wait( | |
boost::asio::posix::stream_descriptor::wait_write, | |
_strand.wrap(handler)); | |
} | |
void start_timer() { | |
if (_heartbeat_interval == 0) | |
return; | |
auto tick = [&, self = shared_from_this()]( | |
const boost::system::error_code &error) { | |
if (!error) { | |
spdlog::debug("AMQP Heartbeat..."); | |
_connection->heartbeat(); | |
start_timer(); | |
} | |
}; | |
_timer->expires_at(_timer->expires_at() + | |
boost::posix_time::seconds(_heartbeat_interval)); | |
_timer->async_wait(_strand.wrap(tick)); | |
} | |
public: | |
Watcher(boost::asio::io_context &io_context, | |
AMQP::TcpConnection *connection) | |
: _strand(io_context), _iocontext(io_context) { | |
_connection = connection; | |
} | |
Watcher(Watcher &&that) = delete; | |
Watcher(const Watcher &that) = delete; | |
~Watcher() {} | |
void stop() { | |
BOOST_FOREACH (auto &e, _sockets) { | |
e.second.socket->cancel(); | |
e.second.socket->release(); | |
} | |
_sockets.clear(); | |
_heartbeat_interval = 0; | |
if (_timer.get() != nullptr) { | |
_timer->cancel(); | |
} | |
} | |
void events(int fd, int monitor_events) { | |
auto it = _sockets.find(fd); | |
if (it == _sockets.end() && !monitor_events) { | |
return; | |
} | |
if (it != _sockets.end() && !monitor_events) { | |
it->second.socket->cancel(); | |
it->second.socket->release(); | |
_sockets.erase(it); | |
return; | |
} | |
shared_ptr<boost::asio::posix::stream_descriptor> socket; | |
int last_flags = 0; | |
if (it == _sockets.end()) { | |
socket = make_shared<boost::asio::posix::stream_descriptor>(_iocontext); | |
socket->assign(fd); | |
socket->non_blocking(true); | |
SocketEvent e = {.socket = socket, .flags = monitor_events}; | |
_sockets[fd] = e; | |
} else { | |
socket = it->second.socket; | |
last_flags = it->second.flags; | |
it->second.flags = monitor_events; | |
} | |
if (monitor_events & AMQP::readable && !(last_flags & AMQP::readable)) { | |
poll_read(fd); | |
} | |
if (monitor_events & AMQP::writable && !(last_flags & AMQP::writable)) { | |
poll_write(fd); | |
} | |
} | |
void heartbeat(int seconds) { | |
_heartbeat_interval = seconds; | |
_timer = make_shared<boost::asio::deadline_timer>( | |
_iocontext, boost::posix_time::seconds(0)); | |
start_timer(); | |
} | |
}; | |
map<AMQP::TcpConnection *, shared_ptr<Watcher>> _watchers; | |
void monitor(AMQP::TcpConnection *const connection, const int fd, | |
const int flags) override { | |
auto iter = _watchers.find(connection); | |
if (iter == _watchers.end()) { | |
const shared_ptr<Watcher> watcher = | |
make_shared<Watcher>(_iocontext, connection); | |
_watchers[connection] = watcher; | |
watcher->events(fd, flags); | |
} else { | |
iter->second->events(fd, flags); | |
} | |
} | |
virtual uint16_t onNegotiate(AMQP::TcpConnection *connection, | |
uint16_t interval) override { | |
spdlog::info("AMQP Heartbeat interval set to {}", interval); | |
auto it = _watchers.find(connection); | |
if (it == _watchers.end() || !interval) { | |
return 0; | |
} | |
it->second->heartbeat(interval); | |
return interval; | |
} | |
virtual void onDetached(AMQP::TcpConnection *connection) override { | |
auto iter = _watchers.find(connection); | |
if (iter != _watchers.end()) { | |
iter->second->stop(); | |
_watchers.erase(iter); | |
} | |
} | |
public: | |
LibBoostAsioHandler() = delete; | |
explicit LibBoostAsioHandler(boost::asio::io_context &io_context) | |
: _iocontext(io_context) {} | |
~LibBoostAsioHandler() override = default; | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment