Stagger send

This commit is contained in:
Tanguy 2023-03-24 10:18:15 +01:00
parent bac754e2ad
commit 9b11fa7332
No known key found for this signature in database
GPG Key ID: 7DD8EC6B6CE6C45E
6 changed files with 68 additions and 4 deletions

View File

@ -25,6 +25,7 @@ import ./pubsub,
./rpc/[messages, message], ./rpc/[messages, message],
../protocol, ../protocol,
../../stream/connection, ../../stream/connection,
../../utils/semaphore,
../../peerinfo, ../../peerinfo,
../../peerid, ../../peerid,
../../utility, ../../utility,
@ -268,6 +269,7 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
respControl.iwant.add(iwant) respControl.iwant.add(iwant)
respControl.prune.add(g.handleGraft(peer, control.graft)) respControl.prune.add(g.handleGraft(peer, control.graft))
let messages = g.handleIWant(peer, control.iwant) let messages = g.handleIWant(peer, control.iwant)
g.handleDontSend(peer, control.dontSend)
if if
respControl.prune.len > 0 or respControl.prune.len > 0 or
@ -332,14 +334,36 @@ proc validateAndRelay(g: GossipSub,
g.floodsub.withValue(t, peers): toSendPeers.incl(peers[]) g.floodsub.withValue(t, peers): toSendPeers.incl(peers[])
g.mesh.withValue(t, peers): toSendPeers.incl(peers[]) g.mesh.withValue(t, peers): toSendPeers.incl(peers[])
if msg.data.len >= g.parameters.lazyPushThreshold:
g.broadcast(toSendPeers, RPCMsg(control: some(ControlMessage(dontSend: @[ControlIHave(messageIds: @[msgId])]))))
# Don't send it to source peer, or peers that # Don't send it to source peer, or peers that
# sent it during validation # sent it during validation
toSendPeers.excl(peer) toSendPeers.excl(peer)
toSendPeers.excl(seenPeers) toSendPeers.excl(seenPeers)
if msg.data.len < g.parameters.lazyPushThreshold:
# In theory, if topics are the same in all messages, we could batch - we'd # In theory, if topics are the same in all messages, we could batch - we'd
# also have to be careful to only include validated messages # also have to be careful to only include validated messages
g.broadcast(toSendPeers, RPCMsg(messages: @[msg])) g.broadcast(toSendPeers, RPCMsg(messages: @[msg]))
else:
let sem = newAsyncSemaphore(1)
var peers = toSeq(toSendPeers)
g.rng.shuffle(peers)
proc sendToOne(p: PubSubPeer) {.async.} =
await sem.acquire()
defer: sem.release()
let fut = p.gotMsgs.mgetOrPut(msgId, newFuture[void]())
if fut.completed: return
g.broadcast(@[p], RPCMsg(messages: @[msg]))
await fut or sleepAsync(200.milliseconds)
if not fut.completed:
echo g.switch.peerInfo.peerId, ": timeout from ", p.peerId
for p in peers:
asyncSpawn sendToOne(p)
trace "forwared message to peers", peers = toSendPeers.len, msgId, peer trace "forwared message to peers", peers = toSendPeers.len, msgId, peer
for topic in msg.topicIds: for topic in msg.topicIds:
if topic notin g.topics: continue if topic notin g.topics: continue
@ -563,7 +587,28 @@ method publish*(g: GossipSub,
g.mcache.put(msgId, msg) g.mcache.put(msgId, msg)
if data.len < g.parameters.lazyPushThreshold:
g.broadcast(peers, RPCMsg(messages: @[msg])) g.broadcast(peers, RPCMsg(messages: @[msg]))
else:
g.broadcast(peers, RPCMsg(control: some(ControlMessage(dontSend: @[ControlIHave(messageIds: @[msgId])]))))
var peersSeq = toSeq(peers)
g.rng.shuffle(peersSeq)
let sem = newAsyncSemaphore(1)
proc sendToOne(p: PubSubPeer) {.async.} =
await sem.acquire()
defer: sem.release()
let fut = p.gotMsgs.mgetOrPut(msgId, newFuture[void]())
if fut.completed: return
g.broadcast(@[p], RPCMsg(messages: @[msg]))
await fut or sleepAsync(200.milliseconds)
if not fut.completed:
echo g.switch.peerInfo.peerId, ": timeout from ", p.peerId
for p in peersSeq:
asyncSpawn sendToOne(p)
if g.knownTopics.contains(topic): if g.knownTopics.contains(topic):
libp2p_pubsub_messages_published.inc(peers.len.int64, labelValues = [topic]) libp2p_pubsub_messages_published.inc(peers.len.int64, labelValues = [topic])

View File

@ -245,6 +245,15 @@ proc handlePrune*(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) {.r
for handler in g.routingRecordsHandler: for handler in g.routingRecordsHandler:
handler(peer.peerId, topic, routingRecords) handler(peer.peerId, topic, routingRecords)
proc handleDontSend*(g: GossipSub,
peer: PubSubPeer,
dontSend: seq[ControlIHave]) {.raises: [Defect].} =
for ds in dontSend:
for x in ds.messageIds:
let fut = peer.gotMsgs.mgetOrPut(x, newFuture[void]())
if not fut.completed:
fut.complete()
proc handleIHave*(g: GossipSub, proc handleIHave*(g: GossipSub,
peer: PubSubPeer, peer: PubSubPeer,
ihaves: seq[ControlIHave]): ControlIWant {.raises: [Defect].} = ihaves: seq[ControlIHave]): ControlIWant {.raises: [Defect].} =

View File

@ -117,6 +117,8 @@ type
dOut*: int dOut*: int
dLazy*: int dLazy*: int
lazyPushThreshold*: int
heartbeatInterval*: Duration heartbeatInterval*: Duration
historyLength*: int historyLength*: int

View File

@ -60,6 +60,7 @@ type
peerId*: PeerId peerId*: PeerId
handler*: RPCHandler handler*: RPCHandler
observers*: ref seq[PubSubObserver] # ref as in smart_ptr observers*: ref seq[PubSubObserver] # ref as in smart_ptr
gotMsgs*: Table[MessageId, Future[void]]
score*: float64 score*: float64
iWantBudget*: int iWantBudget*: int

View File

@ -42,6 +42,7 @@ type
ControlMessage* = object ControlMessage* = object
ihave*: seq[ControlIHave] ihave*: seq[ControlIHave]
dontSend*: seq[ControlIHave]
iwant*: seq[ControlIWant] iwant*: seq[ControlIWant]
graft*: seq[ControlGraft] graft*: seq[ControlGraft]
prune*: seq[ControlPrune] prune*: seq[ControlPrune]

View File

@ -90,6 +90,8 @@ proc write*(pb: var ProtoBuffer, field: int, control: ControlMessage) =
ipb.write(3, graft) ipb.write(3, graft)
for prune in control.prune: for prune in control.prune:
ipb.write(4, prune) ipb.write(4, prune)
for ihave in control.dontSend:
ipb.write(5, ihave)
if len(ipb.buffer) > 0: if len(ipb.buffer) > 0:
ipb.finish() ipb.finish()
pb.write(field, ipb) pb.write(field, ipb)
@ -210,6 +212,7 @@ proc decodeControl*(pb: ProtoBuffer): ProtoResult[Option[ControlMessage]] {.
var control: ControlMessage var control: ControlMessage
var cpb = initProtoBuffer(buffer) var cpb = initProtoBuffer(buffer)
var ihavepbs: seq[seq[byte]] var ihavepbs: seq[seq[byte]]
var dontsendpbs: seq[seq[byte]]
var iwantpbs: seq[seq[byte]] var iwantpbs: seq[seq[byte]]
var graftpbs: seq[seq[byte]] var graftpbs: seq[seq[byte]]
var prunepbs: seq[seq[byte]] var prunepbs: seq[seq[byte]]
@ -225,6 +228,9 @@ proc decodeControl*(pb: ProtoBuffer): ProtoResult[Option[ControlMessage]] {.
if ? cpb.getRepeatedField(4, prunepbs): if ? cpb.getRepeatedField(4, prunepbs):
for item in prunepbs: for item in prunepbs:
control.prune.add(? decodePrune(initProtoBuffer(item))) control.prune.add(? decodePrune(initProtoBuffer(item)))
if ? cpb.getRepeatedField(5, dontsendpbs):
for item in dontsendpbs:
control.dontSend.add(? decodeIHave(initProtoBuffer(item)))
trace "decodeControl: message statistics", graft_count = len(control.graft), trace "decodeControl: message statistics", graft_count = len(control.graft),
prune_count = len(control.prune), prune_count = len(control.prune),
ihave_count = len(control.ihave), ihave_count = len(control.ihave),