Skip to content

Commit 93169cc

Browse files
jxsdariusc93mergify[bot]
authored
fix(gossipsub): Attempt to publish to at least mesh_n peers (#5578)
## Description With flood published disabled we've noticed that it can be the case that we have connected peers on topics but these peers are not in our mesh (perhaps due to their own mesh requirements). Currently, we fail to publish the message if there are no peers in our mesh. This PR adjusts this logic to always attempt to publish to at least mesh_n peers. If we have peers that are subscribed to a topic, we will now attempt to publish messages to them (provided they have the required score). This PR also simplies the peer and respective topics by moving the topic list each peer has subscribed to `PeerConnections` and removing both `peer_topics` and `topic_peers` from the main `Behaviour`. Per commit review is suggested. --------- Co-authored-by: Darius Clark <[email protected]> Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
1 parent f0589c8 commit 93169cc

File tree

8 files changed

+242
-288
lines changed

8 files changed

+242
-288
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ libp2p-core = { version = "0.42.0", path = "core" }
8383
libp2p-dcutr = { version = "0.12.0", path = "protocols/dcutr" }
8484
libp2p-dns = { version = "0.42.0", path = "transports/dns" }
8585
libp2p-floodsub = { version = "0.45.0", path = "protocols/floodsub" }
86-
libp2p-gossipsub = { version = "0.47.0", path = "protocols/gossipsub" }
86+
libp2p-gossipsub = { version = "0.47.1", path = "protocols/gossipsub" }
8787
libp2p-identify = { version = "0.45.0", path = "protocols/identify" }
8888
libp2p-identity = { version = "0.2.9" }
8989
libp2p-kad = { version = "0.47.0", path = "protocols/kad" }

protocols/gossipsub/CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
## 0.47.1
2+
3+
- Attempt to publish to at least mesh_n peers when flood publish is disabled.
4+
See [PR 5578](https://github.com/libp2p/rust-libp2p/pull/5578).
5+
16
## 0.47.0
27

38
<!-- Update to libp2p-swarm v0.45.0 -->

protocols/gossipsub/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ name = "libp2p-gossipsub"
33
edition = "2021"
44
rust-version = { workspace = true }
55
description = "Gossipsub protocol for libp2p"
6-
version = "0.47.0"
6+
version = "0.47.1"
77
authors = ["Age Manning <[email protected]>"]
88
license = "MIT"
99
repository = "https://github.com/libp2p/rust-libp2p"

protocols/gossipsub/src/behaviour.rs

Lines changed: 167 additions & 232 deletions
Large diffs are not rendered by default.

protocols/gossipsub/src/behaviour/tests.rs

Lines changed: 54 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -440,7 +440,9 @@ fn test_unsubscribe() {
440440

441441
for topic_hash in &topic_hashes {
442442
assert!(
443-
gs.topic_peers.contains_key(topic_hash),
443+
gs.connected_peers
444+
.values()
445+
.any(|p| p.topics.contains(topic_hash)),
444446
"Topic_peers contain a topic entry"
445447
);
446448
assert!(
@@ -629,8 +631,11 @@ fn test_publish_without_flood_publishing() {
629631

630632
// all peers should be subscribed to the topic
631633
assert_eq!(
632-
gs.topic_peers.get(&topic_hashes[0]).map(|p| p.len()),
633-
Some(20),
634+
gs.connected_peers
635+
.values()
636+
.filter(|p| p.topics.contains(&topic_hashes[0]))
637+
.count(),
638+
20,
634639
"Peers should be subscribed to the topic"
635640
);
636641

@@ -669,8 +674,8 @@ fn test_publish_without_flood_publishing() {
669674
let config: Config = Config::default();
670675
assert_eq!(
671676
publishes.len(),
672-
config.mesh_n_low(),
673-
"Should send a publish message to all known peers"
677+
config.mesh_n(),
678+
"Should send a publish message to at least mesh_n peers"
674679
);
675680

676681
assert!(
@@ -809,9 +814,9 @@ fn test_inject_connected() {
809814

810815
// should add the new peers to `peer_topics` with an empty vec as a gossipsub node
811816
for peer in peers {
812-
let known_topics = gs.peer_topics.get(&peer).unwrap();
817+
let peer = gs.connected_peers.get(&peer).unwrap();
813818
assert!(
814-
known_topics == &topic_hashes.iter().cloned().collect(),
819+
peer.topics == topic_hashes.iter().cloned().collect(),
815820
"The topics for each node should all topics"
816821
);
817822
}
@@ -860,24 +865,39 @@ fn test_handle_received_subscriptions() {
860865

861866
// verify the result
862867

863-
let peer_topics = gs.peer_topics.get(&peers[0]).unwrap().clone();
868+
let peer = gs.connected_peers.get(&peers[0]).unwrap();
864869
assert!(
865-
peer_topics == topic_hashes.iter().take(3).cloned().collect(),
870+
peer.topics
871+
== topic_hashes
872+
.iter()
873+
.take(3)
874+
.cloned()
875+
.collect::<BTreeSet<_>>(),
866876
"First peer should be subscribed to three topics"
867877
);
868-
let peer_topics = gs.peer_topics.get(&peers[1]).unwrap().clone();
878+
let peer1 = gs.connected_peers.get(&peers[1]).unwrap();
869879
assert!(
870-
peer_topics == topic_hashes.iter().take(3).cloned().collect(),
880+
peer1.topics
881+
== topic_hashes
882+
.iter()
883+
.take(3)
884+
.cloned()
885+
.collect::<BTreeSet<_>>(),
871886
"Second peer should be subscribed to three topics"
872887
);
873888

874889
assert!(
875-
!gs.peer_topics.contains_key(&unknown_peer),
890+
!gs.connected_peers.contains_key(&unknown_peer),
876891
"Unknown peer should not have been added"
877892
);
878893

879894
for topic_hash in topic_hashes[..3].iter() {
880-
let topic_peers = gs.topic_peers.get(topic_hash).unwrap().clone();
895+
let topic_peers = gs
896+
.connected_peers
897+
.iter()
898+
.filter(|(_, p)| p.topics.contains(topic_hash))
899+
.map(|(peer_id, _)| *peer_id)
900+
.collect::<BTreeSet<PeerId>>();
881901
assert!(
882902
topic_peers == peers[..2].iter().cloned().collect(),
883903
"Two peers should be added to the first three topics"
@@ -894,13 +914,21 @@ fn test_handle_received_subscriptions() {
894914
&peers[0],
895915
);
896916

897-
let peer_topics = gs.peer_topics.get(&peers[0]).unwrap().clone();
898-
assert!(
899-
peer_topics == topic_hashes[1..3].iter().cloned().collect(),
917+
let peer = gs.connected_peers.get(&peers[0]).unwrap().clone();
918+
assert_eq!(
919+
peer.topics,
920+
topic_hashes[1..3].iter().cloned().collect::<BTreeSet<_>>(),
900921
"Peer should be subscribed to two topics"
901922
);
902923

903-
let topic_peers = gs.topic_peers.get(&topic_hashes[0]).unwrap().clone(); // only gossipsub at the moment
924+
// only gossipsub at the moment
925+
let topic_peers = gs
926+
.connected_peers
927+
.iter()
928+
.filter(|(_, p)| p.topics.contains(&topic_hashes[0]))
929+
.map(|(peer_id, _)| *peer_id)
930+
.collect::<BTreeSet<PeerId>>();
931+
904932
assert!(
905933
topic_peers == peers[1..2].iter().cloned().collect(),
906934
"Only the second peers should be in the first topic"
@@ -924,9 +952,8 @@ fn test_get_random_peers() {
924952
for _ in 0..20 {
925953
peers.push(PeerId::random())
926954
}
927-
928-
gs.topic_peers
929-
.insert(topic_hash.clone(), peers.iter().cloned().collect());
955+
let mut topics = BTreeSet::new();
956+
topics.insert(topic_hash.clone());
930957

931958
gs.connected_peers = peers
932959
.iter()
@@ -936,52 +963,32 @@ fn test_get_random_peers() {
936963
PeerConnections {
937964
kind: PeerKind::Gossipsubv1_1,
938965
connections: vec![ConnectionId::new_unchecked(0)],
966+
topics: topics.clone(),
939967
},
940968
)
941969
})
942970
.collect();
943971

944-
let random_peers =
945-
get_random_peers(&gs.topic_peers, &gs.connected_peers, &topic_hash, 5, |_| {
946-
true
947-
});
972+
let random_peers = get_random_peers(&gs.connected_peers, &topic_hash, 5, |_| true);
948973
assert_eq!(random_peers.len(), 5, "Expected 5 peers to be returned");
949-
let random_peers = get_random_peers(
950-
&gs.topic_peers,
951-
&gs.connected_peers,
952-
&topic_hash,
953-
30,
954-
|_| true,
955-
);
974+
let random_peers = get_random_peers(&gs.connected_peers, &topic_hash, 30, |_| true);
956975
assert!(random_peers.len() == 20, "Expected 20 peers to be returned");
957976
assert!(
958977
random_peers == peers.iter().cloned().collect(),
959978
"Expected no shuffling"
960979
);
961-
let random_peers = get_random_peers(
962-
&gs.topic_peers,
963-
&gs.connected_peers,
964-
&topic_hash,
965-
20,
966-
|_| true,
967-
);
980+
let random_peers = get_random_peers(&gs.connected_peers, &topic_hash, 20, |_| true);
968981
assert!(random_peers.len() == 20, "Expected 20 peers to be returned");
969982
assert!(
970983
random_peers == peers.iter().cloned().collect(),
971984
"Expected no shuffling"
972985
);
973-
let random_peers =
974-
get_random_peers(&gs.topic_peers, &gs.connected_peers, &topic_hash, 0, |_| {
975-
true
976-
});
986+
let random_peers = get_random_peers(&gs.connected_peers, &topic_hash, 0, |_| true);
977987
assert!(random_peers.is_empty(), "Expected 0 peers to be returned");
978988
// test the filter
979-
let random_peers =
980-
get_random_peers(&gs.topic_peers, &gs.connected_peers, &topic_hash, 5, |_| {
981-
false
982-
});
989+
let random_peers = get_random_peers(&gs.connected_peers, &topic_hash, 5, |_| false);
983990
assert!(random_peers.is_empty(), "Expected 0 peers to be returned");
984-
let random_peers = get_random_peers(&gs.topic_peers, &gs.connected_peers, &topic_hash, 10, {
991+
let random_peers = get_random_peers(&gs.connected_peers, &topic_hash, 10, {
985992
|peer| peers.contains(peer)
986993
});
987994
assert!(random_peers.len() == 10, "Expected 10 peers to be returned");

protocols/gossipsub/src/metrics.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -355,12 +355,17 @@ impl Metrics {
355355
}
356356
}
357357

358-
/// Register how many peers do we known are subscribed to this topic.
359-
pub(crate) fn set_topic_peers(&mut self, topic: &TopicHash, count: usize) {
358+
/// Increase the number of peers that are subscribed to this topic.
359+
pub(crate) fn inc_topic_peers(&mut self, topic: &TopicHash) {
360360
if self.register_topic(topic).is_ok() {
361-
self.topic_peers_count
362-
.get_or_create(topic)
363-
.set(count as i64);
361+
self.topic_peers_count.get_or_create(topic).inc();
362+
}
363+
}
364+
365+
/// Decrease the number of peers that are subscribed to this topic.
366+
pub(crate) fn dec_topic_peers(&mut self, topic: &TopicHash) {
367+
if self.register_topic(topic).is_ok() {
368+
self.topic_peers_count.get_or_create(topic).dec();
364369
}
365370
}
366371

protocols/gossipsub/src/types.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ use libp2p_identity::PeerId;
2424
use libp2p_swarm::ConnectionId;
2525
use prometheus_client::encoding::EncodeLabelValue;
2626
use quick_protobuf::MessageWrite;
27-
use std::fmt;
2827
use std::fmt::Debug;
28+
use std::{collections::BTreeSet, fmt};
2929

3030
use crate::rpc_proto::proto;
3131
#[cfg(feature = "serde")]
@@ -77,6 +77,8 @@ pub(crate) struct PeerConnections {
7777
pub(crate) kind: PeerKind,
7878
/// Its current connections.
7979
pub(crate) connections: Vec<ConnectionId>,
80+
/// Subscribed topics.
81+
pub(crate) topics: BTreeSet<TopicHash>,
8082
}
8183

8284
/// Describes the types of peers that can exist in the gossipsub context.

0 commit comments

Comments
 (0)