Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
uses: ./.github/workflows/build-ubuntu.yaml
with:
ref: ${{ github.sha }}
target: "bmqbrkr bmqtool bmqstoragetool all.it"
target: "bmqbrkr bmqtool bmqstoragetool all.it bmqauthnpass bmqauthnfail"
save_build_as_artifacts: true
run_unit_tests: false

Expand Down
36 changes: 27 additions & 9 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ if (NOT DEFINED INSTALL_TARGETS)
set(BMQ_TARGET_PROMETHEUS_NEEDED NO)
set(BMQ_TARGET_IT_NEEDED YES)
set(BMQ_TARGET_FUZZTESTS_NEEDED NO)
set(BMQ_TARGET_AUTHNPASS_NEEDED YES)
set(BMQ_TARGET_AUTHNFAIL_NEEDED YES)
else()
bbproject_check_install_target("bmqbrkr" installBMQBRKR)
bbproject_check_install_target("BMQBRKR_NIGHTLY" installNightly)
Expand All @@ -148,14 +150,18 @@ else()
set(BMQ_TARGET_PROMETHEUS_NEEDED NO)
set(BMQ_TARGET_IT_NEEDED NO)
set(BMQ_TARGET_FUZZTESTS_NEEDED NO)

bbproject_check_install_target("bmq" installBMQ)
bbproject_check_install_target("mqb" installMQB)
bbproject_check_install_target("bmqbrkrcfg" installBMQBRKRCFG)
bbproject_check_install_target("bmqtool" installBMQTOOL)
bbproject_check_install_target("bmqstoragetool" installBMQSTORAGETOOL)
bbproject_check_install_target("prometheus" installPROMETHEUS)
bbproject_check_install_target("fuzztests" installFUZZTESTS)
set(BMQ_TARGET_AUTHNPASS_NEEDED NO)
set(BMQ_TARGET_AUTHNFAIL_NEEDED NO)

bbproject_check_install_target("bmq" installBMQ)
bbproject_check_install_target("mqb" installMQB)
bbproject_check_install_target("bmqbrkrcfg" installBMQBRKRCFG)
bbproject_check_install_target("bmqtool" installBMQTOOL)
bbproject_check_install_target("bmqstoragetool" installBMQSTORAGETOOL)
bbproject_check_install_target("prometheus" installPROMETHEUS)
bbproject_check_install_target("fuzztests" installFUZZTESTS)
bbproject_check_install_target("authnpass" installAUTHNPASS)
bbproject_check_install_target("authnfail" installAUTHNFAIL)

if (installBMQ)
set(BMQ_TARGET_BMQ_NEEDED YES)
Expand Down Expand Up @@ -200,6 +206,18 @@ else()
set(BMQ_TARGET_MQB_NEEDED YES)
set(BMQ_TARGET_FUZZTESTS_NEEDED YES)
endif()

if (installAUTHNPASS)
set(BMQ_TARGET_BMQ_NEEDED YES)
set(BMQ_TARGET_MQB_NEEDED YES)
set(BMQ_TARGET_AUTHNPASS_NEEDED YES)
endif()

if (installAUTHNFAIL)
set(BMQ_TARGET_BMQ_NEEDED YES)
set(BMQ_TARGET_MQB_NEEDED YES)
set(BMQ_TARGET_AUTHNFAIL_NEEDED YES)
endif()
endif()

find_package(Git)
Expand Down Expand Up @@ -326,7 +344,7 @@ if(NOT BMQ_TARGET_BMQBRKR_NEEDED)
return()
endif()

# Install all the headers for mqb + bmq
# Install all the headers for mqb + bmq
install(TARGETS bmqbrkr_plugins
EXPORT BmqbrkrPluginsTargets
FILE_SET HEADERS
Expand Down
2 changes: 1 addition & 1 deletion bin/build-darwin.sh
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ CMAKE_OPTIONS=(\

PKG_CONFIG_PATH="${DIR_INSTALL}/lib/pkgconfig:${BREW_PKG_CONFIG_PATH}" \
cmake -B "${DIR_BUILD}/blazingmq" -S "${DIR_ROOT}" "${CMAKE_OPTIONS[@]}"
make -C "${DIR_BUILD}/blazingmq" -j 16
cmake --build "${DIR_BUILD}/blazingmq" --parallel 16 --target bmqbrkr bmqtool all.it bmqauthnpass bmqauthnfail

echo broker is here: "${DIR_BUILD}/blazingmq/src/applications/bmqbrkr/bmqbrkr.tsk"
echo to run the broker: "${DIR_BUILD}/blazingmq/src/applications/bmqbrkr/run"
Expand Down
4 changes: 2 additions & 2 deletions docker/build_deps.sh
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ fetch_deps() {
configure() {
PATH="$PATH:$(realpath srcs/bde-tools/bin)"
export PATH
eval "$(bbs_build_env -u opt_64_cpp17)"
eval "$(bbs_build_env -u opt_64_cpp17_pic)"
}

build_bde() {
Expand All @@ -68,7 +68,7 @@ build_ntf() {
--without-usage-examples \
--without-applications \
--without-warnings-as-errors \
--ufid opt_64_cpp17
--ufid opt_64_cpp17_pic
make -j8
make install
popd
Expand Down
3 changes: 2 additions & 1 deletion src/groups/bmq/bmqa/bmqa_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@
#include <bmqp_ctrlmsg_messages.h>
#include <bmqp_messageguidgenerator.h>
#include <bmqp_protocol.h>
#include <bmqt_authncredential.h>
#include <bmqt_correlationid.h>
#include <bmqt_queueflags.h>

#include <bmqu_memoutstream.h>

// BDE
Expand All @@ -50,6 +50,7 @@
#include <bsl_cstdio.h>
#include <bsl_cstdlib.h>
#include <bsl_iostream.h>
#include <bsl_memory.h>
#include <bsl_type_traits.h>
#include <bsla_annotations.h>
#include <bslma_default.h>
Expand Down
21 changes: 11 additions & 10 deletions src/groups/bmq/bmqimp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@ its components directly, but rather use the accessor public APIs from 'bmqa'.

Component Synopsis
------------------
Component | Provides ...
----------------------------------|-----------------------------------------------------------
`bmqimp_application` | the top level object to manipulate a session with bmqbrkr.
`bmqimp_brokersession` | the implementation of a session with the bmqbrkr.
`bmqimp_event` | a value-semantic type representing an event.
`bmqimp_eventqueue` | a thread safe queue of pooled bmqimp::Event items.
`bmqpimp_eventsstats` | a mechanism to keep track of Events statistics.
`bmqpimp_queue` | a type object to represent information about a queue.
`bmqimp_stat` | utilities for stat manipulation.
`bmqimp_negotiatedchannelfactory` | a channel factory that negotiates with a peer.
Component | Provides ...
-------------------------------------|-----------------------------------------------------------
`bmqimp_application` | the top level object to manipulate a session with bmqbrkr.
`bmqimp_brokersession` | the implementation of a session with the bmqbrkr.
`bmqimp_event` | a value-semantic type representing an event.
`bmqimp_eventqueue` | a thread safe queue of pooled bmqimp::Event items.
`bmqpimp_eventsstats` | a mechanism to keep track of Events statistics.
`bmqpimp_queue` | a type object to represent information about a queue.
`bmqimp_stat` | utilities for stat manipulation.
`bmqimp_authenticatedchannelfactory` | a channel factory that authenticates with a peer.
`bmqimp_negotiatedchannelfactory` | a channel factory that negotiates with a peer.
31 changes: 29 additions & 2 deletions src/groups/bmq/bmqimp/bmqimp_application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
// BMQ
#include <bmqex_executionpolicy.h>
#include <bmqex_systemexecutor.h>
#include <bmqimp_authenticatedchannelfactory.h>
#include <bmqimp_negotiatedchannelfactory.h>
#include <bmqio_channelutil.h>
#include <bmqio_connectoptions.h>
#include <bmqio_status.h>
Expand Down Expand Up @@ -93,7 +95,7 @@
// has to be of size 1 more than the dump interval
config.defaultHistorySize(
30,
(options.statsDumpInterval().seconds() / 30) + 1);

Check warning on line 98 in src/groups/bmq/bmqimp/bmqimp_application.cpp

View workflow job for this annotation

GitHub Actions / Build Prometheus plugin [ubuntu]

conversion from ‘BloombergLP::bsls::Types::Int64’ {aka ‘long long int’} to ‘int’ may change value [-Wconversion]

Check warning on line 98 in src/groups/bmq/bmqimp/bmqimp_application.cpp

View workflow job for this annotation

GitHub Actions / Build [ubuntu] / Build [ubuntu] b363301bba8e868a492e64990718dc57fbf74dd7 bmqbrkr bmqtool bmqstoragetool all.it bmqauthnpass bmqauthnfail

conversion from ‘BloombergLP::bsls::Types::Int64’ {aka ‘long long int’} to ‘int’ may change value [-Wconversion]

Check warning on line 98 in src/groups/bmq/bmqimp/bmqimp_application.cpp

View workflow job for this annotation

GitHub Actions / UT [c++] / Build [ubuntu] b363301bba8e868a492e64990718dc57fbf74dd7 all.t

conversion from ‘BloombergLP::bsls::Types::Int64’ {aka ‘long long int’} to ‘int’ may change value [-Wconversion]
}
else {
config.defaultHistorySize(2);
Expand Down Expand Up @@ -215,7 +217,24 @@
BALL_LOG_TRACE << channel->peerUri() << ": ReadCallback got a blob\n"
<< bmqu::BlobStartHexDumper(&readBlob);

d_brokerSession.processPacket(event);
if (event.isAuthenticationEvent()) {
// Application received a broker response to an re-authentication
// request. The callback function `channelStateCallback` should
// only be called for failed cases.
d_authenticatedChannelFactory.processAuthenticationEvent(
event,
bdlf::BindUtil::bindS(&d_allocator,
&Application::channelStateCallback,
this,
channel->peerUri(),
bdlf::PlaceHolders::_1, // event
bdlf::PlaceHolders::_2, // status
bdlf::PlaceHolders::_3), // channel
channel);
}
else {
d_brokerSession.processPacket(event);
}
}
}

Expand Down Expand Up @@ -415,7 +434,7 @@

// Schedule a stats snapshot every seconds, if configured for
if (d_sessionOptions.statsDumpInterval() != bsls::TimeInterval()) {
d_nextStatDump = d_sessionOptions.statsDumpInterval().seconds();

Check warning on line 437 in src/groups/bmq/bmqimp/bmqimp_application.cpp

View workflow job for this annotation

GitHub Actions / Build Prometheus plugin [ubuntu]

conversion from ‘BloombergLP::bsls::Types::Int64’ {aka ‘long long int’} to ‘int’ may change value [-Wconversion]

Check warning on line 437 in src/groups/bmq/bmqimp/bmqimp_application.cpp

View workflow job for this annotation

GitHub Actions / Build [ubuntu] / Build [ubuntu] b363301bba8e868a492e64990718dc57fbf74dd7 bmqbrkr bmqtool bmqstoragetool all.it bmqauthnpass bmqauthnfail

conversion from ‘BloombergLP::bsls::Types::Int64’ {aka ‘long long int’} to ‘int’ may change value [-Wconversion]

Check warning on line 437 in src/groups/bmq/bmqimp/bmqimp_application.cpp

View workflow job for this annotation

GitHub Actions / UT [c++] / Build [ubuntu] b363301bba8e868a492e64990718dc57fbf74dd7 all.t

conversion from ‘BloombergLP::bsls::Types::Int64’ {aka ‘long long int’} to ‘int’ may change value [-Wconversion]
d_scheduler.scheduleRecurringEvent(
&d_statSnaphotTimerHandle,
bsls::TimeInterval(1.0),
Expand Down Expand Up @@ -444,7 +463,7 @@
if (d_nextStatDump > 0 && --d_nextStatDump == 0) {
// NOTE: This is subject to the scheduler issue where it will try to
// catch-up after falling behind.
d_nextStatDump = d_sessionOptions.statsDumpInterval().seconds();

Check warning on line 466 in src/groups/bmq/bmqimp/bmqimp_application.cpp

View workflow job for this annotation

GitHub Actions / Build Prometheus plugin [ubuntu]

conversion from ‘BloombergLP::bsls::Types::Int64’ {aka ‘long long int’} to ‘int’ may change value [-Wconversion]

Check warning on line 466 in src/groups/bmq/bmqimp/bmqimp_application.cpp

View workflow job for this annotation

GitHub Actions / Build [ubuntu] / Build [ubuntu] b363301bba8e868a492e64990718dc57fbf74dd7 bmqbrkr bmqtool bmqstoragetool all.it bmqauthnpass bmqauthnfail

conversion from ‘BloombergLP::bsls::Types::Int64’ {aka ‘long long int’} to ‘int’ may change value [-Wconversion]

Check warning on line 466 in src/groups/bmq/bmqimp/bmqimp_application.cpp

View workflow job for this annotation

GitHub Actions / UT [c++] / Build [ubuntu] b363301bba8e868a492e64990718dc57fbf74dd7 all.t

conversion from ‘BloombergLP::bsls::Types::Int64’ {aka ‘long long int’} to ‘int’ may change value [-Wconversion]
printStats(false);
}
}
Expand Down Expand Up @@ -602,8 +621,16 @@
bdlf::PlaceHolders::_2), // handle
allocator),
allocator)
, d_authenticatedChannelFactory(
AuthenticatedChannelFactoryConfig(&d_statChannelFactory,
&d_scheduler,
sessionOptions.authnCredentialCb(),
sessionOptions.connectTimeout(),
d_blobSpPool_sp.get(),
allocator),
allocator)
, d_negotiatedChannelFactory(
NegotiatedChannelFactoryConfig(&d_statChannelFactory,
NegotiatedChannelFactoryConfig(&d_authenticatedChannelFactory,
negotiationMessage,
sessionOptions.connectTimeout(),
d_blobSpPool_sp.get(),
Expand Down Expand Up @@ -679,7 +706,7 @@
if (d_sessionOptions.statsDumpInterval() != bsls::TimeInterval()) {
start.setLevel(1).setIndex(0);
end.setLevel(1).setIndex(
d_sessionOptions.statsDumpInterval().seconds() / 30);

Check warning on line 709 in src/groups/bmq/bmqimp/bmqimp_application.cpp

View workflow job for this annotation

GitHub Actions / Build Prometheus plugin [ubuntu]

conversion from ‘BloombergLP::bsls::Types::Int64’ {aka ‘long long int’} to ‘int’ may change value [-Wconversion]

Check warning on line 709 in src/groups/bmq/bmqimp/bmqimp_application.cpp

View workflow job for this annotation

GitHub Actions / Build [ubuntu] / Build [ubuntu] b363301bba8e868a492e64990718dc57fbf74dd7 bmqbrkr bmqtool bmqstoragetool all.it bmqauthnpass bmqauthnfail

conversion from ‘BloombergLP::bsls::Types::Int64’ {aka ‘long long int’} to ‘int’ may change value [-Wconversion]

Check warning on line 709 in src/groups/bmq/bmqimp/bmqimp_application.cpp

View workflow job for this annotation

GitHub Actions / UT [c++] / Build [ubuntu] b363301bba8e868a492e64990718dc57fbf74dd7 all.t

conversion from ‘BloombergLP::bsls::Types::Int64’ {aka ‘long long int’} to ‘int’ may change value [-Wconversion]
}
else {
start.setLevel(0).setIndex(0);
Expand Down
47 changes: 23 additions & 24 deletions src/groups/bmq/bmqimp/bmqimp_application.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

// BMQ

#include <bmqimp_authenticatedchannelfactory.h>
#include <bmqimp_brokersession.h>
#include <bmqimp_eventqueue.h>
#include <bmqimp_negotiatedchannelfactory.h>
Expand Down Expand Up @@ -81,7 +82,7 @@ namespace bmqimp {
class Application {
public:
// PUBLIC TYPES
typedef bmqp::BlobPoolUtil::BlobSpPool BlobSpPool;
typedef bmqp::BlobPoolUtil::BlobSpPool BlobSpPool;
typedef bmqp::BlobPoolUtil::BlobSpPoolSp BlobSpPoolSp;

private:
Expand All @@ -93,38 +94,38 @@ class Application {
BALL_LOG_SET_CLASS_CATEGORY("BMQIMP.APPLICATION");

// DATA

/// Stat context for counting allocators
bmqst::StatContext d_allocatorStatContext;
// Stat context for counting allocators

/// Counting allocator
bmqma::CountingAllocator d_allocator;
// Counting allocator

/// Allocator store to spawn new
/// allocators for sub-components
bmqma::CountingAllocatorStore d_allocators;
// Allocator store to spawn new
// allocators for sub-components

/// Top level stat context for all stats
bmqst::StatContext d_rootStatContext;
// Top level stat context for all stats

/// Top level stat context for channels
bslma::ManagedPtr<bmqst::StatContext> d_channelsStatContext_mp;
// Top level stat context for channels

/// Options to configure this application
bmqt::SessionOptions d_sessionOptions;
// Options to configure this
// application

bmqst::Table d_channelsTable;

bmqst::BasicTableInfoProvider d_channelsTip;

/// Factory for blob buffers
bdlbb::PooledBlobBufferFactory d_blobBufferFactory;
// Factory for blob buffers

/// Shared pointer to the pool of shared pointers to blobs.
BlobSpPoolSp d_blobSpPool_sp;

/// Scheduler
bdlmt::EventScheduler d_scheduler;
// Scheduler

bmqio::NtcChannelFactory d_channelFactory;

Expand All @@ -134,31 +135,28 @@ class Application {

bmqio::StatChannelFactory d_statChannelFactory;

AuthenticatedChannelFactory d_authenticatedChannelFactory;

NegotiatedChannelFactory d_negotiatedChannelFactory;

ChannelFactoryOpHandleMp d_connectHandle_mp;

/// The 'persistent' broker session state
BrokerSession d_brokerSession;
// The 'persistent' broker session
// state

/// Timer Event handle for 'async' start timeout
bdlmt::EventScheduler::EventHandle d_startTimeoutHandle;
// Timer Event handle for 'async' start
// timeout

/// Timer Event handle for statistics snaphot
bdlmt::EventScheduler::RecurringEventHandle d_statSnaphotTimerHandle;
// Timer Event handle for statistics
// snaphot

/// Counter decremented at every stat snapshot,
/// to know when to dump the stats
int d_nextStatDump;
// Counter decremented at every stat
// snapshot, to know when to dump the
// stats

/// HiRes timer value of the last time the snapshot was performed on the
/// Counting Allocators context
bsls::Types::Int64 d_lastAllocatorSnapshot;
// HiRes timer value of the last time
// the snapshot was performed on the
// Counting Allocators context

/// Scheduler handle of the recurring event to monitor channels heartbeats.
bdlmt::EventSchedulerRecurringEventHandle d_heartbeatSchedulerHandle;
Expand Down Expand Up @@ -226,7 +224,8 @@ class Application {
const bsl::shared_ptr<bmqp::HeartbeatMonitor>& monitor);
bsl::shared_ptr<bmqp::HeartbeatMonitor>
createMonitor(const bsl::shared_ptr<bmqio::Channel>& channel);
void startHeartbeat(const bsl::shared_ptr<bmqio::Channel>& channel,
void
startHeartbeat(const bsl::shared_ptr<bmqio::Channel>& channel,
const bsl::shared_ptr<bmqp::HeartbeatMonitor>& monitor);
void stopHeartbeat();

Expand Down
6 changes: 6 additions & 0 deletions src/groups/bmq/bmqimp/bmqimp_application.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ static void test1_breathingTest()

// Create a default application
bmqt::SessionOptions options(bmqtst::TestHelperUtil::allocator());
bmqp_ctrlmsg::AuthenticationMessage authenticaionMessage(
bmqtst::TestHelperUtil::allocator());
bmqp_ctrlmsg::NegotiationMessage negotiationMessage(
bmqtst::TestHelperUtil::allocator());
bmqimp::EventQueue::EventHandlerCallback emptyEventHandler;
Expand Down Expand Up @@ -78,6 +80,8 @@ static void test2_startStopTest()

// Create a default application, make sure it can start/stop
bmqt::SessionOptions options(bmqtst::TestHelperUtil::allocator());
bmqp_ctrlmsg::AuthenticationMessage authenticaionMessage(
bmqtst::TestHelperUtil::allocator());
bmqp_ctrlmsg::NegotiationMessage negotiationMessage(
bmqtst::TestHelperUtil::allocator());
bmqimp::EventQueue::EventHandlerCallback emptyEventHandler;
Expand Down Expand Up @@ -130,6 +134,8 @@ static void test3_startStopAsyncTest()

// Create a default application, make sure it can start/stop
bmqt::SessionOptions options(bmqtst::TestHelperUtil::allocator());
bmqp_ctrlmsg::AuthenticationMessage authenticaionMessage(
bmqtst::TestHelperUtil::allocator());
bmqp_ctrlmsg::NegotiationMessage negotiationMessage(
bmqtst::TestHelperUtil::allocator());
bmqimp::EventQueue::EventHandlerCallback emptyEventHandler;
Expand Down
Loading
Loading