Prevent concurrent IWANT of the same message (#943)
This commit is contained in:
parent
e03547ea3e
commit
c6aa085e98
|
@ -75,7 +75,8 @@ proc init*(_: type[GossipSubParams]): GossipSubParams =
|
|||
behaviourPenaltyDecay: 0.999,
|
||||
disconnectBadPeers: false,
|
||||
enablePX: false,
|
||||
bandwidthEstimatebps: 100_000_000 # 100 Mbps or 12.5 MBps
|
||||
bandwidthEstimatebps: 100_000_000, # 100 Mbps or 12.5 MBps
|
||||
iwantTimeout: 3 * GossipSubHeartbeatInterval
|
||||
)
|
||||
|
||||
proc validateParameters*(parameters: GossipSubParams): Result[void, cstring] =
|
||||
|
@ -401,6 +402,9 @@ method rpcHandler*(g: GossipSub,
|
|||
let
|
||||
msgId = msgIdResult.get
|
||||
msgIdSalted = msgId & g.seenSalt
|
||||
g.outstandingIWANTs.withValue(msgId, iwantRequest):
|
||||
if iwantRequest.peer.peerId == peer.peerId:
|
||||
g.outstandingIWANTs.del(msgId)
|
||||
|
||||
# addSeen adds salt to msgId to avoid
|
||||
# remote attacking the hash function
|
||||
|
|
|
@ -253,7 +253,8 @@ proc handleIHave*(g: GossipSub,
|
|||
if not g.hasSeen(msgId):
|
||||
if peer.iHaveBudget <= 0:
|
||||
break
|
||||
elif msgId notin res.messageIds:
|
||||
elif msgId notin res.messageIds and msgId notin g.outstandingIWANTs:
|
||||
g.outstandingIWANTs[msgId] = IWANTRequest(messageId: msgId, peer: peer, timestamp: Moment.now())
|
||||
res.messageIds.add(msgId)
|
||||
dec peer.iHaveBudget
|
||||
trace "requested message via ihave", messageID=msgId
|
||||
|
@ -299,6 +300,17 @@ proc handleIWant*(g: GossipSub,
|
|||
messages.add(msg)
|
||||
return messages
|
||||
|
||||
proc checkIWANTTimeouts(g: GossipSub, timeoutDuration: Duration) {.raises: [].} =
|
||||
let currentTime = Moment.now()
|
||||
var idsToRemove = newSeq[MessageId]()
|
||||
for msgId, request in g.outstandingIWANTs.pairs():
|
||||
if currentTime - request.timestamp > timeoutDuration:
|
||||
trace "IWANT request timed out", messageID=msgId, peer=request.peer
|
||||
request.peer.behaviourPenalty += 0.1
|
||||
idsToRemove.add(msgId)
|
||||
for msgId in idsToRemove:
|
||||
g.outstandingIWANTs.del(msgId)
|
||||
|
||||
proc commitMetrics(metrics: var MeshMetrics) {.raises: [].} =
|
||||
libp2p_gossipsub_low_peers_topics.set(metrics.lowPeersTopics)
|
||||
libp2p_gossipsub_no_peers_topics.set(metrics.noPeersTopics)
|
||||
|
@ -704,3 +716,5 @@ proc heartbeat*(g: GossipSub) {.async.} =
|
|||
for trigger in g.heartbeatEvents:
|
||||
trace "firing heartbeat event", instance = cast[int](g)
|
||||
trigger.fire()
|
||||
|
||||
checkIWANTTimeouts(g, g.parameters.iwantTimeout)
|
||||
|
|
|
@ -143,6 +143,7 @@ type
|
|||
enablePX*: bool
|
||||
|
||||
bandwidthEstimatebps*: int # This is currently used only for limting flood publishing. 0 disables flood-limiting completely
|
||||
iwantTimeout*: Duration
|
||||
|
||||
BackoffTable* = Table[string, Table[PeerId, Moment]]
|
||||
ValidationSeenTable* = Table[MessageId, HashSet[PubSubPeer]]
|
||||
|
@ -177,6 +178,7 @@ type
|
|||
routingRecordsHandler*: seq[RoutingRecordsHandler] # Callback for peer exchange
|
||||
|
||||
heartbeatEvents*: seq[AsyncEvent]
|
||||
outstandingIWANTs*: Table[MessageId, IWANTRequest]
|
||||
|
||||
MeshMetrics* = object
|
||||
# scratch buffers for metrics
|
||||
|
@ -187,3 +189,8 @@ type
|
|||
lowPeersTopics*: int64 # npeers < dlow
|
||||
healthyPeersTopics*: int64 # npeers >= dlow
|
||||
underDoutTopics*: int64
|
||||
|
||||
IWANTRequest* = object
|
||||
messageId*: MessageId
|
||||
peer*: PubSubPeer
|
||||
timestamp*: Moment
|
||||
|
|
|
@ -727,3 +727,101 @@ suite "GossipSub internal":
|
|||
|
||||
await allFuturesThrowing(conns.mapIt(it.close()))
|
||||
await gossipSub.switch.stop()
|
||||
|
||||
asyncTest "two IHAVEs should generate only one IWANT":
|
||||
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
||||
|
||||
var iwantCount = 0
|
||||
|
||||
proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
|
||||
check false
|
||||
|
||||
proc handler2(topic: string, data: seq[byte]) {.async.} = discard
|
||||
|
||||
let topic = "foobar"
|
||||
var conns = newSeq[Connection]()
|
||||
gossipSub.subscribe(topic, handler2)
|
||||
|
||||
# Setup two connections and two peers
|
||||
var ihaveMessageId: string
|
||||
var firstPeer: PubSubPeer
|
||||
let seqno = @[0'u8, 1, 2, 3]
|
||||
for i in 0..<2:
|
||||
let conn = TestBufferStream.new(noop)
|
||||
conns &= conn
|
||||
let peerId = randomPeerId()
|
||||
conn.peerId = peerId
|
||||
let peer = gossipSub.getPubSubPeer(peerId)
|
||||
if isNil(firstPeer):
|
||||
firstPeer = peer
|
||||
ihaveMessageId = byteutils.toHex(seqno) & $firstPeer.peerId
|
||||
peer.handler = handler
|
||||
|
||||
# Simulate that each peer sends an IHAVE message to our node
|
||||
let msg = ControlIHave(
|
||||
topicID: topic,
|
||||
messageIDs: @[ihaveMessageId.toBytes()]
|
||||
)
|
||||
let iwants = gossipSub.handleIHave(peer, @[msg])
|
||||
if iwants.messageIds.len > 0:
|
||||
iwantCount += 1
|
||||
|
||||
# Verify that our node responds with only one IWANT message
|
||||
check: iwantCount == 1
|
||||
check: gossipSub.outstandingIWANTs.contains(ihaveMessageId.toBytes())
|
||||
|
||||
# Simulate that our node receives the RPCMsg in response to the IWANT
|
||||
let actualMessageData = "Hello, World!".toBytes
|
||||
let rpcMsg = RPCMsg(
|
||||
messages: @[Message(
|
||||
fromPeer: firstPeer.peerId,
|
||||
seqno: seqno,
|
||||
data: actualMessageData
|
||||
)]
|
||||
)
|
||||
await gossipSub.rpcHandler(firstPeer, rpcMsg)
|
||||
|
||||
check: not gossipSub.outstandingIWANTs.contains(ihaveMessageId.toBytes())
|
||||
|
||||
await allFuturesThrowing(conns.mapIt(it.close()))
|
||||
await gossipSub.switch.stop()
|
||||
|
||||
asyncTest "handle unanswered IWANT messages":
|
||||
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
||||
gossipSub.parameters.heartbeatInterval = 50.milliseconds
|
||||
gossipSub.parameters.iwantTimeout = 10.milliseconds
|
||||
await gossipSub.start()
|
||||
|
||||
proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = discard
|
||||
proc handler2(topic: string, data: seq[byte]) {.async.} = discard
|
||||
|
||||
let topic = "foobar"
|
||||
var conns = newSeq[Connection]()
|
||||
gossipSub.subscribe(topic, handler2)
|
||||
|
||||
# Setup a connection and a peer
|
||||
let conn = TestBufferStream.new(noop)
|
||||
conns &= conn
|
||||
let peerId = randomPeerId()
|
||||
conn.peerId = peerId
|
||||
let peer = gossipSub.getPubSubPeer(peerId)
|
||||
peer.handler = handler
|
||||
|
||||
# Simulate that the peer sends an IHAVE message to our node
|
||||
let ihaveMessageId = @[0'u8, 1, 2, 3]
|
||||
let ihaveMsg = ControlIHave(
|
||||
topicID: topic,
|
||||
messageIDs: @[ihaveMessageId]
|
||||
)
|
||||
discard gossipSub.handleIHave(peer, @[ihaveMsg])
|
||||
|
||||
check: gossipSub.outstandingIWANTs.contains(ihaveMessageId)
|
||||
check: peer.behaviourPenalty == 0.0
|
||||
|
||||
await sleepAsync(60.milliseconds)
|
||||
|
||||
check: not gossipSub.outstandingIWANTs.contains(ihaveMessageId)
|
||||
check: peer.behaviourPenalty == 0.1
|
||||
|
||||
await allFuturesThrowing(conns.mapIt(it.close()))
|
||||
await gossipSub.switch.stop()
|
||||
|
|
Loading…
Reference in New Issue