diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 079808129..3ef78d9bb 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -275,6 +275,7 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) = var respControl: ControlMessage g.handleIDontWant(peer, control.idontwant) + g.handleIMReceiving(peer, control.imreceiving) let iwant = g.handleIHave(peer, control.ihave) if iwant.messageIds.len > 0: respControl.iwant.add(iwant) @@ -320,10 +321,39 @@ proc validateAndRelay(g: GossipSub, msg: Message, msgId, msgIdSalted: MessageId, peer: PubSubPeer) {.async.} = - try: - let validation = await g.validate(msg) - var seenPeers: HashSet[PubSubPeer] + var seenPeers: HashSet[PubSubPeer] + var toSendPeers = HashSet[PubSubPeer]() + for t in msg.topicIds: # for every topic in the message + if t notin g.topics: + continue + + g.floodsub.withValue(t, peers): toSendPeers.incl(peers[]) + g.mesh.withValue(t, peers): toSendPeers.incl(peers[]) + # add direct peers + toSendPeers.incl(g.subscribedDirectPeers.getOrDefault(t)) + # Don't send it to source peer + toSendPeers.excl(peer) + + for peer in toSendPeers: + for heDontWant in peer.heDontWants: + if msgId in heDontWant: + seenPeers.incl(peer) + libp2p_gossipsub_idontwant_saved_messages.inc + libp2p_gossipsub_saved_bytes.inc(msg.data.len.int64, labelValues = ["idontwant"]) + break + toSendPeers.excl(seenPeers) + + try: + # IDontWant is only worth it if the message is substantially + # bigger than the messageId + if msg.data.len > msgId.len * 10: + g.broadcast(toSendPeers, RPCMsg(control: some(ControlMessage( + idontwant: @[ControlIWant(messageIds: @[msgId])] + ))), isHighPriority = true) + + seenPeers.clear() + let validation = await g.validate(msg) discard g.validationSeen.pop(msgIdSalted, seenPeers) libp2p_gossipsub_duplicate_during_validation.inc(seenPeers.len.int64) libp2p_gossipsub_saved_bytes.inc((msg.data.len * seenPeers.len).int64, labelValues = ["validation_duplicate"]) @@ -346,35 +376,15 @@ proc validateAndRelay(g: GossipSub, g.rewardDelivered(peer, msg.topicIds, true) - var toSendPeers = HashSet[PubSubPeer]() - for t in msg.topicIds: # for every topic in the message - if t notin g.topics: - continue - - g.floodsub.withValue(t, peers): toSendPeers.incl(peers[]) - g.mesh.withValue(t, peers): toSendPeers.incl(peers[]) - - # add direct peers - toSendPeers.incl(g.subscribedDirectPeers.getOrDefault(t)) - - # Don't send it to source peer, or peers that - # sent it during validation - toSendPeers.excl(peer) + # Don't send it to peers that sent it during validation toSendPeers.excl(seenPeers) - # IDontWant is only worth it if the message is substantially - # bigger than the messageId - if msg.data.len > msgId.len * 10: - g.broadcast(toSendPeers, RPCMsg(control: some(ControlMessage( - idontwant: @[ControlIWant(messageIds: @[msgId])] - ))), isHighPriority = true) - + #We have received IMReceiving from these peers, We should not exclude them + #Ideally we should wait (TxTime + large safety cushion) before sending to these peers for peer in toSendPeers: - for heDontWant in peer.heDontWants: - if msgId in heDontWant: + for heIsReceiving in peer.heIsReceivings: + if msgId in heIsReceiving: seenPeers.incl(peer) - libp2p_gossipsub_idontwant_saved_messages.inc - libp2p_gossipsub_saved_bytes.inc(msg.data.len.int64, labelValues = ["idontwant"]) break toSendPeers.excl(seenPeers) diff --git a/libp2p/protocols/pubsub/gossipsub/behavior.nim b/libp2p/protocols/pubsub/gossipsub/behavior.nim index b626cb1da..63b67a8d9 100644 --- a/libp2p/protocols/pubsub/gossipsub/behavior.nim +++ b/libp2p/protocols/pubsub/gossipsub/behavior.nim @@ -272,6 +272,37 @@ proc handleIDontWant*(g: GossipSub, if messageId.len > 100: continue peer.heDontWants[^1].incl(messageId) + var toSendPeers = HashSet[PubSubPeer]() + + #Experimental change for quick performance evaluation only (Ideally for very large messages): + #[ + 1) IDontWant is followed by the message. IMReceiving informs peers that we are receiving this message + 2) Prototype implementation for a single topic ("test"). Need topic ID in IDontWant + + 3) More reasonable solution can be to send Message detail before message, That can be used for IMReceiving + ]# + g.floodsub.withValue("test", peers): toSendPeers.incl(peers[]) + g.mesh.withValue("test", peers): toSendPeers.incl(peers[]) + + # add direct peers + toSendPeers.incl(g.subscribedDirectPeers.getOrDefault("test")) + + g.broadcast(toSendPeers, RPCMsg(control: some(ControlMessage( + imreceiving: @[ControlIWant(messageIds: @[messageId])] + ))), isHighPriority = true) + + + +proc handleIMReceiving*(g: GossipSub, + peer: PubSubPeer, + imreceivings: seq[ControlIWant]) = + for imreceiving in imreceivings: + for messageId in imreceiving.messageIds: + if peer.heIsReceivings[^1].len > 1000: break + if messageId.len > 100: continue + peer.heIsReceivings[^1].incl(messageId) + + proc handleIWant*(g: GossipSub, peer: PubSubPeer, iwants: seq[ControlIWant]): seq[Message] {.raises: [].} = diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 1de6ea79e..39fcf685f 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -75,6 +75,7 @@ type score*: float64 sentIHaves*: Deque[HashSet[MessageId]] heDontWants*: Deque[HashSet[MessageId]] + heIsReceivings*:Deque[HashSet[MessageId]] iHaveBudget*: int pingBudget*: int maxMessageSize: int @@ -480,4 +481,5 @@ proc new*( ) result.sentIHaves.addFirst(default(HashSet[MessageId])) result.heDontWants.addFirst(default(HashSet[MessageId])) + result.heIsReceivings.addFirst(default(HashSet[MessageId])) result.startSendNonPriorityTask() diff --git a/libp2p/protocols/pubsub/rpc/messages.nim b/libp2p/protocols/pubsub/rpc/messages.nim index 77baded78..407017fec 100644 --- a/libp2p/protocols/pubsub/rpc/messages.nim +++ b/libp2p/protocols/pubsub/rpc/messages.nim @@ -51,6 +51,7 @@ type graft*: seq[ControlGraft] prune*: seq[ControlPrune] idontwant*: seq[ControlIWant] + imreceiving*: seq[ControlIWant] ControlIHave* = object topicId*: string @@ -163,11 +164,12 @@ static: expectedFields(ControlPrune, @["topicId", "peers", "backoff"]) proc byteSize(controlPrune: ControlPrune): int = controlPrune.topicId.len + controlPrune.peers.foldl(a + b.byteSize, 0) + 8 # 8 bytes for uint64 -static: expectedFields(ControlMessage, @["ihave", "iwant", "graft", "prune", "idontwant"]) +static: expectedFields(ControlMessage, @["ihave", "iwant", "graft", "prune", "idontwant", "imreceiving"]) proc byteSize(control: ControlMessage): int = control.ihave.foldl(a + b.byteSize, 0) + control.iwant.foldl(a + b.byteSize, 0) + control.graft.foldl(a + b.byteSize, 0) + control.prune.foldl(a + b.byteSize, 0) + - control.idontwant.foldl(a + b.byteSize, 0) + control.idontwant.foldl(a + b.byteSize, 0) + + control.imreceiving.foldl(a + b.byteSize, 0) static: expectedFields(RPCMsg, @["subscriptions", "messages", "control", "ping", "pong"]) proc byteSize*(rpc: RPCMsg): int = diff --git a/libp2p/protocols/pubsub/rpc/protobuf.nim b/libp2p/protocols/pubsub/rpc/protobuf.nim index 4aa2e5210..24ff51556 100644 --- a/libp2p/protocols/pubsub/rpc/protobuf.nim +++ b/libp2p/protocols/pubsub/rpc/protobuf.nim @@ -89,6 +89,8 @@ proc write*(pb: var ProtoBuffer, field: int, control: ControlMessage) = ipb.write(4, prune) for idontwant in control.idontwant: ipb.write(5, idontwant) + for imreceiving in control.imreceiving: + ipb.write(6, imreceiving) if len(ipb.buffer) > 0: ipb.finish() pb.write(field, ipb) @@ -213,6 +215,7 @@ proc decodeControl*(pb: ProtoBuffer): ProtoResult[Option[ControlMessage]] {. var graftpbs: seq[seq[byte]] var prunepbs: seq[seq[byte]] var idontwant: seq[seq[byte]] + var imreceiving: seq[seq[byte]] if ? cpb.getRepeatedField(1, ihavepbs): for item in ihavepbs: control.ihave.add(? decodeIHave(initProtoBuffer(item))) @@ -228,6 +231,9 @@ proc decodeControl*(pb: ProtoBuffer): ProtoResult[Option[ControlMessage]] {. if ? cpb.getRepeatedField(5, idontwant): for item in idontwant: control.idontwant.add(? decodeIWant(initProtoBuffer(item))) + if ? cpb.getRepeatedField(6, imreceiving): + for item in imreceiving: + control.imreceiving.add(? decodeIWant(initProtoBuffer(item))) trace "decodeControl: message statistics", graft_count = len(control.graft), prune_count = len(control.prune), ihave_count = len(control.ihave),