diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index d9b4bc2c0..2142c7c3b 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -158,8 +158,7 @@ method onNewPeer(g: GossipSub, peer: PubSubPeer) = peer.appScore = stats.appScore peer.behaviourPenalty = stats.behaviourPenalty - peer.iWantBudget = IWantPeerBudget - peer.iHaveBudget = IHavePeerBudget + peer.iHaveBudget = IHavePeerBudget method onPubSubPeerEvent*(p: GossipSub, peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe.} = case event.kind diff --git a/libp2p/protocols/pubsub/gossipsub/behavior.nim b/libp2p/protocols/pubsub/gossipsub/behavior.nim index bbdcd27e7..f711c0c7a 100644 --- a/libp2p/protocols/pubsub/gossipsub/behavior.nim +++ b/libp2p/protocols/pubsub/gossipsub/behavior.nim @@ -12,7 +12,7 @@ when (NimMajor, NimMinor) < (1, 4): else: {.push raises: [].} -import std/[tables, sequtils, sets, algorithm] +import std/[tables, sequtils, sets, algorithm, deques] import chronos, chronicles, metrics import "."/[types, scoring] import ".."/[pubsubpeer, peertable, timedcache, mcache, floodsub, pubsub] @@ -31,7 +31,7 @@ declareGauge(libp2p_gossipsub_no_peers_topics, "number of topics in mesh with no declareGauge(libp2p_gossipsub_low_peers_topics, "number of topics in mesh with at least one but below dlow peers") declareGauge(libp2p_gossipsub_healthy_peers_topics, "number of topics in mesh with at least dlow peers (but below dhigh)") declareCounter(libp2p_gossipsub_above_dhigh_condition, "number of above dhigh pruning branches ran", labels = ["topic"]) -declareSummary(libp2p_gossipsub_mcache_hit, "ratio of successful IWANT message cache lookups") +declareGauge(libp2p_gossipsub_received_iwants, "received iwants", labels = ["kind"]) proc grafted*(g: GossipSub, p: PubSubPeer, topic: string) {.raises: [Defect].} = g.withPeerStats(p.peerId) do (stats: var PeerStats): @@ -280,28 +280,30 @@ proc handleIHave*(g: GossipSub, proc handleIWant*(g: GossipSub, peer: PubSubPeer, iwants: seq[ControlIWant]): seq[Message] {.raises: [Defect].} = - var messages: seq[Message] + var + messages: seq[Message] + invalidRequests = 0 if peer.score < g.parameters.gossipThreshold: trace "iwant: ignoring low score peer", peer, score = peer.score - elif peer.iWantBudget <= 0: - trace "iwant: ignoring out of budget peer", peer, score = peer.score else: - let deIwants = iwants.deduplicate() - for iwant in deIwants: - let deIwantsMsgs = iwant.messageIds.deduplicate() - for mid in deIwantsMsgs: + for iwant in iwants: + for mid in iwant.messageIds: trace "peer sent iwant", peer, messageID = mid + # canAskIWant will only return true once for a specific message + if not peer.canAskIWant(mid): + libp2p_gossipsub_received_iwants.inc(1, labelValues=["notsent"]) + + invalidRequests.inc() + if invalidRequests > 20: + libp2p_gossipsub_received_iwants.inc(1, labelValues=["skipped"]) + return messages + continue let msg = g.mcache.get(mid) if msg.isSome: - libp2p_gossipsub_mcache_hit.observe(1) - # avoid spam - if peer.iWantBudget > 0: - messages.add(msg.get()) - dec peer.iWantBudget - else: - break + libp2p_gossipsub_received_iwants.inc(1, labelValues=["correct"]) + messages.add(msg.get()) else: - libp2p_gossipsub_mcache_hit.observe(0) + libp2p_gossipsub_received_iwants.inc(1, labelValues=["unknown"]) return messages proc commitMetrics(metrics: var MeshMetrics) {.raises: [Defect].} = @@ -624,8 +626,11 @@ proc getGossipPeers*(g: GossipSub): Table[PubSubPeer, ControlMessage] {.raises: g.rng.shuffle(allPeers) allPeers.setLen(target) + let msgIdsAsSet = ihave.messageIds.toHashSet() + for peer in allPeers: control.mgetOrPut(peer, ControlMessage()).ihave.add(ihave) + peer.sentIHaves[^1].incl(msgIdsAsSet) libp2p_gossipsub_cache_window_size.set(cacheWindowSize.int64) @@ -636,7 +641,9 @@ proc onHeartbeat(g: GossipSub) {.raises: [Defect].} = # reset IHAVE cap block: for peer in g.peers.values: - peer.iWantBudget = IWantPeerBudget + peer.sentIHaves.addFirst(default(HashSet[MessageId])) + if peer.sentIHaves.len > g.parameters.historyLength: + discard peer.sentIHaves.popLast() peer.iHaveBudget = IHavePeerBudget var meshMetrics = MeshMetrics() diff --git a/libp2p/protocols/pubsub/gossipsub/types.nim b/libp2p/protocols/pubsub/gossipsub/types.nim index 79d263444..e82b85af8 100644 --- a/libp2p/protocols/pubsub/gossipsub/types.nim +++ b/libp2p/protocols/pubsub/gossipsub/types.nim @@ -48,7 +48,6 @@ const const BackoffSlackTime* = 2 # seconds - IWantPeerBudget* = 25 # 25 messages per second ( reset every heartbeat ) IHavePeerBudget* = 10 # the max amount of IHave to expose, not by spec, but go as example # rust sigp: https://github.com/sigp/rust-libp2p/blob/f53d02bc873fef2bf52cd31e3d5ce366a41d8a8c/protocols/gossipsub/src/config.rs#L572 diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index b925b2378..cea5107ef 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -12,7 +12,7 @@ when (NimMajor, NimMinor) < (1, 4): else: {.push raises: [].} -import std/[sequtils, strutils, tables, hashes, options] +import std/[sequtils, strutils, tables, hashes, options, sets, deques] import stew/results import chronos, chronicles, nimcrypto/sha2, metrics import rpc/[messages, message, protobuf], @@ -62,7 +62,7 @@ type observers*: ref seq[PubSubObserver] # ref as in smart_ptr score*: float64 - iWantBudget*: int + sentIHaves*: Deque[HashSet[MessageId]] iHaveBudget*: int maxMessageSize: int appScore*: float64 # application specific score @@ -286,6 +286,13 @@ proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [Defect].} = asyncSpawn p.sendEncoded(encoded) +proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool = + for sentIHave in p.sentIHaves.mitems(): + if msgId in sentIHave: + sentIHave.excl(msgId) + return true + return false + proc new*( T: typedesc[PubSubPeer], peerId: PeerId, @@ -294,7 +301,7 @@ proc new*( codec: string, maxMessageSize: int): T = - T( + result = T( getConn: getConn, onEvent: onEvent, codec: codec, @@ -302,3 +309,4 @@ proc new*( connectedFut: newFuture[void](), maxMessageSize: maxMessageSize ) + result.sentIHaves.addFirst(default(HashSet[MessageId])) diff --git a/tests/pubsub/testgossipinternal.nim b/tests/pubsub/testgossipinternal.nim index a48caad8a..2e79e2355 100644 --- a/tests/pubsub/testgossipinternal.nim +++ b/tests/pubsub/testgossipinternal.nim @@ -2,7 +2,7 @@ include ../../libp2p/protocols/pubsub/gossipsub {.used.} -import options +import std/[options, deques] import stew/byteutils import ../../libp2p/builders import ../../libp2p/errors @@ -713,6 +713,7 @@ suite "GossipSub internal": let peer = gossipSub.getPubSubPeer(peerId) let id = @[0'u8, 1, 2, 3] gossipSub.mcache.put(id, Message()) + peer.sentIHaves[^1].incl(id) let msg = ControlIWant( messageIDs: @[id, id, id] )