diff --git a/eth.nimble b/eth.nimble index c3e2300..c6ab841 100644 --- a/eth.nimble +++ b/eth.nimble @@ -54,7 +54,6 @@ proc runP2pTests() = "test_waku_connect", "test_waku_bridge", "test_waku_mail", - "test_waku_mode", "test_protocol_handlers", "test_enr", ]: diff --git a/eth/p2p/rlpx_protocols/waku_protocol.nim b/eth/p2p/rlpx_protocols/waku_protocol.nim index 78bdf41..c8fac30 100644 --- a/eth/p2p/rlpx_protocols/waku_protocol.nim +++ b/eth/p2p/rlpx_protocols/waku_protocol.nim @@ -72,16 +72,6 @@ const topicInterestMax = 1000 type - WakuMode* = enum - # TODO: is there a reason to allow such "none" mode? This was originally - # put here when it was still supposed to be compatible with Whisper. - None, # No Waku mode - WakuChan, # Waku client - WakuSan # Waku node - # TODO: Light mode could also become part of this enum - # TODO: With discv5, this could be capabilities also announced at level of - # discovery. - WakuConfig* = object powRequirement*: float64 bloom*: Bloom @@ -89,8 +79,7 @@ type maxMsgSize*: uint32 confirmationsEnabled*: bool rateLimits*: RateLimits - wakuMode*: WakuMode - topics*: seq[Topic] + topics*: Option[seq[Topic]] WakuPeer = ref object initialized: bool # when successfully completed the handshake @@ -98,8 +87,7 @@ type bloom*: Bloom isLightNode*: bool trusted*: bool - wakuMode*: WakuMode - topics*: seq[Topic] + topics*: Option[seq[Topic]] received: HashSet[Hash] P2PRequestHandler* = proc(peer: Peer, envelope: Envelope) {.gcsafe.} @@ -116,6 +104,89 @@ type limitPeerId*: uint limitTopic*: uint + StatusOptions* = object + powRequirement*: Option[(float64)] + bloomFilter*: Option[Bloom] + lightNode*: Option[bool] + confirmationsEnabled*: Option[bool] + rateLimits*: Option[RateLimits] + topicInterest*: Option[seq[Topic]] + + KeyKind* = enum + powRequirementKey, + bloomFilterKey, + lightNodeKey, + confirmationsEnabledKey, + rateLimitsKey, + topicInterestKey + +template countSomeFields*(x: StatusOptions): int = + var count = 0 + for f in fields(x): + if f.isSome(): + inc count + count + +proc append*(rlpWriter: var RlpWriter, value: StatusOptions) = + var list = initRlpList(countSomeFields(value)) + if value.powRequirement.isSome(): + list.append((powRequirementKey, cast[uint64](value.powRequirement.get()))) + if value.bloomFilter.isSome(): + list.append((bloomFilterKey, @(value.bloomFilter.get()))) + if value.lightNode.isSome(): + list.append((lightNodeKey, value.lightNode.get())) + if value.confirmationsEnabled.isSome(): + list.append((confirmationsEnabledKey, value.confirmationsEnabled.get())) + if value.rateLimits.isSome(): + list.append((rateLimitsKey, value.rateLimits.get())) + if value.topicInterest.isSome(): + list.append((topicInterestKey, value.topicInterest.get())) + + let bytes = list.finish() + + rlpWriter.append(rlpFromBytes(bytes.toRange)) + +proc read*(rlp: var Rlp, T: typedesc[StatusOptions]): T = + if not rlp.isList(): + raise newException(RlpTypeMismatch, + "List expected, but the source RLP is not a list.") + + let sz = rlp.listLen() + rlp.enterList() + for i in 0 ..< sz: + if not rlp.isList(): + raise newException(RlpTypeMismatch, + "List expected, but the source RLP is not a list.") + + rlp.enterList() + var k: KeyKind + try: + k = rlp.read(KeyKind) + except RlpTypeMismatch: + # skip unknown keys and their value + rlp.skipElem() + rlp.skipElem() + continue + + case k + of powRequirementKey: + let pow = rlp.read(uint64) + result.powRequirement = some(cast[float64](pow)) + of bloomFilterKey: + let bloom = rlp.read(seq[byte]) + if bloom.len != bloomSize: + raise newException(UselessPeerError, "Bloomfilter size mismatch") + var bloomFilter: Bloom + bloomFilter.bytesCopy(bloom) + result.bloomFilter = some(bloomFilter) + of lightNodeKey: + result.lightNode = some(rlp.read(bool)) + of confirmationsEnabledKey: + result.confirmationsEnabled = some(rlp.read(bool)) + of rateLimitsKey: + result.rateLimits = some(rlp.read(RateLimits)) + of topicInterestKey: + result.topicInterest = some(rlp.read(seq[Topic])) proc allowed*(msg: Message, config: WakuConfig): bool = # Check max msg size, already happens in RLPx but there is a specific waku @@ -135,8 +206,8 @@ proc allowed*(msg: Message, config: WakuConfig): bool = warn "Message does not match node bloom filter" return false - if config.wakuMode == WakuChan: - if msg.env.topic notin config.topics: + if config.topics.isSome(): + if msg.env.topic notin config.topics.get(): dropped_topic_mismatch_envelopes.inc() warn "Message topic does not match Waku topic list" return false @@ -160,8 +231,7 @@ proc initProtocolState*(network: WakuNetwork, node: EthereumNode) {.gcsafe.} = network.config.rateLimits = RateLimits(limitIp: 0, limitPeerId: 0, limitTopic:0) network.config.maxMsgSize = defaultMaxMsgSize - network.config.wakuMode = None # default no waku mode - network.config.topics = @[] + network.config.topics = none(seq[Topic]) asyncCheck node.run(network) p2pProtocol Waku(version = wakuVersion, @@ -175,49 +245,37 @@ p2pProtocol Waku(version = wakuVersion, wakuNet = peer.networkState wakuPeer = peer.state - let m = await peer.status(wakuVersion, - cast[uint64](wakuNet.config.powRequirement), - @(wakuNet.config.bloom), - wakuNet.config.isLightNode, - wakuNet.config.confirmationsEnabled, - wakuNet.config.rateLimits, - wakuNet.config.wakuMode, - wakuNet.config.topics, - timeout = chronos.milliseconds(500)) + let list = StatusOptions( + powRequirement: some(wakuNet.config.powRequirement), + bloomFilter: some(wakuNet.config.bloom), + lightNode: some(wakuNet.config.isLightNode), + confirmationsEnabled: some(wakuNet.config.confirmationsEnabled), + rateLimits: some(wakuNet.config.rateLimits), + topicInterest: wakuNet.config.topics) + + let m = await peer.status(wakuVersion, list, + timeout = chronos.milliseconds(500)) if m.protocolVersion == wakuVersion: debug "Waku peer", peer, wakuVersion else: raise newException(UselessPeerError, "Incompatible Waku version") - wakuPeer.powRequirement = cast[float64](m.powConverted) + wakuPeer.powRequirement = m.list.powRequirement.get(defaultMinPow) + wakuPeer.bloom = m.list.bloomFilter.get(fullBloom()) - if m.bloom.len > 0: - if m.bloom.len != bloomSize: - raise newException(UselessPeerError, "Bloomfilter size mismatch") - else: - wakuPeer.bloom.bytesCopy(m.bloom) - else: - # If no bloom filter is send we allow all - wakuPeer.bloom = fullBloom() - - wakuPeer.isLightNode = m.isLightNode + wakuPeer.isLightNode = m.list.lightNode.get(false) if wakuPeer.isLightNode and wakuNet.config.isLightNode: # No sense in connecting two light nodes so we disconnect raise newException(UselessPeerError, "Two light nodes connected") - # When Waku-san connect to all. When None, connect to all, Waku-chan has - # to decide to disconnect. When Waku-chan, connect only to Waku-san. - wakuPeer.wakuMode = m.wakuMode - if wakuNet.config.wakuMode == WakuChan: - if wakuPeer.wakuMode == WakuChan: - raise newException(UselessPeerError, "Two Waku-chan connected") - elif wakuPeer.wakuMode == None: - raise newException(UselessPeerError, "Not in Waku mode") - if wakuNet.config.wakuMode == WakuSan and - wakuPeer.wakuMode == WakuChan: - # TODO: need some maximum check on amount of topics - wakuPeer.topics = m.topics + wakuPeer.topics = m.list.topicInterest + if wakuPeer.topics.isSome(): + if wakuPeer.topics.get().len > topicInterestMax: + raise newException(UselessPeerError, "Topic-interest is too large") + if wakuNet.config.topics.isSome(): + raise newException(UselessPeerError, + "Two Waku nodes with topic-interest connected") wakuPeer.received.init() wakuPeer.trusted = false @@ -230,15 +288,7 @@ p2pProtocol Waku(version = wakuVersion, debug "Waku peer initialized", peer handshake: - proc status(peer: Peer, - protocolVersion: uint, - powConverted: uint64, - bloom: Bytes, - isLightNode: bool, - confirmationsEnabled: bool, - rateLimits: RateLimits, - wakuMode: WakuMode, - topics: seq[Topic]) + proc status(peer: Peer, protocolVersion: uint, list: StatusOptions) proc messages(peer: Peer, envelopes: openarray[Envelope]) = if not peer.state.initialized: @@ -311,8 +361,11 @@ p2pProtocol Waku(version = wakuVersion, error "Too many topics in the topic-interest list" return - if peer.state.wakuMode == WakuChan: - peer.state.topics = topics + # TODO: We currently do not allow changing topic-interest. + # If we want the check here should be removed, however this would be no + # consistent (with Status packet) way of changing back to no topic-interest. + if peer.state.topics.isSome(): + peer.state.topics = some(topics) nextID 126 @@ -377,9 +430,8 @@ proc processQueue(peer: Peer) = trace "Message does not match peer bloom filter" continue - if wakuNet.config.wakuMode == WakuSan and - wakuPeer.wakuMode == WakuChan: - if message.env.topic notin wakuPeer.topics: + if wakuPeer.topics.isSome(): + if message.env.topic notin wakuPeer.topics.get(): trace "Message does not match topics list" continue @@ -561,7 +613,7 @@ proc setTopics*(node: EthereumNode, topics: seq[Topic]): if topics.len > topicInterestMax: return false - node.protocolState(Waku).config.topics = topics + node.protocolState(Waku).config.topics = some(topics) var futures: seq[Future[void]] = @[] for peer in node.peers(Waku): futures.add(peer.topicInterest(topics)) diff --git a/tests/p2p/test_waku_connect.nim b/tests/p2p/test_waku_connect.nim index b15f296..e3171e1 100644 --- a/tests/p2p/test_waku_connect.nim +++ b/tests/p2p/test_waku_connect.nim @@ -12,11 +12,60 @@ import eth/p2p/rlpx_protocols/waku_protocol, eth/p2p/peer_pool, ./p2p_test_helper -const safeTTL = 5'u32 +const + safeTTL = 5'u32 + waitInterval = messageInterval + 150.milliseconds # TODO: Just repeat all the test_shh_connect tests here that are applicable or # have some commonly shared test code for both protocols. suite "Waku connections": + asyncTest "Test Waku connections": + var n1 = setupTestNode(Waku) + var n2 = setupTestNode(Waku) + var n3 = setupTestNode(Waku) + var n4 = setupTestNode(Waku) + + var topics: seq[Topic] + n1.protocolState(Waku).config.topics = some(topics) + n2.protocolState(Waku).config.topics = some(topics) + n3.protocolState(Waku).config.topics = none(seq[Topic]) + n4.protocolState(Waku).config.topics = none(seq[Topic]) + + n1.startListening() + n3.startListening() + + let p1 = await n2.rlpxConnect(newNode(initENode(n1.keys.pubKey, n1.address))) + let p2 = await n2.rlpxConnect(newNode(initENode(n3.keys.pubKey, n3.address))) + let p3 = await n4.rlpxConnect(newNode(initENode(n3.keys.pubKey, n3.address))) + check: + p1.isNil + p2.isNil == false + p3.isNil == false + + asyncTest "Test Waku topic-interest": + var wakuTopicNode = setupTestNode(Waku) + var wakuNode = setupTestNode(Waku) + + let topic1 = [byte 0xDA, 0xDA, 0xDA, 0xAA] + let topic2 = [byte 0xD0, 0xD0, 0xD0, 0x00] + let wrongTopic = [byte 0x4B, 0x1D, 0x4B, 0x1D] + + wakuTopicNode.protocolState(Waku).config.topics = some(@[topic1, topic2]) + + wakuNode.startListening() + await wakuTopicNode.peerPool.connectToNode(newNode(initENode(wakuNode.keys.pubKey, + wakuNode.address))) + + let payload = repeat(byte 0, 10) + check: + wakuNode.postMessage(ttl = safeTTL, topic = topic1, payload = payload) + wakuNode.postMessage(ttl = safeTTL, topic = topic2, payload = payload) + wakuNode.postMessage(ttl = safeTTL, topic = wrongTopic, payload = payload) + wakuNode.protocolState(Waku).queue.items.len == 3 + await sleepAsync(waitInterval) + check: + wakuTopicNode.protocolState(Waku).queue.items.len == 2 + asyncTest "Light node posting": var ln = setupTestNode(Waku) ln.setLightNode(true) diff --git a/tests/p2p/test_waku_mode.nim b/tests/p2p/test_waku_mode.nim deleted file mode 100644 index 8cd41cc..0000000 --- a/tests/p2p/test_waku_mode.nim +++ /dev/null @@ -1,77 +0,0 @@ -# -# Waku -# (c) Copyright 2019 -# Status Research & Development GmbH -# -# Licensed under either of -# Apache License, version 2.0, (LICENSE-APACHEv2) -# MIT license (LICENSE-MIT) - -import - sequtils, options, unittest, chronos, eth/[keys, p2p], - eth/p2p/rlpx_protocols/waku_protocol, eth/p2p/peer_pool, - ./p2p_test_helper - -const - safeTTL = 5'u32 - waitInterval = messageInterval + 150.milliseconds - -suite "Waku Mode": - asyncTest "Test Waku-chan with Waku-san": - var wakuChan = setupTestNode(Waku) - var wakuSan = setupTestNode(Waku) - - let topic1 = [byte 0xDA, 0xDA, 0xDA, 0xAA] - let topic2 = [byte 0xD0, 0xD0, 0xD0, 0x00] - let wrongTopic = [byte 0x4B, 0x1D, 0x4B, 0x1D] - - wakuChan.protocolState(Waku).config.wakuMode = WakuChan - wakuChan.protocolState(Waku).config.topics = @[topic1, topic2] - wakuSan.protocolState(Waku).config.wakuMode = WakuSan - - wakuSan.startListening() - await wakuChan.peerPool.connectToNode(newNode(initENode(wakuSan.keys.pubKey, - wakuSan.address))) - - let payload = repeat(byte 0, 10) - check: - wakuSan.postMessage(ttl = safeTTL, topic = topic1, payload = payload) - wakuSan.postMessage(ttl = safeTTL, topic = topic2, payload = payload) - wakuSan.postMessage(ttl = safeTTL, topic = wrongTopic, payload = payload) - wakuSan.protocolState(Waku).queue.items.len == 3 - await sleepAsync(waitInterval) - check: - wakuChan.protocolState(Waku).queue.items.len == 2 - - asyncTest "Test Waku connections": - var n1 = setupTestNode(Waku) - var n2 = setupTestNode(Waku) - var n3 = setupTestNode(Waku) - var n4 = setupTestNode(Waku) - var n5 = setupTestNode(Waku) - - n1.protocolState(Waku).config.wakuMode = WakuMode.None - n2.protocolState(Waku).config.wakuMode = WakuChan - n3.protocolState(Waku).config.wakuMode = WakuChan - n4.protocolState(Waku).config.wakuMode = WakuSan - n5.protocolState(Waku).config.wakuMode = WakuSan - - n1.startListening() - n3.startListening() - n5.startListening() - - let p1 = await n2.rlpxConnect(newNode(initENode(n1.keys.pubKey, - n1.address))) - let p2 = await n2.rlpxConnect(newNode(initENode(n3.keys.pubKey, - n3.address))) - check: - p1.isNil - p2.isNil - - let p3 = await n4.rlpxConnect(newNode(initENode(n1.keys.pubKey, - n1.address))) - let p4 = await n4.rlpxConnect(newNode(initENode(n5.keys.pubKey, - n5.address))) - check: - p3.isNil == false - p4.isNil == false