mirror of https://github.com/waku-org/nwaku.git
Add check for node in peerpool in P2P post test
This commit is contained in:
parent
c1fb7aeacd
commit
1f90bae10e
|
@ -15,7 +15,7 @@ import
|
|||
const
|
||||
safeTTL = 5'u32
|
||||
waitInterval = messageInterval + 150.milliseconds
|
||||
conditionTimeoutMs = 3000
|
||||
conditionTimeoutMs = 3000.milliseconds
|
||||
|
||||
proc resetMessageQueues(nodes: varargs[EthereumNode]) =
|
||||
for node in nodes:
|
||||
|
@ -23,7 +23,8 @@ proc resetMessageQueues(nodes: varargs[EthereumNode]) =
|
|||
|
||||
# check on a condition until true or return a future containing false
|
||||
# if timeout expires first
|
||||
proc eventually(timeout: int, condition: proc(): bool {.gcsafe.}): Future[bool] =
|
||||
proc eventually(timeout: Duration, condition: proc(): bool {.gcsafe.}):
|
||||
Future[bool] =
|
||||
let wrappedCondition = proc(): Future[bool] {.async.} =
|
||||
let f = newFuture[bool]()
|
||||
while not condition():
|
||||
|
@ -76,7 +77,8 @@ suite "Waku connections":
|
|||
|
||||
proc handler1(msg: ReceivedMessage) =
|
||||
var count {.global.}: int
|
||||
check msg.decoded.payload == payloads[0] or msg.decoded.payload == payloads[1]
|
||||
check msg.decoded.payload == payloads[0] or
|
||||
msg.decoded.payload == payloads[1]
|
||||
count += 1
|
||||
if count == 2: futures[0].complete(1)
|
||||
proc handler2(msg: ReceivedMessage) =
|
||||
|
@ -84,7 +86,8 @@ suite "Waku connections":
|
|||
futures[1].complete(1)
|
||||
proc handler3(msg: ReceivedMessage) =
|
||||
var count {.global.}: int
|
||||
check msg.decoded.payload == payloads[2] or msg.decoded.payload == payloads[3]
|
||||
check msg.decoded.payload == payloads[2] or
|
||||
msg.decoded.payload == payloads[3]
|
||||
count += 1
|
||||
if count == 2: futures[2].complete(1)
|
||||
proc handler4(msg: ReceivedMessage) =
|
||||
|
@ -93,19 +96,17 @@ suite "Waku connections":
|
|||
|
||||
# Filters
|
||||
# filter for encrypted asym
|
||||
filters.add(node1.subscribeFilter(initFilter(privateKey = some(encryptKeyPair.seckey),
|
||||
topics = @[topic]), handler1))
|
||||
filters.add(node1.subscribeFilter(initFilter(
|
||||
privateKey = some(encryptKeyPair.seckey), topics = @[topic]), handler1))
|
||||
# filter for encrypted asym + signed
|
||||
filters.add(node1.subscribeFilter(initFilter(some(signKeyPair.pubkey),
|
||||
privateKey = some(encryptKeyPair.seckey),
|
||||
topics = @[topic]), handler2))
|
||||
privateKey = some(encryptKeyPair.seckey), topics = @[topic]), handler2))
|
||||
# filter for encrypted sym
|
||||
filters.add(node1.subscribeFilter(initFilter(symKey = some(symKey),
|
||||
topics = @[topic]), handler3))
|
||||
topics = @[topic]), handler3))
|
||||
# filter for encrypted sym + signed
|
||||
filters.add(node1.subscribeFilter(initFilter(some(signKeyPair.pubkey),
|
||||
symKey = some(symKey),
|
||||
topics = @[topic]), handler4))
|
||||
symKey = some(symKey), topics = @[topic]), handler4))
|
||||
# Messages
|
||||
check:
|
||||
# encrypted asym
|
||||
|
@ -182,10 +183,10 @@ suite "Waku connections":
|
|||
check msg.decoded.payload == payload
|
||||
futures[1].complete(1)
|
||||
|
||||
var filter1 = node1.subscribeFilter(initFilter(topics = @[topic], powReq = 0),
|
||||
handler1)
|
||||
var filter2 = node1.subscribeFilter(initFilter(topics = @[topic],
|
||||
powReq = 1_000_000), handler2)
|
||||
var filter1 = node1.subscribeFilter(
|
||||
initFilter(topics = @[topic], powReq = 0), handler1)
|
||||
var filter2 = node1.subscribeFilter(
|
||||
initFilter(topics = @[topic], powReq = 1_000_000), handler2)
|
||||
|
||||
check:
|
||||
node2.postMessage(ttl = safeTTL, topic = topic, payload = payload) == true
|
||||
|
@ -246,7 +247,8 @@ suite "Waku connections":
|
|||
proc handler(msg: ReceivedMessage) =
|
||||
check msg.decoded.payload == payload
|
||||
f.complete(1)
|
||||
var filter = node1.subscribeFilter(initFilter(topics = filterTopics), handler)
|
||||
var filter = node1.subscribeFilter(
|
||||
initFilter(topics = filterTopics), handler)
|
||||
await node1.setBloomFilter(node1.filtersToBloom())
|
||||
|
||||
check:
|
||||
|
@ -314,7 +316,7 @@ suite "Waku connections":
|
|||
# to uint32 in postMessage()
|
||||
let lowerTTL = 2'u32 # Lower TTL as we need to wait for messages to expire
|
||||
for i in countdown(10, 1):
|
||||
check node2.postMessage(ttl = lowerTTL, topic = topic, payload = payload) == true
|
||||
check node2.postMessage(ttl = lowerTTL, topic = topic, payload = payload)
|
||||
check node2.protocolState(Waku).queue.items.len == 10
|
||||
|
||||
await sleepAsync(waitInterval)
|
||||
|
@ -338,6 +340,10 @@ suite "Waku connections":
|
|||
|
||||
var filter = node1.subscribeFilter(initFilter(topics = @[topic],
|
||||
allowP2P = true), handler)
|
||||
# Need to be sure that node1 is added in the peerpool of node2 as
|
||||
# postMessage with target will iterate over the peers
|
||||
require await eventually(conditionTimeoutMs,
|
||||
proc(): bool = node2.peerPool.len == 1)
|
||||
check:
|
||||
node1.setPeerTrusted(toNodeId(node2.keys.pubkey)) == true
|
||||
node2.postMessage(ttl = 10, topic = topic,
|
||||
|
@ -472,9 +478,11 @@ suite "Waku connections":
|
|||
peer.state(Waku).bloom == bloom
|
||||
peer.state(Waku).topics == some(topics)
|
||||
|
||||
let hasBloomNodeConnectedCondition = proc(): bool = wakuBloomNode.peerPool.len == 1
|
||||
let hasBloomNodeConnectedCondition = proc(): bool =
|
||||
wakuBloomNode.peerPool.len == 1
|
||||
# wait for the peer to be connected on the other side
|
||||
let hasBloomNodeConnected = await eventually(conditionTimeoutMs, hasBloomNodeConnectedCondition)
|
||||
let hasBloomNodeConnected =
|
||||
await eventually(conditionTimeoutMs, hasBloomNodeConnectedCondition)
|
||||
# check bloom filter is updated
|
||||
check:
|
||||
hasBloomNodeConnected
|
||||
|
@ -487,9 +495,11 @@ suite "Waku connections":
|
|||
|
||||
let bloomFilterUpdatedCondition = proc(): bool =
|
||||
for peer in wakuNode.peerPool.peers:
|
||||
return peer.state(Waku).bloom == bloom and peer.state(Waku).topics == none(seq[Topic])
|
||||
return peer.state(Waku).bloom == bloom and
|
||||
peer.state(Waku).topics == none(seq[Topic])
|
||||
|
||||
let bloomFilterUpdated = await eventually(conditionTimeoutMs, bloomFilterUpdatedCondition)
|
||||
let bloomFilterUpdated =
|
||||
await eventually(conditionTimeoutMs, bloomFilterUpdatedCondition)
|
||||
# check bloom filter is updated
|
||||
check:
|
||||
bloomFilterUpdated
|
||||
|
@ -516,9 +526,8 @@ suite "Waku connections":
|
|||
wakuNode.postMessage(ttl = safeTTL, topic = wrongTopic, payload = payload)
|
||||
wakuNode.protocolState(Waku).queue.items.len == 3
|
||||
|
||||
let response = await eventually(conditionTimeoutMs, proc (): bool = wakuTopicNode.protocolState(Waku).queue.items.len == 2)
|
||||
check:
|
||||
response
|
||||
await eventually(conditionTimeoutMs,
|
||||
proc (): bool = wakuTopicNode.protocolState(Waku).queue.items.len == 2)
|
||||
|
||||
asyncTest "Waku topic-interest versus bloom filter":
|
||||
var
|
||||
|
|
Loading…
Reference in New Issue