diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index d9b4bc2c0..261cdf06c 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -25,6 +25,7 @@ import ./pubsub, ./rpc/[messages, message], ../protocol, ../../stream/connection, + ../../utils/semaphore, ../../peerinfo, ../../peerid, ../../utility, @@ -268,6 +269,7 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) = respControl.iwant.add(iwant) respControl.prune.add(g.handleGraft(peer, control.graft)) let messages = g.handleIWant(peer, control.iwant) + g.handleDontSend(peer, control.dontSend) if respControl.prune.len > 0 or @@ -332,14 +334,36 @@ proc validateAndRelay(g: GossipSub, g.floodsub.withValue(t, peers): toSendPeers.incl(peers[]) g.mesh.withValue(t, peers): toSendPeers.incl(peers[]) + if msg.data.len >= g.parameters.lazyPushThreshold: + g.broadcast(toSendPeers, RPCMsg(control: some(ControlMessage(dontSend: @[ControlIHave(messageIds: @[msgId])])))) + # Don't send it to source peer, or peers that # sent it during validation toSendPeers.excl(peer) toSendPeers.excl(seenPeers) - # In theory, if topics are the same in all messages, we could batch - we'd - # also have to be careful to only include validated messages - g.broadcast(toSendPeers, RPCMsg(messages: @[msg])) + if msg.data.len < g.parameters.lazyPushThreshold: + # In theory, if topics are the same in all messages, we could batch - we'd + # also have to be careful to only include validated messages + g.broadcast(toSendPeers, RPCMsg(messages: @[msg])) + else: + let sem = newAsyncSemaphore(1) + var peers = toSeq(toSendPeers) + g.rng.shuffle(peers) + + proc sendToOne(p: PubSubPeer) {.async.} = + await sem.acquire() + defer: sem.release() + + let fut = p.gotMsgs.mgetOrPut(msgId, newFuture[void]()) + if fut.completed: return + g.broadcast(@[p], RPCMsg(messages: @[msg])) + await fut or sleepAsync(200.milliseconds) + if not fut.completed: + echo g.switch.peerInfo.peerId, ": timeout from ", p.peerId + + for p in peers: + asyncSpawn sendToOne(p) trace "forwared message to peers", peers = toSendPeers.len, msgId, peer for topic in msg.topicIds: if topic notin g.topics: continue @@ -563,7 +587,28 @@ method publish*(g: GossipSub, g.mcache.put(msgId, msg) - g.broadcast(peers, RPCMsg(messages: @[msg])) + if data.len < g.parameters.lazyPushThreshold: + g.broadcast(peers, RPCMsg(messages: @[msg])) + else: + g.broadcast(peers, RPCMsg(control: some(ControlMessage(dontSend: @[ControlIHave(messageIds: @[msgId])])))) + + var peersSeq = toSeq(peers) + g.rng.shuffle(peersSeq) + + let sem = newAsyncSemaphore(1) + proc sendToOne(p: PubSubPeer) {.async.} = + await sem.acquire() + defer: sem.release() + + let fut = p.gotMsgs.mgetOrPut(msgId, newFuture[void]()) + if fut.completed: return + g.broadcast(@[p], RPCMsg(messages: @[msg])) + await fut or sleepAsync(200.milliseconds) + if not fut.completed: + echo g.switch.peerInfo.peerId, ": timeout from ", p.peerId + + for p in peersSeq: + asyncSpawn sendToOne(p) if g.knownTopics.contains(topic): libp2p_pubsub_messages_published.inc(peers.len.int64, labelValues = [topic]) diff --git a/libp2p/protocols/pubsub/gossipsub/behavior.nim b/libp2p/protocols/pubsub/gossipsub/behavior.nim index bbdcd27e7..e6c8506c5 100644 --- a/libp2p/protocols/pubsub/gossipsub/behavior.nim +++ b/libp2p/protocols/pubsub/gossipsub/behavior.nim @@ -245,6 +245,15 @@ proc handlePrune*(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) {.r for handler in g.routingRecordsHandler: handler(peer.peerId, topic, routingRecords) +proc handleDontSend*(g: GossipSub, + peer: PubSubPeer, + dontSend: seq[ControlIHave]) {.raises: [Defect].} = + for ds in dontSend: + for x in ds.messageIds: + let fut = peer.gotMsgs.mgetOrPut(x, newFuture[void]()) + if not fut.completed: + fut.complete() + proc handleIHave*(g: GossipSub, peer: PubSubPeer, ihaves: seq[ControlIHave]): ControlIWant {.raises: [Defect].} = diff --git a/libp2p/protocols/pubsub/gossipsub/types.nim b/libp2p/protocols/pubsub/gossipsub/types.nim index 79d263444..c25d8ee8a 100644 --- a/libp2p/protocols/pubsub/gossipsub/types.nim +++ b/libp2p/protocols/pubsub/gossipsub/types.nim @@ -117,6 +117,8 @@ type dOut*: int dLazy*: int + lazyPushThreshold*: int + heartbeatInterval*: Duration historyLength*: int diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index b925b2378..ecc7e018f 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -60,6 +60,7 @@ type peerId*: PeerId handler*: RPCHandler observers*: ref seq[PubSubObserver] # ref as in smart_ptr + gotMsgs*: Table[MessageId, Future[void]] score*: float64 iWantBudget*: int diff --git a/libp2p/protocols/pubsub/rpc/messages.nim b/libp2p/protocols/pubsub/rpc/messages.nim index 541782a8b..4b6fbebba 100644 --- a/libp2p/protocols/pubsub/rpc/messages.nim +++ b/libp2p/protocols/pubsub/rpc/messages.nim @@ -42,6 +42,7 @@ type ControlMessage* = object ihave*: seq[ControlIHave] + dontSend*: seq[ControlIHave] iwant*: seq[ControlIWant] graft*: seq[ControlGraft] prune*: seq[ControlPrune] diff --git a/libp2p/protocols/pubsub/rpc/protobuf.nim b/libp2p/protocols/pubsub/rpc/protobuf.nim index d87a6b928..5bc940220 100644 --- a/libp2p/protocols/pubsub/rpc/protobuf.nim +++ b/libp2p/protocols/pubsub/rpc/protobuf.nim @@ -90,6 +90,8 @@ proc write*(pb: var ProtoBuffer, field: int, control: ControlMessage) = ipb.write(3, graft) for prune in control.prune: ipb.write(4, prune) + for ihave in control.dontSend: + ipb.write(5, ihave) if len(ipb.buffer) > 0: ipb.finish() pb.write(field, ipb) @@ -210,6 +212,7 @@ proc decodeControl*(pb: ProtoBuffer): ProtoResult[Option[ControlMessage]] {. var control: ControlMessage var cpb = initProtoBuffer(buffer) var ihavepbs: seq[seq[byte]] + var dontsendpbs: seq[seq[byte]] var iwantpbs: seq[seq[byte]] var graftpbs: seq[seq[byte]] var prunepbs: seq[seq[byte]] @@ -225,6 +228,9 @@ proc decodeControl*(pb: ProtoBuffer): ProtoResult[Option[ControlMessage]] {. if ? cpb.getRepeatedField(4, prunepbs): for item in prunepbs: control.prune.add(? decodePrune(initProtoBuffer(item))) + if ? cpb.getRepeatedField(5, dontsendpbs): + for item in dontsendpbs: + control.dontSend.add(? decodeIHave(initProtoBuffer(item))) trace "decodeControl: message statistics", graft_count = len(control.graft), prune_count = len(control.prune), ihave_count = len(control.ihave),