diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index ef4cbafea..1fd9f1877 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -75,7 +75,8 @@ proc init*(_: type[GossipSubParams]): GossipSubParams = behaviourPenaltyDecay: 0.999, disconnectBadPeers: false, enablePX: false, - bandwidthEstimatebps: 100_000_000 # 100 Mbps or 12.5 MBps + bandwidthEstimatebps: 100_000_000, # 100 Mbps or 12.5 MBps + iwantTimeout: 3 * GossipSubHeartbeatInterval ) proc validateParameters*(parameters: GossipSubParams): Result[void, cstring] = @@ -401,6 +402,9 @@ method rpcHandler*(g: GossipSub, let msgId = msgIdResult.get msgIdSalted = msgId & g.seenSalt + g.outstandingIWANTs.withValue(msgId, iwantRequest): + if iwantRequest.peer.peerId == peer.peerId: + g.outstandingIWANTs.del(msgId) # addSeen adds salt to msgId to avoid # remote attacking the hash function diff --git a/libp2p/protocols/pubsub/gossipsub/behavior.nim b/libp2p/protocols/pubsub/gossipsub/behavior.nim index 326c48763..232e1aa7e 100644 --- a/libp2p/protocols/pubsub/gossipsub/behavior.nim +++ b/libp2p/protocols/pubsub/gossipsub/behavior.nim @@ -253,7 +253,8 @@ proc handleIHave*(g: GossipSub, if not g.hasSeen(msgId): if peer.iHaveBudget <= 0: break - elif msgId notin res.messageIds: + elif msgId notin res.messageIds and msgId notin g.outstandingIWANTs: + g.outstandingIWANTs[msgId] = IWANTRequest(messageId: msgId, peer: peer, timestamp: Moment.now()) res.messageIds.add(msgId) dec peer.iHaveBudget trace "requested message via ihave", messageID=msgId @@ -299,6 +300,17 @@ proc handleIWant*(g: GossipSub, messages.add(msg) return messages +proc checkIWANTTimeouts(g: GossipSub, timeoutDuration: Duration) {.raises: [].} = + let currentTime = Moment.now() + var idsToRemove = newSeq[MessageId]() + for msgId, request in g.outstandingIWANTs.pairs(): + if currentTime - request.timestamp > timeoutDuration: + trace "IWANT request timed out", messageID=msgId, peer=request.peer + request.peer.behaviourPenalty += 0.1 + idsToRemove.add(msgId) + for msgId in idsToRemove: + g.outstandingIWANTs.del(msgId) + proc commitMetrics(metrics: var MeshMetrics) {.raises: [].} = libp2p_gossipsub_low_peers_topics.set(metrics.lowPeersTopics) libp2p_gossipsub_no_peers_topics.set(metrics.noPeersTopics) @@ -704,3 +716,5 @@ proc heartbeat*(g: GossipSub) {.async.} = for trigger in g.heartbeatEvents: trace "firing heartbeat event", instance = cast[int](g) trigger.fire() + + checkIWANTTimeouts(g, g.parameters.iwantTimeout) diff --git a/libp2p/protocols/pubsub/gossipsub/types.nim b/libp2p/protocols/pubsub/gossipsub/types.nim index 333b9fae1..25a9e636a 100644 --- a/libp2p/protocols/pubsub/gossipsub/types.nim +++ b/libp2p/protocols/pubsub/gossipsub/types.nim @@ -143,6 +143,7 @@ type enablePX*: bool bandwidthEstimatebps*: int # This is currently used only for limting flood publishing. 0 disables flood-limiting completely + iwantTimeout*: Duration BackoffTable* = Table[string, Table[PeerId, Moment]] ValidationSeenTable* = Table[MessageId, HashSet[PubSubPeer]] @@ -177,6 +178,7 @@ type routingRecordsHandler*: seq[RoutingRecordsHandler] # Callback for peer exchange heartbeatEvents*: seq[AsyncEvent] + outstandingIWANTs*: Table[MessageId, IWANTRequest] MeshMetrics* = object # scratch buffers for metrics @@ -187,3 +189,8 @@ type lowPeersTopics*: int64 # npeers < dlow healthyPeersTopics*: int64 # npeers >= dlow underDoutTopics*: int64 + + IWANTRequest* = object + messageId*: MessageId + peer*: PubSubPeer + timestamp*: Moment diff --git a/tests/pubsub/testgossipinternal.nim b/tests/pubsub/testgossipinternal.nim index e574ce611..909e1d613 100644 --- a/tests/pubsub/testgossipinternal.nim +++ b/tests/pubsub/testgossipinternal.nim @@ -727,3 +727,101 @@ suite "GossipSub internal": await allFuturesThrowing(conns.mapIt(it.close())) await gossipSub.switch.stop() + + asyncTest "two IHAVEs should generate only one IWANT": + let gossipSub = TestGossipSub.init(newStandardSwitch()) + + var iwantCount = 0 + + proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = + check false + + proc handler2(topic: string, data: seq[byte]) {.async.} = discard + + let topic = "foobar" + var conns = newSeq[Connection]() + gossipSub.subscribe(topic, handler2) + + # Setup two connections and two peers + var ihaveMessageId: string + var firstPeer: PubSubPeer + let seqno = @[0'u8, 1, 2, 3] + for i in 0..<2: + let conn = TestBufferStream.new(noop) + conns &= conn + let peerId = randomPeerId() + conn.peerId = peerId + let peer = gossipSub.getPubSubPeer(peerId) + if isNil(firstPeer): + firstPeer = peer + ihaveMessageId = byteutils.toHex(seqno) & $firstPeer.peerId + peer.handler = handler + + # Simulate that each peer sends an IHAVE message to our node + let msg = ControlIHave( + topicID: topic, + messageIDs: @[ihaveMessageId.toBytes()] + ) + let iwants = gossipSub.handleIHave(peer, @[msg]) + if iwants.messageIds.len > 0: + iwantCount += 1 + + # Verify that our node responds with only one IWANT message + check: iwantCount == 1 + check: gossipSub.outstandingIWANTs.contains(ihaveMessageId.toBytes()) + + # Simulate that our node receives the RPCMsg in response to the IWANT + let actualMessageData = "Hello, World!".toBytes + let rpcMsg = RPCMsg( + messages: @[Message( + fromPeer: firstPeer.peerId, + seqno: seqno, + data: actualMessageData + )] + ) + await gossipSub.rpcHandler(firstPeer, rpcMsg) + + check: not gossipSub.outstandingIWANTs.contains(ihaveMessageId.toBytes()) + + await allFuturesThrowing(conns.mapIt(it.close())) + await gossipSub.switch.stop() + + asyncTest "handle unanswered IWANT messages": + let gossipSub = TestGossipSub.init(newStandardSwitch()) + gossipSub.parameters.heartbeatInterval = 50.milliseconds + gossipSub.parameters.iwantTimeout = 10.milliseconds + await gossipSub.start() + + proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = discard + proc handler2(topic: string, data: seq[byte]) {.async.} = discard + + let topic = "foobar" + var conns = newSeq[Connection]() + gossipSub.subscribe(topic, handler2) + + # Setup a connection and a peer + let conn = TestBufferStream.new(noop) + conns &= conn + let peerId = randomPeerId() + conn.peerId = peerId + let peer = gossipSub.getPubSubPeer(peerId) + peer.handler = handler + + # Simulate that the peer sends an IHAVE message to our node + let ihaveMessageId = @[0'u8, 1, 2, 3] + let ihaveMsg = ControlIHave( + topicID: topic, + messageIDs: @[ihaveMessageId] + ) + discard gossipSub.handleIHave(peer, @[ihaveMsg]) + + check: gossipSub.outstandingIWANTs.contains(ihaveMessageId) + check: peer.behaviourPenalty == 0.0 + + await sleepAsync(60.milliseconds) + + check: not gossipSub.outstandingIWANTs.contains(ihaveMessageId) + check: peer.behaviourPenalty == 0.1 + + await allFuturesThrowing(conns.mapIt(it.close())) + await gossipSub.switch.stop()