parent
fc4e9a8bb8
commit
3fc1236659
|
@ -79,7 +79,6 @@ proc init*(_: type[GossipSubParams]): GossipSubParams =
|
|||
disconnectBadPeers: false,
|
||||
enablePX: false,
|
||||
bandwidthEstimatebps: 100_000_000, # 100 Mbps or 12.5 MBps
|
||||
iwantTimeout: 3 * GossipSubHeartbeatInterval,
|
||||
overheadRateLimit: Opt.none(tuple[bytes: int, interval: Duration]),
|
||||
disconnectPeerAboveRateLimit: false
|
||||
)
|
||||
|
@ -461,9 +460,6 @@ method rpcHandler*(g: GossipSub,
|
|||
let
|
||||
msgId = msgIdResult.get
|
||||
msgIdSalted = msgId & g.seenSalt
|
||||
g.outstandingIWANTs.withValue(msgId, iwantRequest):
|
||||
if iwantRequest.peer.peerId == peer.peerId:
|
||||
g.outstandingIWANTs.del(msgId)
|
||||
|
||||
# addSeen adds salt to msgId to avoid
|
||||
# remote attacking the hash function
|
||||
|
|
|
@ -254,8 +254,7 @@ proc handleIHave*(g: GossipSub,
|
|||
if not g.hasSeen(msgId):
|
||||
if peer.iHaveBudget <= 0:
|
||||
break
|
||||
elif msgId notin res.messageIds and msgId notin g.outstandingIWANTs:
|
||||
g.outstandingIWANTs[msgId] = IWANTRequest(messageId: msgId, peer: peer, timestamp: Moment.now())
|
||||
elif msgId notin res.messageIds:
|
||||
res.messageIds.add(msgId)
|
||||
dec peer.iHaveBudget
|
||||
trace "requested message via ihave", messageID=msgId
|
||||
|
@ -301,17 +300,6 @@ proc handleIWant*(g: GossipSub,
|
|||
messages.add(msg)
|
||||
return messages
|
||||
|
||||
proc checkIWANTTimeouts(g: GossipSub, timeoutDuration: Duration) {.raises: [].} =
|
||||
let currentTime = Moment.now()
|
||||
var idsToRemove = newSeq[MessageId]()
|
||||
for msgId, request in g.outstandingIWANTs.pairs():
|
||||
if currentTime - request.timestamp > timeoutDuration:
|
||||
trace "IWANT request timed out", messageID=msgId, peer=request.peer
|
||||
request.peer.behaviourPenalty += 0.1
|
||||
idsToRemove.add(msgId)
|
||||
for msgId in idsToRemove:
|
||||
g.outstandingIWANTs.del(msgId)
|
||||
|
||||
proc commitMetrics(metrics: var MeshMetrics) {.raises: [].} =
|
||||
libp2p_gossipsub_low_peers_topics.set(metrics.lowPeersTopics)
|
||||
libp2p_gossipsub_no_peers_topics.set(metrics.noPeersTopics)
|
||||
|
@ -717,5 +705,3 @@ proc heartbeat*(g: GossipSub) {.async.} =
|
|||
for trigger in g.heartbeatEvents:
|
||||
trace "firing heartbeat event", instance = cast[int](g)
|
||||
trigger.fire()
|
||||
|
||||
checkIWANTTimeouts(g, g.parameters.iwantTimeout)
|
||||
|
|
|
@ -143,7 +143,6 @@ type
|
|||
enablePX*: bool
|
||||
|
||||
bandwidthEstimatebps*: int # This is currently used only for limting flood publishing. 0 disables flood-limiting completely
|
||||
iwantTimeout*: Duration
|
||||
|
||||
overheadRateLimit*: Opt[tuple[bytes: int, interval: Duration]]
|
||||
disconnectPeerAboveRateLimit*: bool
|
||||
|
@ -181,7 +180,6 @@ type
|
|||
routingRecordsHandler*: seq[RoutingRecordsHandler] # Callback for peer exchange
|
||||
|
||||
heartbeatEvents*: seq[AsyncEvent]
|
||||
outstandingIWANTs*: Table[MessageId, IWANTRequest]
|
||||
|
||||
MeshMetrics* = object
|
||||
# scratch buffers for metrics
|
||||
|
@ -192,8 +190,3 @@ type
|
|||
lowPeersTopics*: int64 # npeers < dlow
|
||||
healthyPeersTopics*: int64 # npeers >= dlow
|
||||
underDoutTopics*: int64
|
||||
|
||||
IWANTRequest* = object
|
||||
messageId*: MessageId
|
||||
peer*: PubSubPeer
|
||||
timestamp*: Moment
|
||||
|
|
|
@ -718,104 +718,6 @@ suite "GossipSub internal":
|
|||
await allFuturesThrowing(conns.mapIt(it.close()))
|
||||
await gossipSub.switch.stop()
|
||||
|
||||
asyncTest "two IHAVEs should generate only one IWANT":
|
||||
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
||||
|
||||
var iwantCount = 0
|
||||
|
||||
proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} =
|
||||
check false
|
||||
|
||||
proc handler2(topic: string, data: seq[byte]) {.async.} = discard
|
||||
|
||||
let topic = "foobar"
|
||||
var conns = newSeq[Connection]()
|
||||
gossipSub.subscribe(topic, handler2)
|
||||
|
||||
# Setup two connections and two peers
|
||||
var ihaveMessageId: string
|
||||
var firstPeer: PubSubPeer
|
||||
let seqno = @[0'u8, 1, 2, 3]
|
||||
for i in 0..<2:
|
||||
let conn = TestBufferStream.new(noop)
|
||||
conns &= conn
|
||||
let peerId = randomPeerId()
|
||||
conn.peerId = peerId
|
||||
let peer = gossipSub.getPubSubPeer(peerId)
|
||||
if isNil(firstPeer):
|
||||
firstPeer = peer
|
||||
ihaveMessageId = byteutils.toHex(seqno) & $firstPeer.peerId
|
||||
peer.handler = handler
|
||||
|
||||
# Simulate that each peer sends an IHAVE message to our node
|
||||
let msg = ControlIHave(
|
||||
topicID: topic,
|
||||
messageIDs: @[ihaveMessageId.toBytes()]
|
||||
)
|
||||
let iwants = gossipSub.handleIHave(peer, @[msg])
|
||||
if iwants.messageIds.len > 0:
|
||||
iwantCount += 1
|
||||
|
||||
# Verify that our node responds with only one IWANT message
|
||||
check: iwantCount == 1
|
||||
check: gossipSub.outstandingIWANTs.contains(ihaveMessageId.toBytes())
|
||||
|
||||
# Simulate that our node receives the RPCMsg in response to the IWANT
|
||||
let actualMessageData = "Hello, World!".toBytes
|
||||
let rpcMsg = RPCMsg(
|
||||
messages: @[Message(
|
||||
fromPeer: firstPeer.peerId,
|
||||
seqno: seqno,
|
||||
data: actualMessageData
|
||||
)]
|
||||
)
|
||||
await gossipSub.rpcHandler(firstPeer, encodeRpcMsg(rpcMsg, false))
|
||||
|
||||
check: not gossipSub.outstandingIWANTs.contains(ihaveMessageId.toBytes())
|
||||
|
||||
await allFuturesThrowing(conns.mapIt(it.close()))
|
||||
await gossipSub.switch.stop()
|
||||
|
||||
asyncTest "handle unanswered IWANT messages":
|
||||
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
||||
gossipSub.parameters.heartbeatInterval = 50.milliseconds
|
||||
gossipSub.parameters.iwantTimeout = 10.milliseconds
|
||||
await gossipSub.start()
|
||||
|
||||
proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} = discard
|
||||
proc handler2(topic: string, data: seq[byte]) {.async.} = discard
|
||||
|
||||
let topic = "foobar"
|
||||
var conns = newSeq[Connection]()
|
||||
gossipSub.subscribe(topic, handler2)
|
||||
|
||||
# Setup a connection and a peer
|
||||
let conn = TestBufferStream.new(noop)
|
||||
conns &= conn
|
||||
let peerId = randomPeerId()
|
||||
conn.peerId = peerId
|
||||
let peer = gossipSub.getPubSubPeer(peerId)
|
||||
peer.handler = handler
|
||||
|
||||
# Simulate that the peer sends an IHAVE message to our node
|
||||
let ihaveMessageId = @[0'u8, 1, 2, 3]
|
||||
let ihaveMsg = ControlIHave(
|
||||
topicID: topic,
|
||||
messageIDs: @[ihaveMessageId]
|
||||
)
|
||||
discard gossipSub.handleIHave(peer, @[ihaveMsg])
|
||||
|
||||
check: gossipSub.outstandingIWANTs.contains(ihaveMessageId)
|
||||
check: peer.behaviourPenalty == 0.0
|
||||
|
||||
await sleepAsync(60.milliseconds)
|
||||
|
||||
check: not gossipSub.outstandingIWANTs.contains(ihaveMessageId)
|
||||
check: peer.behaviourPenalty == 0.1
|
||||
|
||||
await allFuturesThrowing(conns.mapIt(it.close()))
|
||||
await gossipSub.switch.stop()
|
||||
|
||||
proc setupTest(): Future[tuple[gossip0: GossipSub, gossip1: GossipSub, receivedMessages: ref HashSet[seq[byte]]]] {.async.} =
|
||||
let
|
||||
nodes = generateNodes(2, gossip = true, verifySignature = false)
|
||||
|
|
Loading…
Reference in New Issue