Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 15 additions & 29 deletions src/groups/mqb/mqbblp/mqbblp_localqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,8 @@ void LocalQueue::postMessage(const bmqp::PutHeader& putHeader,
static_cast<unsigned int>(appData->length()),
translation,
putHeader.compressionAlgorithmType(),
!d_haveStrongConsistency,
false,
d_haveStrongConsistency,
doAck ? source : 0,
putHeader.crc32c(),
timePoint); // Arrival Timepoint
Expand All @@ -488,24 +489,17 @@ void LocalQueue::postMessage(const bmqp::PutHeader& putHeader,

// Send acknowledgement if post failed or if ack was requested (both could
// be true as well).
if (res != mqbi::StorageResult::e_SUCCESS || haveReceipt) {
// Calculate time delta between PUT and ACK
const bsls::Types::Int64 timeDelta =
bmqsys::Time::highResolutionTimer() - timePoint;
d_state_p->stats()
->onEvent<mqbstat::QueueStatsDomain::EventType::e_ACK_TIME>(
timeDelta);
if (res != mqbi::StorageResult::e_SUCCESS || doAck) {
bmqp::AckMessage ackMessage;
ackMessage
.setStatus(bmqp::ProtocolUtil::ackResultToCode(
mqbi::StorageResult::toAckResult(res)))
.setMessageGUID(putHeader.messageGUID());
// CorrelationId & QueueId are left unset as those fields will
// be filled downstream.

source->onAckMessage(ackMessage);
}
if (res != mqbi::StorageResult::e_SUCCESS || haveReceipt ||
!d_haveStrongConsistency) {
bmqp::AckMessage ackMessage;
ackMessage
.setStatus(bmqp::ProtocolUtil::ackResultToCode(
mqbi::StorageResult::toAckResult(res)))
.setMessageGUID(putHeader.messageGUID());
// CorrelationId & QueueId are left unset as those fields will
// be filled downstream.

source->onAckMessage(ackMessage);
}

if (BSLS_PERFORMANCEHINT_PREDICT_LIKELY(res ==
Expand Down Expand Up @@ -551,17 +545,9 @@ void LocalQueue::onPushMessage(
"onPushMessage should not be called on LocalQueue");
}

void LocalQueue::onReceipt(const bmqt::MessageGUID& msgGUID,
mqbi::QueueHandle* qH,
const bsls::Types::Int64& arrivalTimepoint)
void LocalQueue::onReceipt(const bmqt::MessageGUID& msgGUID,
mqbi::QueueHandle* qH)
{
// Calculate time delta between PUT and ACK
const bsls::Types::Int64 timeDelta = bmqsys::Time::highResolutionTimer() -
arrivalTimepoint;

d_state_p->stats()
->onEvent<mqbstat::QueueStatsDomain::EventType::e_ACK_TIME>(timeDelta);

if (d_state_p->handleCatalog().hasHandle(qH)) {
// Send acknowledgement
bmqp::AckMessage ackMessage;
Expand Down
7 changes: 2 additions & 5 deletions src/groups/mqb/mqbblp/mqbblp_localqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,13 +180,10 @@ class LocalQueue BSLS_CPP11_FINAL {

/// Invoked by the Data Store when it receives quorum Receipts for the
/// specified `msgGUID`. Send ACK to the specified `qH` if it is
/// present in the queue handle catalog. Update ACK time stats using
/// the specified `arrivalTimepoint`.
/// present in the queue handle catalog.
///
/// THREAD: This method is called from the Storage dispatcher thread.
void onReceipt(const bmqt::MessageGUID& msgGUID,
mqbi::QueueHandle* qH,
const bsls::Types::Int64& arrivalTimepoint);
void onReceipt(const bmqt::MessageGUID& msgGUID, mqbi::QueueHandle* qH);

/// Invoked by the Data Store when it removes (times out waiting for
/// quorum Receipts for) a message with the specified `msgGUID`. Send
Expand Down
7 changes: 3 additions & 4 deletions src/groups/mqb/mqbblp/mqbblp_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -565,13 +565,12 @@ void Queue::onOpenUpstream(bsls::Types::Uint64 genCount,
}
}

void Queue::onReceipt(const bmqt::MessageGUID& msgGUID,
mqbi::QueueHandle* queueHandle,
const bsls::Types::Int64& arrivalTimepoint)
void Queue::onReceipt(const bmqt::MessageGUID& msgGUID,
mqbi::QueueHandle* queueHandle)
{
BSLS_ASSERT_SAFE(d_localQueue_mp);

d_localQueue_mp->onReceipt(msgGUID, queueHandle, arrivalTimepoint);
d_localQueue_mp->onReceipt(msgGUID, queueHandle);
}

void Queue::onRemoval(const bmqt::MessageGUID& msgGUID,
Expand Down
6 changes: 2 additions & 4 deletions src/groups/mqb/mqbblp/mqbblp_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -337,10 +337,8 @@ class Queue BSLS_CPP11_FINAL : public mqbi::Queue {
/// the specified `arrivalTimepoint`.
///
/// THREAD: This method is called from the Storage dispatcher thread.
void onReceipt(const bmqt::MessageGUID& msgGUID,
mqbi::QueueHandle* queueHandle,
const bsls::Types::Int64& arrivalTimepoint)
BSLS_KEYWORD_OVERRIDE;
void onReceipt(const bmqt::MessageGUID& msgGUID,
mqbi::QueueHandle* queueHandle) BSLS_KEYWORD_OVERRIDE;

/// Invoked by the Data Store when it removes (times out waiting for
/// quorum Receipts for) a message with the specified `msgGUID`. Send
Expand Down
4 changes: 2 additions & 2 deletions src/groups/mqb/mqbi/mqbi_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,8 @@ class Cluster : public DispatcherClient {
/// used by this cluster.
virtual RequestManagerType& requestManager() = 0;

// Return a reference offering a modifiable access to the multi request
// manager used by this cluster.
/// Return a reference offering a modifiable access to the multi request
/// manager used by this cluster.
virtual MultiRequestManagerType& multiRequestManager() = 0;

/// Send the specified `request` with the specified `timeout` to the
Expand Down
8 changes: 3 additions & 5 deletions src/groups/mqb/mqbi/mqbi_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -869,13 +869,11 @@ class Queue : public DispatcherClient {

/// Invoked by the Data Store when it receives quorum Receipts for the
/// specified `msgGUID`. Send ACK to the specified `queueHandle` if it
/// is present in the queue handle catalog. Update AVK time stats using
/// the specified `arrivalTimepoint`.
/// is present in the queue handle catalog..
///
/// THREAD: This method is called from the Queue's dispatcher thread.
virtual void onReceipt(const bmqt::MessageGUID& msgGUID,
mqbi::QueueHandle* queueHandle,
const bsls::Types::Int64& arrivalTimepoint) = 0;
virtual void onReceipt(const bmqt::MessageGUID& msgGUID,
mqbi::QueueHandle* queueHandle) = 0;

/// Invoked by the Data Store when it removes (times out waiting for
/// quorum Receipts for) a message with the specified `msgGUID`. Send
Expand Down
20 changes: 16 additions & 4 deletions src/groups/mqb/mqbi/mqbi_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ class StorageMessageAttributes {

bool d_hasReceipt;

bool d_strongConsistency;

mqbi::QueueHandle* d_queueHandle;

unsigned int d_crc32c;
Expand Down Expand Up @@ -214,10 +216,11 @@ class StorageMessageAttributes {
unsigned int refCount,
const bmqp::MessagePropertiesInfo& messagePropertiesInfo,
bmqt::CompressionAlgorithmType::Enum compressionAlgorithmType,
bool hasReceipt = true,
mqbi::QueueHandle* queueHandle = 0,
unsigned int crc32c = 0,
bsls::Types::Int64 arrivalTimepoint = 0);
bool hasReceipt = true,
bool strongConsistency = true,
mqbi::QueueHandle* queueHandle = 0,
unsigned int crc32c = 0,
bsls::Types::Int64 arrivalTimepoint = 0);

// MANIPULATORS
StorageMessageAttributes& setArrivalTimestamp(bsls::Types::Uint64 value);
Expand All @@ -243,6 +246,7 @@ class StorageMessageAttributes {
unsigned int appDataLen() const;
const bmqp::MessagePropertiesInfo& messagePropertiesInfo() const;
bool hasReceipt() const;
bool strongConsistency() const;
mqbi::QueueHandle* queueHandle() const;

/// Return the CRC32-C associated with this object.
Expand Down Expand Up @@ -788,6 +792,7 @@ inline StorageMessageAttributes::StorageMessageAttributes()
, d_appDataLen(0)
, d_messagePropertiesInfo()
, d_hasReceipt(true)
, d_strongConsistency(true)
, d_queueHandle(0)
, d_crc32c(0)
, d_compressionAlgorithmType(bmqt::CompressionAlgorithmType::e_NONE)
Expand All @@ -801,6 +806,7 @@ inline StorageMessageAttributes::StorageMessageAttributes(
const bmqp::MessagePropertiesInfo& messagePropertiesInfo,
bmqt::CompressionAlgorithmType::Enum compressionAlgorithmType,
bool hasReceipt,
bool strongConsistency,
mqbi::QueueHandle* queueHandle,
unsigned int crc32c,
bsls::Types::Int64 arrivalTimepoint)
Expand All @@ -810,6 +816,7 @@ inline StorageMessageAttributes::StorageMessageAttributes(
, d_appDataLen(appDataLen)
, d_messagePropertiesInfo(messagePropertiesInfo)
, d_hasReceipt(hasReceipt)
, d_strongConsistency(strongConsistency)
, d_queueHandle(queueHandle)
, d_crc32c(crc32c)
, d_compressionAlgorithmType(compressionAlgorithmType)
Expand Down Expand Up @@ -921,6 +928,11 @@ inline bool StorageMessageAttributes::hasReceipt() const
return d_hasReceipt;
}

inline bool StorageMessageAttributes::strongConsistency() const
{
return d_strongConsistency;
}

inline mqbi::QueueHandle* StorageMessageAttributes::queueHandle() const
{
return d_queueHandle;
Expand Down
3 changes: 1 addition & 2 deletions src/groups/mqb/mqbmock/mqbmock_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,7 @@ void Queue::onOpenFailure(BSLA_UNUSED unsigned int subQueueId)
}

void Queue::onReceipt(BSLA_UNUSED const bmqt::MessageGUID& msgGUID,
BSLA_UNUSED mqbi::QueueHandle* qH,
BSLA_UNUSED const bsls::Types::Int64& arrivalTimepoint)
BSLA_UNUSED mqbi::QueueHandle* qH)
{
// NOTHING
}
Expand Down
6 changes: 2 additions & 4 deletions src/groups/mqb/mqbmock/mqbmock_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -324,10 +324,8 @@ class Queue : public mqbi::Queue {
/// Invoked by the Data Store when it receives quorum Receipts.
///
/// THREAD: This method is called from the Queue's dispatcher thread.
void onReceipt(const bmqt::MessageGUID& msgGUID,
mqbi::QueueHandle* qH,
const bsls::Types::Int64& arrivalTimepoint)
BSLS_KEYWORD_OVERRIDE;
void onReceipt(const bmqt::MessageGUID& msgGUID,
mqbi::QueueHandle* qH) BSLS_KEYWORD_OVERRIDE;

/// Invoked by the Data Store when it removes (times out waiting for
/// quorum Receipts for) a message with the specified `msgGUID`. Send
Expand Down
1 change: 0 additions & 1 deletion src/groups/mqb/mqbs/mqbs_filebackedstorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,6 @@ FileBackedStorage::put(mqbi::StorageMessageAttributes* attributes,
queue()
->stats()
->onEvent<mqbstat::QueueStatsDomain::EventType::e_ADD_MESSAGE>(

msgSize);

d_isEmpty.storeRelaxed(0);
Expand Down
Loading
Loading