How can I perform a long task #5154
-
We just recently upgraded from RabbitMQ 3.3.5 to 3.8.27 It is found that some long-running tasks (about 1 hour) will trigger message requeue So I adjusted consumer_timeout to 24 hours But the message will still trigger redelivery in about 3 minutes. Why is this I tried to adjust the heartbeat to 0, 3600 and found that it still didn't work Thank you every one ! Producer
Consumer
|
Beta Was this translation helpful? Give feedback.
Replies: 4 comments 5 replies
-
We cannot suggest much without any server logs. A traffic capture would tell you exactly what both sides send. If you perform a blocking operation in the consumer handler, with some clients this may block all outgoing I/O and result in missed heartbeats detected by RabbitMQ. The default heartbeat timeout is 60 seconds and not three minutes, though. This is a Pika-specific question. |
Beta Was this translation helpful? Give feedback.
-
Also, if a consumer delivery hits a timeout, there will be a specific log message. The default is 30 minutes, not 3. You can inspect your node's effective configuration as it may actually be different from what you think it is. |
Beta Was this translation helpful? Give feedback.
-
Finally, RabbitMQ 3.8 is about to go completely unsupported. While Consider upgrading to 3.10. |
Beta Was this translation helpful? Give feedback.
-
It is documented here: Here the example: import pika
import time
from datetime import datetime
class Consumer:
def __init__(self):
params = pika.ConnectionParameters(heartbeat=600, blocked_connection_timeout=300)
connection = pika.BlockingConnection(params)
self._channel = connection.channel()
def start(self):
start = datetime.now()
def on_message_callback(channel, method, properties, body):
print(body)
for i in range(3600):
print("seconds: {} id: {}".format((datetime.now()-start).total_seconds(), i) )
time.sleep(1)
print('回复 ack')
self._channel.basic_ack(method.delivery_tag)
self._channel.basic_qos(prefetch_count=50)
self._channel.basic_consume('queue1', on_message_callback)
self._channel.start_consuming()
if __name__ == '__main__':
Consumer().start() I left it up and running for a while:
The connection was still up and running:
|
Beta Was this translation helpful? Give feedback.
It is documented here:
https://pika.readthedocs.io/en/stable/examples/heartbeat_and_blocked_timeouts.html
Here the example:
https://github.com/pika/pika/blob/main/examples/heartbeat_and_blocked_timeouts.py