Skip to content

Commit 4cb1a81

Browse files
Hello world for Python (rstream) stream client (#404)
1 parent bfe6dd0 commit 4cb1a81

File tree

4 files changed

+99
-0
lines changed

4 files changed

+99
-0
lines changed

python-stream/README.md

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Python code for RabbitMQ tutorials
2+
3+
4+
Here you can find Python code examples from [RabbitMQ tutorials](https://www.rabbitmq.com/getstarted.html).
5+
6+
7+
## Requirements
8+
9+
These examples use the [`qweeze/rstream`](https://github.com/qweeze/rstream) client library.
10+
11+
Get it first with the necessary dependencies:
12+
13+
pip3 install -r requirements.txt
14+
15+
## Code
16+
17+
Code examples are executed via `python3`:
18+
19+
[Tutorial one: "Hello World!"](https://www.rabbitmq.com/tutorials/tutorial-one-python-stream):
20+
21+
python3 send.py
22+
python3 receive.py
23+
24+
25+
To learn more, see [`rqweeze/rstream`](https://github.com/qweeze/rstream).

python-stream/receive.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import asyncio
2+
import signal
3+
4+
from rstream import (
5+
AMQPMessage,
6+
Consumer,
7+
MessageContext,
8+
ConsumerOffsetSpecification,
9+
OffsetType,
10+
)
11+
12+
STREAM_NAME = "hello-python-stream"
13+
# 5GB
14+
STREAM_RETENTION = 5000000000
15+
16+
17+
async def receive():
18+
consumer = Consumer(host="localhost", username="guest", password="guest")
19+
await consumer.create_stream(
20+
STREAM_NAME, exists_ok=True, arguments={"MaxLengthBytes": STREAM_RETENTION}
21+
)
22+
23+
loop = asyncio.get_event_loop()
24+
loop.add_signal_handler(
25+
signal.SIGINT, lambda: asyncio.create_task(consumer.close())
26+
)
27+
28+
async def on_message(msg: AMQPMessage, message_context: MessageContext):
29+
stream = message_context.consumer.get_stream(message_context.subscriber_name)
30+
print("Got message: {} from stream {}".format(msg, stream))
31+
32+
print("Press control +C to close")
33+
await consumer.start()
34+
await consumer.subscribe(
35+
stream=STREAM_NAME,
36+
callback=on_message,
37+
offset_specification=ConsumerOffsetSpecification(OffsetType.FIRST, None),
38+
)
39+
await consumer.run()
40+
# give time to the consumer task to close the consumer
41+
await asyncio.sleep(1)
42+
43+
44+
asyncio.run(receive())

python-stream/requirements.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
rstream
2+
uamqp
3+
mmh3

python-stream/send.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import asyncio
2+
3+
from rstream import Producer
4+
5+
STREAM_NAME = "hello-python-stream"
6+
# 5GB
7+
STREAM_RETENTION = 5000000000
8+
9+
10+
async def send():
11+
async with Producer(
12+
host="localhost",
13+
username="guest",
14+
password="guest",
15+
) as producer:
16+
await producer.create_stream(
17+
STREAM_NAME, exists_ok=True, arguments={"MaxLengthBytes": STREAM_RETENTION}
18+
)
19+
20+
await producer.send(stream=STREAM_NAME, message=b"Hello, World!")
21+
22+
print(" [x] Hello, World! message sent")
23+
24+
input(" [x] Press Enter to close the producer ...")
25+
26+
27+
asyncio.run(send())

0 commit comments

Comments
 (0)