Skip to content

Instantly share code, notes, and snippets.

@kervel
Last active November 6, 2023 09:50
Show Gist options
  • Save kervel/75d81b8c34e45a19706c661b90d02548 to your computer and use it in GitHub Desktop.
Save kervel/75d81b8c34e45a19706c661b90d02548 to your computer and use it in GitHub Desktop.
Small ros2 python using pybind11
# will need to be tweaked for your project
make_minimum_required(VERSION 3.5)
project(test_ros_pb11)
# Default to C99
if(NOT CMAKE_C_STANDARD)
set(CMAKE_C_STANDARD 99)
endif()
# Default to C++14
if(NOT CMAKE_CXX_STANDARD)
set(CMAKE_CXX_STANDARD 14)
endif()
if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic)
endif()
add_subdirectory(pybind11)
# find dependencies
find_package(ament_cmake REQUIRED)
find_package(ament_cmake_ros REQUIRED)
find_package(objectdetector_msgs REQUIRED)
find_package(rclcpp REQUIRED)
find_package(sensor_msgs REQUIRED)
find_package(PythonLibs 3 REQUIRED)
find_package(PythonInterp 3 REQUIRED)
find_package(python_cmake_module REQUIRED)
_ament_cmake_python_register_environment_hook()
pybind11_add_module(fast_ros_pi src/fast_ros_pi.cpp)
###HACK : ament_target_dependencies cannot declare its deps as private, which is needed for pybind11.. so copypaste
function(ament_target_dependencies_private target)
if(NOT TARGET ${target})
message(FATAL_ERROR "ament_target_dependencies() the first argument must be a valid target name")
endif()
if(${ARGC} GREATER 0)
set(definitions "")
set(include_dirs "")
set(libraries "")
set(link_flags "")
foreach(package_name ${ARGN})
if(NOT ${${package_name}_FOUND})
message(FATAL_ERROR "ament_target_dependencies() the passed package name '${package_name}' was not found before")
endif()
list_append_unique(definitions ${${package_name}_DEFINITIONS})
list_append_unique(include_dirs ${${package_name}_INCLUDE_DIRS})
list(APPEND libraries ${${package_name}_LIBRARIES})
list_append_unique(link_flags ${${package_name}_LINK_FLAGS})
endforeach()
target_compile_definitions(${target}
PUBLIC ${definitions})
ament_include_directories_order(ordered_include_dirs ${include_dirs})
target_include_directories(${target}
PUBLIC ${ordered_include_dirs})
ament_libraries_deduplicate(unique_libraries ${libraries})
target_link_libraries(${target} PRIVATE
${unique_libraries})
foreach(link_flag IN LISTS link_flags)
set_property(TARGET ${target} APPEND_STRING PROPERTY LINK_FLAGS " ${link_flag} ")
endforeach()
endif()
endfunction()
set_target_properties(fast_ros_pi PROPERTIES PREFIX "${PYTHON_MODULE_PREFIX}"
SUFFIX "${PYTHON_MODULE_EXTENSION}")
target_include_directories(fast_ros_pi PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include>
)
ament_target_dependencies_private(
fast_ros_pi
"objectdetector_msgs"
"rclcpp"
"sensor_msgs"
pybind11::module
)
# Causes the visibility macros to use dllexport rather than dllimport,
# which is appropriate when building the dll but not consuming it.
target_compile_definitions(fast_ros_pi PRIVATE "FAST_ROS_PI_BUILDING_LIBRARY")
install(
DIRECTORY include/
DESTINATION include
)
install(
TARGETS fast_ros_pi
EXPORT export_${PROJECT_NAME}
ARCHIVE DESTINATION lib
LIBRARY DESTINATION lib
RUNTIME DESTINATION bin
)
install(
TARGETS fast_ros_pi
DESTINATION "${PYTHON_INSTALL_DIR}"
)
if(BUILD_TESTING)
find_package(ament_lint_auto REQUIRED)
# the following line skips the linter which checks for copyrights
# remove the line when a copyright and license is present in all source files
set(ament_cmake_copyright_FOUND TRUE)
# the following line skips cpplint (only works in a git repo)
# remove the line when this package is a git repo
set(ament_cmake_cpplint_FOUND TRUE)
ament_lint_auto_find_test_dependencies()
endif()
ament_export_include_directories(
include
)
ament_export_interfaces(
export_${PROJECT_NAME}
)
ament_export_libraries(
fast_ros_pi
)
ament_package()
#include "sensor_msgs/msg/image.hpp"
#include "std_msgs/msg/header.hpp"
#include "builtin_interfaces/msg/time.hpp"
#include "objectdetector_msgs/msg/detection_result.hpp"
#include "rclcpp/rclcpp.hpp"
#include "objectdetector_msgs/msg/annotated_object.hpp"
#include <pybind11/pybind11.h>
#include <pybind11/stl.h>
#include <pybind11/functional.h>
#include <pybind11/complex.h>
namespace py = pybind11;
namespace fast_ros_pi
{
template <typename T, typename... Extra> py::class_<T, std::shared_ptr<T>> message_class(py::module module, std::string class_name, const Extra &... extra) {
py::class_<T, std::shared_ptr<T>> msg_class(module, class_name.c_str(), extra...);
std::string ss = "Publisher_" + class_name;
py::class_<rclcpp::Publisher<T>, std::shared_ptr<rclcpp::Publisher<T>>> pub_dr(module,ss.c_str());
pub_dr.def("publish", [](std::shared_ptr<rclcpp::Publisher<T>> pub, std::shared_ptr<T> dr) { pub->publish(dr); });
std::string s = "Subscription_" + class_name;
py::class_<rclcpp::Subscription<T>, std::shared_ptr<rclcpp::Subscription<T>>> subscriber_image(module, s.c_str());
msg_class.def_static("__create_subscription__", []
(rclcpp::Node::SharedPtr n, std::string topic, std::function<void(std::shared_ptr<T>)> callback)
-> std::shared_ptr<rclcpp::Subscription<T>>
{
std::cout << "listening on '" << topic << "'" << std::endl;
return n->create_subscription<T>(topic, [callback](std::shared_ptr<T> im) {
(callback)(im);
});
});
msg_class.def_static("__create_publisher__", [](rclcpp::Node::SharedPtr n, std::string topic) {
return n->create_publisher<T>(topic);
});
return std::move(msg_class);
}
PYBIND11_MODULE(fast_ros_pi, m) {
m.doc() = "pybind11 ros python";
auto time = message_class<builtin_interfaces::msg::Time>(m, "time");
time.def_readwrite("sec", &builtin_interfaces::msg::Time::sec);
time.def_readwrite("nanosec", &builtin_interfaces::msg::Time::nanosec);
auto header = message_class<std_msgs::msg::Header>(m, "Header");
header.def_readwrite("frame_id" , &std_msgs::msg::Header::frame_id);
header.def_readwrite("stamp" , &std_msgs::msg::Header::stamp);
auto image = message_class<sensor_msgs::msg::Image>(m,"Image",py::buffer_protocol());
image.def_readwrite("header", &sensor_msgs::msg::Image::header);
image.def_readwrite("height", &sensor_msgs::msg::Image::height);
image.def_readwrite("width", &sensor_msgs::msg::Image::width);
image.def_readwrite("encoding", &sensor_msgs::msg::Image::encoding);
image.def_readwrite("is_bigendian", &sensor_msgs::msg::Image::is_bigendian);
image.def_readwrite("step", &sensor_msgs::msg::Image::step);
image.def_buffer([](sensor_msgs::msg::Image &im) -> py::buffer_info {
return py::buffer_info(
im.data.data(), /* Pointer to buffer */
sizeof(uint8_t), /* Size of one scalar */
py::format_descriptor<uint8_t>::format(), /* Python struct-style format descriptor */
1, /* Number of dimensions */
{ im.data.size() }, /* Buffer dimensions */
{ sizeof(uint8_t) /* Strides (in bytes) for each index */
});
});
image.def("set_data", [](sensor_msgs::msg::Image &im, py::buffer &buf) {
py::buffer_info info = buf.request();
if (info.format != py::format_descriptor<uint8_t>::format())
throw std::runtime_error("Incompatible format: expected a " + py::format_descriptor<int>::format() + " array!");
if (info.ndim != 1)
throw std::runtime_error("Incompatible buffer dimension (1D array is supported)!");
if (info.strides[0] != sizeof(uint8_t)) {
throw std::runtime_error("Strided arrays not supported yet");
}
im.data = std::vector<uint8_t>(static_cast<int *>(info.ptr), static_cast<int *>(info.ptr) + info.shape[0]);
});
auto point = message_class<geometry_msgs::msg::Point>(m, "Point");
point.def(py::init<>());
point.def_readwrite("x", &geometry_msgs::msg::Point::x);
point.def_readwrite("y", &geometry_msgs::msg::Point::y);
point.def_readwrite("z", &geometry_msgs::msg::Point::z);
auto annot = message_class<objectdetector_msgs::msg::AnnotatedObject>(m, "AnnotatedObject");
annot.def(py::init<>());
annot.def_readwrite("location", &objectdetector_msgs::msg::AnnotatedObject::location);
annot.def_readwrite("classname", &objectdetector_msgs::msg::AnnotatedObject::classname);
annot.def_readwrite("length", &objectdetector_msgs::msg::AnnotatedObject::length);
annot.def_readwrite("attributes", &objectdetector_msgs::msg::AnnotatedObject::attributes);
auto bbox = message_class<objectdetector_msgs::msg::BoundingBox>(m, "BoundingBox");
bbox.def(py::init<>());
bbox.def_readwrite("classname", &objectdetector_msgs::msg::BoundingBox::classname);
bbox.def_readwrite("x", &objectdetector_msgs::msg::BoundingBox::x);
bbox.def_readwrite("y", &objectdetector_msgs::msg::BoundingBox::y);
bbox.def_readwrite("w", &objectdetector_msgs::msg::BoundingBox::w);
bbox.def_readwrite("h", &objectdetector_msgs::msg::BoundingBox::h);
bbox.def_readwrite("confidence", &objectdetector_msgs::msg::BoundingBox::confidence);
bbox.def_readwrite("attributes", &objectdetector_msgs::msg::BoundingBox::attributes);
auto dres = message_class<objectdetector_msgs::msg::DetectionResult>(m, "DetectionResult");
dres.def(py::init<>());
dres.def_readwrite("header", &objectdetector_msgs::msg::DetectionResult::header);
dres.def_readwrite("length", &objectdetector_msgs::msg::DetectionResult::length);
dres.def_readwrite("detections", &objectdetector_msgs::msg::DetectionResult::detections);
m.def("init", [](std::vector<std::string> &args) {
std::vector<const char *> c_strs;
for (auto &a: args) {
c_strs.push_back(a.c_str());
}
rclcpp::init(c_strs.size(),c_strs.data());});
py::class_<rclcpp::Node, rclcpp::Node::SharedPtr> node(m, "Node");
node.def(py::init([](std::string nodename) {return std::make_shared<rclcpp::Node>(nodename);}));
node.def("create_subscription", []
(rclcpp::Node::SharedPtr n, py::object the_type, std::string topic, py::function callback) {
auto f = the_type.attr("__create_subscription__");
return f(n, topic, callback);
});
node.def("create_publisher", []
(rclcpp::Node::SharedPtr n, py::object the_type, std::string topic) {
auto f = the_type.attr("__create_publisher__");
return f(n, topic);
});
node.def("log", [](rclcpp::Node::SharedPtr n, std::string logmsg) {
RCLCPP_WARN(n->get_logger(), logmsg.c_str());
});
m.def("spin", [](rclcpp::Node::SharedPtr node) { rclcpp::spin(node);});
m.def("shutdown", []() { rclcpp::shutdown();});
}
}// namespace fast_ros_pi
@chrissunny94
Copy link

Thank you

@chrissunny94
Copy link

ament_target_dependencies_private( floam_python "rclcpp" "sensor_msgs" "rosbag2_cpp" "pcl_conversions" "tf2" "tf2_ros" "tf2_geometry_msgs" pybind11::module )

Despite my ament_target_dependencies_private looking like the above .

fatal error: tf2_geometry_msgs/tf2_geometry_msgs.hpp: No such file or directory
   48 | #include <tf2_geometry_msgs/tf2_geometry_msgs.hpp>

@chrissunny94
Copy link

chrissunny94 commented Sep 25, 2023

FIXED

ament_target_dependencies_private(
  ${PROJECT_NAME}_python
  "objectdetector_msgs"
  "rclcpp"
  "sensor_msgs"
  "pcl_conversions"
  "nav_msgs"
  "tf2"
  "tf2_geometry_msgs"
  "tf2_ros"
  "rosbag2_cpp"
  "pcl_ros"
  pybind11::module
)

pcl_ros was missing

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment