You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
This document analyzes the FIXME issue in the muxer_multistream.py module:
Issue: Missing negotiate timeout in MuxerMultistream File:libp2p/stream_muxer/muxer_multistream.py:34 FIXME Comment:# FIXME: add negotiate timeout to MuxerMultistream
Issue Analysis
Current Status
Current Implementation
The MuxerMultistream class has a constant defined but no timeout implementation:
# FIXME: add negotiate timeout to `MuxerMultistream`DEFAULT_NEGOTIATE_TIMEOUT=60classMuxerMultistream:
""" MuxerMultistream is a multistream stream muxed transport multiplexer. go implementation: github.com/libp2p/go-stream-muxer-multistream/multistream.go """# NOTE: Can be changed to `typing.OrderedDict` since Python 3.7.2.transports: "OrderedDict[TProtocol, TMuxerClass]"multiselect: Multiselectmultiselect_client: MultiselectClientdef__init__(self, muxer_transports_by_protocol: TMuxerOptions) ->None:
self.transports=OrderedDict()
self.multiselect=Multiselect()
self.multistream_client=MultiselectClient()
forprotocol, transportinmuxer_transports_by_protocol.items():
self.add_transport(protocol, transport)
asyncdefselect_transport(self, conn: IRawConnection) ->TMuxerClass:
""" Select a transport that both us and the node on the other end of conn support and agree on. :param conn: conn to choose a transport over :return: selected muxer transport """protocol: TProtocolcommunicator=MultiselectCommunicator(conn)
ifconn.is_initiator:
protocol=awaitself.multiselect_client.select_one_of(
tuple(self.transports.keys()), communicator
)
else:
protocol, _=awaitself.multiselect.negotiate(communicator)
returnself.transports[protocol]
asyncdefnew_conn(self, conn: ISecureConn, peer_id: ID) ->IMuxedConn:
communicator=MultiselectCommunicator(conn)
protocol=awaitself.multistream_client.select_one_of(
tuple(self.transports.keys()), communicator
)
transport_class=self.transports[protocol]
ifprotocol==PROTOCOL_ID:
asyncwithtrio.open_nursery():
asyncdefon_close() ->None:
passreturnYamux(
conn, peer_id, is_initiator=conn.is_initiator, on_close=on_close
)
returntransport_class(conn, peer_id)
The Problem
No Timeout Protection: Negotiation operations can hang indefinitely
Resource Leaks: Connections may remain open during failed negotiations
User Experience: Applications may hang waiting for protocol negotiation
Network Resilience: No protection against slow or unresponsive peers
Inconsistent Behavior: Some operations have timeouts, others don't
What Must Be Done
Required Changes
Add Timeout Parameter: Allow configurable timeout for negotiation
Implement Timeout Logic: Use trio's timeout mechanisms
Error Handling: Provide appropriate exceptions for timeout scenarios
Backward Compatibility: Maintain existing API while adding timeout support
Integration: Ensure timeout works with both client and server negotiation
Implementation Options
Option A: Simple Timeout with Constructor Parameter
classMuxerMultistream:
""" MuxerMultistream is a multistream stream muxed transport multiplexer. go implementation: github.com/libp2p/go-stream-muxer-multistream/multistream.go """# NOTE: Can be changed to `typing.OrderedDict` since Python 3.7.2.transports: "OrderedDict[TProtocol, TMuxerClass]"multiselect: Multiselectmultiselect_client: MultiselectClientnegotiate_timeout: floatdef__init__(
self,
muxer_transports_by_protocol: TMuxerOptions,
negotiate_timeout: float=DEFAULT_NEGOTIATE_TIMEOUT
) ->None:
self.transports=OrderedDict()
self.multiselect=Multiselect()
self.multistream_client=MultiselectClient()
self.negotiate_timeout=negotiate_timeoutforprotocol, transportinmuxer_transports_by_protocol.items():
self.add_transport(protocol, transport)
asyncdefselect_transport(self, conn: IRawConnection) ->TMuxerClass:
""" Select a transport that both us and the node on the other end of conn support and agree on. :param conn: conn to choose a transport over :return: selected muxer transport :raises MuxerUpgradeFailure: when negotiation times out or fails """protocol: TProtocolcommunicator=MultiselectCommunicator(conn)
try:
withtrio.move_on_after(self.negotiate_timeout):
ifconn.is_initiator:
protocol=awaitself.multiselect_client.select_one_of(
tuple(self.transports.keys()), communicator
)
else:
protocol, _=awaitself.multiselect.negotiate(communicator)
returnself.transports[protocol]
excepttrio.TooSlowError:
raiseMuxerUpgradeFailure(
f"Protocol negotiation timed out after {self.negotiate_timeout} seconds"
)
asyncdefnew_conn(self, conn: ISecureConn, peer_id: ID) ->IMuxedConn:
""" Create a new multiplexed connection with timeout protection. :param conn: secure connection to multiplex :param peer_id: peer ID for the connection :return: multiplexed connection :raises MuxerUpgradeFailure: when negotiation times out or fails """communicator=MultiselectCommunicator(conn)
try:
withtrio.move_on_after(self.negotiate_timeout):
protocol=awaitself.multistream_client.select_one_of(
tuple(self.transports.keys()), communicator
)
transport_class=self.transports[protocol]
ifprotocol==PROTOCOL_ID:
asyncwithtrio.open_nursery():
asyncdefon_close() ->None:
passreturnYamux(
conn, peer_id, is_initiator=conn.is_initiator, on_close=on_close
)
returntransport_class(conn, peer_id)
excepttrio.TooSlowError:
raiseMuxerUpgradeFailure(
f"Protocol negotiation timed out after {self.negotiate_timeout} seconds"
)
Option B: Advanced Timeout with Retry Logic
fromdataclassesimportdataclassfromtypingimportOptional@dataclassclassNegotiateOptions:
timeout: float=DEFAULT_NEGOTIATE_TIMEOUTretries: int=0retry_delay: float=0.1exponential_backoff: bool=TrueclassMuxerMultistream:
""" MuxerMultistream is a multistream stream muxed transport multiplexer. go implementation: github.com/libp2p/go-stream-muxer-multistream/multistream.go """# NOTE: Can be changed to `typing.OrderedDict` since Python 3.7.2.transports: "OrderedDict[TProtocol, TMuxerClass]"multiselect: Multiselectmultiselect_client: MultiselectClientnegotiate_options: NegotiateOptionsdef__init__(
self,
muxer_transports_by_protocol: TMuxerOptions,
negotiate_options: Optional[NegotiateOptions] =None
) ->None:
self.transports=OrderedDict()
self.multiselect=Multiselect()
self.multistream_client=MultiselectClient()
self.negotiate_options=negotiate_optionsorNegotiateOptions()
forprotocol, transportinmuxer_transports_by_protocol.items():
self.add_transport(protocol, transport)
asyncdef_negotiate_with_retry(
self,
is_initiator: bool,
communicator: MultiselectCommunicator
) ->TProtocol:
"""Perform negotiation with retry logic and timeout."""last_error=Noneforattemptinrange(self.negotiate_options.retries+1):
try:
withtrio.move_on_after(self.negotiate_options.timeout):
ifis_initiator:
returnawaitself.multiselect_client.select_one_of(
tuple(self.transports.keys()), communicator
)
else:
protocol, _=awaitself.multiselect.negotiate(communicator)
returnprotocolexcepttrio.TooSlowErrorase:
last_error=MuxerUpgradeFailure(
f"Protocol negotiation timed out after {self.negotiate_options.timeout} seconds"
)
exceptExceptionase:
last_error=MuxerUpgradeFailure(f"Protocol negotiation failed: {str(e)}")
# If this was the last attempt, raise the errorifattempt==self.negotiate_options.retries:
raiselast_error# Calculate delay for next retryifself.negotiate_options.exponential_backoff:
delay=self.negotiate_options.retry_delay* (2**attempt)
else:
delay=self.negotiate_options.retry_delayawaittrio.sleep(delay)
# This should never be reached, but satisfies the type checkerraiseMuxerUpgradeFailure("Unexpected end of negotiation retry loop")
asyncdefselect_transport(self, conn: IRawConnection) ->TMuxerClass:
""" Select a transport that both us and the node on the other end of conn support and agree on. :param conn: conn to choose a transport over :return: selected muxer transport :raises MuxerUpgradeFailure: when negotiation times out or fails """communicator=MultiselectCommunicator(conn)
protocol=awaitself._negotiate_with_retry(conn.is_initiator, communicator)
returnself.transports[protocol]
asyncdefnew_conn(self, conn: ISecureConn, peer_id: ID) ->IMuxedConn:
""" Create a new multiplexed connection with timeout protection. :param conn: secure connection to multiplex :param peer_id: peer ID for the connection :return: multiplexed connection :raises MuxerUpgradeFailure: when negotiation times out or fails """communicator=MultiselectCommunicator(conn)
protocol=awaitself._negotiate_with_retry(conn.is_initiator, communicator)
transport_class=self.transports[protocol]
ifprotocol==PROTOCOL_ID:
asyncwithtrio.open_nursery():
asyncdefon_close() ->None:
passreturnYamux(
conn, peer_id, is_initiator=conn.is_initiator, on_close=on_close
)
returntransport_class(conn, peer_id)
Exception Handling
Required Exception Class
classMuxerUpgradeFailure(Exception):
"""Raised when muxer upgrade fails due to timeout or negotiation error."""pass
Integration with Existing Exceptions
The timeout should integrate with existing exception handling in the transport upgrader:
# In libp2p/transport/upgrader.pytry:
muxed_conn=awaitself.muxer_multistream.new_conn(secured_conn, peer_id)
exceptMuxerUpgradeFailureaserror:
logger.debug("fail to upgrade mux for peer %s", peer_id)
awaitsecured_conn.close()
raiseMuxerUpgradeFailure(
f"fail to upgrade mux for peer {peer_id}"
) fromerror
Configuration Options
Global Configuration
# In libp2p/stream_muxer/__init__.py or similarDEFAULT_NEGOTIATE_TIMEOUT=60.0# secondsDEFAULT_NEGOTIATE_RETRIES=0DEFAULT_NEGOTIATE_RETRY_DELAY=0.1
Per-Instance Configuration
# Example usagemuxer_options= {
"/mplex/6.7.0": Mplex,
"/yamux/1.0.0": Yamux,
}
# With timeout configurationmuxer_multistream=MuxerMultistream(
muxer_options,
negotiate_timeout=30.0# 30 second timeout
)
# Or with advanced optionsnegotiate_options=NegotiateOptions(
timeout=30.0,
retries=2,
retry_delay=0.5,
exponential_backoff=True
)
muxer_multistream=MuxerMultistream(muxer_options, negotiate_options)
Impact Analysis
Positive Impacts
Reliability: Prevents hanging connections during negotiation
Resource Management: Ensures connections are properly closed on timeout
User Experience: Provides predictable behavior with timeouts
Network Resilience: Handles slow or unresponsive peers gracefully
Debugging: Clear error messages for timeout scenarios
Potential Drawbacks
API Changes: May require updates to existing code
Performance: Minimal overhead from timeout checking
Configuration Complexity: Additional parameters to manage
Backward Compatibility: Need to ensure existing code continues to work
Compatibility Considerations
Existing Code: Should work without changes using default timeout
Error Handling: Integrate with existing exception hierarchy
Configuration: Provide sensible defaults
Testing Requirements
Unit Tests
@pytest.mark.trioasyncdeftest_negotiate_timeout():
"""Test that negotiation times out correctly."""muxer=MuxerMultistream({}, negotiate_timeout=0.1)
# Create a slow communicator that never respondsclassSlowCommunicator:
asyncdefread(self):
awaittrio.sleep(1.0) # Longer than timeoutreturn"test"asyncdefwrite(self, data):
passcommunicator=SlowCommunicator()
withpytest.raises(MuxerUpgradeFailure, match="timed out"):
awaitmuxer.select_transport(mock_connection)
@pytest.mark.trioasyncdeftest_negotiate_success_with_timeout():
"""Test that negotiation succeeds within timeout."""muxer=MuxerMultistream({"/test/1.0.0": MockTransport}, negotiate_timeout=1.0)
# Test successful negotiationresult=awaitmuxer.select_transport(mock_connection)
assertresult==MockTransport@pytest.mark.trioasyncdeftest_negotiate_retry_logic():
"""Test retry logic with exponential backoff."""options=NegotiateOptions(timeout=0.1, retries=2, retry_delay=0.1)
muxer=MuxerMultistream({}, negotiate_options=options)
# Test that retries are attemptedwithpytest.raises(MuxerUpgradeFailure):
awaitmuxer.select_transport(mock_connection)
Integration Tests
@pytest.mark.trioasyncdeftest_muxer_timeout_integration():
"""Test timeout behavior in full connection flow."""# Create hosts with different timeout settingshost_fast=new_host(muxer_timeout=0.1)
host_slow=new_host(muxer_timeout=5.0)
# Test connection behavior with timeouts# ... implementation details
Performance Tests
@pytest.mark.trioasyncdeftest_timeout_performance():
"""Test that timeout adds minimal overhead."""importtimemuxer=MuxerMultistream({}, negotiate_timeout=60.0)
start_time=time.time()
# Perform negotiationend_time=time.time()
# Verify overhead is minimalassert (end_time-start_time) <0.1# Less than 100ms overhead
Implementation Strategy
Phase 1: Basic Timeout Implementation
Add timeout parameter to constructor
Implement basic timeout logic using trio.move_on_after
Add MuxerUpgradeFailure exception
Update select_transport and new_conn methods
Phase 2: Enhanced Features
Add retry logic with exponential backoff
Implement configurable timeout options
Add logging for timeout scenarios
Create comprehensive test suite
Phase 3: Integration and Documentation
Update transport upgrader integration
Add configuration examples
Update documentation
Performance testing and optimization
Phase 4: Advanced Features (Optional)
Context-based timeout management
Per-protocol timeout configuration
Timeout metrics and monitoring
Advanced retry strategies
Files That Need Updates
Core Implementation
libp2p/stream_muxer/muxer_multistream.py - Main implementation
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
Uh oh!
There was an error while loading. Please reload this page.
-
TODO Analysis: MuxerMultistream Negotiate Timeout
Overview
This document analyzes the FIXME issue in the muxer_multistream.py module:
Issue: Missing negotiate timeout in
MuxerMultistream
File:
libp2p/stream_muxer/muxer_multistream.py:34
FIXME Comment:
# FIXME: add negotiate timeout to MuxerMultistream
Issue Analysis
Current Status
Current Implementation
The
MuxerMultistream
class has a constant defined but no timeout implementation:The Problem
What Must Be Done
Required Changes
Implementation Options
Option A: Simple Timeout with Constructor Parameter
Option B: Advanced Timeout with Retry Logic
Exception Handling
Required Exception Class
Integration with Existing Exceptions
The timeout should integrate with existing exception handling in the transport upgrader:
Configuration Options
Global Configuration
Per-Instance Configuration
Impact Analysis
Positive Impacts
Potential Drawbacks
Compatibility Considerations
Testing Requirements
Unit Tests
Integration Tests
Performance Tests
Implementation Strategy
Phase 1: Basic Timeout Implementation
trio.move_on_after
MuxerUpgradeFailure
exceptionselect_transport
andnew_conn
methodsPhase 2: Enhanced Features
Phase 3: Integration and Documentation
Phase 4: Advanced Features (Optional)
Files That Need Updates
Core Implementation
libp2p/stream_muxer/muxer_multistream.py
- Main implementationlibp2p/transport/exceptions.py
- AddMuxerUpgradeFailure
exceptionIntegration Points
libp2p/transport/upgrader.py
- Handle new exception typelibp2p/host/basic_host.py
- Pass timeout configurationlibp2p/__init__.py
- Export new exception and configurationTests
tests/core/stream_muxer/test_muxer_multistream.py
- Unit teststests/core/transport/test_upgrader.py
- Integration teststests/core/host/test_basic_host.py
- Host integration testsDocumentation
docs/stream_muxer.rst
- Update documentationexamples/
- Add timeout configuration examplesSummary and Recommendations
Recommended Approach: Option A (Simple Timeout)
Rationale:
Implementation Priority: High
Reasons:
Configuration Recommendations
Testing Strategy
Documentation Needs
Beta Was this translation helpful? Give feedback.
All reactions