From f940162b6b038fb63d855f46cdb419ac970a74f0 Mon Sep 17 00:00:00 2001 From: kdeme Date: Thu, 12 Dec 2019 23:23:26 +0100 Subject: [PATCH] Implement Waku mode PoC --- eth.nimble | 1 + eth/p2p/rlpx_protocols/waku_protocol.nim | 62 ++++++++++++++++++- tests/p2p/test_waku_mode.nim | 77 ++++++++++++++++++++++++ 3 files changed, 139 insertions(+), 1 deletion(-) create mode 100644 tests/p2p/test_waku_mode.nim diff --git a/eth.nimble b/eth.nimble index 455a7c6..21a422c 100644 --- a/eth.nimble +++ b/eth.nimble @@ -54,6 +54,7 @@ proc runP2pTests() = "test_shh_connect", "test_waku_bridge", "test_waku_mail", + "test_waku_mode", "test_protocol_handlers", ]: runTest("tests/p2p/" & filename) diff --git a/eth/p2p/rlpx_protocols/waku_protocol.nim b/eth/p2p/rlpx_protocols/waku_protocol.nim index 088de5f..8851821 100644 --- a/eth/p2p/rlpx_protocols/waku_protocol.nim +++ b/eth/p2p/rlpx_protocols/waku_protocol.nim @@ -57,11 +57,23 @@ const ## queue is pruned, in ms. type + WakuMode* = enum + # TODO: is there a reason to allow such "none" mode? This was originally + # put here when it was still supposed to be compatible with Whisper. + None, # No Waku mode + WakuChan, # Waku client + WakuSan # Waku node + # TODO: Light mode could also become part of this enum + # TODO: With discv5, this could be capabilities also announced at level of + # discovery. + WakuConfig* = object powRequirement*: float64 bloom*: Bloom isLightNode*: bool maxMsgSize*: uint32 + wakuMode*: WakuMode + topics*: seq[Topic] WakuPeer = ref object initialized: bool # when successfully completed the handshake @@ -69,6 +81,8 @@ type bloom*: Bloom isLightNode*: bool trusted*: bool + wakuMode*: WakuMode + topics*: seq[Topic] received: HashSet[Message] P2PRequestHandler* = proc(peer: Peer, envelope: Envelope) {.gcsafe.} @@ -105,6 +119,11 @@ proc allowed*(msg: Message, config: WakuConfig): bool = warn "Message does not match node bloom filter" return false + if config.wakuMode == WakuChan: + if msg.env.topic notin config.topics: + warn "Message topic does not match Waku topic list" + return false + return true proc run(peer: Peer) {.gcsafe, async.} @@ -118,6 +137,8 @@ proc initProtocolState*(network: WakuNetwork, node: EthereumNode) {.gcsafe.} = network.config.powRequirement = defaultMinPow network.config.isLightNode = false network.config.maxMsgSize = defaultMaxMsgSize + network.config.wakuMode = None # default no waku mode + network.config.topics = @[] asyncCheck node.run(network) p2pProtocol Waku(version = wakuVersion, @@ -135,6 +156,8 @@ p2pProtocol Waku(version = wakuVersion, cast[uint](wakuNet.config.powRequirement), @(wakuNet.config.bloom), wakuNet.config.isLightNode, + wakuNet.config.wakuMode, + wakuNet.config.topics, timeout = chronos.milliseconds(500)) if m.protocolVersion == wakuVersion: @@ -158,6 +181,19 @@ p2pProtocol Waku(version = wakuVersion, # No sense in connecting two light nodes so we disconnect raise newException(UselessPeerError, "Two light nodes connected") + # When Waku-san connect to all. When None, connect to all, Waku-chan has + # to decide to disconnect. When Waku-chan, connect only to Waku-san. + wakuPeer.wakuMode = m.wakuMode + if wakuNet.config.wakuMode == WakuChan: + if wakuPeer.wakuMode == WakuChan: + raise newException(UselessPeerError, "Two Waku-chan connected") + elif wakuPeer.wakuMode == None: + raise newException(UselessPeerError, "Not in Waku mode") + if wakuNet.config.wakuMode == WakuSan and + wakuPeer.wakuMode == WakuChan: + # TODO: need some maximum check on amount of topics + wakuPeer.topics = m.topics + wakuPeer.received.init() wakuPeer.trusted = false wakuPeer.initialized = true @@ -172,7 +208,9 @@ p2pProtocol Waku(version = wakuVersion, protocolVersion: uint, powConverted: uint, bloom: Bytes, - isLightNode: bool) + isLightNode: bool, + wakuMode: WakuMode, + topics: seq[Topic]) proc messages(peer: Peer, envelopes: openarray[Envelope]) = if not peer.state.initialized: @@ -228,6 +266,14 @@ p2pProtocol Waku(version = wakuVersion, if bloom.len == bloomSize: peer.state.bloom.bytesCopy(bloom) + proc topicsExchange(peer: Peer, topics: seq[Topic]) = + if not peer.state.initialized: + warn "Handshake not completed yet, discarding topicsExchange" + return + + if peer.state.wakuMode == WakuChan: + peer.state.topics = topics + nextID 126 proc p2pRequest(peer: Peer, envelope: Envelope) = @@ -283,6 +329,11 @@ proc processQueue(peer: Peer) = debug "Message does not match peer bloom filter" continue + if wakuNet.config.wakuMode == WakuSan and + wakuPeer.wakuMode == WakuChan: + if message.env.topic notin wakuPeer.topics: + continue + trace "Adding envelope" envelopes.add(message.env) wakuPeer.received.incl(message) @@ -447,6 +498,15 @@ proc setBloomFilter*(node: EthereumNode, bloom: Bloom) {.async.} = # Exceptions from sendMsg will not be raised await allFutures(futures) +proc setTopics*(node: EthereumNode, topics: seq[Topic]) {.async.} = + node.protocolState(Waku).config.topics = topics + var futures: seq[Future[void]] = @[] + for peer in node.peers(Waku): + futures.add(peer.topicsExchange(topics)) + + # Exceptions from sendMsg will not be raised + await allFutures(futures) + proc setMaxMessageSize*(node: EthereumNode, size: uint32): bool = ## Set the maximum allowed message size. ## Can not be set higher than ``defaultMaxMsgSize``. diff --git a/tests/p2p/test_waku_mode.nim b/tests/p2p/test_waku_mode.nim new file mode 100644 index 0000000..8cd41cc --- /dev/null +++ b/tests/p2p/test_waku_mode.nim @@ -0,0 +1,77 @@ +# +# Waku +# (c) Copyright 2019 +# Status Research & Development GmbH +# +# Licensed under either of +# Apache License, version 2.0, (LICENSE-APACHEv2) +# MIT license (LICENSE-MIT) + +import + sequtils, options, unittest, chronos, eth/[keys, p2p], + eth/p2p/rlpx_protocols/waku_protocol, eth/p2p/peer_pool, + ./p2p_test_helper + +const + safeTTL = 5'u32 + waitInterval = messageInterval + 150.milliseconds + +suite "Waku Mode": + asyncTest "Test Waku-chan with Waku-san": + var wakuChan = setupTestNode(Waku) + var wakuSan = setupTestNode(Waku) + + let topic1 = [byte 0xDA, 0xDA, 0xDA, 0xAA] + let topic2 = [byte 0xD0, 0xD0, 0xD0, 0x00] + let wrongTopic = [byte 0x4B, 0x1D, 0x4B, 0x1D] + + wakuChan.protocolState(Waku).config.wakuMode = WakuChan + wakuChan.protocolState(Waku).config.topics = @[topic1, topic2] + wakuSan.protocolState(Waku).config.wakuMode = WakuSan + + wakuSan.startListening() + await wakuChan.peerPool.connectToNode(newNode(initENode(wakuSan.keys.pubKey, + wakuSan.address))) + + let payload = repeat(byte 0, 10) + check: + wakuSan.postMessage(ttl = safeTTL, topic = topic1, payload = payload) + wakuSan.postMessage(ttl = safeTTL, topic = topic2, payload = payload) + wakuSan.postMessage(ttl = safeTTL, topic = wrongTopic, payload = payload) + wakuSan.protocolState(Waku).queue.items.len == 3 + await sleepAsync(waitInterval) + check: + wakuChan.protocolState(Waku).queue.items.len == 2 + + asyncTest "Test Waku connections": + var n1 = setupTestNode(Waku) + var n2 = setupTestNode(Waku) + var n3 = setupTestNode(Waku) + var n4 = setupTestNode(Waku) + var n5 = setupTestNode(Waku) + + n1.protocolState(Waku).config.wakuMode = WakuMode.None + n2.protocolState(Waku).config.wakuMode = WakuChan + n3.protocolState(Waku).config.wakuMode = WakuChan + n4.protocolState(Waku).config.wakuMode = WakuSan + n5.protocolState(Waku).config.wakuMode = WakuSan + + n1.startListening() + n3.startListening() + n5.startListening() + + let p1 = await n2.rlpxConnect(newNode(initENode(n1.keys.pubKey, + n1.address))) + let p2 = await n2.rlpxConnect(newNode(initENode(n3.keys.pubKey, + n3.address))) + check: + p1.isNil + p2.isNil + + let p3 = await n4.rlpxConnect(newNode(initENode(n1.keys.pubKey, + n1.address))) + let p4 = await n4.rlpxConnect(newNode(initENode(n5.keys.pubKey, + n5.address))) + check: + p3.isNil == false + p4.isNil == false