GossipSub: Better IWANT handling (#875)

This commit is contained in:
Tanguy 2023-04-03 10:56:20 +02:00 committed by GitHub
parent 53b060f8f0
commit 6b61ce8c91
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 39 additions and 25 deletions

View File

@ -158,8 +158,7 @@ method onNewPeer(g: GossipSub, peer: PubSubPeer) =
peer.appScore = stats.appScore peer.appScore = stats.appScore
peer.behaviourPenalty = stats.behaviourPenalty peer.behaviourPenalty = stats.behaviourPenalty
peer.iWantBudget = IWantPeerBudget peer.iHaveBudget = IHavePeerBudget
peer.iHaveBudget = IHavePeerBudget
method onPubSubPeerEvent*(p: GossipSub, peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe.} = method onPubSubPeerEvent*(p: GossipSub, peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe.} =
case event.kind case event.kind

View File

@ -12,7 +12,7 @@ when (NimMajor, NimMinor) < (1, 4):
else: else:
{.push raises: [].} {.push raises: [].}
import std/[tables, sequtils, sets, algorithm] import std/[tables, sequtils, sets, algorithm, deques]
import chronos, chronicles, metrics import chronos, chronicles, metrics
import "."/[types, scoring] import "."/[types, scoring]
import ".."/[pubsubpeer, peertable, timedcache, mcache, floodsub, pubsub] import ".."/[pubsubpeer, peertable, timedcache, mcache, floodsub, pubsub]
@ -31,7 +31,7 @@ declareGauge(libp2p_gossipsub_no_peers_topics, "number of topics in mesh with no
declareGauge(libp2p_gossipsub_low_peers_topics, "number of topics in mesh with at least one but below dlow peers") declareGauge(libp2p_gossipsub_low_peers_topics, "number of topics in mesh with at least one but below dlow peers")
declareGauge(libp2p_gossipsub_healthy_peers_topics, "number of topics in mesh with at least dlow peers (but below dhigh)") declareGauge(libp2p_gossipsub_healthy_peers_topics, "number of topics in mesh with at least dlow peers (but below dhigh)")
declareCounter(libp2p_gossipsub_above_dhigh_condition, "number of above dhigh pruning branches ran", labels = ["topic"]) declareCounter(libp2p_gossipsub_above_dhigh_condition, "number of above dhigh pruning branches ran", labels = ["topic"])
declareSummary(libp2p_gossipsub_mcache_hit, "ratio of successful IWANT message cache lookups") declareGauge(libp2p_gossipsub_received_iwants, "received iwants", labels = ["kind"])
proc grafted*(g: GossipSub, p: PubSubPeer, topic: string) {.raises: [Defect].} = proc grafted*(g: GossipSub, p: PubSubPeer, topic: string) {.raises: [Defect].} =
g.withPeerStats(p.peerId) do (stats: var PeerStats): g.withPeerStats(p.peerId) do (stats: var PeerStats):
@ -280,28 +280,30 @@ proc handleIHave*(g: GossipSub,
proc handleIWant*(g: GossipSub, proc handleIWant*(g: GossipSub,
peer: PubSubPeer, peer: PubSubPeer,
iwants: seq[ControlIWant]): seq[Message] {.raises: [Defect].} = iwants: seq[ControlIWant]): seq[Message] {.raises: [Defect].} =
var messages: seq[Message] var
messages: seq[Message]
invalidRequests = 0
if peer.score < g.parameters.gossipThreshold: if peer.score < g.parameters.gossipThreshold:
trace "iwant: ignoring low score peer", peer, score = peer.score trace "iwant: ignoring low score peer", peer, score = peer.score
elif peer.iWantBudget <= 0:
trace "iwant: ignoring out of budget peer", peer, score = peer.score
else: else:
let deIwants = iwants.deduplicate() for iwant in iwants:
for iwant in deIwants: for mid in iwant.messageIds:
let deIwantsMsgs = iwant.messageIds.deduplicate()
for mid in deIwantsMsgs:
trace "peer sent iwant", peer, messageID = mid trace "peer sent iwant", peer, messageID = mid
# canAskIWant will only return true once for a specific message
if not peer.canAskIWant(mid):
libp2p_gossipsub_received_iwants.inc(1, labelValues=["notsent"])
invalidRequests.inc()
if invalidRequests > 20:
libp2p_gossipsub_received_iwants.inc(1, labelValues=["skipped"])
return messages
continue
let msg = g.mcache.get(mid) let msg = g.mcache.get(mid)
if msg.isSome: if msg.isSome:
libp2p_gossipsub_mcache_hit.observe(1) libp2p_gossipsub_received_iwants.inc(1, labelValues=["correct"])
# avoid spam messages.add(msg.get())
if peer.iWantBudget > 0:
messages.add(msg.get())
dec peer.iWantBudget
else:
break
else: else:
libp2p_gossipsub_mcache_hit.observe(0) libp2p_gossipsub_received_iwants.inc(1, labelValues=["unknown"])
return messages return messages
proc commitMetrics(metrics: var MeshMetrics) {.raises: [Defect].} = proc commitMetrics(metrics: var MeshMetrics) {.raises: [Defect].} =
@ -624,8 +626,11 @@ proc getGossipPeers*(g: GossipSub): Table[PubSubPeer, ControlMessage] {.raises:
g.rng.shuffle(allPeers) g.rng.shuffle(allPeers)
allPeers.setLen(target) allPeers.setLen(target)
let msgIdsAsSet = ihave.messageIds.toHashSet()
for peer in allPeers: for peer in allPeers:
control.mgetOrPut(peer, ControlMessage()).ihave.add(ihave) control.mgetOrPut(peer, ControlMessage()).ihave.add(ihave)
peer.sentIHaves[^1].incl(msgIdsAsSet)
libp2p_gossipsub_cache_window_size.set(cacheWindowSize.int64) libp2p_gossipsub_cache_window_size.set(cacheWindowSize.int64)
@ -636,7 +641,9 @@ proc onHeartbeat(g: GossipSub) {.raises: [Defect].} =
# reset IHAVE cap # reset IHAVE cap
block: block:
for peer in g.peers.values: for peer in g.peers.values:
peer.iWantBudget = IWantPeerBudget peer.sentIHaves.addFirst(default(HashSet[MessageId]))
if peer.sentIHaves.len > g.parameters.historyLength:
discard peer.sentIHaves.popLast()
peer.iHaveBudget = IHavePeerBudget peer.iHaveBudget = IHavePeerBudget
var meshMetrics = MeshMetrics() var meshMetrics = MeshMetrics()

View File

@ -48,7 +48,6 @@ const
const const
BackoffSlackTime* = 2 # seconds BackoffSlackTime* = 2 # seconds
IWantPeerBudget* = 25 # 25 messages per second ( reset every heartbeat )
IHavePeerBudget* = 10 IHavePeerBudget* = 10
# the max amount of IHave to expose, not by spec, but go as example # the max amount of IHave to expose, not by spec, but go as example
# rust sigp: https://github.com/sigp/rust-libp2p/blob/f53d02bc873fef2bf52cd31e3d5ce366a41d8a8c/protocols/gossipsub/src/config.rs#L572 # rust sigp: https://github.com/sigp/rust-libp2p/blob/f53d02bc873fef2bf52cd31e3d5ce366a41d8a8c/protocols/gossipsub/src/config.rs#L572

View File

@ -12,7 +12,7 @@ when (NimMajor, NimMinor) < (1, 4):
else: else:
{.push raises: [].} {.push raises: [].}
import std/[sequtils, strutils, tables, hashes, options] import std/[sequtils, strutils, tables, hashes, options, sets, deques]
import stew/results import stew/results
import chronos, chronicles, nimcrypto/sha2, metrics import chronos, chronicles, nimcrypto/sha2, metrics
import rpc/[messages, message, protobuf], import rpc/[messages, message, protobuf],
@ -62,7 +62,7 @@ type
observers*: ref seq[PubSubObserver] # ref as in smart_ptr observers*: ref seq[PubSubObserver] # ref as in smart_ptr
score*: float64 score*: float64
iWantBudget*: int sentIHaves*: Deque[HashSet[MessageId]]
iHaveBudget*: int iHaveBudget*: int
maxMessageSize: int maxMessageSize: int
appScore*: float64 # application specific score appScore*: float64 # application specific score
@ -286,6 +286,13 @@ proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [Defect].} =
asyncSpawn p.sendEncoded(encoded) asyncSpawn p.sendEncoded(encoded)
proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool =
for sentIHave in p.sentIHaves.mitems():
if msgId in sentIHave:
sentIHave.excl(msgId)
return true
return false
proc new*( proc new*(
T: typedesc[PubSubPeer], T: typedesc[PubSubPeer],
peerId: PeerId, peerId: PeerId,
@ -294,7 +301,7 @@ proc new*(
codec: string, codec: string,
maxMessageSize: int): T = maxMessageSize: int): T =
T( result = T(
getConn: getConn, getConn: getConn,
onEvent: onEvent, onEvent: onEvent,
codec: codec, codec: codec,
@ -302,3 +309,4 @@ proc new*(
connectedFut: newFuture[void](), connectedFut: newFuture[void](),
maxMessageSize: maxMessageSize maxMessageSize: maxMessageSize
) )
result.sentIHaves.addFirst(default(HashSet[MessageId]))

View File

@ -2,7 +2,7 @@ include ../../libp2p/protocols/pubsub/gossipsub
{.used.} {.used.}
import options import std/[options, deques]
import stew/byteutils import stew/byteutils
import ../../libp2p/builders import ../../libp2p/builders
import ../../libp2p/errors import ../../libp2p/errors
@ -713,6 +713,7 @@ suite "GossipSub internal":
let peer = gossipSub.getPubSubPeer(peerId) let peer = gossipSub.getPubSubPeer(peerId)
let id = @[0'u8, 1, 2, 3] let id = @[0'u8, 1, 2, 3]
gossipSub.mcache.put(id, Message()) gossipSub.mcache.put(id, Message())
peer.sentIHaves[^1].incl(id)
let msg = ControlIWant( let msg = ControlIWant(
messageIDs: @[id, id, id] messageIDs: @[id, id, id]
) )