From 2a394e71b4d4b1b1c65ce1b72d8c1cd4322b3dad Mon Sep 17 00:00:00 2001 From: Shane Harvey Date: Tue, 8 Apr 2025 16:38:47 -0700 Subject: [PATCH 1/6] PYTHON-4324 CSOT avoid connection churn when operations timeout --- pymongo/asynchronous/network.py | 5 +-- pymongo/asynchronous/pool.py | 57 +++++++++++++++++++++++++++++---- pymongo/asynchronous/server.py | 2 +- pymongo/message.py | 18 +++++++++-- pymongo/network_layer.py | 28 ++++++++++++---- pymongo/synchronous/network.py | 5 +-- pymongo/synchronous/pool.py | 53 ++++++++++++++++++++++++++---- pymongo/synchronous/server.py | 2 +- 8 files changed, 144 insertions(+), 26 deletions(-) diff --git a/pymongo/asynchronous/network.py b/pymongo/asynchronous/network.py index 1605efe92d..838e1916c6 100644 --- a/pymongo/asynchronous/network.py +++ b/pymongo/asynchronous/network.py @@ -138,8 +138,9 @@ async def command( spec = orig = await client._encrypter.encrypt(dbname, spec, codec_options) # Support CSOT + applied_csot = False if client: - conn.apply_timeout(client, spec) + applied_csot = conn.apply_timeout(client, spec) _csot.apply_write_concern(spec, write_concern) if use_op_msg: @@ -195,7 +196,7 @@ async def command( reply = None response_doc: _DocumentOut = {"ok": 1} else: - reply = await async_receive_message(conn, request_id) + reply = await async_receive_message(conn, request_id, enable_pending=bool(applied_csot)) conn.more_to_come = reply.more_to_come unpacked_docs = reply.unpack_response( codec_options=codec_options, user_fields=user_fields diff --git a/pymongo/asynchronous/pool.py b/pymongo/asynchronous/pool.py index a67cc5f3c8..d1cd879cf2 100644 --- a/pymongo/asynchronous/pool.py +++ b/pymongo/asynchronous/pool.py @@ -34,7 +34,7 @@ ) from bson import DEFAULT_CODEC_OPTIONS -from pymongo import _csot, helpers_shared +from pymongo import _csot, helpers_shared, network_layer from pymongo.asynchronous.client_session import _validate_session_write_concern from pymongo.asynchronous.helpers import _handle_reauth from pymongo.asynchronous.network import command @@ -188,6 +188,41 @@ def __init__( self.creation_time = time.monotonic() # For gossiping $clusterTime from the connection handshake to the client. self._cluster_time = None + self.pending_response = False + self.pending_bytes = 0 + self.pending_deadline = 0.0 + + def mark_pending(self, nbytes: int) -> None: + """Mark this connection as having a pending response.""" + # TODO: add "if self.enable_pending:" + self.pending_response = True + self.pending_bytes = nbytes + self.pending_deadline = time.monotonic() + 3 # 3 seconds timeout for pending response + + async def complete_pending(self) -> None: + """Complete a pending response.""" + if not self.pending_response: + return + + timeout: Optional[Union[float, int]] + timeout = self.conn.gettimeout + if _csot.get_timeout(): + deadline = min(_csot.get_deadline(), self.pending_deadline) + elif timeout: + deadline = min(time.monotonic() + timeout, self.pending_deadline) + else: + deadline = self.pending_deadline + + if not _IS_SYNC: + # In async the reader task reads the whole message at once. + # TODO: respect deadline + await self.receive_message(None, True) + else: + # In sync we need to track the bytes left for the message. + network_layer.receive_data(self.conn.get_conn, self.pending_byte, deadline) + self.pending_response = False + self.pending_bytes = 0 + self.pending_deadline = 0.0 def set_conn_timeout(self, timeout: Optional[float]) -> None: """Cache last timeout to avoid duplicate calls to conn.settimeout.""" @@ -454,13 +489,17 @@ async def send_message(self, message: bytes, max_doc_size: int) -> None: except BaseException as error: await self._raise_connection_failure(error) - async def receive_message(self, request_id: Optional[int]) -> Union[_OpReply, _OpMsg]: + async def receive_message( + self, request_id: Optional[int], enable_pending: bool = False + ) -> Union[_OpReply, _OpMsg]: """Receive a raw BSON message or raise ConnectionFailure. If any exception is raised, the socket is closed. """ try: - return await async_receive_message(self, request_id, self.max_message_size) + return await async_receive_message( + self, request_id, self.max_message_size, enable_pending + ) # Catch KeyboardInterrupt, CancelledError, etc. and cleanup. except BaseException as error: await self._raise_connection_failure(error) @@ -495,7 +534,9 @@ async def write_command( :param msg: bytes, the command message. """ await self.send_message(msg, 0) - reply = await self.receive_message(request_id) + reply = await self.receive_message( + request_id, enable_pending=(_csot.get_timeout() is not None) + ) result = reply.command_response(codec_options) # Raises NotPrimaryError or OperationFailure. @@ -635,7 +676,10 @@ async def _raise_connection_failure(self, error: BaseException) -> NoReturn: reason = None else: reason = ConnectionClosedReason.ERROR - await self.close_conn(reason) + + # Pending connections should be placed back in the pool. + if not self.pending_response: + await self.close_conn(reason) # SSLError from PyOpenSSL inherits directly from Exception. if isinstance(error, (IOError, OSError, SSLError)): details = _get_timeout_details(self.opts) @@ -1076,7 +1120,7 @@ async def checkout( This method should always be used in a with-statement:: - with pool.get_conn() as connection: + with pool.checkout() as connection: connection.send_message(msg) data = connection.receive_message(op_code, request_id) @@ -1388,6 +1432,7 @@ async def _perished(self, conn: AsyncConnection) -> bool: pool, to keep performance reasonable - we can't avoid AutoReconnects completely anyway. """ + await conn.complete_pending() idle_time_seconds = conn.idle_time_seconds() # If socket is idle, open a new one. if ( diff --git a/pymongo/asynchronous/server.py b/pymongo/asynchronous/server.py index 0e0d53b96f..2df594774a 100644 --- a/pymongo/asynchronous/server.py +++ b/pymongo/asynchronous/server.py @@ -205,7 +205,7 @@ async def run_operation( reply = await conn.receive_message(None) else: await conn.send_message(data, max_doc_size) - reply = await conn.receive_message(request_id) + reply = await conn.receive_message(request_id, operation.pending_enabled()) # Unpack and check for command errors. if use_cmd: diff --git a/pymongo/message.py b/pymongo/message.py index d51c77a174..ce680cf7ce 100644 --- a/pymongo/message.py +++ b/pymongo/message.py @@ -1569,6 +1569,7 @@ class _Query: "allow_disk_use", "_as_command", "exhaust", + "_pending_enabled", ) # For compatibility with the _GetMore class. @@ -1612,6 +1613,10 @@ def __init__( self.name = "find" self._as_command: Optional[tuple[dict[str, Any], str]] = None self.exhaust = exhaust + self._pending_enabled = False + + def pending_enabled(self): + return self._pending_enabled def reset(self) -> None: self._as_command = None @@ -1673,7 +1678,9 @@ def as_command( conn.send_cluster_time(cmd, self.session, self.client) # type: ignore[arg-type] # Support CSOT if apply_timeout: - conn.apply_timeout(self.client, cmd=cmd) # type: ignore[arg-type] + res = conn.apply_timeout(self.client, cmd=cmd) # type: ignore[arg-type] + if res is not None: + self._pending_enabled = True self._as_command = cmd, self.db return self._as_command @@ -1747,6 +1754,7 @@ class _GetMore: "_as_command", "exhaust", "comment", + "_pending_enabled", ) name = "getMore" @@ -1779,6 +1787,10 @@ def __init__( self._as_command: Optional[tuple[dict[str, Any], str]] = None self.exhaust = exhaust self.comment = comment + self._pending_enabled = False + + def pending_enabled(self): + return self._pending_enabled def reset(self) -> None: self._as_command = None @@ -1822,7 +1834,9 @@ def as_command( conn.send_cluster_time(cmd, self.session, self.client) # type: ignore[arg-type] # Support CSOT if apply_timeout: - conn.apply_timeout(self.client, cmd=None) # type: ignore[arg-type] + res = conn.apply_timeout(self.client, cmd=None) # type: ignore[arg-type] + if res is not None: + self._pending_enabled = True self._as_command = cmd, self.db return self._as_command diff --git a/pymongo/network_layer.py b/pymongo/network_layer.py index e287655c61..bbcca86115 100644 --- a/pymongo/network_layer.py +++ b/pymongo/network_layer.py @@ -325,7 +325,9 @@ def wait_for_read(conn: Connection, deadline: Optional[float]) -> None: raise socket.timeout("timed out") -def receive_data(conn: Connection, length: int, deadline: Optional[float]) -> memoryview: +def receive_data( + conn: Connection, length: int, deadline: Optional[float], enable_pending: bool = False +) -> memoryview: buf = bytearray(length) mv = memoryview(buf) bytes_read = 0 @@ -357,12 +359,16 @@ def receive_data(conn: Connection, length: int, deadline: Optional[float]) -> me if conn.cancel_context.cancelled: raise _OperationCancelled("operation cancelled") from None # We reached the true deadline. + if enable_pending: + conn.mark_pending(length - bytes_read) raise socket.timeout("timed out") from None except socket.timeout: if conn.cancel_context.cancelled: raise _OperationCancelled("operation cancelled") from None if _PYPY: # We reached the true deadline. + if enable_pending: + conn.mark_pending(length - bytes_read) raise continue except OSError as exc: @@ -692,6 +698,7 @@ async def async_receive_message( conn: AsyncConnection, request_id: Optional[int], max_message_size: int = MAX_MESSAGE_SIZE, + enable_pending: bool = False, ) -> Union[_OpReply, _OpMsg]: """Receive a raw BSON message or raise socket.error.""" timeout: Optional[Union[float, int]] @@ -721,6 +728,8 @@ async def async_receive_message( if pending: await asyncio.wait(pending) if len(done) == 0: + if enable_pending: + conn.mark_pending(1) raise socket.timeout("timed out") if read_task in done: data, op_code = read_task.result() @@ -740,7 +749,10 @@ async def async_receive_message( def receive_message( - conn: Connection, request_id: Optional[int], max_message_size: int = MAX_MESSAGE_SIZE + conn: Connection, + request_id: Optional[int], + max_message_size: int = MAX_MESSAGE_SIZE, + enable_pending: bool = False, ) -> Union[_OpReply, _OpMsg]: """Receive a raw BSON message or raise socket.error.""" if _csot.get_timeout(): @@ -752,7 +764,9 @@ def receive_message( else: deadline = None # Ignore the response's request id. - length, _, response_to, op_code = _UNPACK_HEADER(receive_data(conn, 16, deadline)) + length, _, response_to, op_code = _UNPACK_HEADER( + receive_data(conn, 16, deadline, enable_pending) + ) # No request_id for exhaust cursor "getMore". if request_id is not None: if request_id != response_to: @@ -767,10 +781,12 @@ def receive_message( f"message size ({max_message_size!r})" ) if op_code == 2012: - op_code, _, compressor_id = _UNPACK_COMPRESSION_HEADER(receive_data(conn, 9, deadline)) - data = decompress(receive_data(conn, length - 25, deadline), compressor_id) + op_code, _, compressor_id = _UNPACK_COMPRESSION_HEADER( + receive_data(conn, 9, deadline, enable_pending) + ) + data = decompress(receive_data(conn, length - 25, deadline), compressor_id, enable_pending) else: - data = receive_data(conn, length - 16, deadline) + data = receive_data(conn, length - 16, deadline, enable_pending) try: unpack_reply = _UNPACK_REPLY[op_code] diff --git a/pymongo/synchronous/network.py b/pymongo/synchronous/network.py index 9559a5a542..3651e96bab 100644 --- a/pymongo/synchronous/network.py +++ b/pymongo/synchronous/network.py @@ -138,8 +138,9 @@ def command( spec = orig = client._encrypter.encrypt(dbname, spec, codec_options) # Support CSOT + applied_csot = False if client: - conn.apply_timeout(client, spec) + applied_csot = conn.apply_timeout(client, spec) _csot.apply_write_concern(spec, write_concern) if use_op_msg: @@ -195,7 +196,7 @@ def command( reply = None response_doc: _DocumentOut = {"ok": 1} else: - reply = receive_message(conn, request_id) + reply = receive_message(conn, request_id, enable_pending=bool(applied_csot)) conn.more_to_come = reply.more_to_come unpacked_docs = reply.unpack_response( codec_options=codec_options, user_fields=user_fields diff --git a/pymongo/synchronous/pool.py b/pymongo/synchronous/pool.py index 224834af31..28366a7c9e 100644 --- a/pymongo/synchronous/pool.py +++ b/pymongo/synchronous/pool.py @@ -34,7 +34,7 @@ ) from bson import DEFAULT_CODEC_OPTIONS -from pymongo import _csot, helpers_shared +from pymongo import _csot, helpers_shared, network_layer from pymongo.common import ( MAX_BSON_SIZE, MAX_MESSAGE_SIZE, @@ -188,6 +188,41 @@ def __init__( self.creation_time = time.monotonic() # For gossiping $clusterTime from the connection handshake to the client. self._cluster_time = None + self.pending_response = False + self.pending_bytes = 0 + self.pending_deadline = 0.0 + + def mark_pending(self, nbytes: int) -> None: + """Mark this connection as having a pending response.""" + # TODO: add "if self.enable_pending:" + self.pending_response = True + self.pending_bytes = nbytes + self.pending_deadline = time.monotonic() + 3 # 3 seconds timeout for pending response + + def complete_pending(self) -> None: + """Complete a pending response.""" + if not self.pending_response: + return + + timeout: Optional[Union[float, int]] + timeout = self.conn.gettimeout + if _csot.get_timeout(): + deadline = min(_csot.get_deadline(), self.pending_deadline) + elif timeout: + deadline = min(time.monotonic() + timeout, self.pending_deadline) + else: + deadline = self.pending_deadline + + if not _IS_SYNC: + # In async the reader task reads the whole message at once. + # TODO: respect deadline + self.receive_message(None, True) + else: + # In sync we need to track the bytes left for the message. + network_layer.receive_data(self.conn.get_conn, self.pending_byte, deadline) + self.pending_response = False + self.pending_bytes = 0 + self.pending_deadline = 0.0 def set_conn_timeout(self, timeout: Optional[float]) -> None: """Cache last timeout to avoid duplicate calls to conn.settimeout.""" @@ -454,13 +489,15 @@ def send_message(self, message: bytes, max_doc_size: int) -> None: except BaseException as error: self._raise_connection_failure(error) - def receive_message(self, request_id: Optional[int]) -> Union[_OpReply, _OpMsg]: + def receive_message( + self, request_id: Optional[int], enable_pending: bool = False + ) -> Union[_OpReply, _OpMsg]: """Receive a raw BSON message or raise ConnectionFailure. If any exception is raised, the socket is closed. """ try: - return receive_message(self, request_id, self.max_message_size) + return receive_message(self, request_id, self.max_message_size, enable_pending) # Catch KeyboardInterrupt, CancelledError, etc. and cleanup. except BaseException as error: self._raise_connection_failure(error) @@ -495,7 +532,7 @@ def write_command( :param msg: bytes, the command message. """ self.send_message(msg, 0) - reply = self.receive_message(request_id) + reply = self.receive_message(request_id, enable_pending=(_csot.get_timeout() is not None)) result = reply.command_response(codec_options) # Raises NotPrimaryError or OperationFailure. @@ -633,7 +670,10 @@ def _raise_connection_failure(self, error: BaseException) -> NoReturn: reason = None else: reason = ConnectionClosedReason.ERROR - self.close_conn(reason) + + # Pending connections should be placed back in the pool. + if not self.pending_response: + self.close_conn(reason) # SSLError from PyOpenSSL inherits directly from Exception. if isinstance(error, (IOError, OSError, SSLError)): details = _get_timeout_details(self.opts) @@ -1072,7 +1112,7 @@ def checkout( This method should always be used in a with-statement:: - with pool.get_conn() as connection: + with pool.checkout() as connection: connection.send_message(msg) data = connection.receive_message(op_code, request_id) @@ -1384,6 +1424,7 @@ def _perished(self, conn: Connection) -> bool: pool, to keep performance reasonable - we can't avoid AutoReconnects completely anyway. """ + conn.complete_pending() idle_time_seconds = conn.idle_time_seconds() # If socket is idle, open a new one. if ( diff --git a/pymongo/synchronous/server.py b/pymongo/synchronous/server.py index c3643ba815..af3bff9c9d 100644 --- a/pymongo/synchronous/server.py +++ b/pymongo/synchronous/server.py @@ -205,7 +205,7 @@ def run_operation( reply = conn.receive_message(None) else: conn.send_message(data, max_doc_size) - reply = conn.receive_message(request_id) + reply = conn.receive_message(request_id, operation.pending_enabled()) # Unpack and check for command errors. if use_cmd: From c756d9cd535d36b9ed8fc6c4dee5a3899eda58d5 Mon Sep 17 00:00:00 2001 From: Shane Harvey Date: Tue, 8 Apr 2025 17:08:52 -0700 Subject: [PATCH 2/6] PYTHON-4324 Fix typing bugs and typos --- pymongo/asynchronous/network.py | 5 +++-- pymongo/asynchronous/pool.py | 4 +--- pymongo/message.py | 4 ++-- pymongo/network_layer.py | 2 +- pymongo/synchronous/network.py | 5 +++-- pymongo/synchronous/pool.py | 4 +--- test/asynchronous/test_client.py | 2 +- test/test_client.py | 2 +- 8 files changed, 13 insertions(+), 15 deletions(-) diff --git a/pymongo/asynchronous/network.py b/pymongo/asynchronous/network.py index 838e1916c6..3d862d403e 100644 --- a/pymongo/asynchronous/network.py +++ b/pymongo/asynchronous/network.py @@ -140,7 +140,8 @@ async def command( # Support CSOT applied_csot = False if client: - applied_csot = conn.apply_timeout(client, spec) + res = conn.apply_timeout(client, spec) + applied_csot = bool(res) _csot.apply_write_concern(spec, write_concern) if use_op_msg: @@ -196,7 +197,7 @@ async def command( reply = None response_doc: _DocumentOut = {"ok": 1} else: - reply = await async_receive_message(conn, request_id, enable_pending=bool(applied_csot)) + reply = await async_receive_message(conn, request_id, enable_pending=applied_csot) conn.more_to_come = reply.more_to_come unpacked_docs = reply.unpack_response( codec_options=codec_options, user_fields=user_fields diff --git a/pymongo/asynchronous/pool.py b/pymongo/asynchronous/pool.py index d1cd879cf2..6ccd33ad4d 100644 --- a/pymongo/asynchronous/pool.py +++ b/pymongo/asynchronous/pool.py @@ -194,7 +194,6 @@ def __init__( def mark_pending(self, nbytes: int) -> None: """Mark this connection as having a pending response.""" - # TODO: add "if self.enable_pending:" self.pending_response = True self.pending_bytes = nbytes self.pending_deadline = time.monotonic() + 3 # 3 seconds timeout for pending response @@ -204,7 +203,6 @@ async def complete_pending(self) -> None: if not self.pending_response: return - timeout: Optional[Union[float, int]] timeout = self.conn.gettimeout if _csot.get_timeout(): deadline = min(_csot.get_deadline(), self.pending_deadline) @@ -219,7 +217,7 @@ async def complete_pending(self) -> None: await self.receive_message(None, True) else: # In sync we need to track the bytes left for the message. - network_layer.receive_data(self.conn.get_conn, self.pending_byte, deadline) + network_layer.receive_data(self.conn, self.pending_bytes, deadline) self.pending_response = False self.pending_bytes = 0 self.pending_deadline = 0.0 diff --git a/pymongo/message.py b/pymongo/message.py index ce680cf7ce..d9d1450838 100644 --- a/pymongo/message.py +++ b/pymongo/message.py @@ -1615,7 +1615,7 @@ def __init__( self.exhaust = exhaust self._pending_enabled = False - def pending_enabled(self): + def pending_enabled(self) -> bool: return self._pending_enabled def reset(self) -> None: @@ -1789,7 +1789,7 @@ def __init__( self.comment = comment self._pending_enabled = False - def pending_enabled(self): + def pending_enabled(self) -> bool: return self._pending_enabled def reset(self) -> None: diff --git a/pymongo/network_layer.py b/pymongo/network_layer.py index bbcca86115..4494f2d278 100644 --- a/pymongo/network_layer.py +++ b/pymongo/network_layer.py @@ -784,7 +784,7 @@ def receive_message( op_code, _, compressor_id = _UNPACK_COMPRESSION_HEADER( receive_data(conn, 9, deadline, enable_pending) ) - data = decompress(receive_data(conn, length - 25, deadline), compressor_id, enable_pending) + data = decompress(receive_data(conn, length - 25, deadline, enable_pending), compressor_id) else: data = receive_data(conn, length - 16, deadline, enable_pending) diff --git a/pymongo/synchronous/network.py b/pymongo/synchronous/network.py index 3651e96bab..b27541e5d8 100644 --- a/pymongo/synchronous/network.py +++ b/pymongo/synchronous/network.py @@ -140,7 +140,8 @@ def command( # Support CSOT applied_csot = False if client: - applied_csot = conn.apply_timeout(client, spec) + res = conn.apply_timeout(client, spec) + applied_csot = bool(res) _csot.apply_write_concern(spec, write_concern) if use_op_msg: @@ -196,7 +197,7 @@ def command( reply = None response_doc: _DocumentOut = {"ok": 1} else: - reply = receive_message(conn, request_id, enable_pending=bool(applied_csot)) + reply = receive_message(conn, request_id, enable_pending=applied_csot) conn.more_to_come = reply.more_to_come unpacked_docs = reply.unpack_response( codec_options=codec_options, user_fields=user_fields diff --git a/pymongo/synchronous/pool.py b/pymongo/synchronous/pool.py index 28366a7c9e..3acaeba3a2 100644 --- a/pymongo/synchronous/pool.py +++ b/pymongo/synchronous/pool.py @@ -194,7 +194,6 @@ def __init__( def mark_pending(self, nbytes: int) -> None: """Mark this connection as having a pending response.""" - # TODO: add "if self.enable_pending:" self.pending_response = True self.pending_bytes = nbytes self.pending_deadline = time.monotonic() + 3 # 3 seconds timeout for pending response @@ -204,7 +203,6 @@ def complete_pending(self) -> None: if not self.pending_response: return - timeout: Optional[Union[float, int]] timeout = self.conn.gettimeout if _csot.get_timeout(): deadline = min(_csot.get_deadline(), self.pending_deadline) @@ -219,7 +217,7 @@ def complete_pending(self) -> None: self.receive_message(None, True) else: # In sync we need to track the bytes left for the message. - network_layer.receive_data(self.conn.get_conn, self.pending_byte, deadline) + network_layer.receive_data(self.conn, self.pending_bytes, deadline) self.pending_response = False self.pending_bytes = 0 self.pending_deadline = 0.0 diff --git a/test/asynchronous/test_client.py b/test/asynchronous/test_client.py index c9cfca81fc..738a434b9c 100644 --- a/test/asynchronous/test_client.py +++ b/test/asynchronous/test_client.py @@ -2222,7 +2222,7 @@ async def test_exhaust_getmore_server_error(self): await cursor.next() # Cause a server error on getmore. - async def receive_message(request_id): + async def receive_message(request_id, enable_pending=False): # Discard the actual server response. await AsyncConnection.receive_message(conn, request_id) diff --git a/test/test_client.py b/test/test_client.py index 038ba2241b..94e6ec3f3f 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -2179,7 +2179,7 @@ def test_exhaust_getmore_server_error(self): cursor.next() # Cause a server error on getmore. - def receive_message(request_id): + def receive_message(request_id, enable_pending=False): # Discard the actual server response. Connection.receive_message(conn, request_id) From 2f841a2dba8ebde463f37c71211f11fc3d1ef6e8 Mon Sep 17 00:00:00 2001 From: Shane Harvey Date: Wed, 9 Apr 2025 11:17:08 -0700 Subject: [PATCH 3/6] PYTHON-4324 Fix typing and receive_data call --- pymongo/asynchronous/pool.py | 5 ++--- pymongo/synchronous/pool.py | 5 ++--- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/pymongo/asynchronous/pool.py b/pymongo/asynchronous/pool.py index 6ccd33ad4d..14121d248f 100644 --- a/pymongo/asynchronous/pool.py +++ b/pymongo/asynchronous/pool.py @@ -206,7 +206,7 @@ async def complete_pending(self) -> None: timeout = self.conn.gettimeout if _csot.get_timeout(): deadline = min(_csot.get_deadline(), self.pending_deadline) - elif timeout: + elif timeout is not None: deadline = min(time.monotonic() + timeout, self.pending_deadline) else: deadline = self.pending_deadline @@ -216,8 +216,7 @@ async def complete_pending(self) -> None: # TODO: respect deadline await self.receive_message(None, True) else: - # In sync we need to track the bytes left for the message. - network_layer.receive_data(self.conn, self.pending_bytes, deadline) + network_layer.receive_data(self, self.pending_bytes, deadline) # type:ignore[call-arg] self.pending_response = False self.pending_bytes = 0 self.pending_deadline = 0.0 diff --git a/pymongo/synchronous/pool.py b/pymongo/synchronous/pool.py index 3acaeba3a2..62bf7c3e8a 100644 --- a/pymongo/synchronous/pool.py +++ b/pymongo/synchronous/pool.py @@ -206,7 +206,7 @@ def complete_pending(self) -> None: timeout = self.conn.gettimeout if _csot.get_timeout(): deadline = min(_csot.get_deadline(), self.pending_deadline) - elif timeout: + elif timeout is not None: deadline = min(time.monotonic() + timeout, self.pending_deadline) else: deadline = self.pending_deadline @@ -216,8 +216,7 @@ def complete_pending(self) -> None: # TODO: respect deadline self.receive_message(None, True) else: - # In sync we need to track the bytes left for the message. - network_layer.receive_data(self.conn, self.pending_bytes, deadline) + network_layer.receive_data(self, self.pending_bytes, deadline) # type:ignore[call-arg] self.pending_response = False self.pending_bytes = 0 self.pending_deadline = 0.0 From 69bcb4e21bcb63bdb93a894718fdcfc82ee70efd Mon Sep 17 00:00:00 2001 From: Shane Harvey Date: Wed, 9 Apr 2025 11:21:53 -0700 Subject: [PATCH 4/6] PYTHON-4324 Wrap receive_data errors in pymongo errors --- pymongo/asynchronous/pool.py | 5 ++++- pymongo/synchronous/pool.py | 5 ++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/pymongo/asynchronous/pool.py b/pymongo/asynchronous/pool.py index 14121d248f..1188906f4a 100644 --- a/pymongo/asynchronous/pool.py +++ b/pymongo/asynchronous/pool.py @@ -216,7 +216,10 @@ async def complete_pending(self) -> None: # TODO: respect deadline await self.receive_message(None, True) else: - network_layer.receive_data(self, self.pending_bytes, deadline) # type:ignore[call-arg] + try: + network_layer.receive_data(self, self.pending_bytes, deadline) # type:ignore[call-arg] + except BaseException as error: + await self._raise_connection_failure(error) self.pending_response = False self.pending_bytes = 0 self.pending_deadline = 0.0 diff --git a/pymongo/synchronous/pool.py b/pymongo/synchronous/pool.py index 62bf7c3e8a..d07fe2d11b 100644 --- a/pymongo/synchronous/pool.py +++ b/pymongo/synchronous/pool.py @@ -216,7 +216,10 @@ def complete_pending(self) -> None: # TODO: respect deadline self.receive_message(None, True) else: - network_layer.receive_data(self, self.pending_bytes, deadline) # type:ignore[call-arg] + try: + network_layer.receive_data(self, self.pending_bytes, deadline) # type:ignore[call-arg] + except BaseException as error: + self._raise_connection_failure(error) self.pending_response = False self.pending_bytes = 0 self.pending_deadline = 0.0 From c7a4ba2d199290ee295da7afba45bc7289d0c68b Mon Sep 17 00:00:00 2001 From: Shane Harvey Date: Wed, 9 Apr 2025 11:54:11 -0700 Subject: [PATCH 5/6] PYTHON-4324 Fix NetworkingInterface.gettimeout --- pymongo/asynchronous/pool.py | 11 ++++++----- pymongo/network_layer.py | 5 +++-- pymongo/synchronous/pool.py | 11 ++++++----- 3 files changed, 15 insertions(+), 12 deletions(-) diff --git a/pymongo/asynchronous/pool.py b/pymongo/asynchronous/pool.py index 1188906f4a..d64d565d6c 100644 --- a/pymongo/asynchronous/pool.py +++ b/pymongo/asynchronous/pool.py @@ -203,13 +203,14 @@ async def complete_pending(self) -> None: if not self.pending_response: return - timeout = self.conn.gettimeout if _csot.get_timeout(): deadline = min(_csot.get_deadline(), self.pending_deadline) - elif timeout is not None: - deadline = min(time.monotonic() + timeout, self.pending_deadline) else: - deadline = self.pending_deadline + timeout = self.conn.gettimeout + if timeout is not None: + deadline = min(time.monotonic() + timeout, self.pending_deadline) + else: + deadline = self.pending_deadline if not _IS_SYNC: # In async the reader task reads the whole message at once. @@ -217,7 +218,7 @@ async def complete_pending(self) -> None: await self.receive_message(None, True) else: try: - network_layer.receive_data(self, self.pending_bytes, deadline) # type:ignore[call-arg] + network_layer.receive_data(self, self.pending_bytes, deadline) # type:ignore[arg-type] except BaseException as error: await self._raise_connection_failure(error) self.pending_response = False diff --git a/pymongo/network_layer.py b/pymongo/network_layer.py index 4494f2d278..07e87551d7 100644 --- a/pymongo/network_layer.py +++ b/pymongo/network_layer.py @@ -338,7 +338,7 @@ def receive_data( # When the timeout has expired we perform one final non-blocking recv. # This helps avoid spurious timeouts when the response is actually already # buffered on the client. - orig_timeout = conn.conn.gettimeout() + orig_timeout = conn.conn.gettimeout try: while bytes_read < length: try: @@ -444,6 +444,7 @@ class NetworkingInterface(NetworkingInterfaceBase): def __init__(self, conn: Union[socket.socket, _sslConn]): super().__init__(conn) + @property def gettimeout(self) -> float | None: return self.conn.gettimeout() @@ -758,7 +759,7 @@ def receive_message( if _csot.get_timeout(): deadline = _csot.get_deadline() else: - timeout = conn.conn.gettimeout() + timeout = conn.conn.gettimeout if timeout: deadline = time.monotonic() + timeout else: diff --git a/pymongo/synchronous/pool.py b/pymongo/synchronous/pool.py index d07fe2d11b..1ce7eee884 100644 --- a/pymongo/synchronous/pool.py +++ b/pymongo/synchronous/pool.py @@ -203,13 +203,14 @@ def complete_pending(self) -> None: if not self.pending_response: return - timeout = self.conn.gettimeout if _csot.get_timeout(): deadline = min(_csot.get_deadline(), self.pending_deadline) - elif timeout is not None: - deadline = min(time.monotonic() + timeout, self.pending_deadline) else: - deadline = self.pending_deadline + timeout = self.conn.gettimeout + if timeout is not None: + deadline = min(time.monotonic() + timeout, self.pending_deadline) + else: + deadline = self.pending_deadline if not _IS_SYNC: # In async the reader task reads the whole message at once. @@ -217,7 +218,7 @@ def complete_pending(self) -> None: self.receive_message(None, True) else: try: - network_layer.receive_data(self, self.pending_bytes, deadline) # type:ignore[call-arg] + network_layer.receive_data(self, self.pending_bytes, deadline) # type:ignore[arg-type] except BaseException as error: self._raise_connection_failure(error) self.pending_response = False From ea4835f95506f880d065ad092543401f50dc8915 Mon Sep 17 00:00:00 2001 From: Shane Harvey Date: Wed, 9 Apr 2025 11:58:29 -0700 Subject: [PATCH 6/6] PYTHON-4324 Propagate enable_pending when completing a pending read to allow one to span multiple requests --- pymongo/asynchronous/pool.py | 2 +- pymongo/synchronous/pool.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pymongo/asynchronous/pool.py b/pymongo/asynchronous/pool.py index d64d565d6c..d4d042dd86 100644 --- a/pymongo/asynchronous/pool.py +++ b/pymongo/asynchronous/pool.py @@ -218,7 +218,7 @@ async def complete_pending(self) -> None: await self.receive_message(None, True) else: try: - network_layer.receive_data(self, self.pending_bytes, deadline) # type:ignore[arg-type] + network_layer.receive_data(self, self.pending_bytes, deadline, True) # type:ignore[arg-type] except BaseException as error: await self._raise_connection_failure(error) self.pending_response = False diff --git a/pymongo/synchronous/pool.py b/pymongo/synchronous/pool.py index 1ce7eee884..19addc6336 100644 --- a/pymongo/synchronous/pool.py +++ b/pymongo/synchronous/pool.py @@ -218,7 +218,7 @@ def complete_pending(self) -> None: self.receive_message(None, True) else: try: - network_layer.receive_data(self, self.pending_bytes, deadline) # type:ignore[arg-type] + network_layer.receive_data(self, self.pending_bytes, deadline, True) # type:ignore[arg-type] except BaseException as error: self._raise_connection_failure(error) self.pending_response = False