Unable to consume from streams in Scala or Python #14010
-
Describe the bugWe have tried using rabbitmq streams to produce and consume, both in Scala and python-based pods. Reproduction steps
and the consumer:
The message received block is never reached. Expected behaviorWe would expect messages to be picked up from the stream. This did not work in Scala either... Additional contextNo response |
Beta Was this translation helpful? Give feedback.
Replies: 4 comments 6 replies
-
@thoefkens you haven't provided any evidence of a bug in RabbitMQ, so this is discussion material. Your code uses a combination of asyncio, threads and signal handlers. Start with this Python tutorial that happens to use the same client library, Plus start with just one standalone node, not a cluster on Kubernetes or whatever. Get that to work first, then add complexity as you go, we won't troubleshoot other people's K8S deployments and guess what's in your logs. |
Beta Was this translation helpful? Give feedback.
-
@thoefkens, following what @michaelklishin said, I would start with something easier. Easy way to test:
then
When this ^^^ works, I would add a cluster configuration. Note: In case of a cluster configuration with a proxy / load-balancer, you need to use load balancer mode |
Beta Was this translation helpful? Give feedback.
-
@thoefkens are you aware of this blog post on the nuances of connections using the RabbitMQ Stream Protocol? It's essential to know if you intend to deploy your apps with a load balancer. |
Beta Was this translation helpful? Give feedback.
-
@thoefkens I tested your setup Cluster from https://github.com/rabbitmq/rabbitmq-stream-go-client/tree/main/compose:
Three cluster nodes: Run your script ( with a lb mode and changed the port to 5553) import asyncio
import os
import json
import logging
from rstream import (
AMQPMessage,
Consumer,
ConsumerOffsetSpecification,
MessageContext,
OffsetType,
)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
RABBITMQ_HOST = os.getenv('RABBITMQ_HOST', 'localhost')
RABBITMQ_PORT = int(os.getenv('RABBITMQ_PORT', '5553'))
RABBITMQ_USERNAME = os.getenv('RABBITMQ_USERNAME', 'guest')
RABBITMQ_PASSWORD = os.getenv('RABBITMQ_PASSWORD', 'guest')
STREAM_NAME = os.getenv('STREAM_NAME', 'hello-python-stream')
STREAM_RETENTION = 5000000000 # 5GB
async def receive():
try:
logger.info(f"Attempting to connect to {RABBITMQ_HOST}...")
# Simplified connection - exactly like working receive.py
async with Consumer(
host=RABBITMQ_HOST,
port=RABBITMQ_PORT,
username=RABBITMQ_USERNAME,
password=RABBITMQ_PASSWORD,
load_balancer_mode=True,
) as consumer:
# Create stream if it doesn't exist - exactly like receive.py
logger.info(f"Creating/verifying stream: {STREAM_NAME}")
await consumer.create_stream(
STREAM_NAME, exists_ok=True, arguments={"max-length-bytes": STREAM_RETENTION}
)
# Message callback - simplified like working receive.py
async def on_message(msg: AMQPMessage, message_context: MessageContext):
try:
stream = message_context.consumer.get_stream(message_context.subscriber_name)
# Print like working receive.py - simple and direct
print("Got message: {} from stream: {}".format(msg, stream))
except Exception as e:
logger.error(f"Error processing message: {e}")
logger.info(f"Successfully connected to {RABBITMQ_HOST}")
logger.info("Starting consumer...")
await consumer.start()
logger.info(f"Subscribing to stream '{STREAM_NAME}' with FIRST offset...")
await consumer.subscribe(
stream=STREAM_NAME,
callback=on_message,
offset_specification=ConsumerOffsetSpecification(OffsetType.FIRST, None),
)
logger.info("Consumer subscribed successfully - waiting for messages...")
logger.info("Press control + C to close")
try:
await consumer.run()
except (KeyboardInterrupt, asyncio.CancelledError):
print("Closing Consumer...")
return
except Exception as e:
logger.error(f"Failed to connect to {RABBITMQ_HOST}: {e}")
if __name__ == '__main__':
logger.info("Starting Plot Consumer - Simplified like working receive.py")
logger.info(f"RabbitMQ Host: {RABBITMQ_HOST}")
logger.info(f"RabbitMQ Port: {RABBITMQ_PORT}")
logger.info(f"Stream Name: {STREAM_NAME}")
logger.info(f"Username: {RABBITMQ_USERNAME}")
logger.info("Using FIRST offset - will read all messages from the beginning of the stream")
with asyncio.Runner() as runner:
runner.run(receive()) then the sender: import asyncio
import time
from rstream import Producer, ConfirmationStatus, AMQPMessage
STREAM_NAME = "hello-python-stream"
# 5GB
STREAM_RETENTION = 5000000000
async def _on_publish_confirm_client(confirmation: ConfirmationStatus) -> None:
if confirmation.is_confirmed:
print("message id: {} is confirmed".format(confirmation.message_id))
else:
print(
"message id: {} not confirmed. Response code {}".format(
confirmation.message_id, confirmation.response_code
)
)
async def send():
async with Producer(
host="localhost",
username="guest",
password="guest",
port=5553,
load_balancer_mode=True,
) as producer:
for i in range(1000):
amqp_message = AMQPMessage(
body=bytes("My AMQP message: {}".format(i), "utf-8"),
)
# send is asynchronous
await producer.send(stream=STREAM_NAME, message=amqp_message, on_publish_confirm=_on_publish_confirm_client)
await asyncio.sleep(1)
print("sent message: {}".format(amqp_message.body))
print(" [x] Hello, World! message sent")
input(" [x] Press Enter to close the producer ...")
asyncio.run(send()) I did not try in k8s. This test should be enough. |
Beta Was this translation helpful? Give feedback.
@thoefkens I tested your setup
Cluster from https://github.com/rabbitmq/rabbitmq-stream-go-client/tree/main/compose:
Three cluster nodes:
Run your script ( with a lb mode and changed the port to 5553)