Skip to content

Instantly share code, notes, and snippets.

@shardator
Created December 11, 2024 12:52
Show Gist options
  • Save shardator/76cce67599514424d6c820e55fd38912 to your computer and use it in GitHub Desktop.
Save shardator/76cce67599514424d6c820e55fd38912 to your computer and use it in GitHub Desktop.
#include <boost/asio.hpp>
#include <boost/asio/ip/multicast.hpp>
#include <iostream>
#include <string>
#include <functional>
#include <vector>
#include <span> // C++20
#include <stdexcept>
class MulticastClient {
public:
using MessageCallback = std::function<void(std::span<const unsigned char> data)>;
MulticastClient(boost::asio::io_context &io_context,
const std::string &listen_address,
unsigned short listen_port,
MessageCallback on_message,
std::size_t buffer_size = 2048)
: io_context_(io_context),
socket_(io_context_),
on_message_(std::move(on_message)),
buffer_(buffer_size)
{
boost::system::error_code ec;
boost::asio::ip::address local_addr = boost::asio::ip::address::from_string(listen_address, ec);
if (ec) {
throw std::runtime_error("Invalid listen address: " + listen_address + " Error: " + ec.message());
}
boost::asio::ip::udp::endpoint listen_endpoint(local_addr, listen_port);
socket_.open(listen_endpoint.protocol(), ec);
if (ec) {
throw std::runtime_error("Failed to open socket: " + ec.message());
}
socket_.set_option(boost::asio::ip::udp::socket::reuse_address(true), ec);
if (ec) {
throw std::runtime_error("Failed to set reuse_address: " + ec.message());
}
socket_.bind(listen_endpoint, ec);
if (ec) {
throw std::runtime_error("Failed to bind socket: " + ec.message());
}
start_receive();
}
void join_group(const std::string &group_address) {
boost::system::error_code ec;
auto group_addr = boost::asio::ip::address::from_string(group_address, ec);
if (ec) {
throw std::runtime_error("Invalid multicast group address: " + group_address + ". Error: " + ec.message());
}
if (!group_addr.is_multicast()) {
throw std::runtime_error("Address " + group_address + " is not a multicast address.");
}
socket_.set_option(boost::asio::ip::multicast::join_group(group_addr), ec);
if (ec) {
throw std::runtime_error("Failed to join multicast group " + group_address + ": " + ec.message());
}
}
private:
void start_receive() {
socket_.async_receive_from(
boost::asio::buffer(buffer_),
sender_endpoint_,
[this](const boost::system::error_code &ec, std::size_t bytes_received) {
this->handle_receive(ec, bytes_received);
});
}
void handle_receive(const boost::system::error_code &ec, std::size_t bytes_received) {
if (!ec) {
// Create a span from the pre-allocated buffer, no allocations here.
std::span<const unsigned char> data_span(buffer_.data(), bytes_received);
// Invoke the callback.
if (on_message_) {
on_message_(data_span);
}
// Continue receiving
start_receive();
} else {
std::cerr << "Receive error: " << ec.message() << "\n";
// Attempt to continue receiving unless error is fatal.
start_receive();
}
}
boost::asio::io_context &io_context_;
boost::asio::ip::udp::socket socket_;
boost::asio::ip::udp::endpoint sender_endpoint_;
MessageCallback on_message_;
std::vector<unsigned char> buffer_;
};
// Example usage:
//
// int main() {
// try {
// boost::asio::io_context io_context;
//
// auto callback = [](std::span<const unsigned char> data) {
// // Just print the size of the received data for now.
// std::cout << "Received " << data.size() << " bytes\n";
// // If you need to parse a struct:
// // struct MyStruct { int x; double y; };
// // MyStruct s;
// // std::memcpy(&s, data.data(), sizeof(s)); // Safe way to convert
// };
//
// MulticastClient client(io_context, "0.0.0.0", 30001, callback);
// client.join_group("239.255.0.1");
//
// io_context.run();
// } catch (const std::exception &ex) {
// std::cerr << "Error: " << ex.what() << "\n";
// }
// return 0;
// }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment