Skip to content

Commit 4e8f18b

Browse files
authored
Merge pull request #213 from ACvanWyk/master
Notify the completion handler through the handler's associated executor
2 parents 0e8c402 + aaa47f5 commit 4e8f18b

File tree

3 files changed

+178
-6
lines changed

3 files changed

+178
-6
lines changed

azmq/detail/receive_op.hpp

Lines changed: 77 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,19 @@
1414
#include "socket_ops.hpp"
1515
#include "reactor_op.hpp"
1616

17+
#include <boost/version.hpp>
1718
#include <boost/asio/io_service.hpp>
19+
#include <boost/asio/dispatch.hpp>
20+
#include <boost/asio/executor_work_guard.hpp>
21+
#if BOOST_VERSION >= 107900
22+
#include <boost/asio/recycling_allocator.hpp>
23+
#include <boost/asio/bind_allocator.hpp>
24+
#endif
1825

1926
#include <zmq.h>
2027

2128
#include <iterator>
29+
#include <type_traits>
2230

2331
namespace azmq {
2432
namespace detail {
@@ -31,6 +39,23 @@ class receive_buffer_op_base : public reactor_op {
3139
{ }
3240

3341
virtual bool do_perform(socket_type& socket) override {
42+
return do_perform_impl(socket);
43+
}
44+
45+
private:
46+
template<typename Buff = MutableBufferSequence>
47+
typename std::enable_if<std::is_same<Buff, azmq::message>::value, bool>::type do_perform_impl(socket_type& socket)
48+
{
49+
ec_ = boost::system::error_code();
50+
bytes_transferred_ += socket_ops::receive(const_cast<azmq::message&>(buffers_), socket, flags_ | ZMQ_DONTWAIT, ec_);
51+
if (ec_)
52+
return !try_again();
53+
return true;
54+
}
55+
56+
template<typename Buff = MutableBufferSequence>
57+
typename std::enable_if<!std::is_same<Buff, azmq::message>::value, bool>::type do_perform_impl(socket_type& socket)
58+
{
3459
ec_ = boost::system::error_code();
3560
bytes_transferred_ += socket_ops::receive(buffers_, socket, flags_ | ZMQ_DONTWAIT, ec_);
3661
if (ec_)
@@ -44,7 +69,7 @@ class receive_buffer_op_base : public reactor_op {
4469
}
4570

4671
private:
47-
MutableBufferSequence buffers_;
72+
typename std::conditional<std::is_same<MutableBufferSequence, azmq::message>::value, MutableBufferSequence const&, MutableBufferSequence>::type buffers_;
4873
flags_type flags_;
4974
};
5075

@@ -57,14 +82,30 @@ class receive_buffer_op : public receive_buffer_op_base<MutableBufferSequence> {
5782
socket_ops::flags_type flags)
5883
: receive_buffer_op_base<MutableBufferSequence>(buffers, flags)
5984
, handler_(std::move(handler))
85+
, work_guard(boost::asio::make_work_guard(handler_))
6086
{ }
6187

6288
virtual void do_complete() override {
63-
handler_(this->ec_, this->bytes_transferred_);
89+
#if BOOST_VERSION >= 107900
90+
auto alloc = boost::asio::get_associated_allocator(
91+
handler_, boost::asio::recycling_allocator<void>());
92+
#endif
93+
boost::asio::dispatch(work_guard.get_executor(),
94+
#if BOOST_VERSION >= 107900
95+
boost::asio::bind_allocator(alloc,
96+
#endif
97+
[ec_ = this->ec_, handler_ = std::move(handler_), bytes_transferred_ = this->bytes_transferred_]() mutable {
98+
handler_(ec_, bytes_transferred_);
99+
})
100+
#if BOOST_VERSION >= 107900
101+
)
102+
#endif
103+
;
64104
}
65105

66106
private:
67107
Handler handler_;
108+
boost::asio::executor_work_guard<typename boost::asio::associated_executor<Handler>::type> work_guard;
68109
};
69110

70111
template<typename MutableBufferSequence,
@@ -76,14 +117,30 @@ class receive_more_buffer_op : public receive_buffer_op_base<MutableBufferSequen
76117
socket_ops::flags_type flags)
77118
: receive_buffer_op_base<MutableBufferSequence>(buffers, flags)
78119
, handler_(std::move(handler))
120+
, work_guard(boost::asio::make_work_guard(handler_))
79121
{ }
80122

81123
virtual void do_complete() override {
82-
handler_(this->ec_, std::make_pair(this->bytes_transferred_, this->more()));
124+
#if BOOST_VERSION >= 107900
125+
auto alloc = boost::asio::get_associated_allocator(
126+
handler_, boost::asio::recycling_allocator<void>());
127+
#endif
128+
boost::asio::dispatch(work_guard.get_executor(),
129+
#if BOOST_VERSION >= 107900
130+
boost::asio::bind_allocator(alloc,
131+
#endif
132+
[ec_ = this->ec_, handler_ = std::move(handler_), bytes_transferred_ = this->bytes_transferred_, more = this->more()]() mutable {
133+
handler_(ec_, std::make_pair(bytes_transferred_, more));
134+
})
135+
#if BOOST_VERSION >= 107900
136+
)
137+
#endif
138+
;
83139
}
84140

85141
private:
86142
Handler handler_;
143+
boost::asio::executor_work_guard<typename boost::asio::associated_executor<Handler>::type> work_guard;
87144
};
88145

89146
class receive_op_base : public reactor_op {
@@ -112,14 +169,30 @@ class receive_op : public receive_op_base {
112169
socket_ops::flags_type flags)
113170
: receive_op_base(flags)
114171
, handler_(std::move(handler))
172+
, work_guard(boost::asio::make_work_guard(handler_))
115173
{ }
116174

117175
virtual void do_complete() override {
118-
handler_(ec_, msg_, bytes_transferred_);
176+
#if BOOST_VERSION >= 107900
177+
auto alloc = boost::asio::get_associated_allocator(
178+
handler_, boost::asio::recycling_allocator<void>());
179+
#endif
180+
boost::asio::dispatch(work_guard.get_executor(),
181+
#if BOOST_VERSION >= 107900
182+
boost::asio::bind_allocator(alloc,
183+
#endif
184+
[ec_ = this->ec_, handler_ = std::move(handler_), msg_ = std::move(msg_), bytes_transferred_ = this->bytes_transferred_]() mutable {
185+
handler_(ec_, msg_, bytes_transferred_);
186+
})
187+
#if BOOST_VERSION >= 107900
188+
)
189+
#endif
190+
;
119191
}
120192

121193
private:
122194
Handler handler_;
195+
boost::asio::executor_work_guard<typename boost::asio::associated_executor<Handler>::type> work_guard;
123196
};
124197
} // namespace detail
125198
} // namespace azmq

azmq/detail/send_op.hpp

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,14 @@
1313
#include "socket_ops.hpp"
1414
#include "reactor_op.hpp"
1515

16+
#include <boost/version.hpp>
1617
#include <boost/asio/io_service.hpp>
18+
#include <boost/asio/dispatch.hpp>
19+
#include <boost/asio/executor_work_guard.hpp>
20+
#if BOOST_VERSION >= 107900
21+
#include <boost/asio/recycling_allocator.hpp>
22+
#include <boost/asio/bind_allocator.hpp>
23+
#endif
1724

1825
#include <zmq.h>
1926
#include <iterator>
@@ -52,14 +59,30 @@ class send_buffer_op : public send_buffer_op_base<ConstBufferSequence> {
5259
reactor_op::flags_type flags)
5360
: send_buffer_op_base<ConstBufferSequence>(buffers, flags)
5461
, handler_(std::move(handler))
62+
, work_guard(boost::asio::make_work_guard(handler_))
5563
{ }
5664

5765
virtual void do_complete() override {
58-
handler_(this->ec_, this->bytes_transferred_);
66+
#if BOOST_VERSION >= 107900
67+
auto alloc = boost::asio::get_associated_allocator(
68+
handler_, boost::asio::recycling_allocator<void>());
69+
#endif
70+
boost::asio::dispatch(work_guard.get_executor(),
71+
#if BOOST_VERSION >= 107900
72+
boost::asio::bind_allocator(alloc,
73+
#endif
74+
[ec_ = this->ec_, handler_ = std::move(handler_), bytes_transferred_ = this->bytes_transferred_]() mutable {
75+
handler_(ec_, bytes_transferred_);
76+
})
77+
#if BOOST_VERSION >= 107900
78+
)
79+
#endif
80+
;
5981
}
6082

6183
private:
6284
Handler handler_;
85+
boost::asio::executor_work_guard<typename boost::asio::associated_executor<Handler>::type> work_guard;
6386
};
6487

6588
class send_op_base : public reactor_op {
@@ -90,14 +113,31 @@ class send_op : public send_op_base {
90113
flags_type flags)
91114
: send_op_base(std::move(msg), flags)
92115
, handler_(std::move(handler))
116+
, work_guard(boost::asio::make_work_guard(handler_))
93117
{ }
94118

95119
virtual void do_complete() override {
96-
handler_(ec_, bytes_transferred_);
120+
#if BOOST_VERSION >= 107900
121+
auto alloc = boost::asio::get_associated_allocator(
122+
handler_, boost::asio::recycling_allocator<void>());
123+
#endif
124+
boost::asio::dispatch(work_guard.get_executor(),
125+
#if BOOST_VERSION >= 107900
126+
boost::asio::bind_allocator(alloc,
127+
#endif
128+
[ec_ = this->ec_, handler_ = std::move(handler_), bytes_transferred_ = this->bytes_transferred_]() mutable {
129+
handler_(ec_, bytes_transferred_);
130+
})
131+
#if BOOST_VERSION >= 107900
132+
)
133+
#endif
134+
;
135+
97136
}
98137

99138
private:
100139
Handler handler_;
140+
boost::asio::executor_work_guard<typename boost::asio::associated_executor<Handler>::type> work_guard;
101141
};
102142

103143
} // namespace detail

test/socket/main.cpp

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include <azmq/socket.hpp>
1010
#include <azmq/util/scope_guard.hpp>
1111

12+
#include <boost/current_function.hpp>
1213
#include <boost/utility/string_ref.hpp>
1314
#include <boost/algorithm/string/split.hpp>
1415
#include <boost/algorithm/string/classification.hpp>
@@ -19,6 +20,9 @@
1920
#include <boost/asio/use_future.hpp>
2021
#include <boost/asio/spawn.hpp>
2122
#include <boost/optional.hpp>
23+
#if BOOST_VERSION >= 107400
24+
#include <boost/asio/any_io_executor.hpp>
25+
#endif
2226
#endif
2327

2428
#include <array>
@@ -880,19 +884,74 @@ TEST_CASE("Async Operation Send/Receive single message, stackful coroutine, one
880884
auto frame1 = azmq::message{};
881885
auto const btb1 = azmq::async_receive(sb, frame1, yield);
882886
REQUIRE(btb1 == 5);
887+
REQUIRE(frame1.more());
883888

884889
auto frame2 = azmq::message{};
885890
auto const btb2 = azmq::async_receive(sb, frame2, yield);
886891
REQUIRE(btb2 == 2);
892+
REQUIRE(frame2.more());
893+
REQUIRE(message_ref(snd_bufs.at(0)) == message_ref(frame2));
887894

888895
auto frame3 = azmq::message{};
889896
auto const btb3 = azmq::async_receive(sb, frame3, yield);
890897
REQUIRE(btb3 == 2);
898+
REQUIRE(!frame3.more());
899+
REQUIRE(message_ref(snd_bufs.at(1)) == message_ref(frame3));
891900

892901
});
893902

894903
ios.run();
895904
}
896905

906+
907+
TEST_CASE("Async Operation Send/Receive single message, check thread safety", "[socket_ops]") {
908+
boost::asio::io_service ios;
909+
#if BOOST_VERSION >= 107400
910+
boost::asio::strand<boost::asio::any_io_executor> strand{ios.get_executor()};
911+
#else
912+
boost::asio::strand<boost::asio::executor> strand{ios.get_executor()};
913+
#endif
914+
915+
azmq::socket sb(ios, ZMQ_ROUTER);
916+
sb.bind(subj(BOOST_CURRENT_FUNCTION));
917+
918+
azmq::socket sc(ios, ZMQ_DEALER);
919+
sc.connect(subj(BOOST_CURRENT_FUNCTION));
920+
921+
//send coroutine task
922+
boost::asio::spawn(strand, [&](boost::asio::yield_context yield) {
923+
REQUIRE(strand.running_in_this_thread());
924+
boost::system::error_code ecc;
925+
auto const btc = azmq::async_send(sc, snd_bufs, yield[ecc]);
926+
REQUIRE(strand.running_in_this_thread());
927+
REQUIRE(!ecc);
928+
REQUIRE(btc == 4);
929+
});
930+
931+
//receive coroutine task
932+
boost::asio::spawn(strand, [&](boost::asio::yield_context yield) {
933+
std::array<char, 5> ident;
934+
std::array<char, 2> a;
935+
std::array<char, 2> b;
936+
937+
std::array<boost::asio::mutable_buffer, 3> rcv_bufs = { {boost::asio::buffer(ident),
938+
boost::asio::buffer(a),
939+
boost::asio::buffer(b)}};
940+
941+
boost::system::error_code ecc;
942+
943+
REQUIRE(strand.running_in_this_thread());
944+
auto const btb = azmq::async_receive(sb, rcv_bufs, yield[ecc]);
945+
REQUIRE(strand.running_in_this_thread());
946+
REQUIRE(!ecc);
947+
REQUIRE(btb == 9);
948+
949+
REQUIRE(message_ref(snd_bufs.at(0)) == boost::string_ref(a.data(), 2));
950+
REQUIRE(message_ref(snd_bufs.at(1)) == boost::string_ref(b.data(), 2));
951+
});
952+
953+
ios.run();
954+
}
955+
897956
#endif // BOOST_VERSION >= 107000
898957

0 commit comments

Comments
 (0)