diff --git a/tests/all_tests.nim b/tests/all_tests.nim index 1948456c2..046f7bedc 100644 --- a/tests/all_tests.nim +++ b/tests/all_tests.nim @@ -1,2 +1,5 @@ import + ./test_waku_connect, + ./test_waku_bridge, + ./test_waku_mail, ./test_rpc_waku \ No newline at end of file diff --git a/tests/test_helpers.nim b/tests/test_helpers.nim new file mode 100644 index 000000000..b7c5bf351 --- /dev/null +++ b/tests/test_helpers.nim @@ -0,0 +1,32 @@ +import + unittest, chronos, strutils, + eth/[keys, p2p] + +var nextPort = 30303 + +proc localAddress*(port: int): Address = + let port = Port(port) + result = Address(udpPort: port, tcpPort: port, + ip: parseIpAddress("127.0.0.1")) + +proc setupTestNode*(capabilities: varargs[ProtocolInfo, `protocolInfo`]): EthereumNode = + let keys1 = KeyPair.random()[] + result = newEthereumNode(keys1, localAddress(nextPort), 1, nil, + addAllCapabilities = false) + nextPort.inc + for capability in capabilities: + result.addCapability capability + +template asyncTest*(name, body: untyped) = + test name: + proc scenario {.async.} = body + waitFor scenario() + +template procSuite*(name, body: untyped) = + proc suitePayload = + suite name: + body + + suitePayload() + +template sourceDir*: string = currentSourcePath.rsplit(DirSep, 1)[0] diff --git a/tests/test_rpc_waku.nim b/tests/test_rpc_waku.nim index eb8e26049..f2829d79b 100644 --- a/tests/test_rpc_waku.nim +++ b/tests/test_rpc_waku.nim @@ -1,16 +1,15 @@ import unittest, strformat, options, stew/byteutils, json_rpc/[rpcserver, rpcclient], eth/common as eth_common, eth/[rlp, keys, p2p], - eth/p2p/rlpx_protocols/waku_protocol, - ../waku/node/v0/rpc/[hexstrings, rpc_types, waku, key_storage] + ../waku/protocol/v1/waku_protocol, + ../waku/node/v1/rpc/[hexstrings, rpc_types, waku, key_storage], + ./test_helpers from os import DirSep, ParDir -from strutils import rsplit -template sourceDir: string = currentSourcePath.rsplit(DirSep, 1)[0] ## Generate client convenience marshalling wrappers from forward declarations -## For testing, ethcallsigs needs to be kept in sync with ../waku/node/v0/rpc/waku -const sigPath = &"{sourceDir}{DirSep}{ParDir}{DirSep}waku{DirSep}node{DirSep}v0{DirSep}rpc{DirSep}wakucallsigs.nim" +## For testing, ethcallsigs needs to be kept in sync with ../waku/node/v1/rpc/waku +const sigPath = &"{sourceDir}{DirSep}{ParDir}{DirSep}waku{DirSep}node{DirSep}v1{DirSep}rpc{DirSep}wakucallsigs.nim" createRpcSigs(RpcSocketClient, sigPath) proc setupNode(capabilities: varargs[ProtocolInfo, `protocolInfo`]): EthereumNode = diff --git a/tests/test_waku_bridge.nim b/tests/test_waku_bridge.nim new file mode 100644 index 000000000..53ad2607a --- /dev/null +++ b/tests/test_waku_bridge.nim @@ -0,0 +1,95 @@ +# +# Waku +# (c) Copyright 2019 +# Status Research & Development GmbH +# +# Licensed under either of +# Apache License, version 2.0, (LICENSE-APACHEv2) +# MIT license (LICENSE-MIT) + +import + sequtils, unittest, tables, chronos, eth/p2p, eth/p2p/peer_pool, + eth/p2p/rlpx_protocols/whisper_protocol as whisper, + ../waku/protocol/v1/waku_protocol as waku, + ../waku/protocol/v1/waku_bridge, + ./test_helpers + +let safeTTL = 5'u32 +let waitInterval = waku.messageInterval + 150.milliseconds + +procSuite "Waku - Whisper bridge tests": + # Waku Whisper node has both capabilities, listens to Whisper and Waku and + # relays traffic between the two. + var + nodeWakuWhisper = setupTestNode(Whisper, Waku) # This will be the bridge + nodeWhisper = setupTestNode(Whisper) + nodeWaku = setupTestNode(Waku) + + nodeWakuWhisper.startListening() + let bridgeNode = newNode(nodeWakuWhisper.toENode()) + nodeWakuWhisper.shareMessageQueue() + + waitFor nodeWhisper.peerPool.connectToNode(bridgeNode) + waitFor nodeWaku.peerPool.connectToNode(bridgeNode) + + asyncTest "WakuWhisper and Whisper peers connected": + check: + nodeWhisper.peerPool.connectedNodes.len() == 1 + nodeWaku.peerPool.connectedNodes.len() == 1 + + asyncTest "Whisper - Waku communcation via bridge": + # topic whisper node subscribes to, waku node posts to + let topic1 = [byte 0x12, 0, 0, 0] + # topic waku node subscribes to, whisper node posts to + let topic2 = [byte 0x34, 0, 0, 0] + var payloads = [repeat(byte 0, 10), repeat(byte 1, 10)] + var futures = [newFuture[int](), newFuture[int]()] + + proc handler1(msg: whisper.ReceivedMessage) = + check msg.decoded.payload == payloads[0] + futures[0].complete(1) + proc handler2(msg: waku.ReceivedMessage) = + check msg.decoded.payload == payloads[1] + futures[1].complete(1) + + var filter1 = whisper.subscribeFilter(nodeWhisper, + whisper.initFilter(topics = @[topic1]), handler1) + var filter2 = waku.subscribeFilter(nodeWaku, + waku.initFilter(topics = @[topic2]), handler2) + + check: + # Message should also end up in the Whisper node its queue via the bridge + waku.postMessage(nodeWaku, ttl = safeTTL + 1, topic = topic1, + payload = payloads[0]) == true + # Message should also end up in the Waku node its queue via the bridge + whisper.postMessage(nodeWhisper, ttl = safeTTL, topic = topic2, + payload = payloads[1]) == true + nodeWhisper.protocolState(Whisper).queue.items.len == 1 + nodeWaku.protocolState(Waku).queue.items.len == 1 + + # waitInterval*2 as messages have to pass the bridge also (2 hops) + await allFutures(futures).withTimeout(waitInterval*2) + + # Relay can receive Whisper & Waku messages + nodeWakuWhisper.protocolState(Whisper).queue.items.len == 2 + nodeWakuWhisper.protocolState(Waku).queue.items.len == 2 + + # Whisper node can receive Waku messages (via bridge) + nodeWhisper.protocolState(Whisper).queue.items.len == 2 + # Waku node can receive Whisper messages (via bridge) + nodeWaku.protocolState(Waku).queue.items.len == 2 + + whisper.unsubscribeFilter(nodeWhisper, filter1) == true + waku.unsubscribeFilter(nodeWaku, filter2) == true + + # XXX: This reads a bit weird, but eh + waku.resetMessageQueue(nodeWaku) + whisper.resetMessageQueue(nodeWhisper) + # shared queue so Waku and Whisper should be set to 0 + waku.resetMessageQueue(nodeWakuWhisper) + + check: + nodeWhisper.protocolState(Whisper).queue.items.len == 0 + nodeWaku.protocolState(Waku).queue.items.len == 0 + nodeWakuWhisper.protocolState(Whisper).queue.items.len == 0 + nodeWakuWhisper.protocolState(Waku).queue.items.len == 0 diff --git a/tests/test_waku_connect.nim b/tests/test_waku_connect.nim new file mode 100644 index 000000000..6d0f18cde --- /dev/null +++ b/tests/test_waku_connect.nim @@ -0,0 +1,242 @@ +# +# Waku +# (c) Copyright 2019 +# Status Research & Development GmbH +# +# Licensed under either of +# Apache License, version 2.0, (LICENSE-APACHEv2) +# MIT license (LICENSE-MIT) + +import + sequtils, tables, unittest, chronos, eth/[keys, p2p], eth/p2p/peer_pool, + ../waku/protocol/v1/waku_protocol, + ./test_helpers + +const + safeTTL = 5'u32 + waitInterval = messageInterval + 150.milliseconds + conditionTimeoutMs = 3000 + +# check on a condition until true or return a future containing false +# if timeout expires first +proc eventually(timeout: int, condition: proc(): bool {.gcsafe.}): Future[bool] = + let wrappedCondition = proc(): Future[bool] {.async.} = + let f = newFuture[bool]() + while not condition(): + await sleepAsync(100.milliseconds) + f.complete(true) + return await f + return withTimeout(wrappedCondition(), timeout) + +# TODO: Just repeat all the test_shh_connect tests here that are applicable or +# have some commonly shared test code for both protocols. +suite "Waku connections": + asyncTest "Waku connections": + var + n1 = setupTestNode(Waku) + n2 = setupTestNode(Waku) + n3 = setupTestNode(Waku) + n4 = setupTestNode(Waku) + + var topics: seq[Topic] + n1.protocolState(Waku).config.topics = some(topics) + n2.protocolState(Waku).config.topics = some(topics) + n3.protocolState(Waku).config.topics = none(seq[Topic]) + n4.protocolState(Waku).config.topics = none(seq[Topic]) + + n1.startListening() + n3.startListening() + + let + p1 = await n2.rlpxConnect(newNode(n1.toENode())) + p2 = await n2.rlpxConnect(newNode(n3.toENode())) + p3 = await n4.rlpxConnect(newNode(n3.toENode())) + check: + p1.isNil + p2.isNil == false + p3.isNil == false + + asyncTest "Waku set-topic-interest": + var + wakuTopicNode = setupTestNode(Waku) + wakuNode = setupTestNode(Waku) + + let + topic1 = [byte 0xDA, 0xDA, 0xDA, 0xAA] + topic2 = [byte 0xD0, 0xD0, 0xD0, 0x00] + wrongTopic = [byte 0x4B, 0x1D, 0x4B, 0x1D] + + # Set one topic so we are not considered a full node + wakuTopicNode.protocolState(Waku).config.topics = some(@[topic1]) + + wakuNode.startListening() + await wakuTopicNode.peerPool.connectToNode(newNode(wakuNode.toENode())) + + # Update topic interest + check: + await setTopicInterest(wakuTopicNode, @[topic1, topic2]) + + let payload = repeat(byte 0, 10) + check: + wakuNode.postMessage(ttl = safeTTL, topic = topic1, payload = payload) + wakuNode.postMessage(ttl = safeTTL, topic = topic2, payload = payload) + wakuNode.postMessage(ttl = safeTTL, topic = wrongTopic, payload = payload) + wakuNode.protocolState(Waku).queue.items.len == 3 + await sleepAsync(waitInterval) + check: + wakuTopicNode.protocolState(Waku).queue.items.len == 2 + + asyncTest "Waku set-minimum-pow": + var + wakuPowNode = setupTestNode(Waku) + wakuNode = setupTestNode(Waku) + + wakuNode.startListening() + await wakuPowNode.peerPool.connectToNode(newNode(wakuNode.toENode())) + + # Update minimum pow + await setPowRequirement(wakuPowNode, 1.0) + await sleepAsync(waitInterval) + + check: + wakuNode.peerPool.len == 1 + + # check powRequirement is updated + for peer in wakuNode.peerPool.peers: + check: + peer.state(Waku).powRequirement == 1.0 + + asyncTest "Waku set-light-node": + var + wakuLightNode = setupTestNode(Waku) + wakuNode = setupTestNode(Waku) + + wakuNode.startListening() + await wakuLightNode.peerPool.connectToNode(newNode(wakuNode.toENode())) + + # Update minimum pow + await setLightNode(wakuLightNode, true) + await sleepAsync(waitInterval) + + check: + wakuNode.peerPool.len == 1 + + # check lightNode is updated + for peer in wakuNode.peerPool.peers: + check: + peer.state(Waku).isLightNode + + asyncTest "Waku set-bloom-filter": + var + wakuBloomNode = setupTestNode(Waku) + wakuNode = setupTestNode(Waku) + bloom = fullBloom() + topics = @[[byte 0xDA, 0xDA, 0xDA, 0xAA]] + + # Set topic interest + discard await wakuBloomNode.setTopicInterest(topics) + + wakuBloomNode.startListening() + await wakuNode.peerPool.connectToNode(newNode(wakuBloomNode.toENode())) + + # Sanity check + check: + wakuNode.peerPool.len == 1 + + # check bloom filter is updated + for peer in wakuNode.peerPool.peers: + check: + peer.state(Waku).bloom == bloom + peer.state(Waku).topics == some(topics) + + let hasBloomNodeConnectedCondition = proc(): bool = wakuBloomNode.peerPool.len == 1 + # wait for the peer to be connected on the other side + let hasBloomNodeConnected = await eventually(conditionTimeoutMs, hasBloomNodeConnectedCondition) + # check bloom filter is updated + check: + hasBloomNodeConnected + + # disable one bit in the bloom filter + bloom[0] = 0x0 + + # and set it + await setBloomFilter(wakuBloomNode, bloom) + + let bloomFilterUpdatedCondition = proc(): bool = + for peer in wakuNode.peerPool.peers: + return peer.state(Waku).bloom == bloom and peer.state(Waku).topics == none(seq[Topic]) + + let bloomFilterUpdated = await eventually(conditionTimeoutMs, bloomFilterUpdatedCondition) + # check bloom filter is updated + check: + bloomFilterUpdated + + asyncTest "Waku topic-interest": + var + wakuTopicNode = setupTestNode(Waku) + wakuNode = setupTestNode(Waku) + + let + topic1 = [byte 0xDA, 0xDA, 0xDA, 0xAA] + topic2 = [byte 0xD0, 0xD0, 0xD0, 0x00] + wrongTopic = [byte 0x4B, 0x1D, 0x4B, 0x1D] + + wakuTopicNode.protocolState(Waku).config.topics = some(@[topic1, topic2]) + + wakuNode.startListening() + await wakuTopicNode.peerPool.connectToNode(newNode(wakuNode.toENode())) + + let payload = repeat(byte 0, 10) + check: + wakuNode.postMessage(ttl = safeTTL, topic = topic1, payload = payload) + wakuNode.postMessage(ttl = safeTTL, topic = topic2, payload = payload) + wakuNode.postMessage(ttl = safeTTL, topic = wrongTopic, payload = payload) + wakuNode.protocolState(Waku).queue.items.len == 3 + + let response = await eventually(conditionTimeoutMs, proc (): bool = wakuTopicNode.protocolState(Waku).queue.items.len == 2) + check: + response + + asyncTest "Waku topic-interest versus bloom filter": + var + wakuTopicNode = setupTestNode(Waku) + wakuNode = setupTestNode(Waku) + + let + topic1 = [byte 0xDA, 0xDA, 0xDA, 0xAA] + topic2 = [byte 0xD0, 0xD0, 0xD0, 0x00] + bloomTopic = [byte 0x4B, 0x1D, 0x4B, 0x1D] + + # It was checked that the topics don't trigger false positives on the bloom. + wakuTopicNode.protocolState(Waku).config.topics = some(@[topic1, topic2]) + wakuTopicNode.protocolState(Waku).config.bloom = some(toBloom([bloomTopic])) + + wakuNode.startListening() + await wakuTopicNode.peerPool.connectToNode(newNode(wakuNode.toENode())) + + let payload = repeat(byte 0, 10) + check: + wakuNode.postMessage(ttl = safeTTL, topic = topic1, payload = payload) + wakuNode.postMessage(ttl = safeTTL, topic = topic2, payload = payload) + wakuNode.postMessage(ttl = safeTTL, topic = bloomTopic, payload = payload) + wakuNode.protocolState(Waku).queue.items.len == 3 + await sleepAsync(waitInterval) + check: + wakuTopicNode.protocolState(Waku).queue.items.len == 2 + + asyncTest "Light node posting": + var ln = setupTestNode(Waku) + await ln.setLightNode(true) + var fn = setupTestNode(Waku) + fn.startListening() + await ln.peerPool.connectToNode(newNode(fn.toENode())) + + let topic = [byte 0, 0, 0, 0] + + check: + ln.peerPool.connectedNodes.len() == 1 + # normal post + ln.postMessage(ttl = safeTTL, topic = topic, + payload = repeat(byte 0, 10)) == true + ln.protocolState(Waku).queue.items.len == 1 + # TODO: add test on message relaying diff --git a/tests/test_waku_mail.nim b/tests/test_waku_mail.nim new file mode 100644 index 000000000..a19df5cd6 --- /dev/null +++ b/tests/test_waku_mail.nim @@ -0,0 +1,116 @@ +import + unittest, chronos, tables, sequtils, times, + eth/[p2p, async_utils], eth/p2p/peer_pool, + ../waku/protocol/v1/[waku_protocol, waku_mail], + ./test_helpers + +const + transmissionTimeout = chronos.milliseconds(100) + +proc waitForConnected(node: EthereumNode) {.async.} = + while node.peerPool.connectedNodes.len == 0: + await sleepAsync(chronos.milliseconds(1)) + +procSuite "Waku Mail Client": + var client = setupTestNode(Waku) + var simpleServer = setupTestNode(Waku) + + simpleServer.startListening() + let simpleServerNode = newNode(simpleServer.toENode()) + let clientNode = newNode(client.toENode()) + waitFor client.peerPool.connectToNode(simpleServerNode) + require: + waitFor simpleServer.waitForConnected().withTimeout(transmissionTimeout) + + asyncTest "Two peers connected": + check: + client.peerPool.connectedNodes.len() == 1 + simpleServer.peerPool.connectedNodes.len() == 1 + + asyncTest "Mail Request and Request Complete": + let + topic = [byte 0, 0, 0, 0] + bloom = toBloom(@[topic]) + lower = 0'u32 + upper = epochTime().uint32 + limit = 100'u32 + request = MailRequest(lower: lower, upper: upper, bloom: @bloom, + limit: limit) + + var symKey: SymKey + check client.setPeerTrusted(simpleServerNode.id) + var cursorFut = client.requestMail(simpleServerNode.id, request, symKey, 1) + + # Simple mailserver part + let peer = simpleServer.peerPool.connectedNodes[clientNode] + var f = peer.nextMsg(Waku.p2pRequest) + require await f.withTimeout(transmissionTimeout) + let response = f.read() + let decoded = decode(response.envelope.data, symKey = some(symKey)) + require decoded.isSome() + + var rlp = rlpFromBytes(decoded.get().payload) + let output = rlp.read(MailRequest) + check: + output.lower == lower + output.upper == upper + output.bloom == bloom + output.limit == limit + + var dummy: Hash + await peer.p2pRequestComplete(dummy, dummy, @[]) + + check await cursorFut.withTimeout(transmissionTimeout) + + asyncTest "Mail Send": + let topic = [byte 0x12, 0x34, 0x56, 0x78] + let payload = repeat(byte 0, 10) + var f = newFuture[int]() + + proc handler(msg: ReceivedMessage) = + check msg.decoded.payload == payload + f.complete(1) + + let filter = subscribeFilter(client, + initFilter(topics = @[topic], allowP2P = true), handler) + + check: + client.setPeerTrusted(simpleServerNode.id) + # ttl 0 to show that ttl should be ignored + # TODO: perhaps not the best way to test this, means no PoW calculation + # may be done, and not sure if that is OK? + simpleServer.postMessage(ttl = 0, topic = topic, payload = payload, + targetPeer = some(clientNode.id)) + + await f.withTimeout(transmissionTimeout) + + client.unsubscribeFilter(filter) + + asyncTest "Multiple Client Request and Complete": + var count = 5 + proc customHandler(peer: Peer, envelope: Envelope)= + var envelopes: seq[Envelope] + traceAsyncErrors peer.p2pMessage(envelopes) + + var cursor: seq[byte] + count = count - 1 + if count == 0: + cursor = @[] + else: + cursor = @[byte count] + + var dummy: Hash + traceAsyncErrors peer.p2pRequestComplete(dummy, dummy, cursor) + + simpleServer.enableMailServer(customHandler) + check client.setPeerTrusted(simpleServerNode.id) + var request: MailRequest + var symKey: SymKey + let cursor = + await client.requestMail(simpleServerNode.id, request, symKey, 5) + require cursor.isSome() + check: + cursor.get().len == 0 + count == 0 + + # TODO: Also check for received envelopes. diff --git a/waku.nimble b/waku.nimble index 4bdba6cdf..b99d8dbcc 100644 --- a/waku.nimble +++ b/waku.nimble @@ -41,11 +41,11 @@ task test, "Run tests": test "all_tests" task wakunode, "Build Waku cli": - buildBinary "wakunode", "waku/node/v0/", "-d:chronicles_log_level=TRACE" + buildBinary "wakunode", "waku/node/v1/", "-d:chronicles_log_level=TRACE" task wakusim, "Build Waku simulation tools": - buildBinary "quicksim", "waku/node/v0/", "-d:chronicles_log_level=INFO" - buildBinary "start_network", "waku/node/v0/", "-d:chronicles_log_level=DEBUG" + buildBinary "quicksim", "waku/node/v1/", "-d:chronicles_log_level=INFO" + buildBinary "start_network", "waku/node/v1/", "-d:chronicles_log_level=DEBUG" task protocol2, "Build the experimental Waku protocol": buildBinary "waku_protocol", "waku/protocol/v2/", "-d:chronicles_log_level=DEBUG" diff --git a/waku/node/README.md b/waku/node/README.md index ff169893a..318b90e91 100644 --- a/waku/node/README.md +++ b/waku/node/README.md @@ -2,4 +2,4 @@ TODO. -See README in `v0` folder for instructions on how to run a node. +See README in `v1` folder for instructions on how to run a node. diff --git a/waku/node/v0/README.md b/waku/node/v1/README.md similarity index 100% rename from waku/node/v0/README.md rename to waku/node/v1/README.md diff --git a/waku/node/v0/config.nim b/waku/node/v1/config.nim similarity index 100% rename from waku/node/v0/config.nim rename to waku/node/v1/config.nim diff --git a/waku/node/v0/docker/Dockerfile b/waku/node/v1/docker/Dockerfile similarity index 100% rename from waku/node/v0/docker/Dockerfile rename to waku/node/v1/docker/Dockerfile diff --git a/waku/node/v0/docker/Makefile b/waku/node/v1/docker/Makefile similarity index 100% rename from waku/node/v0/docker/Makefile rename to waku/node/v1/docker/Makefile diff --git a/waku/node/v0/examples/waku-grafana-dashboard.json b/waku/node/v1/examples/waku-grafana-dashboard.json similarity index 100% rename from waku/node/v0/examples/waku-grafana-dashboard.json rename to waku/node/v1/examples/waku-grafana-dashboard.json diff --git a/waku/node/v0/metrics/prometheus/prometheus.yml b/waku/node/v1/metrics/prometheus/prometheus.yml similarity index 100% rename from waku/node/v0/metrics/prometheus/prometheus.yml rename to waku/node/v1/metrics/prometheus/prometheus.yml diff --git a/waku/node/v0/metrics/waku-sim-all-nodes-grafana-dashboard.json b/waku/node/v1/metrics/waku-sim-all-nodes-grafana-dashboard.json similarity index 100% rename from waku/node/v0/metrics/waku-sim-all-nodes-grafana-dashboard.json rename to waku/node/v1/metrics/waku-sim-all-nodes-grafana-dashboard.json diff --git a/waku/node/v0/nim.cfg b/waku/node/v1/nim.cfg similarity index 100% rename from waku/node/v0/nim.cfg rename to waku/node/v1/nim.cfg diff --git a/waku/node/v0/quicksim.nim b/waku/node/v1/quicksim.nim similarity index 96% rename from waku/node/v0/quicksim.nim rename to waku/node/v1/quicksim.nim index 200bc5200..49eae4c65 100644 --- a/waku/node/v0/quicksim.nim +++ b/waku/node/v1/quicksim.nim @@ -1,7 +1,7 @@ import os, strformat, chronicles, json_rpc/[rpcclient, rpcserver], nimcrypto/sysrand, - eth/common as eth_common, eth/keys, eth/p2p/rlpx_protocols/waku_protocol, - ./rpc/[hexstrings, rpc_types], + eth/common as eth_common, eth/keys, + ../../protocol/v1/waku_protocol, ./rpc/[hexstrings, rpc_types], options as what # TODO: Huh? Redefinition? from os import DirSep diff --git a/waku/node/v0/rpc/hexstrings.nim b/waku/node/v1/rpc/hexstrings.nim similarity index 99% rename from waku/node/v0/rpc/hexstrings.nim rename to waku/node/v1/rpc/hexstrings.nim index d5051996e..c015ec5a3 100644 --- a/waku/node/v0/rpc/hexstrings.nim +++ b/waku/node/v1/rpc/hexstrings.nim @@ -26,7 +26,7 @@ import stint, stew/byteutils, eth/[keys, rlp], eth/common/eth_types, - eth/p2p/rlpx_protocols/waku_protocol + ../../../protocol/v1/waku_protocol type HexDataStr* = distinct string diff --git a/waku/node/v0/rpc/key_storage.nim b/waku/node/v1/rpc/key_storage.nim similarity index 100% rename from waku/node/v0/rpc/key_storage.nim rename to waku/node/v1/rpc/key_storage.nim diff --git a/waku/node/v0/rpc/rpc_types.nim b/waku/node/v1/rpc/rpc_types.nim similarity index 98% rename from waku/node/v0/rpc/rpc_types.nim rename to waku/node/v1/rpc/rpc_types.nim index 03b57e4af..582a9cb1b 100644 --- a/waku/node/v0/rpc/rpc_types.nim +++ b/waku/node/v1/rpc/rpc_types.nim @@ -1,6 +1,6 @@ import hexstrings, options, eth/[keys, rlp], - eth/p2p/rlpx_protocols/waku_protocol + ../../../protocol/v1/waku_protocol #[ Notes: diff --git a/waku/node/v0/rpc/waku.nim b/waku/node/v1/rpc/waku.nim similarity index 99% rename from waku/node/v0/rpc/waku.nim rename to waku/node/v1/rpc/waku.nim index af721b784..b8439f5e1 100644 --- a/waku/node/v0/rpc/waku.nim +++ b/waku/node/v1/rpc/waku.nim @@ -1,8 +1,9 @@ import json_rpc/rpcserver, tables, options, sequtils, - eth/[common, rlp, keys, p2p], eth/p2p/rlpx_protocols/waku_protocol, + eth/[common, rlp, keys, p2p], nimcrypto/[sysrand, hmac, sha2, pbkdf2], - rpc_types, hexstrings, key_storage + rpc_types, hexstrings, key_storage, + ../../../protocol/v1/waku_protocol from stew/byteutils import hexToSeqByte, hexToByteArray diff --git a/waku/node/v0/rpc/wakucallsigs.nim b/waku/node/v1/rpc/wakucallsigs.nim similarity index 100% rename from waku/node/v0/rpc/wakucallsigs.nim rename to waku/node/v1/rpc/wakucallsigs.nim diff --git a/waku/node/v0/rpc/wakusim.nim b/waku/node/v1/rpc/wakusim.nim similarity index 94% rename from waku/node/v0/rpc/wakusim.nim rename to waku/node/v1/rpc/wakusim.nim index 07ee950b9..08446ad2b 100644 --- a/waku/node/v0/rpc/wakusim.nim +++ b/waku/node/v1/rpc/wakusim.nim @@ -1,6 +1,7 @@ import json_rpc/rpcserver, stew/endians2, nimcrypto/sysrand, - eth/[p2p, async_utils], eth/p2p/rlpx_protocols/waku_protocol + eth/[p2p, async_utils], + ../../../protocol/v1/waku_protocol proc generateTraffic(node: EthereumNode, amount = 100) {.async.} = var topicNumber = 0'u32 diff --git a/waku/node/v0/start_network.nim b/waku/node/v1/start_network.nim similarity index 100% rename from waku/node/v0/start_network.nim rename to waku/node/v1/start_network.nim diff --git a/waku/node/v0/wakunode.nim b/waku/node/v1/wakunode.nim similarity index 98% rename from waku/node/v0/wakunode.nim rename to waku/node/v1/wakunode.nim index 1b3571be3..346318dac 100644 --- a/waku/node/v0/wakunode.nim +++ b/waku/node/v1/wakunode.nim @@ -3,7 +3,8 @@ import chronicles/topics_registry, # TODO: What? Need this for setLoglevel, weird. eth/[keys, p2p, async_utils], eth/common/utils, eth/net/nat, eth/p2p/[discovery, enode, peer_pool, bootnodes, whispernodes], - eth/p2p/rlpx_protocols/[whisper_protocol, waku_protocol, waku_bridge], + eth/p2p/rlpx_protocols/whisper_protocol, + ../../protocol/v1/[waku_protocol, waku_bridge], ./rpc/[waku, wakusim, key_storage] const clientId = "Nimbus waku node" diff --git a/waku/node/v2/config.nim b/waku/node/v2/config.nim index f12951468..71cd78c9c 100644 --- a/waku/node/v2/config.nim +++ b/waku/node/v2/config.nim @@ -1,6 +1,6 @@ import confutils/defs, chronicles, chronos, - eth/keys, eth/p2p/rlpx_protocols/waku_protocol + eth/keys type Fleet* = enum diff --git a/waku/node/v2/quicksim.nim b/waku/node/v2/quicksim.nim index 40f280c0f..54c5f8e07 100644 --- a/waku/node/v2/quicksim.nim +++ b/waku/node/v2/quicksim.nim @@ -2,8 +2,8 @@ import os, strformat, chronicles, json_rpc/[rpcclient, rpcserver], nimcrypto/sysrand, eth/common as eth_common, eth/keys, # XXX: Replace me - eth/p2p/rlpx_protocols/waku_protocol, - ../v0/rpc/[hexstrings, rpc_types, waku], + ../../protocol/v1/waku_protocol, + ../v1/rpc/[hexstrings, rpc_types, waku], rpc/wakurpc, options as what # TODO: Huh? Redefinition? @@ -14,7 +14,7 @@ template sourceDir: string = currentSourcePath.rsplit(DirSep, 1)[0] const sigWakuPath = &"{sourceDir}{DirSep}rpc{DirSep}wakucallsigs.nim" createRpcSigs(RpcHttpClient, sigWakuPath) -# More minimal than v0 quicksim, just RPC client for now +# More minimal than v1 quicksim, just RPC client for now let node1 = newRpcHttpClient() #let node2 = newRpcHttpClient() diff --git a/waku/node/v2/rpc/wakucallsigs.nim b/waku/node/v2/rpc/wakucallsigs.nim index d3c4571d6..ba6ba5185 100644 --- a/waku/node/v2/rpc/wakucallsigs.nim +++ b/waku/node/v2/rpc/wakucallsigs.nim @@ -1,4 +1,4 @@ -# NOTE: Taken from v0, only version exists right now +# NOTE: Taken from v1, only version exists right now proc waku_version(): string proc waku_info(): WakuInfo diff --git a/waku/node/v2/rpc/wakurpc.nim b/waku/node/v2/rpc/wakurpc.nim index 9f79387c3..c6f6ae33f 100644 --- a/waku/node/v2/rpc/wakurpc.nim +++ b/waku/node/v2/rpc/wakurpc.nim @@ -2,10 +2,10 @@ import json_rpc/rpcserver, tables, options, eth/[common, rlp, keys, p2p], #DevP2P impl - #eth/p2p/rlpx_protocols/waku_protocol, + # ../../../protocol/v1/waku_protocol, ../../../protocol/v2/waku_protocol, nimcrypto/[sysrand, hmac, sha2, pbkdf2], - ../../v0/rpc/[rpc_types, hexstrings, key_storage] + ../../v1/rpc/[rpc_types, hexstrings, key_storage] from stew/byteutils import hexToSeqByte, hexToByteArray diff --git a/waku/node/v2/wakunode.nim b/waku/node/v2/wakunode.nim index cedcd679d..c1883a4d3 100644 --- a/waku/node/v2/wakunode.nim +++ b/waku/node/v2/wakunode.nim @@ -5,7 +5,7 @@ import eth/p2p/[discovery, enode, peer_pool, bootnodes, whispernodes], eth/p2p/rlpx_protocols/[whisper_protocol, waku_protocol, waku_bridge], # TODO remove me - ../v0/rpc/[wakusim, key_storage], + ../v1/rpc/[wakusim, key_storage], ../../vendor/nim-libp2p/libp2p/standard_setup, ../../vendor/nim-libp2p/libp2p/multiaddress, ../../vendor/nim-libp2p/libp2p/crypto/crypto, diff --git a/waku/protocol/v1/waku_bridge.nim b/waku/protocol/v1/waku_bridge.nim new file mode 100644 index 000000000..3227c242d --- /dev/null +++ b/waku/protocol/v1/waku_bridge.nim @@ -0,0 +1,17 @@ +# +# Waku - Whisper Bridge +# (c) Copyright 2019 +# Status Research & Development GmbH +# +# Licensed under either of +# Apache License, version 2.0, (LICENSE-APACHEv2) +# MIT license (LICENSE-MIT) +# + +import + eth/p2p, + eth/p2p/rlpx_protocols/whisper_protocol, + ./waku_protocol + +proc shareMessageQueue*(node: EthereumNode) = + node.protocolState(Waku).queue = node.protocolState(Whisper).queue diff --git a/waku/protocol/v1/waku_mail.nim b/waku/protocol/v1/waku_mail.nim new file mode 100644 index 000000000..535d387fc --- /dev/null +++ b/waku/protocol/v1/waku_mail.nim @@ -0,0 +1,86 @@ +# +# Waku Mail Client & Server +# (c) Copyright 2019 +# Status Research & Development GmbH +# +# Licensed under either of +# Apache License, version 2.0, (LICENSE-APACHEv2) +# MIT license (LICENSE-MIT) +# +import + chronos, + eth/[p2p, async_utils], + ./waku_protocol + +const + requestCompleteTimeout = chronos.seconds(5) + +type + Cursor = seq[byte] + + MailRequest* = object + lower*: uint32 ## Unix timestamp; oldest requested envelope's creation time + upper*: uint32 ## Unix timestamp; newest requested envelope's creation time + bloom*: seq[byte] ## Bloom filter to apply on the envelopes + limit*: uint32 ## Maximum amount of envelopes to return + cursor*: Cursor ## Optional cursor + +proc requestMail*(node: EthereumNode, peerId: NodeId, request: MailRequest, + symKey: SymKey, requests = 10): Future[Option[Cursor]] {.async.} = + ## Send p2p mail request and check request complete. + ## If result is none, and error occured. If result is a none empty cursor, + ## more envelopes are available. + # TODO: Perhaps don't go the recursive route or could use the actual response + # proc to implement this (via a handler) and store the necessary data in the + # WakuPeer object. + # TODO: Several requestMail calls in parallel can create issues with handling + # the wrong response to a request. Can additionaly check the requestId but + # that would only solve it half. Better to use the requestResponse mechanism. + + # TODO: move this check out of requestMail? + let peer = node.getPeer(peerId, Waku) + if not peer.isSome(): + error "Invalid peer" + return result + elif not peer.get().state(Waku).trusted: + return result + + var writer = initRlpWriter() + writer.append(request) + let payload = writer.finish() + let data = encode(Payload(payload: payload, symKey: some(symKey))) + if not data.isSome(): + error "Encoding of payload failed" + return result + + # TODO: should this envelope be valid in terms of ttl, PoW, etc.? + let env = Envelope(expiry:0, ttl: 0, data: data.get(), nonce: 0) + # Send the request + traceAsyncErrors peer.get().p2pRequest(env) + + # Wait for the Request Complete packet + var f = peer.get().nextMsg(Waku.p2pRequestComplete) + if await f.withTimeout(requestCompleteTimeout): + let response = f.read() + # TODO: I guess the idea is to check requestId (Hash) also? + let requests = requests - 1 + # If there is cursor data, do another request + if response.cursor.len > 0 and requests > 0: + var newRequest = request + newRequest.cursor = response.cursor + return await requestMail(node, peerId, newRequest, symKey, requests) + else: + return some(response.cursor) + else: + error "p2pRequestComplete timeout" + return result + +proc p2pRequestHandler(peer: Peer, envelope: Envelope) = + # Mail server p2p request implementation + discard + +proc enableMailServer*(node: EthereumNode, customHandler: P2PRequestHandler) = + node.protocolState(Waku).p2pRequestHandler = customHandler + +proc enableMailServer*(node: EthereumNode) = + node.protocolState(Waku).p2pRequestHandler = p2pRequestHandler diff --git a/waku/protocol/v1/waku_protocol.nim b/waku/protocol/v1/waku_protocol.nim new file mode 100644 index 000000000..3a1fbe672 --- /dev/null +++ b/waku/protocol/v1/waku_protocol.nim @@ -0,0 +1,649 @@ +# +# Waku +# (c) Copyright 2018-2019 +# Status Research & Development GmbH +# +# Licensed under either of +# Apache License, version 2.0, (LICENSE-APACHEv2) +# MIT license (LICENSE-MIT) +# + +## Waku +## ******* +## +## Waku is a fork of Whisper. +## +## Waku is a gossip protocol that synchronizes a set of messages across nodes +## with attention given to sender and recipient anonymitiy. Messages are +## categorized by a topic and stay alive in the network based on a time-to-live +## measured in seconds. Spam prevention is based on proof-of-work, where large +## or long-lived messages must spend more work. +## +## Implementation should be according to Waku specification defined here: +## https://github.com/vacp2p/specs/blob/master/waku/waku.md +## +## Example usage +## ---------- +## First an `EthereumNode` needs to be created, either with all capabilities set +## or with specifically the Waku capability set. +## The latter can be done like this: +## +## .. code-block::nim +## var node = newEthereumNode(keypair, address, netId, nil, +## addAllCapabilities = false) +## node.addCapability Waku +## +## Now calls such as ``postMessage`` and ``subscribeFilter`` can be done. +## However, they only make real sense after ``connectToNetwork`` was started. As +## else there will be no peers to send and receive messages from. + +import + options, tables, times, chronos, chronicles, metrics, + eth/[keys, async_utils, p2p], eth/p2p/rlpx_protocols/whisper/whisper_types, + eth/trie/trie_defs + +export + whisper_types + +logScope: + topics = "waku" + +declarePublicCounter dropped_low_pow_envelopes, + "Dropped envelopes because of too low PoW" +declarePublicCounter dropped_too_large_envelopes, + "Dropped envelopes because larger than maximum allowed size" +declarePublicCounter dropped_bloom_filter_mismatch_envelopes, + "Dropped envelopes because not matching with bloom filter" +declarePublicCounter dropped_topic_mismatch_envelopes, + "Dropped envelopes because of not matching topics" +declarePublicCounter dropped_duplicate_envelopes, + "Dropped duplicate envelopes" + +const + defaultQueueCapacity = 2048 + wakuVersion* = 1 ## Waku version. + wakuVersionStr* = $wakuVersion ## Waku version. + defaultMinPow* = 0.2'f64 ## The default minimum PoW requirement for this node. + defaultMaxMsgSize* = 1024'u32 * 1024'u32 ## The current default and max + ## message size. This can never be larger than the maximum RLPx message size. + messageInterval* = chronos.milliseconds(300) ## Interval at which messages are + ## send to peers, in ms. + pruneInterval* = chronos.milliseconds(1000) ## Interval at which message + ## queue is pruned, in ms. + topicInterestMax = 10000 + +type + WakuConfig* = object + powRequirement*: float64 + bloom*: Option[Bloom] + isLightNode*: bool + maxMsgSize*: uint32 + confirmationsEnabled*: bool + rateLimits*: Option[RateLimits] + topics*: Option[seq[Topic]] + + WakuPeer = ref object + initialized: bool # when successfully completed the handshake + powRequirement*: float64 + bloom*: Bloom + isLightNode*: bool + trusted*: bool + topics*: Option[seq[Topic]] + received: HashSet[Hash] + + P2PRequestHandler* = proc(peer: Peer, envelope: Envelope) {.gcsafe.} + + WakuNetwork = ref object + queue*: ref Queue + filters*: Filters + config*: WakuConfig + p2pRequestHandler*: P2PRequestHandler + + RateLimits* = object + # TODO: uint or specifically uint32? + limitIp*: uint + limitPeerId*: uint + limitTopic*: uint + + StatusOptions* = object + powRequirement*: Option[(float64)] + bloomFilter*: Option[Bloom] + lightNode*: Option[bool] + confirmationsEnabled*: Option[bool] + rateLimits*: Option[RateLimits] + topicInterest*: Option[seq[Topic]] + + KeyKind* = enum + powRequirementKey, + bloomFilterKey, + lightNodeKey, + confirmationsEnabledKey, + rateLimitsKey, + topicInterestKey + +template countSomeFields*(x: StatusOptions): int = + var count = 0 + for f in fields(x): + if f.isSome(): + inc count + count + +proc append*(rlpWriter: var RlpWriter, value: StatusOptions) = + var list = initRlpList(countSomeFields(value)) + if value.powRequirement.isSome(): + list.append((powRequirementKey, cast[uint64](value.powRequirement.get()))) + if value.bloomFilter.isSome(): + list.append((bloomFilterKey, @(value.bloomFilter.get()))) + if value.lightNode.isSome(): + list.append((lightNodeKey, value.lightNode.get())) + if value.confirmationsEnabled.isSome(): + list.append((confirmationsEnabledKey, value.confirmationsEnabled.get())) + if value.rateLimits.isSome(): + list.append((rateLimitsKey, value.rateLimits.get())) + if value.topicInterest.isSome(): + list.append((topicInterestKey, value.topicInterest.get())) + + let bytes = list.finish() + + rlpWriter.append(rlpFromBytes(bytes)) + +proc read*(rlp: var Rlp, T: typedesc[StatusOptions]): T = + if not rlp.isList(): + raise newException(RlpTypeMismatch, + "List expected, but the source RLP is not a list.") + + let sz = rlp.listLen() + # We already know that we are working with a list + doAssert rlp.enterList() + for i in 0 ..< sz: + rlp.tryEnterList() + + var k: KeyKind + try: + k = rlp.read(KeyKind) + except RlpTypeMismatch: + # skip unknown keys and their value + rlp.skipElem() + rlp.skipElem() + continue + + case k + of powRequirementKey: + let pow = rlp.read(uint64) + result.powRequirement = some(cast[float64](pow)) + of bloomFilterKey: + let bloom = rlp.read(seq[byte]) + if bloom.len != bloomSize: + raise newException(UselessPeerError, "Bloomfilter size mismatch") + var bloomFilter: Bloom + bloomFilter.bytesCopy(bloom) + result.bloomFilter = some(bloomFilter) + of lightNodeKey: + result.lightNode = some(rlp.read(bool)) + of confirmationsEnabledKey: + result.confirmationsEnabled = some(rlp.read(bool)) + of rateLimitsKey: + result.rateLimits = some(rlp.read(RateLimits)) + of topicInterestKey: + result.topicInterest = some(rlp.read(seq[Topic])) + +proc allowed*(msg: Message, config: WakuConfig): bool = + # Check max msg size, already happens in RLPx but there is a specific waku + # max msg size which should always be < RLPx max msg size + if msg.size > config.maxMsgSize: + dropped_too_large_envelopes.inc() + warn "Message size too large", size = msg.size + return false + + if msg.pow < config.powRequirement: + dropped_low_pow_envelopes.inc() + warn "Message PoW too low", pow = msg.pow, minPow = config.powRequirement + return false + + if config.topics.isSome(): + if msg.env.topic notin config.topics.get(): + dropped_topic_mismatch_envelopes.inc() + warn "Message topic does not match Waku topic list" + return false + else: + if config.bloom.isSome() and not bloomFilterMatch(config.bloom.get(), msg.bloom): + dropped_bloom_filter_mismatch_envelopes.inc() + warn "Message does not match node bloom filter" + return false + + return true + +proc run(peer: Peer) {.gcsafe, async.} +proc run(node: EthereumNode, network: WakuNetwork) {.gcsafe, async.} + +proc initProtocolState*(network: WakuNetwork, node: EthereumNode) {.gcsafe.} = + new(network.queue) + network.queue[] = initQueue(defaultQueueCapacity) + network.filters = initTable[string, Filter]() + network.config.bloom = some(fullBloom()) + network.config.powRequirement = defaultMinPow + network.config.isLightNode = false + # RateLimits and confirmations are not yet implemented so we set confirmations + # to false and we don't pass RateLimits at all. + network.config.confirmationsEnabled = false + network.config.rateLimits = none(RateLimits) + network.config.maxMsgSize = defaultMaxMsgSize + network.config.topics = none(seq[Topic]) + asyncCheck node.run(network) + +p2pProtocol Waku(version = wakuVersion, + rlpxName = "waku", + peerState = WakuPeer, + networkState = WakuNetwork): + + onPeerConnected do (peer: Peer): + trace "onPeerConnected Waku" + let + wakuNet = peer.networkState + wakuPeer = peer.state + + let options = StatusOptions( + powRequirement: some(wakuNet.config.powRequirement), + bloomFilter: wakuNet.config.bloom, + lightNode: some(wakuNet.config.isLightNode), + confirmationsEnabled: some(wakuNet.config.confirmationsEnabled), + rateLimits: wakuNet.config.rateLimits, + topicInterest: wakuNet.config.topics) + + let m = await peer.status(options, + timeout = chronos.milliseconds(5000)) + + wakuPeer.powRequirement = m.options.powRequirement.get(defaultMinPow) + wakuPeer.bloom = m.options.bloomFilter.get(fullBloom()) + + wakuPeer.isLightNode = m.options.lightNode.get(false) + if wakuPeer.isLightNode and wakuNet.config.isLightNode: + # No sense in connecting two light nodes so we disconnect + raise newException(UselessPeerError, "Two light nodes connected") + + wakuPeer.topics = m.options.topicInterest + if wakuPeer.topics.isSome(): + if wakuPeer.topics.get().len > topicInterestMax: + raise newException(UselessPeerError, "Topic-interest is too large") + if wakuNet.config.topics.isSome(): + raise newException(UselessPeerError, + "Two Waku nodes with topic-interest connected") + + wakuPeer.received.init() + wakuPeer.trusted = false + wakuPeer.initialized = true + + # No timer based queue processing for a light node. + if not wakuNet.config.isLightNode: + traceAsyncErrors peer.run() + + debug "Waku peer initialized", peer + + handshake: + proc status(peer: Peer, options: StatusOptions) + + proc messages(peer: Peer, envelopes: openarray[Envelope]) = + if not peer.state.initialized: + warn "Handshake not completed yet, discarding messages" + return + + for envelope in envelopes: + # check if expired or in future, or ttl not 0 + if not envelope.valid(): + warn "Expired or future timed envelope", peer + # disconnect from peers sending bad envelopes + # await peer.disconnect(SubprotocolReason) + continue + + let msg = initMessage(envelope) + if not msg.allowed(peer.networkState.config): + # disconnect from peers sending bad envelopes + # await peer.disconnect(SubprotocolReason) + continue + + # This peer send this message thus should not receive it again. + # If this peer has the message in the `received` set already, this means + # it was either already received here from this peer or send to this peer. + # Either way it will be in our queue already (and the peer should know + # this) and this peer is sending duplicates. + # Note: geth does not check if a peer has send a message to them before + # broadcasting this message. This too is seen here as a duplicate message + # (see above comment). If we want to seperate these cases (e.g. when peer + # rating), then we have to add a "peer.state.send" HashSet. + # Note: it could also be a race between the arrival of a message send by + # this node to a peer and that same message arriving from that peer (after + # it was received from another peer) here. + if peer.state.received.containsOrIncl(msg.hash): + dropped_duplicate_envelopes.inc() + trace "Peer sending duplicate messages", peer, hash = $msg.hash + # await peer.disconnect(SubprotocolReason) + continue + + # This can still be a duplicate message, but from another peer than + # the peer who send the message. + if peer.networkState.queue[].add(msg): + # notify filters of this message + peer.networkState.filters.notify(msg) + + nextID 22 + + proc statusOptions(peer: Peer, options: StatusOptions) = + if not peer.state.initialized: + warn "Handshake not completed yet, discarding statusOptions" + return + + if options.topicInterest.isSome(): + peer.state.topics = options.topicInterest + elif options.bloomFilter.isSome(): + peer.state.bloom = options.bloomFilter.get() + peer.state.topics = none(seq[Topic]) + + if options.powRequirement.isSome(): + peer.state.powRequirement = options.powRequirement.get() + + if options.lightNode.isSome(): + peer.state.isLightNode = options.lightNode.get() + + nextID 126 + + proc p2pRequest(peer: Peer, envelope: Envelope) = + if not peer.networkState.p2pRequestHandler.isNil(): + peer.networkState.p2pRequestHandler(peer, envelope) + + proc p2pMessage(peer: Peer, envelopes: openarray[Envelope]) = + if peer.state.trusted: + # when trusted we can bypass any checks on envelope + for envelope in envelopes: + let msg = Message(env: envelope, isP2P: true) + peer.networkState.filters.notify(msg) + + # Following message IDs are not part of EIP-627, but are added and used by + # the Status application, we ignore them for now. + nextID 11 + proc batchAcknowledged(peer: Peer) = discard + proc messageResponse(peer: Peer) = discard + + nextID 123 + requestResponse: + proc p2pSyncRequest(peer: Peer) = discard + proc p2pSyncResponse(peer: Peer) = discard + + + proc p2pRequestComplete(peer: Peer, requestId: Hash, lastEnvelopeHash: Hash, + cursor: seq[byte]) = discard + # TODO: + # In the current specification the parameters are not wrapped in a regular + # envelope as is done for the P2P Request packet. If we could alter this in + # the spec it would be a cleaner separation between Waku and Mail server / + # client. + # Also, if a requestResponse block is used, a reqestId will automatically + # be added by the protocol DSL. + # However the requestResponse block in combination with p2pRequest cannot be + # used due to the unfortunate fact that the packet IDs are not consecutive, + # and nextID is not recognized in between these. The nextID behaviour could + # be fixed, however it would be cleaner if the specification could be + # changed to have these IDs to be consecutive. + +# 'Runner' calls --------------------------------------------------------------- + +proc processQueue(peer: Peer) = + # Send to peer all valid and previously not send envelopes in the queue. + var + envelopes: seq[Envelope] = @[] + wakuPeer = peer.state(Waku) + wakuNet = peer.networkState(Waku) + + for message in wakuNet.queue.items: + if wakuPeer.received.contains(message.hash): + # trace "message was already send to peer", hash = $message.hash, peer + continue + + if message.pow < wakuPeer.powRequirement: + trace "Message PoW too low for peer", pow = message.pow, + powReq = wakuPeer.powRequirement + continue + + if wakuPeer.topics.isSome(): + if message.env.topic notin wakuPeer.topics.get(): + trace "Message does not match topics list" + continue + else: + if not bloomFilterMatch(wakuPeer.bloom, message.bloom): + trace "Message does not match peer bloom filter" + continue + + trace "Adding envelope" + envelopes.add(message.env) + wakuPeer.received.incl(message.hash) + + if envelopes.len() > 0: + trace "Sending envelopes", amount=envelopes.len + # Ignore failure of sending messages, this could occur when the connection + # gets dropped + traceAsyncErrors peer.messages(envelopes) + +proc run(peer: Peer) {.async.} = + while peer.connectionState notin {Disconnecting, Disconnected}: + peer.processQueue() + await sleepAsync(messageInterval) + +proc pruneReceived(node: EthereumNode) {.raises: [].} = + if node.peerPool != nil: # XXX: a bit dirty to need to check for this here ... + var wakuNet = node.protocolState(Waku) + + for peer in node.protocolPeers(Waku): + if not peer.initialized: + continue + + # NOTE: Perhaps alter the queue prune call to keep track of a HashSet + # of pruned messages (as these should be smaller), and diff this with + # the received sets. + peer.received = intersection(peer.received, wakuNet.queue.itemHashes) + +proc run(node: EthereumNode, network: WakuNetwork) {.async.} = + while true: + # prune message queue every second + # TTL unit is in seconds, so this should be sufficient? + network.queue[].prune() + # pruning the received sets is not necessary for correct workings + # but simply from keeping the sets growing indefinitely + node.pruneReceived() + await sleepAsync(pruneInterval) + +# Private EthereumNode calls --------------------------------------------------- + +proc sendP2PMessage(node: EthereumNode, peerId: NodeId, + envelopes: openarray[Envelope]): bool = + for peer in node.peers(Waku): + if peer.remote.id == peerId: + asyncCheck peer.p2pMessage(envelopes) + return true + +proc queueMessage(node: EthereumNode, msg: Message): bool = + + var wakuNet = node.protocolState(Waku) + # We have to do the same checks here as in the messages proc not to leak + # any information that the message originates from this node. + if not msg.allowed(wakuNet.config): + return false + + trace "Adding message to queue", hash = $msg.hash + if wakuNet.queue[].add(msg): + # Also notify our own filters of the message we are sending, + # e.g. msg from local Dapp to Dapp + wakuNet.filters.notify(msg) + + return true + +# Public EthereumNode calls ---------------------------------------------------- + +proc postMessage*(node: EthereumNode, pubKey = none[PublicKey](), + symKey = none[SymKey](), src = none[PrivateKey](), + ttl: uint32, topic: Topic, payload: seq[byte], + padding = none[seq[byte]](), powTime = 1'f, + powTarget = defaultMinPow, + targetPeer = none[NodeId]()): bool = + ## Post a message on the message queue which will be processed at the + ## next `messageInterval`. + ## + ## NOTE: This call allows a post without encryption. If encryption is + ## mandatory it should be enforced a layer up + let payload = encode(Payload(payload: payload, src: src, dst: pubKey, + symKey: symKey, padding: padding)) + if payload.isSome(): + var env = Envelope(expiry:epochTime().uint32 + ttl, + ttl: ttl, topic: topic, data: payload.get(), nonce: 0) + + # Allow lightnode to post only direct p2p messages + if targetPeer.isSome(): + return node.sendP2PMessage(targetPeer.get(), [env]) + else: + # non direct p2p message can not have ttl of 0 + if env.ttl == 0: + return false + var msg = initMessage(env, powCalc = false) + # XXX: make this non blocking or not? + # In its current blocking state, it could be noticed by a peer that no + # messages are send for a while, and thus that mining PoW is done, and + # that next messages contains a message originated from this peer + # zah: It would be hard to execute this in a background thread at the + # moment. We'll need a way to send custom "tasks" to the async message + # loop (e.g. AD2 support for AsyncChannels). + if not msg.sealEnvelope(powTime, powTarget): + return false + + # need to check expiry after mining PoW + if not msg.env.valid(): + return false + + result = node.queueMessage(msg) + + # Allows light nodes to post via untrusted messages packet. + # Queue gets processed immediatly as the node sends only its own messages, + # so the privacy ship has already sailed anyhow. + # TODO: + # - Could be still a concern in terms of efficiency, if multiple messages + # need to be send. + # - For Waku Mode, the checks in processQueue are rather useless as the + # idea is to connect only to 1 node? Also refactor in that case. + if node.protocolState(Waku).config.isLightNode: + for peer in node.peers(Waku): + peer.processQueue() + else: + error "Encoding of payload failed" + return false + +proc subscribeFilter*(node: EthereumNode, filter: Filter, + handler:FilterMsgHandler = nil): string = + ## Initiate a filter for incoming/outgoing messages. Messages can be + ## retrieved with the `getFilterMessages` call or with a provided + ## `FilterMsgHandler`. + ## + ## NOTE: This call allows for a filter without decryption. If encryption is + ## mandatory it should be enforced a layer up. + return node.protocolState(Waku).filters.subscribeFilter(filter, handler) + +proc unsubscribeFilter*(node: EthereumNode, filterId: string): bool = + ## Remove a previously subscribed filter. + var filter: Filter + return node.protocolState(Waku).filters.take(filterId, filter) + +proc getFilterMessages*(node: EthereumNode, filterId: string): seq[ReceivedMessage] = + ## Get all the messages currently in the filter queue. This will reset the + ## filter message queue. + return node.protocolState(Waku).filters.getFilterMessages(filterId) + +proc filtersToBloom*(node: EthereumNode): Bloom = + ## Returns the bloom filter of all topics of all subscribed filters. + return node.protocolState(Waku).filters.toBloom() + +proc setPowRequirement*(node: EthereumNode, powReq: float64) {.async.} = + ## Sets the PoW requirement for this node, will also send + ## this new PoW requirement to all connected peers. + ## + ## Failures when sending messages to peers will not be reported. + # NOTE: do we need a tolerance of old PoW for some time? + node.protocolState(Waku).config.powRequirement = powReq + var futures: seq[Future[void]] = @[] + let list = StatusOptions(powRequirement: some(powReq)) + for peer in node.peers(Waku): + futures.add(peer.statusOptions(list)) + + # Exceptions from sendMsg will not be raised + await allFutures(futures) + +proc setBloomFilter*(node: EthereumNode, bloom: Bloom) {.async.} = + ## Sets the bloom filter for this node, will also send + ## this new bloom filter to all connected peers. + ## + ## Failures when sending messages to peers will not be reported. + # NOTE: do we need a tolerance of old bloom filter for some time? + node.protocolState(Waku).config.bloom = some(bloom) + # reset topics + node.protocolState(Waku).config.topics = none(seq[Topic]) + + var futures: seq[Future[void]] = @[] + let list = StatusOptions(bloomFilter: some(bloom)) + for peer in node.peers(Waku): + futures.add(peer.statusOptions(list)) + + # Exceptions from sendMsg will not be raised + await allFutures(futures) + +proc setTopicInterest*(node: EthereumNode, topics: seq[Topic]): + Future[bool] {.async.} = + if topics.len > topicInterestMax: + return false + + node.protocolState(Waku).config.topics = some(topics) + + var futures: seq[Future[void]] = @[] + let list = StatusOptions(topicInterest: some(topics)) + for peer in node.peers(Waku): + futures.add(peer.statusOptions(list)) + + # Exceptions from sendMsg will not be raised + await allFutures(futures) + + return true + +proc setMaxMessageSize*(node: EthereumNode, size: uint32): bool = + ## Set the maximum allowed message size. + ## Can not be set higher than ``defaultMaxMsgSize``. + if size > defaultMaxMsgSize: + warn "size > defaultMaxMsgSize" + return false + node.protocolState(Waku).config.maxMsgSize = size + return true + +proc setPeerTrusted*(node: EthereumNode, peerId: NodeId): bool = + ## Set a connected peer as trusted. + for peer in node.peers(Waku): + if peer.remote.id == peerId: + peer.state(Waku).trusted = true + return true + +proc setLightNode*(node: EthereumNode, isLightNode: bool) {.async.} = + ## Set this node as a Waku light node. + node.protocolState(Waku).config.isLightNode = isLightNode +# TODO: Add starting/stopping of `processQueue` loop depending on value of isLightNode. + var futures: seq[Future[void]] = @[] + let list = StatusOptions(lightNode: some(isLightNode)) + for peer in node.peers(Waku): + futures.add(peer.statusOptions(list)) + + # Exceptions from sendMsg will not be raised + await allFutures(futures) + +proc configureWaku*(node: EthereumNode, config: WakuConfig) = + ## Apply a Waku configuration. + ## + ## NOTE: Should be run before connection is made with peers as some + ## of the settings are only communicated at peer handshake. + node.protocolState(Waku).config = config + +proc resetMessageQueue*(node: EthereumNode) = + ## Full reset of the message queue. + ## + ## NOTE: Not something that should be run in normal circumstances. + node.protocolState(Waku).queue[] = initQueue(defaultQueueCapacity)