Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 27 additions & 48 deletions src/groups/mqb/mqba/mqba_clientsession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -356,16 +356,16 @@ struct BuildAckOverflowFunctor {
// -------------------------

ClientSessionState::ClientSessionState(
bslma::ManagedPtr<bmqst::StatContext>& clientStatContext,
BlobSpPool* blobSpPool,
bdlbb::BlobBufferFactory* bufferFactory,
bmqp::EncodingType::Enum encodingType,
bslma::Allocator* allocator)
const bsl::shared_ptr<bmqst::StatContext>& clientStatContext,
BlobSpPool* blobSpPool,
bdlbb::BlobBufferFactory* bufferFactory,
bmqp::EncodingType::Enum encodingType,
bslma::Allocator* allocator)
: d_allocator_p(allocator)
, d_channelBufferQueue(allocator)
, d_unackedMessageInfos(d_allocator_p)
, d_dispatcherClientData()
, d_statContext_mp(clientStatContext)
, d_statContext_sp(clientStatContext)
, d_bufferFactory_p(bufferFactory)
, d_blobSpPool_p(blobSpPool)
, d_schemaEventBuilder(blobSpPool, encodingType, allocator)
Expand Down Expand Up @@ -673,7 +673,7 @@ void ClientSession::sendAck(bmqt::AckResult::Enum status,
queueStats = invalidQueueStats();
}
else {
queueStats = subQueueCiter->value().d_stats.get();
queueStats = subQueueCiter->value().d_stats_sp.get();
}
}

Expand Down Expand Up @@ -1916,23 +1916,10 @@ bool ClientSession::validateMessage(mqbi::QueueHandle** queueHandle,
return false; // RETURN
}

StreamsMap::iterator subQueueIt =
queueIt->second.d_subQueueInfosMap.findBySubIdSafe(queueId.subId());
if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(
subQueueIt == queueIt->second.d_subQueueInfosMap.end())) {
BSLS_PERFORMANCEHINT_UNLIKELY_HINT;

if (eventType == bmqp::EventType::e_CONFIRM) {
// Update invalid queue stats
invalidQueueStats()->onEvent(
mqbstat::QueueStatsClient::EventType::e_CONFIRM,
1);
}

*errorStream << "for an unknown subQueueId";

return false; // RETURN
}
// Do not lookup 'queueId.subId()'.
// 'QueueHandle::confirmMessageDispatched' does the check.
// Note, that it does not update stats (on "bmq://invalid/queue").
// It does log warnings.

*queueHandle = queueIt->second.d_handle_p;
BSLS_ASSERT_SAFE(queueHandle);
Expand Down Expand Up @@ -1978,13 +1965,6 @@ bool ClientSession::validateMessage(mqbi::QueueHandle** queueHandle,
return false; // RETURN
}

if (eventType == bmqp::EventType::e_CONFIRM) {
// Update stats for the queue (or subStream of the queue)
subQueueIt->value().d_stats->onEvent(
mqbstat::QueueStatsClient::EventType::e_CONFIRM,
1);
}

return true;
}

Expand Down Expand Up @@ -2126,7 +2106,7 @@ void ClientSession::onPushEvent(const mqbi::DispatcherPushEvent& event)
blob->length());
}
else {
subQueueCiter->value().d_stats->onEvent(
subQueueCiter->value().onEvent(
mqbstat::QueueStatsClient::EventType::e_PUSH,
blob->length());
}
Expand Down Expand Up @@ -2277,9 +2257,8 @@ void ClientSession::onPutEvent(const mqbi::DispatcherPutEvent& event)
BSLS_ASSERT_SAFE(queueStatePtr && subQueueInfoPtr);
BSLS_ASSERT_SAFE(queueStatePtr->d_handle_p);

subQueueInfoPtr->d_stats->onEvent(
mqbstat::QueueStatsClient::EventType::e_PUT,
appDataSp->length());
subQueueInfoPtr->onEvent(mqbstat::QueueStatsClient::EventType::e_PUT,
appDataSp->length());

const bool isAtMostOnce =
queueStatePtr->d_handle_p->queue()->isAtMostOnce();
Expand Down Expand Up @@ -2446,7 +2425,7 @@ mqbstat::QueueStatsClient* ClientSession::invalidQueueStats()
d_state.d_invalidQueueStats.makeValue();
d_state.d_invalidQueueStats.value().initialize(
"bmq://invalid/queue",
d_state.d_statContext_mp.get(),
d_state.d_statContext_sp.get(),
d_state.d_allocator_p);
// TBD: The queue uri should be '** INVALID QUEUE **', but that can
// only be done once the stats UI panel has been updated to
Expand Down Expand Up @@ -2649,17 +2628,17 @@ bool ClientSession::validatePutMessage(QueueState** queueState,

// CREATORS
ClientSession::ClientSession(
const bsl::shared_ptr<bmqio::Channel>& channel,
const bmqp_ctrlmsg::NegotiationMessage& negotiationMessage,
const bsl::string& sessionDescription,
mqbi::Dispatcher* dispatcher,
mqbblp::ClusterCatalog* clusterCatalog,
mqbi::DomainFactory* domainFactory,
bslma::ManagedPtr<bmqst::StatContext>& clientStatContext,
ClientSessionState::BlobSpPool* blobSpPool,
bdlbb::BlobBufferFactory* bufferFactory,
bdlmt::EventScheduler* scheduler,
bslma::Allocator* allocator)
const bsl::shared_ptr<bmqio::Channel>& channel,
const bmqp_ctrlmsg::NegotiationMessage& negotiationMessage,
const bsl::string& sessionDescription,
mqbi::Dispatcher* dispatcher,
mqbblp::ClusterCatalog* clusterCatalog,
mqbi::DomainFactory* domainFactory,
const bsl::shared_ptr<bmqst::StatContext>& clientStatContext,
ClientSessionState::BlobSpPool* blobSpPool,
bdlbb::BlobBufferFactory* bufferFactory,
bdlmt::EventScheduler* scheduler,
bslma::Allocator* allocator)
: d_self(this) // use default allocator
, d_operationState(e_RUNNING)
, d_isDisconnecting(false)
Expand All @@ -2680,7 +2659,7 @@ ClientSession::ClientSession(
allocator)
, d_queueSessionManager(this,
*d_clientIdentity_p,
d_state.d_statContext_mp.get(),
d_state.d_statContext_sp,
domainFactory,
allocator)
, d_clusterCatalog_p(clusterCatalog)
Expand Down
24 changes: 11 additions & 13 deletions src/groups/mqb/mqba/mqba_clientsession.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,6 @@ struct ClientSessionState {
typedef bsl::pair<UnackedMessageInfoMap::iterator, bool>
UnackedMessageInfoMapInsertRc;

typedef bslma::ManagedPtr<bmqst::StatContext> StatContextMp;

public:
// PUBLIC DATA

Expand All @@ -173,7 +171,7 @@ struct ClientSessionState {

/// Stat context dedicated to this domain, to use as the parent stat
/// context for any queue in this domain.
StatContextMp d_statContext_mp;
const bsl::shared_ptr<bmqst::StatContext> d_statContext_sp;

/// Blob buffer factory to use.
///
Expand Down Expand Up @@ -225,11 +223,11 @@ struct ClientSessionState {
/// builder will use. Memory allocations are performed using the
/// specified `allocator`.
ClientSessionState(
bslma::ManagedPtr<bmqst::StatContext>& clientStatContext,
BlobSpPool* blobSpPool,
bdlbb::BlobBufferFactory* bufferFactory,
bmqp::EncodingType::Enum encodingType,
bslma::Allocator* allocator);
const bsl::shared_ptr<bmqst::StatContext>& clientStatContext,
BlobSpPool* blobSpPool,
bdlbb::BlobBufferFactory* bufferFactory,
bmqp::EncodingType::Enum encodingType,
bslma::Allocator* allocator);
};

// ===================
Expand Down Expand Up @@ -632,11 +630,11 @@ class ClientSession : public mqbnet::Session,
mqbi::Dispatcher* dispatcher,
mqbblp::ClusterCatalog* clusterCatalog,
mqbi::DomainFactory* domainFactory,
bslma::ManagedPtr<bmqst::StatContext>& clientStatContext,
ClientSessionState::BlobSpPool* blobSpPool,
bdlbb::BlobBufferFactory* bufferFactory,
bdlmt::EventScheduler* scheduler,
bslma::Allocator* allocator);
const bsl::shared_ptr<bmqst::StatContext>& clientStatContext,
ClientSessionState::BlobSpPool* blobSpPool,
bdlbb::BlobBufferFactory* bufferFactory,
bdlmt::EventScheduler* scheduler,
bslma::Allocator* allocator);

/// Destructor
~ClientSession() BSLS_KEYWORD_OVERRIDE;
Expand Down
42 changes: 24 additions & 18 deletions src/groups/mqb/mqba/mqba_clientsession.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ class MyMockDomain : public mqbmock::Domain {
/// calls the specified `callback` with a new queue handle created
/// using the specified `handleParameters`. The specified `uri` and
/// `clientContext` are ignored.
void openQueue(BSLA_UNUSED const bmqt::Uri& uri,
void openQueue(const bmqt::Uri& uri,
const bsl::shared_ptr<mqbi::QueueHandleRequesterContext>&
clientContext,
const bmqp_ctrlmsg::QueueHandleParameters& handleParameters,
Expand All @@ -605,8 +605,15 @@ class MyMockDomain : public mqbmock::Domain {
handleParameters,
d_allocator_p);

OpenQueueConfirmationCookie confirmationCookie;
confirmationCookie.createInplace(d_allocator_p, d_queueHandle.get());
mqbi::OpenQueueConfirmationCookieSp confirmationCookie;
confirmationCookie.createInplace(d_allocator_p);
confirmationCookie->d_handle = d_queueHandle.get();

confirmationCookie->d_stats_sp.createInplace(d_allocator_p);
confirmationCookie->d_stats_sp->initialize(
uri,
clientContext->statContext().get(),
d_allocator_p);

bmqp_ctrlmsg::Status status(d_allocator_p);
status.category() = bmqp_ctrlmsg::StatusCategory::E_SUCCESS;
Expand Down Expand Up @@ -648,18 +655,18 @@ class TestBench {

public:
// DATA
bdlbb::PooledBlobBufferFactory d_bufferFactory;
BlobSpPool d_blobSpPool;
bdlbb::PooledBlobBufferFactory d_bufferFactory;
BlobSpPool d_blobSpPool;
bsl::shared_ptr<bmqio::TestChannel> d_channel;
mqbmock::Cluster d_cluster;
mqbmock::Dispatcher d_mockDispatcher;
MyMockDomain d_domain;
mqbmock::DomainFactory d_mockDomainFactory;
bslma::ManagedPtr<bmqst::StatContext> d_clientStatContext_mp;
bdlmt::EventScheduler d_scheduler;
TestClock d_testClock;
mqba::ClientSession d_cs;
bslma::Allocator* d_allocator_p;
mqbmock::Cluster d_cluster;
mqbmock::Dispatcher d_mockDispatcher;
MyMockDomain d_domain;
mqbmock::DomainFactory d_mockDomainFactory;
const bsl::shared_ptr<bmqst::StatContext> d_clientStatContext_sp;
bdlmt::EventScheduler d_scheduler;
TestClock d_testClock;
mqba::ClientSession d_cs;
bslma::Allocator* d_allocator_p;

static const int k_PAYLOAD_LENGTH = 36;

Expand All @@ -682,9 +689,8 @@ class TestBench {
, d_mockDispatcher(allocator)
, d_domain(&d_mockDispatcher, &d_cluster, atMostOnce, allocator)
, d_mockDomainFactory(d_domain, allocator)
, d_clientStatContext_mp(
mqbstat::QueueStatsUtil::initializeStatContextClients(10, allocator)
.managedPtr())
, d_clientStatContext_sp(
mqbstat::QueueStatsUtil::initializeStatContextClients(10, allocator))
, d_scheduler(bsls::SystemClockType::e_MONOTONIC, allocator)
, d_testClock(d_scheduler)
, d_cs(d_channel,
Expand All @@ -693,7 +699,7 @@ class TestBench {
setInDispatcherThread(&d_mockDispatcher),
0, // ClusterCatalog
&d_mockDomainFactory,
d_clientStatContext_mp,
d_clientStatContext_sp,
&d_blobSpPool,
&d_bufferFactory,
&d_scheduler,
Expand Down
Loading
Loading