Skip to content

Commit 5b45296

Browse files
authored
feat: Added support for faststream >= 0.6.0 (#205)
1 parent 27e8246 commit 5b45296

File tree

4 files changed

+145
-53
lines changed

4 files changed

+145
-53
lines changed

.github/workflows/ci.yml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ jobs:
3333
- "3.12"
3434
- "3.13"
3535
- "3.14"
36+
faststream-version:
37+
- "<0.6.0"
38+
- ">=0.6.0"
3639
steps:
3740
- uses: actions/checkout@v5
3841
- uses: extractions/setup-just@v3
@@ -41,7 +44,7 @@ jobs:
4144
cache-dependency-glob: "**/pyproject.toml"
4245
- run: uv python install ${{ matrix.python-version }}
4346
- run: just install
44-
- run: just test . --cov=. --cov-report xml
47+
- run: uv run --with "faststream${{ matrix.faststream-version }}" pytest . --cov=. --cov-report xml
4548
- uses: codecov/[email protected]
4649
env:
4750
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ fastapi = [
2424
"fastapi",
2525
]
2626
faststream = [
27-
"faststream<0.6.0"
27+
"faststream"
2828
]
2929

3030
[project.urls]

tests/integrations/faststream/test_faststream_di_pass_message.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,18 @@
11
import typing
22

33
from faststream import BaseMiddleware, Context, Depends
4-
from faststream.broker.message import StreamMessage
54
from faststream.nats import NatsBroker, TestNatsBroker
65
from faststream.nats.message import NatsMessage
6+
from packaging.version import Version
77

88
from that_depends import BaseContainer, container_context, fetch_context_item, providers
9+
from that_depends.integrations.faststream import _FASTSTREAM_VERSION
10+
11+
12+
if Version(_FASTSTREAM_VERSION) >= Version("0.6.0"): # pragma: no cover
13+
from faststream.message import StreamMessage
14+
else: # pragma: no cover
15+
from faststream.broker.message import StreamMessage # type: ignore[import-not-found, no-redef]
916

1017

1118
class ContextMiddleware(BaseMiddleware):
@@ -18,7 +25,11 @@ async def consume_scope(
1825
return await call_next(msg)
1926

2027

21-
broker = NatsBroker(middlewares=(ContextMiddleware,), validate=False)
28+
if Version(_FASTSTREAM_VERSION) >= Version("0.6.0"): # pragma: no cover
29+
broker = NatsBroker(middlewares=(ContextMiddleware,))
30+
31+
else: # pragma: no cover
32+
broker = NatsBroker(middlewares=(ContextMiddleware,), validate=False) # type: ignore[call-arg]
2233

2334
TEST_SUBJECT = "test"
2435

Lines changed: 127 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,58 +1,136 @@
11
import typing
2+
from importlib.metadata import version
23
from types import TracebackType
3-
from typing import Any, Optional
4+
from typing import Any, Final, Optional
45

5-
from faststream import BaseMiddleware
6-
from typing_extensions import override
6+
from packaging.version import Version
7+
from typing_extensions import deprecated, override
78

89
from that_depends import container_context
910
from that_depends.providers.context_resources import ContextScope, SupportsContext
1011
from that_depends.utils import UNSET, Unset, is_set
1112

1213

13-
class DIContextMiddleware(BaseMiddleware):
14-
"""Initializes the container context for faststream brokers."""
15-
16-
def __init__(
17-
self,
18-
*context_items: SupportsContext[Any],
19-
global_context: dict[str, Any] | Unset = UNSET,
20-
scope: ContextScope | Unset = UNSET,
21-
) -> None:
22-
"""Initialize the container context middleware.
23-
24-
Args:
25-
*context_items (SupportsContext[Any]): Context items to initialize.
26-
global_context (dict[str, Any] | Unset): Global context to initialize the container.
27-
scope (ContextScope | Unset): Context scope to initialize the container.
28-
29-
"""
30-
super().__init__()
31-
self._context: container_context | None = None
32-
self._context_items = set(context_items)
33-
self._global_context = global_context
34-
self._scope = scope
35-
36-
@override
37-
async def on_receive(self) -> None:
38-
self._context = container_context(
39-
*self._context_items,
40-
scope=self._scope if is_set(self._scope) else None,
41-
global_context=self._global_context if is_set(self._global_context) else None,
42-
)
43-
await self._context.__aenter__()
44-
45-
@override
46-
async def after_processed(
47-
self,
48-
exc_type: type[BaseException] | None = None,
49-
exc_val: BaseException | None = None,
50-
exc_tb: Optional["TracebackType"] = None,
51-
) -> bool | None:
52-
if self._context is not None:
53-
await self._context.__aexit__(exc_type, exc_val, exc_tb)
54-
return None
55-
56-
def __call__(self, *args: typing.Any, **kwargs: typing.Any) -> "DIContextMiddleware": # noqa: ARG002, ANN401
57-
"""Create an instance of DIContextMiddleware."""
58-
return DIContextMiddleware(*self._context_items, scope=self._scope, global_context=self._global_context)
14+
_FASTSTREAM_MODULE_NAME: Final[str] = "faststream"
15+
_FASTSTREAM_VERSION: Final[str] = version(_FASTSTREAM_MODULE_NAME)
16+
if Version(_FASTSTREAM_VERSION) >= Version("0.6.0"): # pragma: no cover
17+
from faststream import BaseMiddleware, ContextRepo
18+
from faststream._internal.types import AnyMsg
19+
20+
class DIContextMiddleware(BaseMiddleware):
21+
"""Initializes the container context for faststream brokers."""
22+
23+
def __init__(
24+
self,
25+
*context_items: SupportsContext[Any],
26+
msg: AnyMsg | None = None,
27+
context: Optional["ContextRepo"] = None,
28+
global_context: dict[str, Any] | Unset = UNSET,
29+
scope: ContextScope | Unset = UNSET,
30+
) -> None:
31+
"""Initialize the container context middleware.
32+
33+
Args:
34+
*context_items (SupportsContext[Any]): Context items to initialize.
35+
msg (Any): Message object.
36+
context (ContextRepo): Context repository.
37+
global_context (dict[str, Any] | Unset): Global context to initialize the container.
38+
scope (ContextScope | Unset): Context scope to initialize the container.
39+
40+
"""
41+
super().__init__(msg, context=context) # type: ignore[arg-type]
42+
self._context: container_context | None = None
43+
self._context_items = set(context_items)
44+
self._global_context = global_context
45+
self._scope = scope
46+
47+
@override
48+
async def on_receive(self) -> None:
49+
self._context = container_context(
50+
*self._context_items,
51+
scope=self._scope if is_set(self._scope) else None,
52+
global_context=self._global_context if is_set(self._global_context) else None,
53+
)
54+
await self._context.__aenter__()
55+
56+
@override
57+
async def after_processed(
58+
self,
59+
exc_type: type[BaseException] | None = None,
60+
exc_val: BaseException | None = None,
61+
exc_tb: Optional["TracebackType"] = None,
62+
) -> bool | None:
63+
if self._context is not None:
64+
await self._context.__aexit__(exc_type, exc_val, exc_tb)
65+
return None
66+
67+
def __call__(self, msg: Any = None, **kwargs: Any) -> "DIContextMiddleware": # noqa: ANN401
68+
"""Create an instance of DIContextMiddleware.
69+
70+
Args:
71+
msg (Any): Message object.
72+
**kwargs: Additional keyword arguments.
73+
74+
Returns:
75+
DIContextMiddleware: A new instance of DIContextMiddleware.
76+
77+
"""
78+
context = kwargs.get("context")
79+
80+
return DIContextMiddleware(
81+
*self._context_items,
82+
msg=msg,
83+
context=context,
84+
scope=self._scope,
85+
global_context=self._global_context,
86+
)
87+
else: # pragma: no cover
88+
from faststream import BaseMiddleware
89+
90+
@deprecated("Will be removed with faststream v1")
91+
class DIContextMiddleware(BaseMiddleware): # type: ignore[no-redef]
92+
"""Initializes the container context for faststream brokers."""
93+
94+
def __init__(
95+
self,
96+
*context_items: SupportsContext[Any],
97+
global_context: dict[str, Any] | Unset = UNSET,
98+
scope: ContextScope | Unset = UNSET,
99+
) -> None:
100+
"""Initialize the container context middleware.
101+
102+
Args:
103+
*context_items (SupportsContext[Any]): Context items to initialize.
104+
global_context (dict[str, Any] | Unset): Global context to initialize the container.
105+
scope (ContextScope | Unset): Context scope to initialize the container.
106+
107+
"""
108+
super().__init__() # type: ignore[call-arg]
109+
self._context: container_context | None = None
110+
self._context_items = set(context_items)
111+
self._global_context = global_context
112+
self._scope = scope
113+
114+
@override
115+
async def on_receive(self) -> None:
116+
self._context = container_context(
117+
*self._context_items,
118+
scope=self._scope if is_set(self._scope) else None,
119+
global_context=self._global_context if is_set(self._global_context) else None,
120+
)
121+
await self._context.__aenter__()
122+
123+
@override
124+
async def after_processed(
125+
self,
126+
exc_type: type[BaseException] | None = None,
127+
exc_val: BaseException | None = None,
128+
exc_tb: Optional["TracebackType"] = None,
129+
) -> bool | None:
130+
if self._context is not None:
131+
await self._context.__aexit__(exc_type, exc_val, exc_tb)
132+
return None
133+
134+
def __call__(self, *args: typing.Any, **kwargs: typing.Any) -> "DIContextMiddleware": # noqa: ARG002, ANN401
135+
"""Create an instance of DIContextMiddleware."""
136+
return DIContextMiddleware(*self._context_items, scope=self._scope, global_context=self._global_context)

0 commit comments

Comments
 (0)