Scrappy gossipsub lazy send implementation

This commit is contained in:
Tanguy 2023-03-10 10:58:16 +01:00
parent 8d5ea43e2b
commit cfce50aa04
No known key found for this signature in database
GPG Key ID: 7DD8EC6B6CE6C45E
3 changed files with 29 additions and 4 deletions

View File

@ -337,9 +337,25 @@ proc validateAndRelay(g: GossipSub,
toSendPeers.excl(peer)
toSendPeers.excl(seenPeers)
# In theory, if topics are the same in all messages, we could batch - we'd
# also have to be careful to only include validated messages
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]))
let lazyPushMinSize = g.parameters.lazyPushMinSize
if lazyPushMinSize > 0 and msg.data.len > lazyPushMinSize and
toSendPeers.len > g.parameters.dLow:
var stp = toSeq(toSendPeers)
g.rng.shuffle(stp)
let
toPush = stp[0 ..< g.parameters.dLow]
toIHave = stp[g.parameters.dLow .. ^1]
g.broadcast(toIHave, RPCMsg(control:
some(ControlMessage(ihave: @[
ControlIHave(topicID: msg.topicIds[0], messageIDs: @[msgId])
]))
))
g.broadcast(toPush, RPCMsg(messages: @[msg]))
else:
# In theory, if topics are the same in all messages, we could batch - we'd
# also have to be careful to only include validated messages
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]))
trace "forwared message to peers", peers = toSendPeers.len, msgId, peer
for topic in msg.topicIds:
if topic notin g.topics: continue

View File

@ -257,9 +257,11 @@ proc handleIHave*(g: GossipSub,
# also avoid duplicates here!
let deIhavesMsgs = ihave.messageIds.deduplicate()
for msgId in deIhavesMsgs:
if not g.hasSeen(msgId):
let msgTuple = (ihave.topicId, msgId)
if not g.hasSeen(msgId) and msgTuple notin g.inflightIWant:
if peer.iHaveBudget > 0:
res.messageIds.add(msgId)
g.inflightIWant.add(msgTuple)
dec peer.iHaveBudget
trace "requested message via ihave", messageID=msgId
else:
@ -267,6 +269,9 @@ proc handleIHave*(g: GossipSub,
# shuffling res.messageIDs before sending it out to increase the likelihood
# of getting an answer if the peer truncates the list due to internal size restrictions.
g.rng.shuffle(res.messageIds)
if g.inflightIWant.len > 2000:
g.inflightIWant = g.inflightIWant[1000..^1]
return res
proc handleIWant*(g: GossipSub,

View File

@ -117,6 +117,8 @@ type
dOut*: int
dLazy*: int
lazyPushMinSize*: int
heartbeatInterval*: Duration
historyLength*: int
@ -170,6 +172,8 @@ type
scoringHeartbeatFut*: Future[void] # cancellation future for scoring heartbeat interval
heartbeatRunning*: bool
inflightIWant*: seq[(string, MessageId)]
peerStats*: Table[PeerId, PeerStats]
parameters*: GossipSubParams
topicParams*: Table[string, TopicParams]