Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/integration-tests/test_alarms.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ def test_no_alarms_if_disabled(cluster: Cluster, domain_urls: tc.DomainUrls):


@tweak.domain.max_idle_time(1)
def test_broadcast_no_alarms(cluster: Cluster, domain_urls: tc.DomainUrls): # pylint: disable=unused-argument
def test_broadcast_no_alarms(
cluster: Cluster, domain_urls: tc.DomainUrls
): # pylint: disable=unused-argument
"""
Test no broker ALARMS in broadcast mode.
"""
Expand Down Expand Up @@ -162,7 +164,7 @@ def test_priority_alarm_when_consumer_dropped(
cluster: Cluster, domain_urls: tc.DomainUrls
):
"""
Test that alarm is triggered when consumer droppped the connection in priority mode.
Test that alarm is triggered when consumer dropped the connection in priority mode.
"""
uri_priority = domain_urls.uri_priority
leader = cluster.last_known_leader
Expand Down
8 changes: 7 additions & 1 deletion src/integration-tests/test_app_subscriptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Any, List, Optional

import blazingmq.dev.it.testconstants as tc

from blazingmq.dev.it.fixtures import ( # pylint: disable=unused-import
Expand All @@ -25,6 +27,7 @@
tweak,
virtual_cluster_config,
)
from blazingmq.dev.it.process.broker import Broker
from blazingmq.dev.it.process.client import Client


Expand All @@ -34,7 +37,10 @@ class TestAppSubscriptions:
(apps)
"""

def _start_client(self, broker, uri, name, subscriptions=[]):
@staticmethod
def _start_client(
broker: Broker, uri: str, name: str, subscriptions: Optional[List[Any]] = None
) -> Client:
consumer = broker.create_client(name)
assert (
consumer.open(
Expand Down
41 changes: 22 additions & 19 deletions src/integration-tests/test_cluster_node_shutdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,13 @@
order,
multi_node,
)
from blazingmq.dev.it.common import BMQTestError, BMQTST_ASSERT
from blazingmq.dev.it.process.client import Client
from blazingmq.dev.it.util import wait_until

pytestmark = order(6)


class BMQITError(RuntimeError):
"""
BMQ IT error.
"""


class TestClusterNodeShutdown:
"""
This suite of test cases exercises node shutdown and subsequent failover
Expand All @@ -50,10 +45,12 @@ class TestClusterNodeShutdown:
@staticmethod
def open_or_raise(client: Client, uri: str, flags: List[str]):
rc = client.open(uri, flags=flags, succeed=True)
if rc != Client.e_SUCCESS:
raise BMQITError(
f"Failed to open a queue: client = {client}, uri = {uri}, rc = {rc}"
)
BMQTST_ASSERT(rc == Client.e_SUCCESS,
"Failed to open a queue",
client=client,
uri=uri,
flags=flags,
rc=rc)

def setup_cluster(self, cluster: Cluster, domain_urls: tc.DomainUrls):
du = domain_urls
Expand Down Expand Up @@ -128,7 +125,7 @@ def release_recovery_if_state_restored(line: str) -> None:
_ = recovery.get(timeout=120)
except queue.Empty as ex:
# No recovery log observed
raise BMQITError("State is not restored") from ex
raise BMQTestError("State is not restored") from ex
sleep(5)

self._verify_all_queues_operational(domain_urls) # After recovery
Expand All @@ -149,14 +146,20 @@ def check_received_one_of(consumer: Client, uri_group: str, *expected):
)
msgs = consumer.list(uri_group, block=True)
self.history.append((str(consumer.name), uri_group, msgs))
if len(msgs) != 1:
raise BMQITError(
f"Expected 1 message, got: {len(msgs)} msgs.\nMessages history:\n{self.history}"
)
if msgs[0].payload not in expected:
raise BMQITError(
f"Unexpected message payload: {msgs[0].payload} (expected: {expected})"
)
BMQTST_ASSERT(len(msgs) == 1,
"Expected exactly 1 message",
client=consumer,
uri=uri_group,
observed_messages=msgs,
history=self.history)
BMQTST_ASSERT(msgs[0].payload in expected,
"Unexpected message payload",
client=consumer,
uri=uri_group,
observed_messages=msgs,
history=self.history,
expected_any_of=expected
)
consumer.confirm(uri_group, "*", succeed=True)

def check_both_received_one_of(*expected):
Expand Down
6 changes: 3 additions & 3 deletions src/integration-tests/test_startup.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
def test_early_assign(multi_node: Cluster, domain_urls: tc.DomainUrls):
"""
Early open a queue on a soon-to-be leader. Legacy leader when it becomes
ACTIVE, starts assigning queues _before_ it assignes partitions. Then,
the leader observes `onQueueAssigned` event _before_ it becomes ACTVIE
ACTIVE, starts assigning queues _before_ it assigns partitions. Then,
the leader observes `onQueueAssigned` event _before_ it becomes ACTIVE
primary. If that event logic erroneously decides that the self is replica,
the soon-to-be primary does not write QueueCreationRecord. The primary
still writes any posted message record though. That leads to either assert
Expand Down Expand Up @@ -70,7 +70,7 @@ def test_early_assign(multi_node: Cluster, domain_urls: tc.DomainUrls):
def test_replica_late_join(multi_node: Cluster, domain_urls: tc.DomainUrls):
"""
In a steady-state cluster where only one replica node is down, with live
messages flowing, the replica node rejoins and attemps to heal itself via
messages flowing, the replica node rejoins and attempts to heal itself via
the primary. The concern is that the primary could send live data to the
replica, and then re-send that data as recovery data chunks; we would like
to make sure our deduplication logic is working correctly to handle this
Expand Down
16 changes: 10 additions & 6 deletions src/integration-tests/test_strong_consistency.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
multi_node,
tweak,
)
from blazingmq.dev.it.common import BMQTestError, BMQTST_ASSERT
from blazingmq.dev.it.util import wait_until
from blazingmq.schemas import mqbconf

Expand Down Expand Up @@ -105,7 +106,7 @@ def _break_post_unbreak(self, multi_node, breaker, has_timeout):
lambda: len(self.consumer.list(uri, block=True)) == 1, 2
)
# the WC queue was last sending data and the SC one was first which
# means it is sufficent to wait on WC before checking SC
# means it is sufficient to wait on WC before checking SC

if has_timeout:
# expect NACK
Expand All @@ -115,7 +116,7 @@ def _break_post_unbreak(self, multi_node, breaker, has_timeout):
else:
# make sure there are neither SC ACK(s) nor message(s)
assert not self.producer.outputs_regex(
f"MESSAGE.*ACK.*{tc.URI_FANOUT_SC}", 1
f"MESSAGE.*ACK.*{tc.URI_FANOUT_SC}", timeout=1
)

for uri in [
Expand All @@ -128,15 +129,18 @@ def _break_post_unbreak(self, multi_node, breaker, has_timeout):
if not has_timeout:
self.consumer.wait_push_event(timeout=120)
# make sure there are SC ACK(s) and message(s)
assert self.producer.outputs_regex(f"MESSAGE.*ACK.*{tc.URI_FANOUT_SC}", 25)
BMQTST_ASSERT(self.producer.outputs_regex(f"MESSAGE.*ACK.*{tc.URI_FANOUT_SC}", timeout=25),
client=self.producer, uri=tc.URI_FANOUT_SC)
for uri in [
tc.URI_FANOUT_SC_FOO,
tc.URI_FANOUT_SC_BAR,
tc.URI_FANOUT_SC_BAZ,
]:
assert wait_until(
lambda: len(self.consumer.list(uri, block=True)) == 1, 2
)
BMQTST_ASSERT(wait_until(lambda: len(self.consumer.list(uri, block=True)) == 1, timeout=2),
"Consumer expected exactly 1 message",
client=self.consumer,
uri=uri
)

def test_suspend_post_resume(
self,
Expand Down
2 changes: 1 addition & 1 deletion src/python/blazingmq/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@


class BMQError(RuntimeError):
"Base class for all exceptions raised by BMQ."
"""Base class for all exceptions raised by BlazingMQ."""
59 changes: 59 additions & 0 deletions src/python/blazingmq/dev/it/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Copyright 2025 Bloomberg Finance L.P.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
blazingmq.dev.it.common

PURPOSE: Provide common types and functions for ITs.
"""

import traceback
from typing import Any, Dict, Optional


from blazingmq.core import BMQError


class BMQTestError(BMQError):
"""BlazingMQ test failure exception."""


def _context_to_str(context: Dict[str, Any]) -> str:
context_str = "\n".join(f" {key} = {val}" for key, val in context.items())
if len(context_str) == 0:
context_str = "EMPTY"
return context_str


def BMQTST_ASSERT(condition, message: Optional[str] = None, **kwargs) -> None:
if condition:
return

tb = traceback.StackSummary.extract(
frame_gen=traceback.walk_stack(None), capture_locals=False
)
if isinstance(tb, list) and 2 <= len(tb):
# [0] -> this function
# [1] -> caller function
fail_line = f"{tb[1].line} ({tb[1].filename}:{tb[1].lineno})"
else:
# Should never happen, don't want to use a nested assert to check it
fail_line = "UNDEFINED"

context_str = _context_to_str(kwargs)

raise BMQTestError(
f"{message or 'Failed condition'}\nFailure at '{fail_line}', context:\n{context_str}"
)
39 changes: 26 additions & 13 deletions src/python/blazingmq/dev/it/process/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
if TYPE_CHECKING:
from blazingmq.dev.it.cluster import Cluster

from blazingmq.dev.it.common import BMQTST_ASSERT
from blazingmq.dev.it.process import proc
import blazingmq.dev.it.process.bmqproc
import blazingmq.dev.it.testconstants as tc
Expand Down Expand Up @@ -103,7 +104,7 @@ def pid(self):
IMPLEMENTATION NOTE: The broker is started via a script that sets up
the environment and creates directories for storage and logs, then runs
the broker proper via the 'exec' shell command. Depending on the shell
in use, a new process is created, or not. Thus 'pid' is overridden to
in use, a new process is created, or not. Thus, 'pid' is overridden to
make methods like 'kill' and 'stack_trace' use the correct process id.
"""
return self._pid
Expand All @@ -113,10 +114,14 @@ def wait_until_started(self):
Wait until the broker has started.
"""
with internal_use(self):
if not self.outputs_substr(
"BMQbrkr started successfully", timeout=START_TIMEOUT
):
raise RuntimeError(f"Failed to start broker on {self.name}: timeout")
BMQTST_ASSERT(
self.outputs_substr(
"BMQbrkr started successfully", timeout=START_TIMEOUT
),
"Failed to start broker: timeout",
name=self.name,
cwd=self._cwd,
)

with (self._cwd / "bmqbrkr.pid").open("r") as file:
self._pid = int(file.read())
Expand Down Expand Up @@ -210,17 +215,25 @@ def wait_status(

if wait_leader:
leader_name = matches.pop(0)
if leader_name is None:
error = f"[broker {self.name}]: no active leader"
self._logger.error(error)
raise RuntimeError(error)
BMQTST_ASSERT(
leader_name is not None,
"No active leader",
name=self.name,
cwd=self._cwd,
)

self.last_known_leader = self.cluster.process(leader_name[1])
self._logger.log(self._log_level, "leader is %s", self.last_known_leader)

if wait_ready and matches.pop(0) is None:
error = f"[broker {self.name}]: cluster not ready"
self._logger.error(error)
raise RuntimeError(error)
if wait_ready:
cluster_name = matches.pop(0)
BMQTST_ASSERT(
cluster_name is not None,
"Cluster is not ready",
cluster=cluster,
name=self.name,
cwd=self._cwd,
)

def dump_queue_internals(self, domain, queue):
"""
Expand Down
8 changes: 7 additions & 1 deletion src/python/blazingmq/dev/it/process/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,14 @@ def __init__(
if dump_messages:
options.append("-d")

self._endpoint: str = f"tcp://{broker[0]}:{broker[1]}"

super().__init__(
name,
[
str(tool_path),
"-b",
f"tcp://{broker[0]}:{broker[1]}",
self._endpoint,
f'--logFormat="{bmqproc.PROC_LOG_FORMAT}"',
]
+ options,
Expand All @@ -126,6 +128,10 @@ def __init__(
**kwargs,
)

def __repr__(self):
"""Provide a string representation of this object for debug."""
return f"Client(name='{self.name}', endpoint='{self._endpoint}')"

###########################################################################
# Public API

Expand Down
Loading