Skip to content

Commit c00546e

Browse files
authored
Fix: PublisherZMQ::flush is called after the publisher has been destructed (BehaviorTree#426)
* fix: PublisherZMQ::flush is called after the publisher has been destructed * style: Adjust code formatting of ~PublisherZMQ * chore: Install zmq-dev in ubuntu pipeline and exclude gtest_logger_zmq.cpp when zmq is not found. * chore: Define WIN32_LEAN_AND_MEAN to avoid ambiguity between tinyxml and msxml
1 parent d84a5c0 commit c00546e

File tree

5 files changed

+21
-14
lines changed

5 files changed

+21
-14
lines changed

.github/workflows/cmake.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ jobs:
2121
- uses: actions/checkout@v2
2222

2323
- name: Install Dependencies (Linux)
24-
run: sudo apt-get install libboost-dev
24+
run: sudo apt-get install libboost-dev libzmq3-dev
2525
if: matrix.os == 'ubuntu-latest'
2626

2727
- name: Create Build Environment

CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ set(CMAKE_CXX_STANDARD 14)
1010
set(CMAKE_CXX_STANDARD_REQUIRED ON)
1111

1212
if(MSVC)
13-
add_definitions(-D_CRT_SECURE_NO_WARNINGS)
13+
add_definitions(-D_CRT_SECURE_NO_WARNINGS -DWIN32_LEAN_AND_MEAN)
1414
else()
1515
add_definitions(-Wpedantic)
1616
endif()

include/behaviortree_cpp_v3/loggers/bt_zmq_publisher.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class PublisherZMQ : public StatusChangeLogger
3737
TimePoint deadline_;
3838
std::mutex mutex_;
3939
std::atomic_bool send_pending_;
40-
40+
std::condition_variable send_condition_variable_;
4141
std::future<void> send_future_;
4242

4343
struct Pimpl;

src/loggers/bt_zmq_publisher.cpp

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -90,12 +90,13 @@ PublisherZMQ::~PublisherZMQ()
9090
{
9191
thread_.join();
9292
}
93-
flush();
94-
zmq_->context.shutdown();
95-
if (send_future_.valid())
93+
if (send_pending_)
9694
{
97-
send_future_.wait();
95+
send_condition_variable_.notify_all();
96+
send_future_.get();
9897
}
98+
flush();
99+
zmq_->context.shutdown();
99100
delete zmq_;
100101
ref_count = false;
101102
}
@@ -116,21 +117,24 @@ void PublisherZMQ::createStatusBuffer()
116117
void PublisherZMQ::callback(Duration timestamp, const TreeNode& node,
117118
NodeStatus prev_status, NodeStatus status)
118119
{
119-
using namespace std::chrono;
120-
121120
SerializedTransition transition =
122121
SerializeTransition(node.UID(), timestamp, prev_status, status);
123122
{
124123
std::unique_lock<std::mutex> lock(mutex_);
125124
transition_buffer_.push_back(transition);
126125
}
127126

128-
if (!send_pending_)
127+
if (!send_pending_.exchange(true))
129128
{
130-
send_pending_ = true;
131129
send_future_ = std::async(std::launch::async, [this]() {
132-
std::this_thread::sleep_for(min_time_between_msgs_);
133-
flush();
130+
std::unique_lock<std::mutex> lock(mutex_);
131+
const bool is_server_inactive = send_condition_variable_.wait_for(
132+
lock, min_time_between_msgs_, [this]() { return !active_server_; });
133+
lock.unlock();
134+
if (!is_server_inactive)
135+
{
136+
flush();
137+
}
134138
});
135139
}
136140
}

tests/CMakeLists.txt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,12 @@ set(BT_TESTS
1919
gtest_subtree.cpp
2020
gtest_switch.cpp
2121
gtest_wakeup.cpp
22-
gtest_logger_zmq.cpp
2322
)
2423

24+
if( ZMQ_FOUND )
25+
LIST( APPEND BT_TESTS gtest_logger_zmq.cpp)
26+
endif()
27+
2528
if( BT_COROUTINES )
2629
LIST( APPEND BT_TESTS gtest_coroutines.cpp)
2730
endif()

0 commit comments

Comments
 (0)