Developer notes for implemented features #739
Replies: 4 comments
-
#696 fix: added negotiate timeout to MuxerMultistreamResolved ProblemThe core issue was that during multistream-select protocol negotiations, read operations like this in the async def handshake(self, communicator: IMultiselectCommunicator) -> None:
"""
Perform handshake to agree on multiselect protocol.
:param communicator: communicator to use
:raise MultiselectError: raised when handshake failed
"""
try:
await communicator.write(MULTISELECT_PROTOCOL_ID)
except MultiselectCommunicatorError as error:
raise MultiselectError() from error
try:
handshake_contents = await communicator.read()
except MultiselectCommunicatorError as error:
raise MultiselectError() from error
if not is_valid_handshake(handshake_contents):
raise MultiselectError(
"multiselect protocol ID mismatch: "
f"received handshake_contents={handshake_contents}"
) could hang indefinitely of remote peers never responded or stalled without closing the stream. This could cause the entire connection negotiation pipeline to stall, blocking the event loop and delaying peer discovery operations. The problem occurred in three critical functions that read from partially established p2p connections:
Implementation ApproachTwo architectural approaches were considered: Option 1: Add timeout wrappers in the two Option 2: Implement timeout logic directly inside Decision: Chose Option 2 for better encapsulation and consistency. This ensures timeout behavior is guaranteed regardless of where these functions are called, avoiding code duplication. Technical ImplementationCore Timeout ImplementationUsed try:
with trio.fail_after(DEFAULT_NEGOTIATE_TIMEOUT): # 5 seconds
response = await communicator.read()
except trio.TooSlowError:
raise MultiselectClientError("protocol selection response timed out")
except MultiselectCommunicatorError as error:
raise MultiselectClientError() from error Modified Functions
Configuration ManagementMade
Testing StrategyChallenge: Simulation timeout conditions Solution: Mock Communicator approach class DummyMultiselectCommunicator(IMultiselectCommunicator):
async def write(self, msg_str: str) -> None:
"""Goes into infinite loop when .write is called"""
await trio.sleep_forever()
async def read(self) -> str:
"""Returns a dummy read"""
return "dummy_read" Test CoverageCreated comprehensive tests for all three modified functions:
Each test verifies that the appropriate timeout exceptions are raised within the expected timeframe. ResultsThis implementation successfully resolves the negotiate timeout issue by:
|
Beta Was this translation helpful? Give feedback.
-
#708 TODO: Adding Concurrency Cap to Identify Push HandlingImplements a ProblemThe core issue was that the function used an unbounded async def push_identify_to_peers(
host: IHost,
peer_ids: set[ID] | None = None,
observed_multiaddr: Multiaddr | None = None,
) -> None:
"""
Push an identify message to multiple peers in parallel.
If peer_ids is None, push to all connected peers.
"""
if peer_ids is None:
# Get all connected peers
peer_ids = set(host.get_peerstore().peer_ids())
async with trio.open_nursery() as nursery:
for peer_id in peer_ids:
nursery.start_soon(push_identify_to_peer, host, peer_id, observed_multiaddr) meaning if a node was connected to 500 or 1000 peers, it would attempt to push identify information to all of them simultaneously without any throttling mechanism. The unbounded concurrency approach caused several critical problems at scale:
The lack of throttling meant the system would "fire everything off as fast as it can," creating potential network congestion and overwhelming both local resources and remote peers. Implementation approachThe solution implemented a semaphore-based throttling mechanism using StrategyCONCURRENCY_LIMIT = 10 # Default limit
semaphore = trio.Semaphore(CONCURRENCY_LIMIT)
async def limited_push(peer_id: ID) -> None:
async with semaphore:
await push_identify_to_peer(host, peer_id, observed_multiaddr) This approach ensures that only 10 concurrent identify pushes run at any given time while the rest wait their turn. Technicals
async def limited_push(peer_id: ID) -> None:
async with semaphore:
if counter is not None and lock is not None:
async with lock:
counter["current"] += 1
counter["max"] = max(counter["max"], counter["current"])
try:
await push_identify_to_peer(host, peer_id, observed_multiaddr)
finally:
if counter is not None and lock is not None:
async with lock:
counter["current"] -= 1
Testing Strategy
ResultsImmediate Benefits
Long-term Advantages
|
Beta Was this translation helpful? Give feedback.
-
#648 Matching py-libp2p <-> go-libp2p PeerStore ImplementationResolved the issue #251 Match ProblemBefore this PR, py-libp2p shipped only a skeletal PeerStore:
Implementation details
Key Design Decisions
|
Beta Was this translation helpful? Give feedback.
-
#753 Certified-Addr-Book interface for Peer-Store:This post will referencing on how the signed-peer-record transfer is integrated in the In this PR an optional ![]() After this, while creating the identify message to be sent in the ![]() After this, in ![]() The Peer record transfer can be checked out with identify-demo example: ![]() This is how the peer-record transfer has been integrated with the Identify protocol. |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
🛠️ Developer Notes for Implemented Features
This thread serves as a shared space for py-libp2p contributors to leave notes, insights, and context about the issues, features, and fixes they work on.
Whether it's:
A workaround for an unexpected bug 🐛
A design decision you debated 💭
A test setup that helped debug a tricky scenario 🧪
Or just the steps you followed to resolve a TODO item ✅
Please document it here. Think of this as a living journal of the dev experience — your notes might save someone else hours of head-scratching.
📚 Why this matters
It helps newcomers learn how to approach issues in this codebase
It builds a transparent understanding of why certain changes were made
It captures tribal knowledge that usually gets lost in commit messages
📝 How to contribute
When you finish a PR or solve an issue:
Briefly summarize the problem and your approach
Mention any gotchas or alternative paths you explored
Link to the relevant PR/issue
Beta Was this translation helpful? Give feedback.
All reactions