|
2 | 2 | PyCyphal -- Python implementation of the Cyphal v1.1 session layer. |
3 | 3 |
|
4 | 4 | All public entities of the library are directly exposed at the top level, here; |
5 | | -implementations of the interfaces shown here are hidden and private. |
| 5 | +implementations of the public interfaces are hidden and private. |
6 | 6 | One exception applies to transport-specific direct submodules like ``pycyphal.udp``, ``pycyphal.can`` -- |
7 | 7 | the user should import only the needed transports manually. |
8 | 8 |
|
|
11 | 11 |
|
12 | 12 | from __future__ import annotations |
13 | 13 |
|
14 | | -from abc import ABC, abstractmethod |
15 | | -from dataclasses import dataclass |
16 | | -import random |
17 | | -import socket |
18 | | - |
19 | | -# We do not import anything from implementation modules here do there's no risk of circular dependencies. |
20 | | -from ._common import DeliveryError as DeliveryError |
21 | | -from ._common import Error as Error |
22 | | -from ._common import Instant as Instant |
23 | | -from ._common import LivenessError as LivenessError |
24 | | -from ._common import NackError as NackError |
25 | | -from ._common import SendError as SendError |
26 | | -from ._common import Priority as Priority |
27 | | -from ._common import Closable as Closable |
28 | | -from ._common import SUBJECT_ID_PINNED_MAX as SUBJECT_ID_PINNED_MAX |
| 14 | +from ._api import * |
29 | 15 | from ._transport import Transport as Transport |
30 | 16 | from ._transport import TransportArrival as TransportArrival |
31 | 17 | from ._transport import SubjectWriter as SubjectWriter |
32 | 18 |
|
33 | 19 | __version__ = "2.0.0.dev0" |
34 | | - |
35 | | - |
36 | | -class Topic(ABC): |
37 | | - """ |
38 | | - Topics are managed automatically by the library, created and destroyed as necessary. |
39 | | - This is just a compact view to expose some auxiliary information. |
40 | | - """ |
41 | | - |
42 | | - @property |
43 | | - @abstractmethod |
44 | | - def hash(self) -> int: |
45 | | - raise NotImplementedError |
46 | | - |
47 | | - @property |
48 | | - @abstractmethod |
49 | | - def name(self) -> str: |
50 | | - raise NotImplementedError |
51 | | - |
52 | | - @abstractmethod |
53 | | - def match(self, pattern: str) -> list[tuple[str, int]] | None: |
54 | | - """ |
55 | | - If the pattern matches the topic name, returns the name segment substitutions needed to achieve the match. |
56 | | - None if there is no match. Empty list for verbatim subscribers (match only one topic), where pattern==name. |
57 | | - Each substitution is the segment and the index of the substitution character in the pattern. |
58 | | - """ |
59 | | - raise NotImplementedError |
60 | | - |
61 | | - |
62 | | -@dataclass(frozen=True) |
63 | | -class Response: |
64 | | - """ |
65 | | - Represents a response to a published message from the specified remote node. |
66 | | - Seqno is managed by the remote, incrementing by one with each response starting from zero. |
67 | | - """ |
68 | | - |
69 | | - timestamp: Instant |
70 | | - remote_id: int |
71 | | - seqno: int |
72 | | - message: bytes |
73 | | - |
74 | | - |
75 | | -class ResponseStream(Closable, ABC): |
76 | | - """ |
77 | | - Represents the expectation of response arrivals, used for one-off RPC and streaming. |
78 | | - Generates multiple async results, one per received response or per error. |
79 | | - Async iterator will continue to yield new messages until close()d, even after delivery/liveness errors. |
80 | | - """ |
81 | | - |
82 | | - def __aiter__(self) -> ResponseStream: |
83 | | - return self |
84 | | - |
85 | | - async def __anext__(self) -> Response: |
86 | | - """ |
87 | | - Blocks until response is received. |
88 | | - Raises DeliveryError if request could not be delivered by the deadline, LivenessError on response timeout, |
89 | | - SendError if failed to send the response before the deadline. |
90 | | - """ |
91 | | - raise NotImplementedError |
92 | | - |
93 | | - |
94 | | -class Publisher(Closable, ABC): |
95 | | - """ |
96 | | - Represents the intent to send messages on a topic. |
97 | | - This is callable; an invocation triggers publication. |
98 | | - For publications that expect a response, use the ``request`` method. |
99 | | - """ |
100 | | - |
101 | | - @property |
102 | | - @abstractmethod |
103 | | - def topic(self) -> Topic: |
104 | | - raise NotImplementedError |
105 | | - |
106 | | - @property |
107 | | - @abstractmethod |
108 | | - def priority(self) -> Priority: |
109 | | - raise NotImplementedError |
110 | | - |
111 | | - @priority.setter |
112 | | - @abstractmethod |
113 | | - def priority(self, priority: Priority) -> None: |
114 | | - raise NotImplementedError |
115 | | - |
116 | | - @property |
117 | | - @abstractmethod |
118 | | - def ack_timeout(self) -> float: |
119 | | - """ |
120 | | - The ack timeout controls the baseline retransmission delay. |
121 | | - The number of attempts is controlled by the deadline specified at publish time. |
122 | | - """ |
123 | | - raise NotImplementedError |
124 | | - |
125 | | - @ack_timeout.setter |
126 | | - @abstractmethod |
127 | | - def ack_timeout(self, duration: float) -> None: |
128 | | - raise NotImplementedError |
129 | | - |
130 | | - @abstractmethod |
131 | | - async def __call__(self, deadline: Instant, message: memoryview | bytes, *, reliable: bool = False) -> None: |
132 | | - """ |
133 | | - Blocks at most until the deadline. Raises SendError if couldn't be sent before the deadline. |
134 | | - If reliable, DeliveryError will be raised unless acked by the remote subscribers before deadline. |
135 | | - """ |
136 | | - raise NotImplementedError |
137 | | - |
138 | | - @abstractmethod |
139 | | - async def request( |
140 | | - self, delivery_deadline: Instant, response_timeout: float, message: memoryview | bytes |
141 | | - ) -> ResponseStream: |
142 | | - """ |
143 | | - Publish a message and expect responses. See ResponseStream for details. |
144 | | - """ |
145 | | - raise NotImplementedError |
146 | | - |
147 | | - |
148 | | -class Breadcrumb(ABC): |
149 | | - """ |
150 | | - The breadcrumb can be used, optionally, to send responses RPC-style or streamed back to a message publisher. |
151 | | - Instances can be retained after message reception for as long as necessary. |
152 | | - One instance is shared across all subscribers receiving the same message, ensuring contiguous seqno across |
153 | | - all responses. |
154 | | -
|
155 | | - Responses are always sent at the same priority as that of the request. |
156 | | - Internally, the library tracks the seqno that starts at zero and is incremented with every response. |
157 | | -
|
158 | | - The set of (remote-ID, topic hash, message tag) forms a globally unique stream identification triplet, |
159 | | - which can be hashed down to a single number for convenience. |
160 | | - """ |
161 | | - |
162 | | - @property |
163 | | - @abstractmethod |
164 | | - def remote_id(self) -> int: |
165 | | - raise NotImplementedError |
166 | | - |
167 | | - @property |
168 | | - @abstractmethod |
169 | | - def topic(self) -> Topic: |
170 | | - raise NotImplementedError |
171 | | - |
172 | | - @property |
173 | | - @abstractmethod |
174 | | - def tag(self) -> int: |
175 | | - raise NotImplementedError |
176 | | - |
177 | | - @abstractmethod |
178 | | - async def __call__(self, deadline: Instant, message: memoryview | bytes, *, reliable: bool = False) -> None: |
179 | | - """ |
180 | | - Invoke multiple times to stream multiple responses. |
181 | | - Blocks at most until the deadline. Raises SendError if couldn't be sent before the deadline. |
182 | | - If reliable: |
183 | | - - DeliveryError will be raised unless acked by the remote subscribers before deadline. |
184 | | - - NackError will be raised if the remote is no longer accepting responses. |
185 | | - """ |
186 | | - raise NotImplementedError |
187 | | - |
188 | | - |
189 | | -@dataclass(frozen=True) |
190 | | -class Arrival: |
191 | | - """ |
192 | | - Represents a message received from a topic. |
193 | | - The breadcrumb allows sending responses back to the publisher, thus enabling RPC and streaming. |
194 | | - """ |
195 | | - |
196 | | - timestamp: Instant |
197 | | - breadcrumb: Breadcrumb |
198 | | - message: bytes |
199 | | - |
200 | | - |
201 | | -class Subscriber(Closable, ABC): |
202 | | - @property |
203 | | - @abstractmethod |
204 | | - def pattern(self) -> str: |
205 | | - raise NotImplementedError |
206 | | - |
207 | | - @property |
208 | | - @abstractmethod |
209 | | - def verbatim(self) -> bool: |
210 | | - """ |
211 | | - True if the pattern does not contain substitution segments named `*` and `>`. |
212 | | - """ |
213 | | - raise NotImplementedError |
214 | | - |
215 | | - @property |
216 | | - @abstractmethod |
217 | | - def timeout(self) -> float: |
218 | | - """ |
219 | | - By default, the timeout is infinite, meaning that LivenessError will never be returned. |
220 | | - The user can override this as needed. Setting a non-finite timeout disables this feature. |
221 | | - """ |
222 | | - raise NotImplementedError |
223 | | - |
224 | | - @timeout.setter |
225 | | - @abstractmethod |
226 | | - def timeout(self, duration: float) -> None: |
227 | | - raise NotImplementedError |
228 | | - |
229 | | - @abstractmethod |
230 | | - def substitutions(self, topic: Topic) -> list[tuple[str, int]] | None: |
231 | | - """ |
232 | | - Pattern name segment substitutions needed to match the name of this subscriber to the name of the |
233 | | - specified topic. None if no match. Empty list for verbatim subscribers (match only one topic). |
234 | | - """ |
235 | | - raise NotImplementedError |
236 | | - |
237 | | - def __aiter__(self) -> Subscriber: |
238 | | - return self |
239 | | - |
240 | | - @abstractmethod |
241 | | - async def __anext__(self) -> Arrival: |
242 | | - """ |
243 | | - Raises LivenessError if messages cease arriving for longer than the timeout, unless timeout is non-finite. |
244 | | - """ |
245 | | - raise NotImplementedError |
246 | | - |
247 | | - |
248 | | -class Node(ABC): |
249 | | - """ |
250 | | - The top-level entity that represents a node in the network. |
251 | | - """ |
252 | | - |
253 | | - @property |
254 | | - @abstractmethod |
255 | | - def home(self) -> str: |
256 | | - raise NotImplementedError |
257 | | - |
258 | | - @property |
259 | | - @abstractmethod |
260 | | - def namespace(self) -> str: |
261 | | - raise NotImplementedError |
262 | | - |
263 | | - @abstractmethod |
264 | | - def advertise(self, name: str) -> Publisher: |
265 | | - """ |
266 | | - Begin publishing on a topic; this also includes sending RPC requests. |
267 | | - """ |
268 | | - raise NotImplementedError |
269 | | - |
270 | | - @abstractmethod |
271 | | - def subscribe(self, name: str, *, reordering_window: float | None = None) -> Subscriber: |
272 | | - """ |
273 | | - Receive messages from a single topic or multiple, if ``name`` is a pattern. |
274 | | - If the reordering window is set, ordered subscription is used that guarantees monotonically |
275 | | - increasing message tags, otherwise messages arrive ASAP. |
276 | | - """ |
277 | | - raise NotImplementedError |
278 | | - |
279 | | - |
280 | | -def new(transport: Transport, *, home: str = "", namespace: str = "") -> Node: |
281 | | - """ |
282 | | - Construct a new node using the specified transport. This is the main entry point of the library. |
283 | | -
|
284 | | - The transport is constructed using one of the stock transport implementations like ``pycyphal.udp``, |
285 | | - depending on the needs of the application, or it could be custom. |
286 | | -
|
287 | | - Every node needs a unique nonempty home. If the home string is not provided, a random home will be generated. |
288 | | - If home ends with a `/`, a unique string will be automatically appended to generate a prefixed unique home; |
289 | | - e.g., `my_node_123` stays as-is; `my_node_123/` becomes like `my_node_123/123xyz`, |
290 | | - an empty string becomes a random string. |
291 | | -
|
292 | | - If namespace is not set, it is read from the CYPHAL_NAMESPACE environment variable if available, |
293 | | - otherwise it remains empty. |
294 | | - """ |
295 | | - from ._hash import rapidhash |
296 | | - from ._node import NodeImpl |
297 | | - |
298 | | - # Add random suffix if requested or generate pure random home. Leading/trailing separators will be normalized away. |
299 | | - home = home.strip() or "/" |
300 | | - if home.endswith("/"): |
301 | | - # A simple hex identifier where, for observability/diagnostic purposes, the first 20 bits (5 hex digits) are |
302 | | - # host-specific (same for all nodes running on this host), and the remaining 44 bits (11 digits) are random. |
303 | | - # The protocol doesn't care about this structure, it is just an optional default convention. |
304 | | - # noinspection PyTypeChecker |
305 | | - host = rapidhash(socket.gethostname().encode()) & 0xFFFFF |
306 | | - rand = random.getrandbits(44) |
307 | | - home += f"{host:05x}{rand:011x}" |
308 | | - |
309 | | - return NodeImpl(transport, home=home, namespace=namespace) |
0 commit comments