mirror of
https://github.com/status-im/nim-eth.git
synced 2025-01-26 06:21:17 +00:00
Merge pull request #178 from cammellos/feature/handle-status-update
Handle waku status-update code 22 & increase max topics
This commit is contained in:
commit
5b140af1cd
@ -69,12 +69,12 @@ const
|
||||
## send to peers, in ms.
|
||||
pruneInterval* = chronos.milliseconds(1000) ## Interval at which message
|
||||
## queue is pruned, in ms.
|
||||
topicInterestMax = 1000
|
||||
topicInterestMax = 10000
|
||||
|
||||
type
|
||||
WakuConfig* = object
|
||||
powRequirement*: float64
|
||||
bloom*: Bloom
|
||||
bloom*: Option[Bloom]
|
||||
isLightNode*: bool
|
||||
maxMsgSize*: uint32
|
||||
confirmationsEnabled*: bool
|
||||
@ -199,7 +199,7 @@ proc allowed*(msg: Message, config: WakuConfig): bool =
|
||||
warn "Message topic does not match Waku topic list"
|
||||
return false
|
||||
else:
|
||||
if not bloomFilterMatch(config.bloom, msg.bloom):
|
||||
if config.bloom.isSome() and not bloomFilterMatch(config.bloom.get(), msg.bloom):
|
||||
dropped_bloom_filter_mismatch_envelopes.inc()
|
||||
warn "Message does not match node bloom filter"
|
||||
return false
|
||||
@ -213,7 +213,7 @@ proc initProtocolState*(network: WakuNetwork, node: EthereumNode) {.gcsafe.} =
|
||||
new(network.queue)
|
||||
network.queue[] = initQueue(defaultQueueCapacity)
|
||||
network.filters = initTable[string, Filter]()
|
||||
network.config.bloom = fullBloom()
|
||||
network.config.bloom = some(fullBloom())
|
||||
network.config.powRequirement = defaultMinPow
|
||||
network.config.isLightNode = false
|
||||
# RateLimits and confirmations are not yet implemented so we set confirmations
|
||||
@ -237,7 +237,7 @@ p2pProtocol Waku(version = wakuVersion,
|
||||
|
||||
let list = StatusOptions(
|
||||
powRequirement: some(wakuNet.config.powRequirement),
|
||||
bloomFilter: some(wakuNet.config.bloom),
|
||||
bloomFilter: wakuNet.config.bloom,
|
||||
lightNode: some(wakuNet.config.isLightNode),
|
||||
confirmationsEnabled: some(wakuNet.config.confirmationsEnabled),
|
||||
rateLimits: wakuNet.config.rateLimits,
|
||||
@ -328,40 +328,24 @@ p2pProtocol Waku(version = wakuVersion,
|
||||
# notify filters of this message
|
||||
peer.networkState.filters.notify(msg)
|
||||
|
||||
proc powRequirement(peer: Peer, value: uint64) =
|
||||
nextID 22
|
||||
|
||||
proc statusOptions(peer: Peer, options: StatusOptions) =
|
||||
if not peer.state.initialized:
|
||||
warn "Handshake not completed yet, discarding powRequirement"
|
||||
warn "Handshake not completed yet, discarding statusOptions"
|
||||
return
|
||||
|
||||
peer.state.powRequirement = cast[float64](value)
|
||||
if options.topicInterest.isSome():
|
||||
peer.state.topics = options.topicInterest
|
||||
elif options.bloomFilter.isSome():
|
||||
peer.state.bloom = options.bloomFilter.get()
|
||||
peer.state.topics = none(seq[Topic])
|
||||
|
||||
proc bloomFilterExchange(peer: Peer, bloom: Bytes) =
|
||||
if not peer.state.initialized:
|
||||
warn "Handshake not completed yet, discarding bloomFilterExchange"
|
||||
return
|
||||
if options.powRequirement.isSome():
|
||||
peer.state.powRequirement = options.powRequirement.get()
|
||||
|
||||
if bloom.len == bloomSize:
|
||||
peer.state.bloom.bytesCopy(bloom)
|
||||
|
||||
nextID 20
|
||||
|
||||
proc rateLimits(peer: Peer, rateLimits: RateLimits) = discard
|
||||
|
||||
proc topicInterest(peer: Peer, topics: openarray[Topic]) =
|
||||
if not peer.state.initialized:
|
||||
warn "Handshake not completed yet, discarding topicInterest"
|
||||
return
|
||||
|
||||
if topics.len > topicInterestMax:
|
||||
error "Too many topics in the topic-interest list"
|
||||
return
|
||||
|
||||
# TODO: We currently do not allow changing topic-interest.
|
||||
# We could allow this by also resetting topic-interest on a bloom filter
|
||||
# exchange. But we don't for now and will instead introduce another packet
|
||||
# type in the Waku Specification.
|
||||
if peer.state.topics.isSome():
|
||||
peer.state.topics = some(topics)
|
||||
if options.lightNode.isSome():
|
||||
peer.state.isLightNode = options.lightNode.get()
|
||||
|
||||
nextID 126
|
||||
|
||||
@ -584,8 +568,9 @@ proc setPowRequirement*(node: EthereumNode, powReq: float64) {.async.} =
|
||||
# NOTE: do we need a tolerance of old PoW for some time?
|
||||
node.protocolState(Waku).config.powRequirement = powReq
|
||||
var futures: seq[Future[void]] = @[]
|
||||
let list = StatusOptions(powRequirement: some(powReq))
|
||||
for peer in node.peers(Waku):
|
||||
futures.add(peer.powRequirement(cast[uint64](powReq)))
|
||||
futures.add(peer.statusOptions(list))
|
||||
|
||||
# Exceptions from sendMsg will not be raised
|
||||
await allFutures(futures)
|
||||
@ -596,10 +581,14 @@ proc setBloomFilter*(node: EthereumNode, bloom: Bloom) {.async.} =
|
||||
##
|
||||
## Failures when sending messages to peers will not be reported.
|
||||
# NOTE: do we need a tolerance of old bloom filter for some time?
|
||||
node.protocolState(Waku).config.bloom = bloom
|
||||
node.protocolState(Waku).config.bloom = some(bloom)
|
||||
# reset topics
|
||||
node.protocolState(Waku).config.topics = none(seq[Topic])
|
||||
|
||||
var futures: seq[Future[void]] = @[]
|
||||
let list = StatusOptions(bloomFilter: some(bloom))
|
||||
for peer in node.peers(Waku):
|
||||
futures.add(peer.bloomFilterExchange(@bloom))
|
||||
futures.add(peer.statusOptions(list))
|
||||
|
||||
# Exceptions from sendMsg will not be raised
|
||||
await allFutures(futures)
|
||||
@ -609,13 +598,12 @@ proc setTopicInterest*(node: EthereumNode, topics: seq[Topic]):
|
||||
if topics.len > topicInterestMax:
|
||||
return false
|
||||
|
||||
if node.protocolState(Waku).config.topics.isNone():
|
||||
return false
|
||||
|
||||
node.protocolState(Waku).config.topics = some(topics)
|
||||
|
||||
var futures: seq[Future[void]] = @[]
|
||||
let list = StatusOptions(topicInterest: some(topics))
|
||||
for peer in node.peers(Waku):
|
||||
futures.add(peer.topicInterest(topics))
|
||||
futures.add(peer.statusOptions(list))
|
||||
|
||||
# Exceptions from sendMsg will not be raised
|
||||
await allFutures(futures)
|
||||
@ -638,12 +626,17 @@ proc setPeerTrusted*(node: EthereumNode, peerId: NodeId): bool =
|
||||
peer.state(Waku).trusted = true
|
||||
return true
|
||||
|
||||
proc setLightNode*(node: EthereumNode, isLightNode: bool) =
|
||||
proc setLightNode*(node: EthereumNode, isLightNode: bool) {.async.} =
|
||||
## Set this node as a Waku light node.
|
||||
##
|
||||
## NOTE: Should be run before connection is made with peers as this
|
||||
## setting is only communicated at peer handshake.
|
||||
node.protocolState(Waku).config.isLightNode = isLightNode
|
||||
# TODO: Add starting/stopping of `processQueue` loop depending on value of isLightNode.
|
||||
var futures: seq[Future[void]] = @[]
|
||||
let list = StatusOptions(lightNode: some(isLightNode))
|
||||
for peer in node.peers(Waku):
|
||||
futures.add(peer.statusOptions(list))
|
||||
|
||||
# Exceptions from sendMsg will not be raised
|
||||
await allFutures(futures)
|
||||
|
||||
proc configureWaku*(node: EthereumNode, config: WakuConfig) =
|
||||
## Apply a Waku configuration.
|
||||
|
@ -15,6 +15,18 @@ import
|
||||
const
|
||||
safeTTL = 5'u32
|
||||
waitInterval = messageInterval + 150.milliseconds
|
||||
conditionTimeoutMs = 3000
|
||||
|
||||
# check on a condition until true or return a future containing false
|
||||
# if timeout expires first
|
||||
proc eventually(timeout: int, condition: proc(): bool): Future[bool] =
|
||||
let wrappedCondition = proc(): Future[bool] {.async.} =
|
||||
let f = newFuture[bool]()
|
||||
while not condition():
|
||||
await sleepAsync(100.milliseconds)
|
||||
f.complete(true)
|
||||
return await f
|
||||
return withTimeout(wrappedCondition(), timeout)
|
||||
|
||||
# TODO: Just repeat all the test_shh_connect tests here that are applicable or
|
||||
# have some commonly shared test code for both protocols.
|
||||
@ -44,6 +56,125 @@ suite "Waku connections":
|
||||
p2.isNil == false
|
||||
p3.isNil == false
|
||||
|
||||
asyncTest "Waku set-topic-interest":
|
||||
var
|
||||
wakuTopicNode = setupTestNode(Waku)
|
||||
wakuNode = setupTestNode(Waku)
|
||||
|
||||
let
|
||||
topic1 = [byte 0xDA, 0xDA, 0xDA, 0xAA]
|
||||
topic2 = [byte 0xD0, 0xD0, 0xD0, 0x00]
|
||||
wrongTopic = [byte 0x4B, 0x1D, 0x4B, 0x1D]
|
||||
|
||||
# Set one topic so we are not considered a full node
|
||||
wakuTopicNode.protocolState(Waku).config.topics = some(@[topic1])
|
||||
|
||||
wakuNode.startListening()
|
||||
await wakuTopicNode.peerPool.connectToNode(newNode(
|
||||
initENode(wakuNode.keys.pubKey, wakuNode.address)))
|
||||
|
||||
# Update topic interest
|
||||
check:
|
||||
await setTopicInterest(wakuTopicNode, @[topic1, topic2])
|
||||
|
||||
let payload = repeat(byte 0, 10)
|
||||
check:
|
||||
wakuNode.postMessage(ttl = safeTTL, topic = topic1, payload = payload)
|
||||
wakuNode.postMessage(ttl = safeTTL, topic = topic2, payload = payload)
|
||||
wakuNode.postMessage(ttl = safeTTL, topic = wrongTopic, payload = payload)
|
||||
wakuNode.protocolState(Waku).queue.items.len == 3
|
||||
await sleepAsync(waitInterval)
|
||||
check:
|
||||
wakuTopicNode.protocolState(Waku).queue.items.len == 2
|
||||
|
||||
asyncTest "Waku set-minimum-pow":
|
||||
var
|
||||
wakuPowNode = setupTestNode(Waku)
|
||||
wakuNode = setupTestNode(Waku)
|
||||
|
||||
wakuNode.startListening()
|
||||
await wakuPowNode.peerPool.connectToNode(newNode(
|
||||
initENode(wakuNode.keys.pubKey, wakuNode.address)))
|
||||
|
||||
# Update minimum pow
|
||||
await setPowRequirement(wakuPowNode, 1.0)
|
||||
await sleepAsync(waitInterval)
|
||||
|
||||
check:
|
||||
wakuNode.peerPool.len == 1
|
||||
|
||||
# check powRequirement is updated
|
||||
for peer in wakuNode.peerPool.peers:
|
||||
check:
|
||||
peer.state(Waku).powRequirement == 1.0
|
||||
|
||||
asyncTest "Waku set-light-node":
|
||||
var
|
||||
wakuLightNode = setupTestNode(Waku)
|
||||
wakuNode = setupTestNode(Waku)
|
||||
|
||||
wakuNode.startListening()
|
||||
await wakuLightNode.peerPool.connectToNode(newNode(
|
||||
initENode(wakuNode.keys.pubKey, wakuNode.address)))
|
||||
|
||||
# Update minimum pow
|
||||
await setLightNode(wakuLightNode, true)
|
||||
await sleepAsync(waitInterval)
|
||||
|
||||
check:
|
||||
wakuNode.peerPool.len == 1
|
||||
|
||||
# check lightNode is updated
|
||||
for peer in wakuNode.peerPool.peers:
|
||||
check:
|
||||
peer.state(Waku).isLightNode
|
||||
|
||||
asyncTest "Waku set-bloom-filter":
|
||||
var
|
||||
wakuBloomNode = setupTestNode(Waku)
|
||||
wakuNode = setupTestNode(Waku)
|
||||
bloom = fullBloom()
|
||||
topics = @[[byte 0xDA, 0xDA, 0xDA, 0xAA]]
|
||||
|
||||
# Set topic interest
|
||||
discard await wakuBloomNode.setTopicInterest(topics)
|
||||
|
||||
wakuBloomNode.startListening()
|
||||
await wakuNode.peerPool.connectToNode(newNode(
|
||||
initENode(wakuBloomNode.keys.pubKey, wakuBloomNode.address)))
|
||||
|
||||
# Sanity check
|
||||
check:
|
||||
wakuNode.peerPool.len == 1
|
||||
|
||||
# check bloom filter is updated
|
||||
for peer in wakuNode.peerPool.peers:
|
||||
check:
|
||||
peer.state(Waku).bloom == bloom
|
||||
peer.state(Waku).topics == some(topics)
|
||||
|
||||
let hasBloomNodeConnectedCondition = proc(): bool = wakuBloomNode.peerPool.len == 1
|
||||
# wait for the peer to be connected on the other side
|
||||
let hasBloomNodeConnected = await eventually(conditionTimeoutMs, hasBloomNodeConnectedCondition)
|
||||
# check bloom filter is updated
|
||||
check:
|
||||
hasBloomNodeConnected
|
||||
|
||||
# disable one bit in the bloom filter
|
||||
bloom[0] = 0x0
|
||||
|
||||
# and set it
|
||||
await setBloomFilter(wakuBloomNode, bloom)
|
||||
|
||||
let bloomFilterUpdatedCondition = proc(): bool =
|
||||
for peer in wakuNode.peerPool.peers:
|
||||
return peer.state(Waku).bloom == bloom and peer.state(Waku).topics == none(seq[Topic])
|
||||
|
||||
let bloomFilterUpdated = await eventually(conditionTimeoutMs, bloomFilterUpdatedCondition)
|
||||
# check bloom filter is updated
|
||||
check:
|
||||
bloomFilterUpdated
|
||||
|
||||
asyncTest "Waku topic-interest":
|
||||
var
|
||||
wakuTopicNode = setupTestNode(Waku)
|
||||
@ -66,9 +197,10 @@ suite "Waku connections":
|
||||
wakuNode.postMessage(ttl = safeTTL, topic = topic2, payload = payload)
|
||||
wakuNode.postMessage(ttl = safeTTL, topic = wrongTopic, payload = payload)
|
||||
wakuNode.protocolState(Waku).queue.items.len == 3
|
||||
await sleepAsync(waitInterval)
|
||||
|
||||
let response = await eventually(conditionTimeoutMs, proc (): bool = wakuTopicNode.protocolState(Waku).queue.items.len == 2)
|
||||
check:
|
||||
wakuTopicNode.protocolState(Waku).queue.items.len == 2
|
||||
response
|
||||
|
||||
asyncTest "Waku topic-interest versus bloom filter":
|
||||
var
|
||||
@ -82,7 +214,7 @@ suite "Waku connections":
|
||||
|
||||
# It was checked that the topics don't trigger false positives on the bloom.
|
||||
wakuTopicNode.protocolState(Waku).config.topics = some(@[topic1, topic2])
|
||||
wakuTopicNode.protocolState(Waku).config.bloom = toBloom([bloomTopic])
|
||||
wakuTopicNode.protocolState(Waku).config.bloom = some(toBloom([bloomTopic]))
|
||||
|
||||
wakuNode.startListening()
|
||||
await wakuTopicNode.peerPool.connectToNode(newNode(
|
||||
@ -100,7 +232,7 @@ suite "Waku connections":
|
||||
|
||||
asyncTest "Light node posting":
|
||||
var ln = setupTestNode(Waku)
|
||||
ln.setLightNode(true)
|
||||
await ln.setLightNode(true)
|
||||
var fn = setupTestNode(Waku)
|
||||
fn.startListening()
|
||||
await ln.peerPool.connectToNode(newNode(initENode(fn.keys.pubKey,
|
||||
|
Loading…
x
Reference in New Issue
Block a user