From cfce50aa04985f573a5fdd2285ca9522f7f43d9b Mon Sep 17 00:00:00 2001 From: Tanguy Date: Fri, 10 Mar 2023 10:58:16 +0100 Subject: [PATCH] Scrappy gossipsub lazy send implementation --- libp2p/protocols/pubsub/gossipsub.nim | 22 ++++++++++++++++--- .../protocols/pubsub/gossipsub/behavior.nim | 7 +++++- libp2p/protocols/pubsub/gossipsub/types.nim | 4 ++++ 3 files changed, 29 insertions(+), 4 deletions(-) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index d9b4bc2c0..97496a706 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -337,9 +337,25 @@ proc validateAndRelay(g: GossipSub, toSendPeers.excl(peer) toSendPeers.excl(seenPeers) - # In theory, if topics are the same in all messages, we could batch - we'd - # also have to be careful to only include validated messages - g.broadcast(toSendPeers, RPCMsg(messages: @[msg])) + let lazyPushMinSize = g.parameters.lazyPushMinSize + if lazyPushMinSize > 0 and msg.data.len > lazyPushMinSize and + toSendPeers.len > g.parameters.dLow: + var stp = toSeq(toSendPeers) + g.rng.shuffle(stp) + + let + toPush = stp[0 ..< g.parameters.dLow] + toIHave = stp[g.parameters.dLow .. ^1] + g.broadcast(toIHave, RPCMsg(control: + some(ControlMessage(ihave: @[ + ControlIHave(topicID: msg.topicIds[0], messageIDs: @[msgId]) + ])) + )) + g.broadcast(toPush, RPCMsg(messages: @[msg])) + else: + # In theory, if topics are the same in all messages, we could batch - we'd + # also have to be careful to only include validated messages + g.broadcast(toSendPeers, RPCMsg(messages: @[msg])) trace "forwared message to peers", peers = toSendPeers.len, msgId, peer for topic in msg.topicIds: if topic notin g.topics: continue diff --git a/libp2p/protocols/pubsub/gossipsub/behavior.nim b/libp2p/protocols/pubsub/gossipsub/behavior.nim index 57ea160ee..37633a34c 100644 --- a/libp2p/protocols/pubsub/gossipsub/behavior.nim +++ b/libp2p/protocols/pubsub/gossipsub/behavior.nim @@ -257,9 +257,11 @@ proc handleIHave*(g: GossipSub, # also avoid duplicates here! let deIhavesMsgs = ihave.messageIds.deduplicate() for msgId in deIhavesMsgs: - if not g.hasSeen(msgId): + let msgTuple = (ihave.topicId, msgId) + if not g.hasSeen(msgId) and msgTuple notin g.inflightIWant: if peer.iHaveBudget > 0: res.messageIds.add(msgId) + g.inflightIWant.add(msgTuple) dec peer.iHaveBudget trace "requested message via ihave", messageID=msgId else: @@ -267,6 +269,9 @@ proc handleIHave*(g: GossipSub, # shuffling res.messageIDs before sending it out to increase the likelihood # of getting an answer if the peer truncates the list due to internal size restrictions. g.rng.shuffle(res.messageIds) + + if g.inflightIWant.len > 2000: + g.inflightIWant = g.inflightIWant[1000..^1] return res proc handleIWant*(g: GossipSub, diff --git a/libp2p/protocols/pubsub/gossipsub/types.nim b/libp2p/protocols/pubsub/gossipsub/types.nim index 79d263444..35cffe211 100644 --- a/libp2p/protocols/pubsub/gossipsub/types.nim +++ b/libp2p/protocols/pubsub/gossipsub/types.nim @@ -117,6 +117,8 @@ type dOut*: int dLazy*: int + lazyPushMinSize*: int + heartbeatInterval*: Duration historyLength*: int @@ -170,6 +172,8 @@ type scoringHeartbeatFut*: Future[void] # cancellation future for scoring heartbeat interval heartbeatRunning*: bool + inflightIWant*: seq[(string, MessageId)] + peerStats*: Table[PeerId, PeerStats] parameters*: GossipSubParams topicParams*: Table[string, TopicParams]