From a8a55f16dc35070d4a90673f11239859198b0b74 Mon Sep 17 00:00:00 2001 From: kdeme Date: Tue, 19 Nov 2019 17:22:35 +0100 Subject: [PATCH] Implement quick Waku - Whisper bridge by sharing the queue + adjust test --- eth.nimble | 1 + eth/p2p/rlpx_protocols/waku_protocol.nim | 18 +- eth/p2p/rlpx_protocols/whisper_protocol.nim | 13 +- tests/p2p/test_waku.nim | 416 -------------------- tests/p2p/test_waku_bridge.nim | 93 +++++ tests/p2p/test_waku_connect.nim | 390 ------------------ 6 files changed, 113 insertions(+), 818 deletions(-) delete mode 100644 tests/p2p/test_waku.nim create mode 100644 tests/p2p/test_waku_bridge.nim delete mode 100644 tests/p2p/test_waku_connect.nim diff --git a/eth.nimble b/eth.nimble index 296b3cd..6215f14 100644 --- a/eth.nimble +++ b/eth.nimble @@ -52,6 +52,7 @@ proc runP2pTests() = "test_shh", "test_shh_config", "test_shh_connect", + "test_waku_bridge", "test_protocol_handlers", ]: runTest("tests/p2p/" & filename) diff --git a/eth/p2p/rlpx_protocols/waku_protocol.nim b/eth/p2p/rlpx_protocols/waku_protocol.nim index 96cf1ea..ff04886 100644 --- a/eth/p2p/rlpx_protocols/waku_protocol.nim +++ b/eth/p2p/rlpx_protocols/waku_protocol.nim @@ -38,6 +38,8 @@ import options, tables, times, chronos, chronicles, eth/[keys, async_utils, p2p], whisper/whisper_types +import eth/p2p/rlpx_protocols/whisper_protocol + export whisper_types @@ -72,7 +74,7 @@ type received: HashSet[Message] WakuNetwork = ref object - queue*: Queue + queue*: ref Queue filters*: Filters config*: WakuConfig @@ -98,7 +100,11 @@ proc run(peer: Peer) {.gcsafe, async.} proc run(node: EthereumNode, network: WakuNetwork) {.gcsafe, async.} proc initProtocolState*(network: WakuNetwork, node: EthereumNode) {.gcsafe.} = - network.queue = initQueue(defaultQueueCapacity) + if node.protocolState(Whisper).isNil: + new(network.queue) + network.queue[] = initQueue(defaultQueueCapacity) + else: + network.queue = node.protocolState(Whisper).queue network.filters = initTable[string, Filter]() network.config.bloom = fullBloom() network.config.powRequirement = defaultMinPow @@ -195,7 +201,7 @@ p2pProtocol Waku(version = wakuVersion, # This can still be a duplicate message, but from another peer than # the peer who send the message. - if peer.networkState.queue.add(msg): + if peer.networkState.queue[].add(msg): # notify filters of this message peer.networkState.filters.notify(msg) @@ -294,7 +300,7 @@ proc run(node: EthereumNode, network: WakuNetwork) {.async.} = while true: # prune message queue every second # TTL unit is in seconds, so this should be sufficient? - network.queue.prune() + network.queue[].prune() # pruning the received sets is not necessary for correct workings # but simply from keeping the sets growing indefinitely node.pruneReceived() @@ -317,7 +323,7 @@ proc queueMessage(node: EthereumNode, msg: Message): bool = return false trace "Adding message to queue" - if wakuNet.queue.add(msg): + if wakuNet.queue[].add(msg): # Also notify our own filters of the message we are sending, # e.g. msg from local Dapp to Dapp wakuNet.filters.notify(msg) @@ -459,4 +465,4 @@ proc resetMessageQueue*(node: EthereumNode) = ## Full reset of the message queue. ## ## NOTE: Not something that should be run in normal circumstances. - node.protocolState(Waku).queue = initQueue(defaultQueueCapacity) + node.protocolState(Waku).queue[] = initQueue(defaultQueueCapacity) diff --git a/eth/p2p/rlpx_protocols/whisper_protocol.nim b/eth/p2p/rlpx_protocols/whisper_protocol.nim index c23d4e5..d92919d 100644 --- a/eth/p2p/rlpx_protocols/whisper_protocol.nim +++ b/eth/p2p/rlpx_protocols/whisper_protocol.nim @@ -70,7 +70,7 @@ type received: HashSet[Message] WhisperNetwork = ref object - queue*: Queue + queue*: ref Queue filters*: Filters config*: WhisperConfig @@ -95,7 +95,8 @@ proc run(peer: Peer) {.gcsafe, async.} proc run(node: EthereumNode, network: WhisperNetwork) {.gcsafe, async.} proc initProtocolState*(network: WhisperNetwork, node: EthereumNode) {.gcsafe.} = - network.queue = initQueue(defaultQueueCapacity) + new(network.queue) + network.queue[] = initQueue(defaultQueueCapacity) network.filters = initTable[string, Filter]() network.config.bloom = fullBloom() network.config.powRequirement = defaultMinPow @@ -192,7 +193,7 @@ p2pProtocol Whisper(version = whisperVersion, # This can still be a duplicate message, but from another peer than # the peer who send the message. - if peer.networkState.queue.add(msg): + if peer.networkState.queue[].add(msg): # notify filters of this message peer.networkState.filters.notify(msg) @@ -291,7 +292,7 @@ proc run(node: EthereumNode, network: WhisperNetwork) {.async.} = while true: # prune message queue every second # TTL unit is in seconds, so this should be sufficient? - network.queue.prune() + network.queue[].prune() # pruning the received sets is not necessary for correct workings # but simply from keeping the sets growing indefinitely node.pruneReceived() @@ -314,7 +315,7 @@ proc queueMessage(node: EthereumNode, msg: Message): bool = return false trace "Adding message to queue" - if whisperNet.queue.add(msg): + if whisperNet.queue[].add(msg): # Also notify our own filters of the message we are sending, # e.g. msg from local Dapp to Dapp whisperNet.filters.notify(msg) @@ -456,4 +457,4 @@ proc resetMessageQueue*(node: EthereumNode) = ## Full reset of the message queue. ## ## NOTE: Not something that should be run in normal circumstances. - node.protocolState(Whisper).queue = initQueue(defaultQueueCapacity) + node.protocolState(Whisper).queue[] = initQueue(defaultQueueCapacity) diff --git a/tests/p2p/test_waku.nim b/tests/p2p/test_waku.nim deleted file mode 100644 index 37704bf..0000000 --- a/tests/p2p/test_waku.nim +++ /dev/null @@ -1,416 +0,0 @@ -# -# Ethereum P2P -# (c) Copyright 2018 -# Status Research & Development GmbH -# -# Licensed under either of -# Apache License, version 2.0, (LICENSE-APACHEv2) -# MIT license (LICENSE-MIT) - -import - sequtils, options, unittest, times, tables, - nimcrypto/hash, - eth/[keys, rlp], - eth/p2p/rlpx_protocols/waku_protocol as waku - -suite "Waku payload": - test "should roundtrip without keys": - let payload = Payload(payload: @[byte 0, 1, 2]) - let encoded = waku.encode(payload) - - let decoded = waku.decode(encoded.get()) - check: - decoded.isSome() - payload.payload == decoded.get().payload - decoded.get().src.isNone() - decoded.get().padding.get().len == 251 # 256 -1 -1 -3 - - test "should roundtrip with symmetric encryption": - var symKey: SymKey - let payload = Payload(symKey: some(symKey), payload: @[byte 0, 1, 2]) - let encoded = waku.encode(payload) - - let decoded = waku.decode(encoded.get(), symKey = some(symKey)) - check: - decoded.isSome() - payload.payload == decoded.get().payload - decoded.get().src.isNone() - decoded.get().padding.get().len == 251 # 256 -1 -1 -3 - - test "should roundtrip with signature": - let privKey = keys.newPrivateKey() - - let payload = Payload(src: some(privKey), payload: @[byte 0, 1, 2]) - let encoded = waku.encode(payload) - - let decoded = waku.decode(encoded.get()) - check: - decoded.isSome() - payload.payload == decoded.get().payload - privKey.getPublicKey() == decoded.get().src.get() - decoded.get().padding.get().len == 186 # 256 -1 -1 -3 -65 - - test "should roundtrip with asymmetric encryption": - let privKey = keys.newPrivateKey() - - let payload = Payload(dst: some(privKey.getPublicKey()), - payload: @[byte 0, 1, 2]) - let encoded = waku.encode(payload) - - let decoded = waku.decode(encoded.get(), dst = some(privKey)) - check: - decoded.isSome() - payload.payload == decoded.get().payload - decoded.get().src.isNone() - decoded.get().padding.get().len == 251 # 256 -1 -1 -3 - - test "should return specified bloom": - # Geth test: https://github.com/ethersphere/go-ethereum/blob/d3441ebb563439bac0837d70591f92e2c6080303/waku/wakuv6/waku_test.go#L834 - let top0 = [byte 0, 0, 255, 6] - var x: Bloom - x[0] = byte 1 - x[32] = byte 1 - x[^1] = byte 128 - check @(top0.topicBloom) == @x - -suite "Waku payload padding": - test "should do max padding": - let payload = Payload(payload: repeat(byte 1, 254)) - let encoded = waku.encode(payload) - - let decoded = waku.decode(encoded.get()) - check: - decoded.isSome() - payload.payload == decoded.get().payload - decoded.get().padding.isSome() - decoded.get().padding.get().len == 256 # as dataLen == 256 - - test "should do max padding with signature": - let privKey = keys.newPrivateKey() - - let payload = Payload(src: some(privKey), payload: repeat(byte 1, 189)) - let encoded = waku.encode(payload) - - let decoded = waku.decode(encoded.get()) - check: - decoded.isSome() - payload.payload == decoded.get().payload - privKey.getPublicKey() == decoded.get().src.get() - decoded.get().padding.isSome() - decoded.get().padding.get().len == 256 # as dataLen == 256 - - test "should do min padding": - let payload = Payload(payload: repeat(byte 1, 253)) - let encoded = waku.encode(payload) - - let decoded = waku.decode(encoded.get()) - check: - decoded.isSome() - payload.payload == decoded.get().payload - decoded.get().padding.isSome() - decoded.get().padding.get().len == 1 # as dataLen == 255 - - test "should do min padding with signature": - let privKey = keys.newPrivateKey() - - let payload = Payload(src: some(privKey), payload: repeat(byte 1, 188)) - let encoded = waku.encode(payload) - - let decoded = waku.decode(encoded.get()) - check: - decoded.isSome() - payload.payload == decoded.get().payload - privKey.getPublicKey() == decoded.get().src.get() - decoded.get().padding.isSome() - decoded.get().padding.get().len == 1 # as dataLen == 255 - - test "should roundtrip custom padding": - let payload = Payload(payload: repeat(byte 1, 10), - padding: some(repeat(byte 2, 100))) - let encoded = waku.encode(payload) - - let decoded = waku.decode(encoded.get()) - check: - decoded.isSome() - payload.payload == decoded.get().payload - decoded.get().padding.isSome() - payload.padding.get() == decoded.get().padding.get() - - test "should roundtrip custom 0 padding": - let padding: seq[byte] = @[] - let payload = Payload(payload: repeat(byte 1, 10), - padding: some(padding)) - let encoded = waku.encode(payload) - - let decoded = waku.decode(encoded.get()) - check: - decoded.isSome() - payload.payload == decoded.get().payload - decoded.get().padding.isNone() - - test "should roundtrip custom padding with signature": - let privKey = keys.newPrivateKey() - let payload = Payload(src: some(privKey), payload: repeat(byte 1, 10), - padding: some(repeat(byte 2, 100))) - let encoded = waku.encode(payload) - - let decoded = waku.decode(encoded.get()) - check: - decoded.isSome() - payload.payload == decoded.get().payload - privKey.getPublicKey() == decoded.get().src.get() - decoded.get().padding.isSome() - payload.padding.get() == decoded.get().padding.get() - - test "should roundtrip custom 0 padding with signature": - let padding: seq[byte] = @[] - let privKey = keys.newPrivateKey() - let payload = Payload(src: some(privKey), payload: repeat(byte 1, 10), - padding: some(padding)) - let encoded = waku.encode(payload) - - let decoded = waku.decode(encoded.get()) - check: - decoded.isSome() - payload.payload == decoded.get().payload - privKey.getPublicKey() == decoded.get().src.get() - decoded.get().padding.isNone() - -# example from https://github.com/paritytech/parity-ethereum/blob/93e1040d07e385d1219d00af71c46c720b0a1acf/waku/src/message.rs#L439 -let - env0 = Envelope( - expiry:100000, ttl: 30, topic: [byte 0, 0, 0, 0], - data: repeat(byte 9, 256), nonce: 1010101) - env1 = Envelope( - expiry:100000, ttl: 30, topic: [byte 0, 0, 0, 0], - data: repeat(byte 9, 256), nonce: 1010102) - -suite "Waku envelope": - - proc hashAndPow(env: Envelope): (string, float64) = - # This is the current implementation of go-ethereum - let size = env.toShortRlp().len().uint32 - # This is our current implementation in `waku_protocol.nim` - # let size = env.len().uint32 - # This is the EIP-627 specification - # let size = env.toRlp().len().uint32 - let hash = env.calcPowHash() - ($hash, calcPow(size, env.ttl, hash)) - - test "PoW calculation leading zeroes tests": - # Test values from Parity, in message.rs - let testHashes = [ - # 256 leading zeroes - "0x0000000000000000000000000000000000000000000000000000000000000000", - # 255 leading zeroes - "0x0000000000000000000000000000000000000000000000000000000000000001", - # no leading zeroes - "0xff00000000000000000000000000000000000000000000000000000000000000" - ] - check: - calcPow(1, 1, Hash.fromHex(testHashes[0])) == - 115792089237316200000000000000000000000000000000000000000000000000000000000000.0 - calcPow(1, 1, Hash.fromHex(testHashes[1])) == - 57896044618658100000000000000000000000000000000000000000000000000000000000000.0 - calcPow(1, 1, Hash.fromHex(testHashes[2])) == 1.0 - - # Test values from go-ethereum wakuv6 in envelope_test - var env = Envelope(ttl: 1, data: @[byte 0xde, 0xad, 0xbe, 0xef]) - # PoW calculation with no leading zeroes - env.nonce = 100000 - check hashAndPoW(env) == ("A788E02A95BFC673709E97CA81E39CA903BAD5638D3388964C51EB64952172D6", - 0.07692307692307693) - # PoW calculation with 8 leading zeroes - env.nonce = 276 - check hashAndPoW(env) == ("00E2374C6353C243E4073E209A7F2ACB2506522AF318B3B78CF9A88310A2A11C", - 19.692307692307693) - - test "should validate and allow envelope according to config": - let ttl = 1'u32 - let topic = [byte 1, 2, 3, 4] - let config = WakuConfig(powRequirement: 0, bloom: topic.topicBloom(), - isLightNode: false, maxMsgSize: defaultMaxMsgSize) - - let env = Envelope(expiry:epochTime().uint32 + ttl, ttl: ttl, topic: topic, - data: repeat(byte 9, 256), nonce: 0) - check env.valid() - - let msg = initMessage(env) - check msg.allowed(config) - - test "should invalidate envelope due to ttl 0": - let ttl = 0'u32 - let topic = [byte 1, 2, 3, 4] - let config = WakuConfig(powRequirement: 0, bloom: topic.topicBloom(), - isLightNode: false, maxMsgSize: defaultMaxMsgSize) - - let env = Envelope(expiry:epochTime().uint32 + ttl, ttl: ttl, topic: topic, - data: repeat(byte 9, 256), nonce: 0) - check env.valid() == false - - test "should invalidate envelope due to expired": - let ttl = 1'u32 - let topic = [byte 1, 2, 3, 4] - let config = WakuConfig(powRequirement: 0, bloom: topic.topicBloom(), - isLightNode: false, maxMsgSize: defaultMaxMsgSize) - - let env = Envelope(expiry:epochTime().uint32, ttl: ttl, topic: topic, - data: repeat(byte 9, 256), nonce: 0) - check env.valid() == false - - test "should invalidate envelope due to in the future": - let ttl = 1'u32 - let topic = [byte 1, 2, 3, 4] - let config = WakuConfig(powRequirement: 0, bloom: topic.topicBloom(), - isLightNode: false, maxMsgSize: defaultMaxMsgSize) - - # there is currently a 2 second tolerance, hence the + 3 - let env = Envelope(expiry:epochTime().uint32 + ttl + 3, ttl: ttl, topic: topic, - data: repeat(byte 9, 256), nonce: 0) - check env.valid() == false - - test "should not allow envelope due to bloom filter": - let topic = [byte 1, 2, 3, 4] - let wrongTopic = [byte 9, 8, 7, 6] - let config = WakuConfig(powRequirement: 0, bloom: wrongTopic.topicBloom(), - isLightNode: false, maxMsgSize: defaultMaxMsgSize) - - let env = Envelope(expiry:100000 , ttl: 30, topic: topic, - data: repeat(byte 9, 256), nonce: 0) - - let msg = initMessage(env) - check msg.allowed(config) == false - - -suite "Waku queue": - test "should throw out lower proof-of-work item when full": - var queue = initQueue(1) - - let msg0 = initMessage(env0) - let msg1 = initMessage(env1) - - discard queue.add(msg0) - discard queue.add(msg1) - - check: - queue.items.len() == 1 - queue.items[0].env.nonce == - (if msg0.pow > msg1.pow: msg0.env.nonce else: msg1.env.nonce) - - test "should not throw out messages as long as there is capacity": - var queue = initQueue(2) - - check: - queue.add(initMessage(env0)) == true - queue.add(initMessage(env1)) == true - - queue.items.len() == 2 - - test "check field order against expected rlp order": - check rlp.encode(env0) == - rlp.encodeList(env0.expiry, env0.ttl, env0.topic, env0.data, env0.nonce) - -# To test filters we do not care if the msg is valid or allowed -proc prepFilterTestMsg(pubKey = none[PublicKey](), symKey = none[SymKey](), - src = none[PrivateKey](), topic: Topic, - padding = none[seq[byte]]()): Message = - let payload = Payload(dst: pubKey, symKey: symKey, src: src, - payload: @[byte 0, 1, 2], padding: padding) - let encoded = waku.encode(payload) - let env = Envelope(expiry: 1, ttl: 1, topic: topic, data: encoded.get(), - nonce: 0) - result = initMessage(env) - -suite "Waku filter": - test "should notify filter on message with symmetric encryption": - var symKey: SymKey - let topic = [byte 0, 0, 0, 0] - let msg = prepFilterTestMsg(symKey = some(symKey), topic = topic) - - var filters = initTable[string, Filter]() - let filter = newFilter(symKey = some(symKey), topics = @[topic]) - let filterId = filters.subscribeFilter(filter) - - notify(filters, msg) - - let messages = filters.getFilterMessages(filterId) - check: - messages.len == 1 - messages[0].decoded.src.isNone() - messages[0].dst.isNone() - - test "should notify filter on message with asymmetric encryption": - let privKey = keys.newPrivateKey() - let topic = [byte 0, 0, 0, 0] - let msg = prepFilterTestMsg(pubKey = some(privKey.getPublicKey()), - topic = topic) - - var filters = initTable[string, Filter]() - let filter = newFilter(privateKey = some(privKey), topics = @[topic]) - let filterId = filters.subscribeFilter(filter) - - notify(filters, msg) - - let messages = filters.getFilterMessages(filterId) - check: - messages.len == 1 - messages[0].decoded.src.isNone() - messages[0].dst.isSome() - - test "should notify filter on message with signature": - let privKey = keys.newPrivateKey() - let topic = [byte 0, 0, 0, 0] - let msg = prepFilterTestMsg(src = some(privKey), topic = topic) - - var filters = initTable[string, Filter]() - let filter = newFilter(src = some(privKey.getPublicKey()), - topics = @[topic]) - let filterId = filters.subscribeFilter(filter) - - notify(filters, msg) - - let messages = filters.getFilterMessages(filterId) - check: - messages.len == 1 - messages[0].decoded.src.isSome() - messages[0].dst.isNone() - - test "test notify of filter against PoW requirement": - let topic = [byte 0, 0, 0, 0] - let padding = some(repeat(byte 0, 251)) - # this message has a PoW of 0.02962962962962963, number should be updated - # in case PoW algorithm changes or contents of padding, payload, topic, etc. - # update: now with NON rlp encoded envelope size the PoW of this message is - # 0.014492753623188406 - let msg = prepFilterTestMsg(topic = topic, padding = padding) - - var filters = initTable[string, Filter]() - let - filterId1 = filters.subscribeFilter( - newFilter(topics = @[topic], powReq = 0.014492753623188406)) - filterId2 = filters.subscribeFilter( - newFilter(topics = @[topic], powReq = 0.014492753623188407)) - - notify(filters, msg) - - check: - filters.getFilterMessages(filterId1).len == 1 - filters.getFilterMessages(filterId2).len == 0 - - test "test notify of filter on message with certain topic": - let - topic1 = [byte 0xAB, 0x12, 0xCD, 0x34] - topic2 = [byte 0, 0, 0, 0] - - let msg = prepFilterTestMsg(topic = topic1) - - var filters = initTable[string, Filter]() - let - filterId1 = filters.subscribeFilter(newFilter(topics = @[topic1])) - filterId2 = filters.subscribeFilter(newFilter(topics = @[topic2])) - - notify(filters, msg) - - check: - filters.getFilterMessages(filterId1).len == 1 - filters.getFilterMessages(filterId2).len == 0 diff --git a/tests/p2p/test_waku_bridge.nim b/tests/p2p/test_waku_bridge.nim new file mode 100644 index 0000000..abf7e45 --- /dev/null +++ b/tests/p2p/test_waku_bridge.nim @@ -0,0 +1,93 @@ +# +# Ethereum P2P +# (c) Copyright 2018 +# Status Research & Development GmbH +# +# Licensed under either of +# Apache License, version 2.0, (LICENSE-APACHEv2) +# MIT license (LICENSE-MIT) + +import + sequtils, unittest, tables, chronos, eth/p2p, eth/p2p/peer_pool, + ./p2p_test_helper + +import eth/p2p/rlpx_protocols/waku_protocol as waku +import eth/p2p/rlpx_protocols/whisper_protocol as whisper + +let safeTTL = 5'u32 +let waitInterval = waku.messageInterval + 150.milliseconds + +suite "Waku - Whisper bridge tests": + # Waku Whisper node has both capabilities, listens to Whisper and Waku and + # relays traffic between the two. + var + nodeWakuWhisper = setupTestNode(Whisper, Waku) # This will be the bridge + nodeWhisper = setupTestNode(Whisper) + nodeWaku = setupTestNode(Waku) + + nodeWakuWhisper.startListening() + let bridgeNode = newNode(initENode(nodeWakuWhisper.keys.pubKey, + nodeWakuWhisper.address)) + waitFor nodeWhisper.peerPool.connectToNode(bridgeNode) + waitFor nodeWaku.peerPool.connectToNode(bridgeNode) + + asyncTest "WakuWhisper and Whisper peers connected": + check: + nodeWakuWhisper.peerPool.connectedNodes.len() == 2 + + asyncTest "Whisper - Waku communcation via bridge": + # topic whisper node subscribes to, waku node posts to + let topic1 = [byte 0x12, 0, 0, 0] + # topic waku node subscribes to, whisper node posts to + let topic2 = [byte 0x34, 0, 0, 0] + var payloads = [repeat(byte 0, 10), repeat(byte 1, 10)] + var futures = [newFuture[int](), newFuture[int]()] + + proc handler1(msg: whisper.ReceivedMessage) = + check msg.decoded.payload == payloads[0] + futures[0].complete(1) + proc handler2(msg: waku.ReceivedMessage) = + check msg.decoded.payload == payloads[1] + futures[1].complete(1) + + var filter1 = whisper.subscribeFilter(nodeWhisper, + whisper.newFilter(topics = @[topic1]), handler1) + var filter2 = waku.subscribeFilter(nodeWaku, + waku.newFilter(topics = @[topic2]), handler2) + + check: + # Message should also end up in the Whisper node its queue via the bridge + waku.postMessage(nodeWaku, ttl = safeTTL + 1, topic = topic1, + payload = payloads[0]) == true + # Message should also end up in the Waku node its queue via the bridge + whisper.postMessage(nodeWhisper, ttl = safeTTL, topic = topic2, + payload = payloads[1]) == true + nodeWhisper.protocolState(Whisper).queue.items.len == 1 + nodeWaku.protocolState(Waku).queue.items.len == 1 + + # waitInterval*2 as messages have to pass the bridge also (2 hops) + await allFutures(futures).withTimeout(waitInterval*2) + + # Relay can receive Whisper & Waku messages + nodeWakuWhisper.protocolState(Whisper).queue.items.len == 2 + nodeWakuWhisper.protocolState(Waku).queue.items.len == 2 + + # Whisper node can receive Waku messages (via bridge) + nodeWhisper.protocolState(Whisper).queue.items.len == 2 + # Waku node can receive Whisper messages (via bridge) + nodeWaku.protocolState(Waku).queue.items.len == 2 + + whisper.unsubscribeFilter(nodeWhisper, filter1) == true + waku.unsubscribeFilter(nodeWaku, filter2) == true + + # XXX: This reads a bit weird, but eh + waku.resetMessageQueue(nodeWaku) + whisper.resetMessageQueue(nodeWhisper) + # shared queue so Waku and Whisper should be set to 0 + waku.resetMessageQueue(nodeWakuWhisper) + + check: + nodeWhisper.protocolState(Whisper).queue.items.len == 0 + nodeWaku.protocolState(Waku).queue.items.len == 0 + nodeWakuWhisper.protocolState(Whisper).queue.items.len == 0 + nodeWakuWhisper.protocolState(Waku).queue.items.len == 0 diff --git a/tests/p2p/test_waku_connect.nim b/tests/p2p/test_waku_connect.nim deleted file mode 100644 index 88ad58f..0000000 --- a/tests/p2p/test_waku_connect.nim +++ /dev/null @@ -1,390 +0,0 @@ -# -# Ethereum P2P -# (c) Copyright 2018 -# Status Research & Development GmbH -# -# Licensed under either of -# Apache License, version 2.0, (LICENSE-APACHEv2) -# MIT license (LICENSE-MIT) - -import - sequtils, options, unittest, tables, chronos, eth/[keys, p2p], - eth/p2p/peer_pool, ./p2p_test_helper - -import eth/p2p/rlpx_protocols/waku_protocol as waku -import eth/p2p/rlpx_protocols/whisper_protocol as whisper - -# proc resetMessageQueues(nodes: varargs[EthereumNode]) = -# for node in nodes: -# resetMessageQueue(node) - -let safeTTL = 5'u32 -let waitInterval = waku.messageInterval + 150.milliseconds - -suite "Waku connections": - var node1 = setupTestNode(Waku) - var node2 = setupTestNode(Waku) - node2.startListening() - waitFor node1.peerPool.connectToNode(newNode(initENode(node2.keys.pubKey, - node2.address))) - - # Waku Whisper has both capabilities and listens to Whisper, then relays traffic - var nodeWakuWhisper = setupTestNode(Waku, Whisper) - # XXX: Assuming we added Whisper capability here - var nodeWhisper = setupTestNode(Whisper) - # TODO: Connect them - nodeWakuWhisper.startListening() - waitFor nodeWhisper.peerPool.connectToNode(newNode(initENode(nodeWakuWhisper.keys.pubKey, - nodeWakuWhisper.address))) - - # NOTE: Commented out Whisper equivalent tests - # To enable, fully qualify nodes - - # asyncTest "Two peers connected": - # check: - # node1.peerPool.connectedNodes.len() == 1 - - # asyncTest "Filters with encryption and signing": - # let encryptKeyPair = newKeyPair() - # let signKeyPair = newKeyPair() - # var symKey: SymKey - # let topic = [byte 0x12, 0, 0, 0] - # var filters: seq[string] = @[] - # var payloads = [repeat(byte 1, 10), repeat(byte 2, 10), - # repeat(byte 3, 10), repeat(byte 4, 10)] - # var futures = [newFuture[int](), newFuture[int](), - # newFuture[int](), newFuture[int]()] - - # proc handler1(msg: ReceivedMessage) = - # var count {.global.}: int - # check msg.decoded.payload == payloads[0] or msg.decoded.payload == payloads[1] - # count += 1 - # if count == 2: futures[0].complete(1) - # proc handler2(msg: ReceivedMessage) = - # check msg.decoded.payload == payloads[1] - # futures[1].complete(1) - # proc handler3(msg: ReceivedMessage) = - # var count {.global.}: int - # check msg.decoded.payload == payloads[2] or msg.decoded.payload == payloads[3] - # count += 1 - # if count == 2: futures[2].complete(1) - # proc handler4(msg: ReceivedMessage) = - # check msg.decoded.payload == payloads[3] - # futures[3].complete(1) - - # # Filters - # # filter for encrypted asym - # filters.add(node1.subscribeFilter(newFilter(privateKey = some(encryptKeyPair.seckey), - # topics = @[topic]), handler1)) - # # filter for encrypted asym + signed - # filters.add(node1.subscribeFilter(newFilter(some(signKeyPair.pubkey), - # privateKey = some(encryptKeyPair.seckey), - # topics = @[topic]), handler2)) - # # filter for encrypted sym - # filters.add(node1.subscribeFilter(newFilter(symKey = some(symKey), - # topics = @[topic]), handler3)) - # # filter for encrypted sym + signed - # filters.add(node1.subscribeFilter(newFilter(some(signKeyPair.pubkey), - # symKey = some(symKey), - # topics = @[topic]), handler4)) - # # Messages - # check: - # # encrypted asym - # node2.postMessage(some(encryptKeyPair.pubkey), ttl = safeTTL, - # topic = topic, payload = payloads[0]) == true - # # encrypted asym + signed - # node2.postMessage(some(encryptKeyPair.pubkey), - # src = some(signKeyPair.seckey), ttl = safeTTL, - # topic = topic, payload = payloads[1]) == true - # # encrypted sym - # node2.postMessage(symKey = some(symKey), ttl = safeTTL, topic = topic, - # payload = payloads[2]) == true - # # encrypted sym + signed - # node2.postMessage(symKey = some(symKey), - # src = some(signKeyPair.seckey), - # ttl = safeTTL, topic = topic, - # payload = payloads[3]) == true - - # node2.protocolState(Waku).queue.items.len == 4 - - # check: - # await allFutures(futures).withTimeout(waitInterval) - # node1.protocolState(Waku).queue.items.len == 4 - - # for filter in filters: - # check node1.unsubscribeFilter(filter) == true - - # resetMessageQueues(node1, node2) - - # asyncTest "Filters with topics": - # let topic1 = [byte 0x12, 0, 0, 0] - # let topic2 = [byte 0x34, 0, 0, 0] - # var payloads = [repeat(byte 0, 10), repeat(byte 1, 10)] - # var futures = [newFuture[int](), newFuture[int]()] - # proc handler1(msg: ReceivedMessage) = - # check msg.decoded.payload == payloads[0] - # futures[0].complete(1) - # proc handler2(msg: ReceivedMessage) = - # check msg.decoded.payload == payloads[1] - # futures[1].complete(1) - - # var filter1 = node1.subscribeFilter(newFilter(topics = @[topic1]), handler1) - # var filter2 = node1.subscribeFilter(newFilter(topics = @[topic2]), handler2) - - # check: - # node2.postMessage(ttl = safeTTL + 1, topic = topic1, - # payload = payloads[0]) == true - # node2.postMessage(ttl = safeTTL, topic = topic2, - # payload = payloads[1]) == true - # node2.protocolState(Waku).queue.items.len == 2 - - # await allFutures(futures).withTimeout(waitInterval) - # node1.protocolState(Waku).queue.items.len == 2 - - # node1.unsubscribeFilter(filter1) == true - # node1.unsubscribeFilter(filter2) == true - - # resetMessageQueues(node1, node2) - - # asyncTest "Filters with PoW": - # let topic = [byte 0x12, 0, 0, 0] - # var payload = repeat(byte 0, 10) - # var futures = [newFuture[int](), newFuture[int]()] - # proc handler1(msg: ReceivedMessage) = - # check msg.decoded.payload == payload - # futures[0].complete(1) - # proc handler2(msg: ReceivedMessage) = - # check msg.decoded.payload == payload - # futures[1].complete(1) - - # var filter1 = node1.subscribeFilter(newFilter(topics = @[topic], powReq = 0), - # handler1) - # var filter2 = node1.subscribeFilter(newFilter(topics = @[topic], - # powReq = 1_000_000), handler2) - - # check: - # node2.postMessage(ttl = safeTTL, topic = topic, payload = payload) == true - - # (await futures[0].withTimeout(waitInterval)) == true - # (await futures[1].withTimeout(waitInterval)) == false - # node1.protocolState(Waku).queue.items.len == 1 - - # node1.unsubscribeFilter(filter1) == true - # node1.unsubscribeFilter(filter2) == true - - # resetMessageQueues(node1, node2) - - # asyncTest "Filters with queues": - # let topic = [byte 0, 0, 0, 0] - # let payload = repeat(byte 0, 10) - - # var filter = node1.subscribeFilter(newFilter(topics = @[topic])) - # for i in countdown(10, 1): - # check node2.postMessage(ttl = safeTTL, topic = topic, - # payload = payload) == true - - # await sleepAsync(waitInterval) - # check: - # node1.getFilterMessages(filter).len() == 10 - # node1.getFilterMessages(filter).len() == 0 - # node1.unsubscribeFilter(filter) == true - - # resetMessageQueues(node1, node2) - - # asyncTest "Local filter notify": - # let topic = [byte 0, 0, 0, 0] - - # var filter = node1.subscribeFilter(newFilter(topics = @[topic])) - # check: - # node1.postMessage(ttl = safeTTL, topic = topic, - # payload = repeat(byte 4, 10)) == true - # node1.getFilterMessages(filter).len() == 1 - # node1.unsubscribeFilter(filter) == true - - # await sleepAsync(waitInterval) - # resetMessageQueues(node1, node2) - - # asyncTest "Bloomfilter blocking": - # let sendTopic1 = [byte 0x12, 0, 0, 0] - # let sendTopic2 = [byte 0x34, 0, 0, 0] - # let filterTopics = @[[byte 0x34, 0, 0, 0],[byte 0x56, 0, 0, 0]] - # let payload = repeat(byte 0, 10) - # var f: Future[int] = newFuture[int]() - # proc handler(msg: ReceivedMessage) = - # check msg.decoded.payload == payload - # f.complete(1) - # var filter = node1.subscribeFilter(newFilter(topics = filterTopics), handler) - # await node1.setBloomFilter(node1.filtersToBloom()) - - # check: - # node2.postMessage(ttl = safeTTL, topic = sendTopic1, - # payload = payload) == true - # node2.protocolState(Waku).queue.items.len == 1 - - # (await f.withTimeout(waitInterval)) == false - # node1.protocolState(Waku).queue.items.len == 0 - - # resetMessageQueues(node1, node2) - - # f = newFuture[int]() - - # check: - # node2.postMessage(ttl = safeTTL, topic = sendTopic2, - # payload = payload) == true - # node2.protocolState(Waku).queue.items.len == 1 - - # await f.withTimeout(waitInterval) - # f.read() == 1 - # node1.protocolState(Waku).queue.items.len == 1 - - # node1.unsubscribeFilter(filter) == true - - # await node1.setBloomFilter(fullBloom()) - - # resetMessageQueues(node1, node2) - - # asyncTest "PoW blocking": - # let topic = [byte 0, 0, 0, 0] - # let payload = repeat(byte 0, 10) - - # await node1.setPowRequirement(1_000_000) - # check: - # node2.postMessage(ttl = safeTTL, topic = topic, payload = payload) == true - # node2.protocolState(Waku).queue.items.len == 1 - # await sleepAsync(waitInterval) - # check: - # node1.protocolState(Waku).queue.items.len == 0 - - # resetMessageQueues(node1, node2) - - # await node1.setPowRequirement(0.0) - # check: - # node2.postMessage(ttl = safeTTL, topic = topic, payload = payload) == true - # node2.protocolState(Waku).queue.items.len == 1 - # await sleepAsync(waitInterval) - # check: - # node1.protocolState(Waku).queue.items.len == 1 - - # resetMessageQueues(node1, node2) - - # asyncTest "Queue pruning": - # let topic = [byte 0, 0, 0, 0] - # let payload = repeat(byte 0, 10) - # # We need a minimum TTL of 2 as when set to 1 there is a small chance that - # # it is already expired after messageInterval due to rounding down of float - # # to uint32 in postMessage() - # let lowerTTL = 2'u32 # Lower TTL as we need to wait for messages to expire - # for i in countdown(10, 1): - # check node2.postMessage(ttl = lowerTTL, topic = topic, payload = payload) == true - # check node2.protocolState(Waku).queue.items.len == 10 - - # await sleepAsync(waitInterval) - # check node1.protocolState(Waku).queue.items.len == 10 - - # await sleepAsync(milliseconds((lowerTTL+1)*1000)) - # check node1.protocolState(Waku).queue.items.len == 0 - # check node2.protocolState(Waku).queue.items.len == 0 - - # resetMessageQueues(node1, node2) - - # asyncTest "P2P post": - # let topic = [byte 0, 0, 0, 0] - # var f: Future[int] = newFuture[int]() - # proc handler(msg: ReceivedMessage) = - # check msg.decoded.payload == repeat(byte 4, 10) - # f.complete(1) - - # var filter = node1.subscribeFilter(newFilter(topics = @[topic], - # allowP2P = true), handler) - # check: - # node1.setPeerTrusted(toNodeId(node2.keys.pubkey)) == true - # node2.postMessage(ttl = 10, topic = topic, - # payload = repeat(byte 4, 10), - # targetPeer = some(toNodeId(node1.keys.pubkey))) == true - - # await f.withTimeout(waitInterval) - # f.read() == 1 - # node1.protocolState(Waku).queue.items.len == 0 - # node2.protocolState(Waku).queue.items.len == 0 - - # node1.unsubscribeFilter(filter) == true - - # asyncTest "Light node posting": - # var ln1 = setupTestNode(Waku) - # ln1.setLightNode(true) - - # await ln1.peerPool.connectToNode(newNode(initENode(node2.keys.pubKey, - # node2.address))) - - # let topic = [byte 0, 0, 0, 0] - - # check: - # # normal post - # ln1.postMessage(ttl = safeTTL, topic = topic, - # payload = repeat(byte 0, 10)) == false - # ln1.protocolState(Waku).queue.items.len == 0 - # # P2P post - # ln1.postMessage(ttl = safeTTL, topic = topic, - # payload = repeat(byte 0, 10), - # targetPeer = some(toNodeId(node2.keys.pubkey))) == true - # ln1.protocolState(Waku).queue.items.len == 0 - - # asyncTest "Connect two light nodes": - # var ln1 = setupTestNode(Waku) - # var ln2 = setupTestNode(Waku) - - # ln1.setLightNode(true) - # ln2.setLightNode(true) - - # ln2.startListening() - # let peer = await ln1.rlpxConnect(newNode(initENode(ln2.keys.pubKey, - # ln2.address))) - # check peer.isNil == true - - asyncTest "WakuWhisper and Whisper peers connected": - check: - nodeWakuWhisper.peerPool.connectedNodes.len() == 1 - - asyncTest "WhisperWaku and Whisper filters with topics": - let topic1 = [byte 0x12, 0, 0, 0] - let topic2 = [byte 0x34, 0, 0, 0] - var payloads = [repeat(byte 0, 10), repeat(byte 1, 10)] - var futures = [newFuture[int](), newFuture[int]()] - - proc handler1(msg: whisper.ReceivedMessage) = - check msg.decoded.payload == payloads[0] - futures[0].complete(1) - proc handler2(msg: whisper.ReceivedMessage) = - check msg.decoded.payload == payloads[1] - futures[1].complete(1) - - var filter1 = nodeWakuWhisper.subscribeFilter(whisper.newFilter(topics = @[topic1]), handler1) - var filter2 = nodeWakuWhisper.subscribeFilter(whisper.newFilter(topics = @[topic2]), handler2) - - check: - whisper.postMessage(nodeWhisper, ttl = safeTTL + 1, topic = topic1, - payload = payloads[0]) == true - whisper.postMessage(nodeWhisper, ttl = safeTTL, topic = topic2, - payload = payloads[1]) == true - nodeWhisper.protocolState(Whisper).queue.items.len == 2 - - await allFutures(futures).withTimeout(waitInterval) - - # This shows WakuWhisper can receive Whisper messages - # TODO: This should also make its way to Waku state! Where? - nodeWakuWhisper.protocolState(Whisper).queue.items.len == 2 - - # XXX: How does this look with protocol state for waku and whisper? - whisper.unsubscribeFilter(nodeWakuWhisper, filter1) == true - whisper.unsubscribeFilter(nodeWakuWhisper, filter2) == true - - # XXX: This reads a bit weird, but eh - waku.resetMessageQueue(nodeWakuWhisper) - whisper.resetMessageQueue(nodeWakuWhisper) - whisper.resetMessageQueue(nodeWhisper) - - check: - nodeWhisper.protocolState(Whisper).queue.items.len == 0 - nodeWakuWhisper.protocolState(Whisper).queue.items.len == 0 - - # TODO: Add test for Waku node also listening on Whisper topic