From 8cad437112d647c067ed0860729e59f05586978c Mon Sep 17 00:00:00 2001 From: kdeme Date: Thu, 6 Dec 2018 12:30:29 +0100 Subject: [PATCH] Fix, improve and enable tshh_connect + other improvements: - minePow fix - random padding - random IV (phew!) - other small changes + comments --- eth_p2p.nimble | 2 +- eth_p2p/rlpx_protocols/whisper_protocol.nim | 41 +- tests/tshh.nim | 10 +- tests/tshh_connect.nim | 537 +++++++++++--------- 4 files changed, 329 insertions(+), 261 deletions(-) diff --git a/eth_p2p.nimble b/eth_p2p.nimble index 55d3a86..5e49bae 100644 --- a/eth_p2p.nimble +++ b/eth_p2p.nimble @@ -32,5 +32,5 @@ task test, "Runs the test suite": runTest "tdiscovery" runTest "tserver" runTest "tserver", "-d:useSnappy" - # runTest "tshh_connect" + runTest "tshh_connect" runTest "tshh_connect_mocked" diff --git a/eth_p2p/rlpx_protocols/whisper_protocol.nim b/eth_p2p/rlpx_protocols/whisper_protocol.nim index 43b09d9..bf84c10 100644 --- a/eth_p2p/rlpx_protocols/whisper_protocol.nim +++ b/eth_p2p/rlpx_protocols/whisper_protocol.nim @@ -25,6 +25,8 @@ const whisperVersion* = 6 defaultMinPow* = 0.001'f64 defaultMaxMsgSize* = 1024'u32 * 1024'u32 # * 10 # should be no higher than max RLPx size + messageInterval* = 300 ## Interval at which messages are send to peers, in ms + pruneInterval* = 1000 ## Interval at which message queue is pruned, in ms type Hash* = MDigest[256] @@ -181,13 +183,9 @@ proc `or`(a, b: Bloom): Bloom = proc bytesCopy(bloom: var Bloom, b: Bytes) = assert b.len == bloomSize - # memcopy? - for i in 0.. bestPow: # XXX: could also compare hashes as numbers instead bestPow = pow result = i.uint64 + i.inc + proc calcPowHash*(self: Envelope): Hash = ## Calculate the message hash, as done during mining - this can be used to ## verify proof-of-work @@ -555,6 +560,9 @@ proc add*(self: var Queue, msg: Message): bool = proc newFilter*(src = none[PublicKey](), privateKey = none[PrivateKey](), symKey = none[SymKey](), topics: seq[Topic] = @[], powReq = 0.0, allowP2P = false): Filter = + # Zero topics will give an empty bloom filter which is fine as this bloom + # filter is only used to `or` with existing/other bloom filters. Not to do + # matching. Filter(src: src, privateKey: privateKey, symKey: symKey, topics: topics, powReq: powReq, allowP2P: allowP2P, bloom: toBloom(topics)) @@ -689,7 +697,7 @@ p2pProtocol Whisper(version = whisperVersion, whisperNet.config.isLightNode)) if m.protocolVersion == whisperVersion: - debug "Suitable Whisper peer", peer, whisperVersion + debug "Whisper peer", peer, whisperVersion else: raise newException(UselessPeerError, "Incompatible Whisper version") @@ -827,7 +835,7 @@ proc run(peer: Peer) {.async.} = whisperPeer.running = true while whisperPeer.running: peer.processQueue() - await sleepAsync(300) + await sleepAsync(messageInterval) proc pruneReceived(node: EthereumNode) = if node.peerPool != nil: # XXX: a bit dirty to need to check for this here ... @@ -850,7 +858,7 @@ proc run(node: EthereumNode, network: WhisperNetwork) {.async.} = # pruning the received sets is not necessary for correct workings # but simply from keeping the sets growing indefinitely node.pruneReceived() - await sleepAsync(1000) + await sleepAsync(pruneInterval) # Public EthereumNode calls ---------------------------------------------------- @@ -964,3 +972,6 @@ proc setLightNode*(node: EthereumNode, isLightNode: bool) = proc configureWhisper*(node: EthereumNode, config: WhisperConfig) = node.protocolState(Whisper).config = config +# Not something that should be run in normal circumstances +proc resetMessageQueue*(node: EthereumNode) = + node.protocolState(Whisper).queue = initQueue(defaultQueueCapacity) diff --git a/tests/tshh.nim b/tests/tshh.nim index be05ad3..9cc746f 100644 --- a/tests/tshh.nim +++ b/tests/tshh.nim @@ -277,9 +277,10 @@ suite "Whisper queue": # To test filters we do not care if the msg is valid or allowed proc prepFilterTestMsg(pubKey = none[PublicKey](), symKey = none[SymKey](), - src = none[PrivateKey](), topic: Topic): Message = + src = none[PrivateKey](), topic: Topic, + padding = none[seq[byte]]()): Message = let payload = Payload(dst: pubKey, symKey: symKey, src: src, - payload: @[byte 0, 1, 2]) + payload: @[byte 0, 1, 2], padding: padding) let encoded = whisper.encode(payload) let env = Envelope(expiry: 1, ttl: 1, topic: topic, data: encoded.get(), nonce: 0) @@ -332,9 +333,10 @@ suite "Whisper filter": test "test notify of filter against PoW requirement": let topic = [byte 0, 0, 0, 0] + let padding = some(repeat(byte 0, 251)) # this message has a PoW of 0.02962962962962963, number should be updated - # in case PoW algorithm changes - let msg = prepFilterTestMsg(topic = topic) + # in case PoW algorithm changes or contents of padding, payload, topic, etc. + let msg = prepFilterTestMsg(topic = topic, padding = padding) var filters = initTable[string, Filter]() let diff --git a/tests/tshh_connect.nim b/tests/tshh_connect.nim index 4cd6287..e07b69a 100644 --- a/tests/tshh_connect.nim +++ b/tests/tshh_connect.nim @@ -11,6 +11,11 @@ import sequtils, options, unittest, tables, asyncdispatch2, rlp, eth_keys, eth_p2p, eth_p2p/rlpx_protocols/[whisper_protocol], eth_p2p/[discovery, enode] +const + useCompression = defined(useSnappy) + +var nextPort = 30303 + proc localAddress(port: int): Address = let port = Port(port) result = Address(udpPort: port, tcpPort: port, ip: parseIpAddress("127.0.0.1")) @@ -33,298 +38,348 @@ template asyncTest(name, body: untyped) = proc scenario {.async.} = body waitFor scenario() -const useCompression = defined(useSnappy) -let - keys1 = newKeyPair() - keys2 = newKeyPair() -var node1 = newEthereumNode(keys1, localAddress(30303), 1, nil, - addAllCapabilities = false, - useCompression = useCompression) -node1.addCapability Whisper +proc resetMessageQueues(nodes: varargs[EthereumNode]) = + for node in nodes: + node.resetMessageQueue() -var node2 = newEthereumNode(keys2, localAddress(30304), 1, nil, - addAllCapabilities = false, - useCompression = useCompression) -node2.addCapability Whisper +proc prepTestNode(): EthereumNode = + let keys1 = newKeyPair() + result = newEthereumNode(keys1, localAddress(nextPort), 1, nil, + addAllCapabilities = false, + useCompression = useCompression) + nextPort.inc + result.addCapability Whisper -template waitForEmptyQueues() = - while node1.protocolState(Whisper).queue.items.len != 0 or - node2.protocolState(Whisper).queue.items.len != 0: poll() +let bootENode = waitFor setupBootNode() -when not defined(directConnect): - let bootENode = waitFor setupBootNode() - - # node2 listening and node1 not, to avoid many incoming vs outgoing - var node1Connected = node1.connectToNetwork(@[bootENode], false, true) - var node2Connected = node2.connectToNetwork(@[bootENode], true, true) - waitFor node1Connected - waitFor node2Connected +var node1 = prepTestNode() +var node2 = prepTestNode() +# node2 listening and node1 not, to avoid many incoming vs outgoing +var node1Connected = node1.connectToNetwork(@[bootENode], false, true) +var node2Connected = node2.connectToNetwork(@[bootENode], true, true) +waitFor node1Connected +waitFor node2Connected +suite "Whisper connections": asyncTest "Two peers connected": check: node1.peerPool.connectedNodes.len() == 1 node2.peerPool.connectedNodes.len() == 1 -else: # XXX: tricky without peerPool - node2.startListening() - discard waitFor node1.rlpxConnect(newNode(initENode(node2.keys.pubKey, - node2.address))) -asyncTest "Filters with encryption and signing": - let encryptKeyPair = newKeyPair() - let signKeyPair = newKeyPair() - var symKey: SymKey - let topic = [byte 0x12, 0, 0, 0] - var filters: seq[string] = @[] - var payloads = [repeat(byte 1, 10), repeat(byte 2, 10), - repeat(byte 3, 10), repeat(byte 4, 10)] - var futures = [newFuture[int](), newFuture[int](), - newFuture[int](), newFuture[int]()] + asyncTest "Filters with encryption and signing": + let encryptKeyPair = newKeyPair() + let signKeyPair = newKeyPair() + var symKey: SymKey + let topic = [byte 0x12, 0, 0, 0] + var filters: seq[string] = @[] + var payloads = [repeat(byte 1, 10), repeat(byte 2, 10), + repeat(byte 3, 10), repeat(byte 4, 10)] + var futures = [newFuture[int](), newFuture[int](), + newFuture[int](), newFuture[int]()] - proc handler1(msg: ReceivedMessage) = - var count {.global.}: int - check msg.decoded.payload == payloads[0] or msg.decoded.payload == payloads[1] - count += 1 - if count == 2: futures[0].complete(1) - proc handler2(msg: ReceivedMessage) = - check msg.decoded.payload == payloads[1] - futures[1].complete(1) - proc handler3(msg: ReceivedMessage) = - var count {.global.}: int - check msg.decoded.payload == payloads[2] or msg.decoded.payload == payloads[3] - count += 1 - if count == 2: futures[2].complete(1) - proc handler4(msg: ReceivedMessage) = - check msg.decoded.payload == payloads[3] - futures[3].complete(1) + proc handler1(msg: ReceivedMessage) = + var count {.global.}: int + check msg.decoded.payload == payloads[0] or msg.decoded.payload == payloads[1] + count += 1 + if count == 2: futures[0].complete(1) + proc handler2(msg: ReceivedMessage) = + check msg.decoded.payload == payloads[1] + futures[1].complete(1) + proc handler3(msg: ReceivedMessage) = + var count {.global.}: int + check msg.decoded.payload == payloads[2] or msg.decoded.payload == payloads[3] + count += 1 + if count == 2: futures[2].complete(1) + proc handler4(msg: ReceivedMessage) = + check msg.decoded.payload == payloads[3] + futures[3].complete(1) - # Filters - # filter for encrypted asym - filters.add(node1.subscribeFilter(newFilter(privateKey = some(encryptKeyPair.seckey), - topics = @[topic]), handler1)) - # filter for encrypted asym + signed - filters.add(node1.subscribeFilter(newFilter(some(signKeyPair.pubkey), - privateKey = some(encryptKeyPair.seckey), - topics = @[topic]), handler2)) - # filter for encrypted sym - filters.add(node1.subscribeFilter(newFilter(symKey = some(symKey), - topics = @[topic]), handler3)) - # filter for encrypted sym + signed - filters.add(node1.subscribeFilter(newFilter(some(signKeyPair.pubkey), - symKey = some(symKey), - topics = @[topic]), handler4)) - # Messages - # encrypted asym - check true == node2.postMessage(some(encryptKeyPair.pubkey), ttl = 5, - topic = topic, payload = payloads[0]) - # encrypted asym + signed - check true == node2.postMessage(some(encryptKeyPair.pubkey), - src = some(signKeyPair.seckey), ttl = 4, - topic = topic, payload = payloads[1]) - # encrypted sym - check true == node2.postMessage(symKey = some(symKey), ttl = 3, topic = topic, - payload = payloads[2]) - # encrypted sym + signed - check true == node2.postMessage(symKey = some(symKey), - src = some(signKeyPair.seckey), ttl = 2, - topic = topic, payload = payloads[3]) + # Filters + # filter for encrypted asym + filters.add(node1.subscribeFilter(newFilter(privateKey = some(encryptKeyPair.seckey), + topics = @[topic]), handler1)) + # filter for encrypted asym + signed + filters.add(node1.subscribeFilter(newFilter(some(signKeyPair.pubkey), + privateKey = some(encryptKeyPair.seckey), + topics = @[topic]), handler2)) + # filter for encrypted sym + filters.add(node1.subscribeFilter(newFilter(symKey = some(symKey), + topics = @[topic]), handler3)) + # filter for encrypted sym + signed + filters.add(node1.subscribeFilter(newFilter(some(signKeyPair.pubkey), + symKey = some(symKey), + topics = @[topic]), handler4)) + var safeTTL = 5'u32 + # Messages + check: + # encrypted asym + node2.postMessage(some(encryptKeyPair.pubkey), ttl = safeTTL, + topic = topic, payload = payloads[0]) == true + # encrypted asym + signed + node2.postMessage(some(encryptKeyPair.pubkey), + src = some(signKeyPair.seckey), ttl = safeTTL, + topic = topic, payload = payloads[1]) == true + # encrypted sym + node2.postMessage(symKey = some(symKey), ttl = safeTTL, topic = topic, + payload = payloads[2]) == true + # encrypted sym + signed + node2.postMessage(symKey = some(symKey), + src = some(signKeyPair.seckey), + ttl = safeTTL, topic = topic, + payload = payloads[3]) == true - check node2.protocolState(Whisper).queue.items.len == 4 + node2.protocolState(Whisper).queue.items.len == 4 - var f = all(futures) - await f or sleepAsync(300) - check: - f.finished == true - node1.protocolState(Whisper).queue.items.len == 4 + var f = all(futures) + await f or sleepAsync(messageInterval) + check: + f.finished == true + node1.protocolState(Whisper).queue.items.len == 4 - for filter in filters: - check node1.unsubscribeFilter(filter) == true + for filter in filters: + check node1.unsubscribeFilter(filter) == true - waitForEmptyQueues() + resetMessageQueues(node1, node2) -asyncTest "Filters with topics": - let topic1 = [byte 0x12, 0, 0, 0] - let topic2 = [byte 0x34, 0, 0, 0] - var payloads = [repeat(byte 0, 10), repeat(byte 1, 10)] - var futures = [newFuture[int](), newFuture[int]()] - proc handler1(msg: ReceivedMessage) = - check msg.decoded.payload == payloads[0] - futures[0].complete(1) - proc handler2(msg: ReceivedMessage) = - check msg.decoded.payload == payloads[1] - futures[1].complete(1) + asyncTest "Filters with topics": + let topic1 = [byte 0x12, 0, 0, 0] + let topic2 = [byte 0x34, 0, 0, 0] + var payloads = [repeat(byte 0, 10), repeat(byte 1, 10)] + var futures = [newFuture[int](), newFuture[int]()] + proc handler1(msg: ReceivedMessage) = + check msg.decoded.payload == payloads[0] + futures[0].complete(1) + proc handler2(msg: ReceivedMessage) = + check msg.decoded.payload == payloads[1] + futures[1].complete(1) - var filter1 = node1.subscribeFilter(newFilter(topics = @[topic1]), handler1) - var filter2 = node1.subscribeFilter(newFilter(topics = @[topic2]), handler2) + var filter1 = node1.subscribeFilter(newFilter(topics = @[topic1]), handler1) + var filter2 = node1.subscribeFilter(newFilter(topics = @[topic2]), handler2) - check: - true == node2.postMessage(ttl = 3, topic = topic1, payload = payloads[0]) - true == node2.postMessage(ttl = 2, topic = topic2, payload = payloads[1]) + var safeTTL = 3'u32 + check: + node2.postMessage(ttl = safeTTL + 1, topic = topic1, + payload = payloads[0]) == true + node2.postMessage(ttl = safeTTL, topic = topic2, + payload = payloads[1]) == true + node2.protocolState(Whisper).queue.items.len == 2 - var f = all(futures) - await f or sleepAsync(300) - check: - f.finished == true - node1.protocolState(Whisper).queue.items.len == 2 + var f = all(futures) + await f or sleepAsync(messageInterval) + check: + f.finished == true + node1.protocolState(Whisper).queue.items.len == 2 - node1.unsubscribeFilter(filter1) == true - node1.unsubscribeFilter(filter2) == true + node1.unsubscribeFilter(filter1) == true + node1.unsubscribeFilter(filter2) == true - waitForEmptyQueues() + resetMessageQueues(node1, node2) -asyncTest "Filters with PoW": - let topic = [byte 0x12, 0, 0, 0] - var payload = repeat(byte 0, 10) - var futures = [newFuture[int](), newFuture[int]()] - proc handler1(msg: ReceivedMessage) = - check msg.decoded.payload == payload - futures[0].complete(1) - proc handler2(msg: ReceivedMessage) = - check msg.decoded.payload == payload - futures[1].complete(1) + asyncTest "Filters with PoW": + let topic = [byte 0x12, 0, 0, 0] + var payload = repeat(byte 0, 10) + var futures = [newFuture[int](), newFuture[int]()] + proc handler1(msg: ReceivedMessage) = + check msg.decoded.payload == payload + futures[0].complete(1) + proc handler2(msg: ReceivedMessage) = + check msg.decoded.payload == payload + futures[1].complete(1) - var filter1 = node1.subscribeFilter(newFilter(topics = @[topic], powReq = 0), - handler1) - var filter2 = node1.subscribeFilter(newFilter(topics = @[topic], powReq = 10), - handler2) + var filter1 = node1.subscribeFilter(newFilter(topics = @[topic], powReq = 0), + handler1) + var filter2 = node1.subscribeFilter(newFilter(topics = @[topic], + powReq = 1_000_000), handler2) - check: - true == node2.postMessage(ttl = 2, topic = topic, payload = payload) + let safeTTL = 2'u32 + check: + node2.postMessage(ttl = safeTTL, topic = topic, payload = payload) == true - await futures[0] or sleepAsync(300) - await futures[1] or sleepAsync(300) - check: - futures[0].finished == true - futures[1].finished == false - node1.protocolState(Whisper).queue.items.len == 1 + await futures[0] or sleepAsync(messageInterval) + await futures[1] or sleepAsync(messageInterval) + check: + futures[0].finished == true + futures[1].finished == false + node1.protocolState(Whisper).queue.items.len == 1 - node1.unsubscribeFilter(filter1) == true - node1.unsubscribeFilter(filter2) == true + node1.unsubscribeFilter(filter1) == true + node1.unsubscribeFilter(filter2) == true - waitForEmptyQueues() + resetMessageQueues(node1, node2) -asyncTest "Filters with queues": - let topic = [byte 0, 0, 0, 0] - let payload = repeat(byte 0, 10) + asyncTest "Filters with queues": + let topic = [byte 0, 0, 0, 0] + let payload = repeat(byte 0, 10) - var filter = node1.subscribeFilter(newFilter(topics = @[topic])) - for i in countdown(10, 1): - check true == node2.postMessage(ttl = i.uint32, topic = topic, - payload = payload) + var filter = node1.subscribeFilter(newFilter(topics = @[topic])) + for i in countdown(10, 1): + check node2.postMessage(ttl = i.uint32, topic = topic, + payload = payload) == true - await sleepAsync(300) - check: - node1.getFilterMessages(filter).len() == 10 - node1.getFilterMessages(filter).len() == 0 - node1.unsubscribeFilter(filter) == true + await sleepAsync(messageInterval) + check: + node1.getFilterMessages(filter).len() == 10 + node1.getFilterMessages(filter).len() == 0 + node1.unsubscribeFilter(filter) == true - waitForEmptyQueues() + resetMessageQueues(node1, node2) -asyncTest "Bloomfilter blocking": - let sendTopic1 = [byte 0x12, 0, 0, 0] - let sendTopic2 = [byte 0x34, 0, 0, 0] - let filterTopics = @[[byte 0x34, 0, 0, 0],[byte 0x56, 0, 0, 0]] - let payload = repeat(byte 0, 10) - var f: Future[int] = newFuture[int]() - proc handler(msg: ReceivedMessage) = - check msg.decoded.payload == payload - f.complete(1) - var filter = node1.subscribeFilter(newFilter(topics = filterTopics), handler) - await node1.setBloomFilter(node1.filtersToBloom()) + asyncTest "Local filter notify": + let topic = [byte 0, 0, 0, 0] - check true == node2.postMessage(ttl = 1, topic = sendTopic1, payload = payload) + var filter = node1.subscribeFilter(newFilter(topics = @[topic])) + let safeTTL = 2'u32 + check: + node1.postMessage(ttl = safeTTL, topic = topic, + payload = repeat(byte 4, 10)) == true + node1.getFilterMessages(filter).len() == 1 + node1.unsubscribeFilter(filter) == true - await f or sleepAsync(300) - check: - f.finished == false - node1.protocolState(Whisper).queue.items.len == 0 - node2.protocolState(Whisper).queue.items.len == 1 + await sleepAsync(messageInterval) + resetMessageQueues(node1, node2) - f = newFuture[int]() - waitForEmptyQueues() + asyncTest "Bloomfilter blocking": + let sendTopic1 = [byte 0x12, 0, 0, 0] + let sendTopic2 = [byte 0x34, 0, 0, 0] + let filterTopics = @[[byte 0x34, 0, 0, 0],[byte 0x56, 0, 0, 0]] + let payload = repeat(byte 0, 10) + var f: Future[int] = newFuture[int]() + proc handler(msg: ReceivedMessage) = + check msg.decoded.payload == payload + f.complete(1) + var filter = node1.subscribeFilter(newFilter(topics = filterTopics), handler) + await node1.setBloomFilter(node1.filtersToBloom()) - check true == node2.postMessage(ttl = 1, topic = sendTopic2, payload = payload) + let safeTTL = 2'u32 + check: + node2.postMessage(ttl = safeTTL, topic = sendTopic1, + payload = payload) == true + node2.protocolState(Whisper).queue.items.len == 1 - await f or sleepAsync(300) - check: - f.finished == true - f.read() == 1 - node1.protocolState(Whisper).queue.items.len == 1 - node2.protocolState(Whisper).queue.items.len == 1 + await f or sleepAsync(messageInterval) + check: + f.finished == false + node1.protocolState(Whisper).queue.items.len == 0 - node1.unsubscribeFilter(filter) == true + resetMessageQueues(node1, node2) - await node1.setBloomFilter(fullBloom()) + f = newFuture[int]() - waitForEmptyQueues() + check: + node2.postMessage(ttl = safeTTL, topic = sendTopic2, + payload = payload) == true + node2.protocolState(Whisper).queue.items.len == 1 -asyncTest "PoW blocking": - let topic = [byte 0, 0, 0, 0] - let payload = repeat(byte 0, 10) - await node1.setPowRequirement(1.0) - check true == node2.postMessage(ttl = 1, topic = topic, payload = payload) - await sleepAsync(300) - check: - node1.protocolState(Whisper).queue.items.len == 0 - node2.protocolState(Whisper).queue.items.len == 1 + await f or sleepAsync(messageInterval) + check: + f.finished == true + f.read() == 1 + node1.protocolState(Whisper).queue.items.len == 1 - waitForEmptyQueues() + node1.unsubscribeFilter(filter) == true - await node1.setPowRequirement(0.0) - check true == node2.postMessage(ttl = 1, topic = topic, payload = payload) - await sleepAsync(300) - check: - node1.protocolState(Whisper).queue.items.len == 1 - node2.protocolState(Whisper).queue.items.len == 1 + await node1.setBloomFilter(fullBloom()) - waitForEmptyQueues() + resetMessageQueues(node1, node2) -asyncTest "Queue pruning": - let topic = [byte 0, 0, 0, 0] - let payload = repeat(byte 0, 10) - for i in countdown(10, 1): - check true == node2.postMessage(ttl = i.uint32, topic = topic, - payload = payload) - check node2.protocolState(Whisper).queue.items.len == 10 + asyncTest "PoW blocking": + let topic = [byte 0, 0, 0, 0] + let payload = repeat(byte 0, 10) + let safeTTL = 2'u32 - await sleepAsync(300) - check: - node1.protocolState(Whisper).queue.items.len == 10 + await node1.setPowRequirement(1_000_000) + check: + node2.postMessage(ttl = safeTTL, topic = topic, payload = payload) == true + node2.protocolState(Whisper).queue.items.len == 1 + await sleepAsync(messageInterval) + check: + node1.protocolState(Whisper).queue.items.len == 0 - await sleepAsync(1000) - check: - node1.protocolState(Whisper).queue.items.len == 0 - node2.protocolState(Whisper).queue.items.len == 0 + resetMessageQueues(node1, node2) -asyncTest "Light node posting": - let topic = [byte 0, 0, 0, 0] - node1.setLightNode(true) + await node1.setPowRequirement(0.0) + check: + node2.postMessage(ttl = safeTTL, topic = topic, payload = payload) == true + node2.protocolState(Whisper).queue.items.len == 1 + await sleepAsync(messageInterval) + check: + node1.protocolState(Whisper).queue.items.len == 1 - check: - node1.postMessage(ttl = 2, topic = topic, payload = repeat(byte 0, 10)) == false - node1.protocolState(Whisper).queue.items.len == 0 + resetMessageQueues(node1, node2) - node1.setLightNode(false) + asyncTest "Queue pruning": + let topic = [byte 0, 0, 0, 0] + let payload = repeat(byte 0, 10) + # We need a minimum TTL of 2 as when set to 1 there is a small chance that + # it is already expired after messageInterval due to rounding down of float + # to uint32 in postMessage() + let minTTL = 2'u32 + for i in countdown(minTTL + 9, minTTL): + check node2.postMessage(ttl = i, topic = topic, payload = payload) == true + check node2.protocolState(Whisper).queue.items.len == 10 -asyncTest "P2P": - let topic = [byte 0, 0, 0, 0] - var f: Future[int] = newFuture[int]() - proc handler(msg: ReceivedMessage) = - check msg.decoded.payload == repeat(byte 4, 10) - f.complete(1) + await sleepAsync(messageInterval) + check node1.protocolState(Whisper).queue.items.len == 10 - var filter = node1.subscribeFilter(newFilter(topics = @[topic], allowP2P = true), - handler) - check: - true == node1.setPeerTrusted(toNodeId(node2.keys.pubkey)) - true == node2.postMessage(ttl = 2, topic = topic, - payload = repeat(byte 4, 10), - targetPeer = some(toNodeId(node1.keys.pubkey))) + await sleepAsync(int(minTTL*1000)) + check node1.protocolState(Whisper).queue.items.len == 0 + check node2.protocolState(Whisper).queue.items.len == 0 - await f or sleepAsync(300) - check: - f.finished == true - f.read() == 1 - node1.protocolState(Whisper).queue.items.len == 0 - node2.protocolState(Whisper).queue.items.len == 0 + resetMessageQueues(node1, node2) - node1.unsubscribeFilter(filter) == true + asyncTest "P2P post": + let topic = [byte 0, 0, 0, 0] + var f: Future[int] = newFuture[int]() + proc handler(msg: ReceivedMessage) = + check msg.decoded.payload == repeat(byte 4, 10) + f.complete(1) + + var filter = node1.subscribeFilter(newFilter(topics = @[topic], + allowP2P = true), handler) + check: + node1.setPeerTrusted(toNodeId(node2.keys.pubkey)) == true + node2.postMessage(ttl = 10, topic = topic, + payload = repeat(byte 4, 10), + targetPeer = some(toNodeId(node1.keys.pubkey))) == true + + await f or sleepAsync(messageInterval) + check: + f.finished == true + f.read() == 1 + node1.protocolState(Whisper).queue.items.len == 0 + node2.protocolState(Whisper).queue.items.len == 0 + + node1.unsubscribeFilter(filter) == true + + test "Light node posting": + var ln1 = prepTestNode() + ln1.setLightNode(true) + + # not listening, so will only connect to others that are listening (node2) + waitFor ln1.connectToNetwork(@[bootENode], false, true) + + let topic = [byte 0, 0, 0, 0] + + let safeTTL = 2'u32 + check: + # normal post + ln1.postMessage(ttl = safeTTL, topic = topic, + payload = repeat(byte 0, 10)) == false + ln1.protocolState(Whisper).queue.items.len == 0 + # P2P post + ln1.postMessage(ttl = safeTTL, topic = topic, + payload = repeat(byte 0, 10), + targetPeer = some(toNodeId(node2.keys.pubkey))) == true + ln1.protocolState(Whisper).queue.items.len == 0 + + test "Connect two light nodes": + var ln1 = prepTestNode() + var ln2 = prepTestNode() + + ln1.setLightNode(true) + ln2.setLightNode(true) + + ln2.startListening() + let peer = waitFor ln1.rlpxConnect(newNode(initENode(ln2.keys.pubKey, + ln2.address))) + check peer.isNil == true