Revert "Prevent concurrent IWANT of the same message (#943)" (#977)

This commit is contained in:
diegomrsantos 2023-11-03 15:24:27 +01:00 committed by Roman
parent 2efa4b7d3d
commit c876904425
No known key found for this signature in database
GPG Key ID: B8FE070B54E11B75
4 changed files with 1 additions and 124 deletions

View File

@ -79,7 +79,6 @@ proc init*(_: type[GossipSubParams]): GossipSubParams =
disconnectBadPeers: false, disconnectBadPeers: false,
enablePX: false, enablePX: false,
bandwidthEstimatebps: 100_000_000, # 100 Mbps or 12.5 MBps bandwidthEstimatebps: 100_000_000, # 100 Mbps or 12.5 MBps
iwantTimeout: 3 * GossipSubHeartbeatInterval,
overheadRateLimit: Opt.none(tuple[bytes: int, interval: Duration]), overheadRateLimit: Opt.none(tuple[bytes: int, interval: Duration]),
disconnectPeerAboveRateLimit: false disconnectPeerAboveRateLimit: false
) )
@ -461,9 +460,6 @@ method rpcHandler*(g: GossipSub,
let let
msgId = msgIdResult.get msgId = msgIdResult.get
msgIdSalted = msgId & g.seenSalt msgIdSalted = msgId & g.seenSalt
g.outstandingIWANTs.withValue(msgId, iwantRequest):
if iwantRequest.peer.peerId == peer.peerId:
g.outstandingIWANTs.del(msgId)
# addSeen adds salt to msgId to avoid # addSeen adds salt to msgId to avoid
# remote attacking the hash function # remote attacking the hash function

View File

@ -254,8 +254,7 @@ proc handleIHave*(g: GossipSub,
if not g.hasSeen(msgId): if not g.hasSeen(msgId):
if peer.iHaveBudget <= 0: if peer.iHaveBudget <= 0:
break break
elif msgId notin res.messageIds and msgId notin g.outstandingIWANTs: elif msgId notin res.messageIds:
g.outstandingIWANTs[msgId] = IWANTRequest(messageId: msgId, peer: peer, timestamp: Moment.now())
res.messageIds.add(msgId) res.messageIds.add(msgId)
dec peer.iHaveBudget dec peer.iHaveBudget
trace "requested message via ihave", messageID=msgId trace "requested message via ihave", messageID=msgId
@ -301,17 +300,6 @@ proc handleIWant*(g: GossipSub,
messages.add(msg) messages.add(msg)
return messages return messages
proc checkIWANTTimeouts(g: GossipSub, timeoutDuration: Duration) {.raises: [].} =
let currentTime = Moment.now()
var idsToRemove = newSeq[MessageId]()
for msgId, request in g.outstandingIWANTs.pairs():
if currentTime - request.timestamp > timeoutDuration:
trace "IWANT request timed out", messageID=msgId, peer=request.peer
request.peer.behaviourPenalty += 0.1
idsToRemove.add(msgId)
for msgId in idsToRemove:
g.outstandingIWANTs.del(msgId)
proc commitMetrics(metrics: var MeshMetrics) {.raises: [].} = proc commitMetrics(metrics: var MeshMetrics) {.raises: [].} =
libp2p_gossipsub_low_peers_topics.set(metrics.lowPeersTopics) libp2p_gossipsub_low_peers_topics.set(metrics.lowPeersTopics)
libp2p_gossipsub_no_peers_topics.set(metrics.noPeersTopics) libp2p_gossipsub_no_peers_topics.set(metrics.noPeersTopics)
@ -717,5 +705,3 @@ proc heartbeat*(g: GossipSub) {.async.} =
for trigger in g.heartbeatEvents: for trigger in g.heartbeatEvents:
trace "firing heartbeat event", instance = cast[int](g) trace "firing heartbeat event", instance = cast[int](g)
trigger.fire() trigger.fire()
checkIWANTTimeouts(g, g.parameters.iwantTimeout)

View File

@ -143,7 +143,6 @@ type
enablePX*: bool enablePX*: bool
bandwidthEstimatebps*: int # This is currently used only for limting flood publishing. 0 disables flood-limiting completely bandwidthEstimatebps*: int # This is currently used only for limting flood publishing. 0 disables flood-limiting completely
iwantTimeout*: Duration
overheadRateLimit*: Opt[tuple[bytes: int, interval: Duration]] overheadRateLimit*: Opt[tuple[bytes: int, interval: Duration]]
disconnectPeerAboveRateLimit*: bool disconnectPeerAboveRateLimit*: bool
@ -181,7 +180,6 @@ type
routingRecordsHandler*: seq[RoutingRecordsHandler] # Callback for peer exchange routingRecordsHandler*: seq[RoutingRecordsHandler] # Callback for peer exchange
heartbeatEvents*: seq[AsyncEvent] heartbeatEvents*: seq[AsyncEvent]
outstandingIWANTs*: Table[MessageId, IWANTRequest]
MeshMetrics* = object MeshMetrics* = object
# scratch buffers for metrics # scratch buffers for metrics
@ -192,8 +190,3 @@ type
lowPeersTopics*: int64 # npeers < dlow lowPeersTopics*: int64 # npeers < dlow
healthyPeersTopics*: int64 # npeers >= dlow healthyPeersTopics*: int64 # npeers >= dlow
underDoutTopics*: int64 underDoutTopics*: int64
IWANTRequest* = object
messageId*: MessageId
peer*: PubSubPeer
timestamp*: Moment

View File

@ -718,104 +718,6 @@ suite "GossipSub internal":
await allFuturesThrowing(conns.mapIt(it.close())) await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop() await gossipSub.switch.stop()
asyncTest "two IHAVEs should generate only one IWANT":
let gossipSub = TestGossipSub.init(newStandardSwitch())
var iwantCount = 0
proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} =
check false
proc handler2(topic: string, data: seq[byte]) {.async.} = discard
let topic = "foobar"
var conns = newSeq[Connection]()
gossipSub.subscribe(topic, handler2)
# Setup two connections and two peers
var ihaveMessageId: string
var firstPeer: PubSubPeer
let seqno = @[0'u8, 1, 2, 3]
for i in 0..<2:
let conn = TestBufferStream.new(noop)
conns &= conn
let peerId = randomPeerId()
conn.peerId = peerId
let peer = gossipSub.getPubSubPeer(peerId)
if isNil(firstPeer):
firstPeer = peer
ihaveMessageId = byteutils.toHex(seqno) & $firstPeer.peerId
peer.handler = handler
# Simulate that each peer sends an IHAVE message to our node
let msg = ControlIHave(
topicID: topic,
messageIDs: @[ihaveMessageId.toBytes()]
)
let iwants = gossipSub.handleIHave(peer, @[msg])
if iwants.messageIds.len > 0:
iwantCount += 1
# Verify that our node responds with only one IWANT message
check: iwantCount == 1
check: gossipSub.outstandingIWANTs.contains(ihaveMessageId.toBytes())
# Simulate that our node receives the RPCMsg in response to the IWANT
let actualMessageData = "Hello, World!".toBytes
let rpcMsg = RPCMsg(
messages: @[Message(
fromPeer: firstPeer.peerId,
seqno: seqno,
data: actualMessageData
)]
)
await gossipSub.rpcHandler(firstPeer, encodeRpcMsg(rpcMsg, false))
check: not gossipSub.outstandingIWANTs.contains(ihaveMessageId.toBytes())
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
asyncTest "handle unanswered IWANT messages":
let gossipSub = TestGossipSub.init(newStandardSwitch())
gossipSub.parameters.heartbeatInterval = 50.milliseconds
gossipSub.parameters.iwantTimeout = 10.milliseconds
await gossipSub.start()
proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} = discard
proc handler2(topic: string, data: seq[byte]) {.async.} = discard
let topic = "foobar"
var conns = newSeq[Connection]()
gossipSub.subscribe(topic, handler2)
# Setup a connection and a peer
let conn = TestBufferStream.new(noop)
conns &= conn
let peerId = randomPeerId()
conn.peerId = peerId
let peer = gossipSub.getPubSubPeer(peerId)
peer.handler = handler
# Simulate that the peer sends an IHAVE message to our node
let ihaveMessageId = @[0'u8, 1, 2, 3]
let ihaveMsg = ControlIHave(
topicID: topic,
messageIDs: @[ihaveMessageId]
)
discard gossipSub.handleIHave(peer, @[ihaveMsg])
check: gossipSub.outstandingIWANTs.contains(ihaveMessageId)
check: peer.behaviourPenalty == 0.0
await sleepAsync(60.milliseconds)
check: not gossipSub.outstandingIWANTs.contains(ihaveMessageId)
check: peer.behaviourPenalty == 0.1
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
proc setupTest(): Future[tuple[gossip0: GossipSub, gossip1: GossipSub, receivedMessages: ref HashSet[seq[byte]]]] {.async.} = proc setupTest(): Future[tuple[gossip0: GossipSub, gossip1: GossipSub, receivedMessages: ref HashSet[seq[byte]]]] {.async.} =
let let
nodes = generateNodes(2, gossip = true, verifySignature = false) nodes = generateNodes(2, gossip = true, verifySignature = false)