diff --git a/eth/p2p/rlpx_protocols/waku_protocol.nim b/eth/p2p/rlpx_protocols/waku_protocol.nim index 0719071..e2aa669 100644 --- a/eth/p2p/rlpx_protocols/waku_protocol.nim +++ b/eth/p2p/rlpx_protocols/waku_protocol.nim @@ -74,7 +74,7 @@ const type WakuConfig* = object powRequirement*: float64 - bloom*: Bloom + bloom*: Option[Bloom] isLightNode*: bool maxMsgSize*: uint32 confirmationsEnabled*: bool @@ -199,7 +199,7 @@ proc allowed*(msg: Message, config: WakuConfig): bool = warn "Message topic does not match Waku topic list" return false else: - if not bloomFilterMatch(config.bloom, msg.bloom): + if config.bloom.isSome() and not bloomFilterMatch(config.bloom.get(), msg.bloom): dropped_bloom_filter_mismatch_envelopes.inc() warn "Message does not match node bloom filter" return false @@ -213,7 +213,7 @@ proc initProtocolState*(network: WakuNetwork, node: EthereumNode) {.gcsafe.} = new(network.queue) network.queue[] = initQueue(defaultQueueCapacity) network.filters = initTable[string, Filter]() - network.config.bloom = fullBloom() + network.config.bloom = some(fullBloom()) network.config.powRequirement = defaultMinPow network.config.isLightNode = false # RateLimits and confirmations are not yet implemented so we set confirmations @@ -237,7 +237,7 @@ p2pProtocol Waku(version = wakuVersion, let list = StatusOptions( powRequirement: some(wakuNet.config.powRequirement), - bloomFilter: some(wakuNet.config.bloom), + bloomFilter: wakuNet.config.bloom, lightNode: some(wakuNet.config.isLightNode), confirmationsEnabled: some(wakuNet.config.confirmationsEnabled), rateLimits: wakuNet.config.rateLimits, @@ -328,40 +328,24 @@ p2pProtocol Waku(version = wakuVersion, # notify filters of this message peer.networkState.filters.notify(msg) - proc powRequirement(peer: Peer, value: uint64) = + nextID 22 + + proc statusOptions(peer: Peer, options: StatusOptions) = if not peer.state.initialized: - warn "Handshake not completed yet, discarding powRequirement" + warn "Handshake not completed yet, discarding statusOptions" return - peer.state.powRequirement = cast[float64](value) + if options.topicInterest.isSome(): + peer.state.topics = options.topicInterest + elif options.bloomFilter.isSome(): + peer.state.bloom = options.bloomFilter.get() + peer.state.topics = none(seq[Topic]) - proc bloomFilterExchange(peer: Peer, bloom: Bytes) = - if not peer.state.initialized: - warn "Handshake not completed yet, discarding bloomFilterExchange" - return + if options.powRequirement.isSome(): + peer.state.powRequirement = options.powRequirement.get() - if bloom.len == bloomSize: - peer.state.bloom.bytesCopy(bloom) - - nextID 20 - - proc rateLimits(peer: Peer, rateLimits: RateLimits) = discard - - proc topicInterest(peer: Peer, topics: openarray[Topic]) = - if not peer.state.initialized: - warn "Handshake not completed yet, discarding topicInterest" - return - - if topics.len > topicInterestMax: - error "Too many topics in the topic-interest list" - return - - # TODO: We currently do not allow changing topic-interest. - # We could allow this by also resetting topic-interest on a bloom filter - # exchange. But we don't for now and will instead introduce another packet - # type in the Waku Specification. - if peer.state.topics.isSome(): - peer.state.topics = some(topics) + if options.lightNode.isSome(): + peer.state.isLightNode = options.lightNode.get() nextID 126 @@ -584,8 +568,9 @@ proc setPowRequirement*(node: EthereumNode, powReq: float64) {.async.} = # NOTE: do we need a tolerance of old PoW for some time? node.protocolState(Waku).config.powRequirement = powReq var futures: seq[Future[void]] = @[] + let list = StatusOptions(powRequirement: some(powReq)) for peer in node.peers(Waku): - futures.add(peer.powRequirement(cast[uint64](powReq))) + futures.add(peer.statusOptions(list)) # Exceptions from sendMsg will not be raised await allFutures(futures) @@ -596,10 +581,14 @@ proc setBloomFilter*(node: EthereumNode, bloom: Bloom) {.async.} = ## ## Failures when sending messages to peers will not be reported. # NOTE: do we need a tolerance of old bloom filter for some time? - node.protocolState(Waku).config.bloom = bloom + node.protocolState(Waku).config.bloom = some(bloom) + # reset topics + node.protocolState(Waku).config.topics = none(seq[Topic]) + var futures: seq[Future[void]] = @[] + let list = StatusOptions(bloomFilter: some(bloom)) for peer in node.peers(Waku): - futures.add(peer.bloomFilterExchange(@bloom)) + futures.add(peer.statusOptions(list)) # Exceptions from sendMsg will not be raised await allFutures(futures) @@ -609,13 +598,12 @@ proc setTopicInterest*(node: EthereumNode, topics: seq[Topic]): if topics.len > topicInterestMax: return false - if node.protocolState(Waku).config.topics.isNone(): - return false - node.protocolState(Waku).config.topics = some(topics) + var futures: seq[Future[void]] = @[] + let list = StatusOptions(topicInterest: some(topics)) for peer in node.peers(Waku): - futures.add(peer.topicInterest(topics)) + futures.add(peer.statusOptions(list)) # Exceptions from sendMsg will not be raised await allFutures(futures) @@ -638,12 +626,17 @@ proc setPeerTrusted*(node: EthereumNode, peerId: NodeId): bool = peer.state(Waku).trusted = true return true -proc setLightNode*(node: EthereumNode, isLightNode: bool) = +proc setLightNode*(node: EthereumNode, isLightNode: bool) {.async.} = ## Set this node as a Waku light node. - ## - ## NOTE: Should be run before connection is made with peers as this - ## setting is only communicated at peer handshake. node.protocolState(Waku).config.isLightNode = isLightNode +# TODO: Add starting/stopping of `processQueue` loop depending on value of isLightNode. + var futures: seq[Future[void]] = @[] + let list = StatusOptions(lightNode: some(isLightNode)) + for peer in node.peers(Waku): + futures.add(peer.statusOptions(list)) + + # Exceptions from sendMsg will not be raised + await allFutures(futures) proc configureWaku*(node: EthereumNode, config: WakuConfig) = ## Apply a Waku configuration. diff --git a/tests/p2p/test_waku_connect.nim b/tests/p2p/test_waku_connect.nim index 62b792d..c1f6f04 100644 --- a/tests/p2p/test_waku_connect.nim +++ b/tests/p2p/test_waku_connect.nim @@ -44,6 +44,119 @@ suite "Waku connections": p2.isNil == false p3.isNil == false + asyncTest "Waku set-topic-interest": + var + wakuTopicNode = setupTestNode(Waku) + wakuNode = setupTestNode(Waku) + + let + topic1 = [byte 0xDA, 0xDA, 0xDA, 0xAA] + topic2 = [byte 0xD0, 0xD0, 0xD0, 0x00] + wrongTopic = [byte 0x4B, 0x1D, 0x4B, 0x1D] + + # Set one topic so we are not considered a full node + wakuTopicNode.protocolState(Waku).config.topics = some(@[topic1]) + + wakuNode.startListening() + await wakuTopicNode.peerPool.connectToNode(newNode( + initENode(wakuNode.keys.pubKey, wakuNode.address))) + + # Update topic interest + check: + await setTopicInterest(wakuTopicNode, @[topic1, topic2]) + + let payload = repeat(byte 0, 10) + check: + wakuNode.postMessage(ttl = safeTTL, topic = topic1, payload = payload) + wakuNode.postMessage(ttl = safeTTL, topic = topic2, payload = payload) + wakuNode.postMessage(ttl = safeTTL, topic = wrongTopic, payload = payload) + wakuNode.protocolState(Waku).queue.items.len == 3 + await sleepAsync(waitInterval) + check: + wakuTopicNode.protocolState(Waku).queue.items.len == 2 + + asyncTest "Waku set-minimum-pow": + var + wakuPowNode = setupTestNode(Waku) + wakuNode = setupTestNode(Waku) + + wakuNode.startListening() + await wakuPowNode.peerPool.connectToNode(newNode( + initENode(wakuNode.keys.pubKey, wakuNode.address))) + + # Update minimum pow + await setPowRequirement(wakuPowNode, 1.0) + await sleepAsync(waitInterval) + + check: + wakuNode.peerPool.len == 1 + + # check powRequirement is updated + for peer in wakuNode.peerPool.peers: + check: + peer.state(Waku).powRequirement == 1.0 + + asyncTest "Waku set-light-node": + var + wakuLightNode = setupTestNode(Waku) + wakuNode = setupTestNode(Waku) + + wakuNode.startListening() + await wakuLightNode.peerPool.connectToNode(newNode( + initENode(wakuNode.keys.pubKey, wakuNode.address))) + + # Update minimum pow + await setLightNode(wakuLightNode, true) + await sleepAsync(waitInterval) + + check: + wakuNode.peerPool.len == 1 + + # check lightNode is updated + for peer in wakuNode.peerPool.peers: + check: + peer.state(Waku).isLightNode + + asyncTest "Waku set-bloom-filter": + var + wakuBloomNode = setupTestNode(Waku) + wakuNode = setupTestNode(Waku) + bloom = fullBloom() + topics = @[[byte 0xDA, 0xDA, 0xDA, 0xAA]] + + # Set topic interest + discard await wakuBloomNode.setTopicInterest(topics) + + wakuBloomNode.startListening() + await wakuNode.peerPool.connectToNode(newNode( + initENode(wakuBloomNode.keys.pubKey, wakuBloomNode.address))) + + # Sanity check + check: + wakuNode.peerPool.len == 1 + + # check bloom filter is updated + for peer in wakuNode.peerPool.peers: + check: + peer.state(Waku).bloom == bloom + peer.state(Waku).topics == some(topics) + + # disable one bit in the bloom filter + bloom[0] = 0x0 + + # and set it + await setBloomFilter(wakuBloomNode, bloom) + await sleepAsync(waitInterval) + + # check bloom filter is updated + check: + wakuNode.peerPool.len == 1 + + for peer in wakuNode.peerPool.peers: + check: + peer.state(Waku).bloom == bloom + peer.state(Waku).topics == none(seq[Topic]) + asyncTest "Waku topic-interest": var wakuTopicNode = setupTestNode(Waku) @@ -82,7 +195,7 @@ suite "Waku connections": # It was checked that the topics don't trigger false positives on the bloom. wakuTopicNode.protocolState(Waku).config.topics = some(@[topic1, topic2]) - wakuTopicNode.protocolState(Waku).config.bloom = toBloom([bloomTopic]) + wakuTopicNode.protocolState(Waku).config.bloom = some(toBloom([bloomTopic])) wakuNode.startListening() await wakuTopicNode.peerPool.connectToNode(newNode( @@ -100,7 +213,7 @@ suite "Waku connections": asyncTest "Light node posting": var ln = setupTestNode(Waku) - ln.setLightNode(true) + await ln.setLightNode(true) var fn = setupTestNode(Waku) fn.startListening() await ln.peerPool.connectToNode(newNode(initENode(fn.keys.pubKey,