Handle waku status-update code 22

This commits adds handling of the status-update code, 22, according to
waku specs 0.4 a6a9d6bcb1

Removes unused codes 20,21
This commit is contained in:
Andrea Maria Piana 2020-02-25 15:37:52 +01:00
parent 3a4f97cb13
commit 9dcbb91ae0
No known key found for this signature in database
GPG Key ID: AA6CCA6DE0E06424
2 changed files with 152 additions and 46 deletions

View File

@ -74,7 +74,7 @@ const
type type
WakuConfig* = object WakuConfig* = object
powRequirement*: float64 powRequirement*: float64
bloom*: Bloom bloom*: Option[Bloom]
isLightNode*: bool isLightNode*: bool
maxMsgSize*: uint32 maxMsgSize*: uint32
confirmationsEnabled*: bool confirmationsEnabled*: bool
@ -199,7 +199,7 @@ proc allowed*(msg: Message, config: WakuConfig): bool =
warn "Message topic does not match Waku topic list" warn "Message topic does not match Waku topic list"
return false return false
else: else:
if not bloomFilterMatch(config.bloom, msg.bloom): if config.bloom.isSome() and not bloomFilterMatch(config.bloom.get(), msg.bloom):
dropped_bloom_filter_mismatch_envelopes.inc() dropped_bloom_filter_mismatch_envelopes.inc()
warn "Message does not match node bloom filter" warn "Message does not match node bloom filter"
return false return false
@ -213,7 +213,7 @@ proc initProtocolState*(network: WakuNetwork, node: EthereumNode) {.gcsafe.} =
new(network.queue) new(network.queue)
network.queue[] = initQueue(defaultQueueCapacity) network.queue[] = initQueue(defaultQueueCapacity)
network.filters = initTable[string, Filter]() network.filters = initTable[string, Filter]()
network.config.bloom = fullBloom() network.config.bloom = some(fullBloom())
network.config.powRequirement = defaultMinPow network.config.powRequirement = defaultMinPow
network.config.isLightNode = false network.config.isLightNode = false
# RateLimits and confirmations are not yet implemented so we set confirmations # RateLimits and confirmations are not yet implemented so we set confirmations
@ -237,7 +237,7 @@ p2pProtocol Waku(version = wakuVersion,
let list = StatusOptions( let list = StatusOptions(
powRequirement: some(wakuNet.config.powRequirement), powRequirement: some(wakuNet.config.powRequirement),
bloomFilter: some(wakuNet.config.bloom), bloomFilter: wakuNet.config.bloom,
lightNode: some(wakuNet.config.isLightNode), lightNode: some(wakuNet.config.isLightNode),
confirmationsEnabled: some(wakuNet.config.confirmationsEnabled), confirmationsEnabled: some(wakuNet.config.confirmationsEnabled),
rateLimits: wakuNet.config.rateLimits, rateLimits: wakuNet.config.rateLimits,
@ -328,40 +328,24 @@ p2pProtocol Waku(version = wakuVersion,
# notify filters of this message # notify filters of this message
peer.networkState.filters.notify(msg) peer.networkState.filters.notify(msg)
proc powRequirement(peer: Peer, value: uint64) = nextID 22
proc statusOptions(peer: Peer, options: StatusOptions) =
if not peer.state.initialized: if not peer.state.initialized:
warn "Handshake not completed yet, discarding powRequirement" warn "Handshake not completed yet, discarding statusOptions"
return return
peer.state.powRequirement = cast[float64](value) if options.topicInterest.isSome():
peer.state.topics = options.topicInterest
elif options.bloomFilter.isSome():
peer.state.bloom = options.bloomFilter.get()
peer.state.topics = none(seq[Topic])
proc bloomFilterExchange(peer: Peer, bloom: Bytes) = if options.powRequirement.isSome():
if not peer.state.initialized: peer.state.powRequirement = options.powRequirement.get()
warn "Handshake not completed yet, discarding bloomFilterExchange"
return
if bloom.len == bloomSize: if options.lightNode.isSome():
peer.state.bloom.bytesCopy(bloom) peer.state.isLightNode = options.lightNode.get()
nextID 20
proc rateLimits(peer: Peer, rateLimits: RateLimits) = discard
proc topicInterest(peer: Peer, topics: openarray[Topic]) =
if not peer.state.initialized:
warn "Handshake not completed yet, discarding topicInterest"
return
if topics.len > topicInterestMax:
error "Too many topics in the topic-interest list"
return
# TODO: We currently do not allow changing topic-interest.
# We could allow this by also resetting topic-interest on a bloom filter
# exchange. But we don't for now and will instead introduce another packet
# type in the Waku Specification.
if peer.state.topics.isSome():
peer.state.topics = some(topics)
nextID 126 nextID 126
@ -584,8 +568,9 @@ proc setPowRequirement*(node: EthereumNode, powReq: float64) {.async.} =
# NOTE: do we need a tolerance of old PoW for some time? # NOTE: do we need a tolerance of old PoW for some time?
node.protocolState(Waku).config.powRequirement = powReq node.protocolState(Waku).config.powRequirement = powReq
var futures: seq[Future[void]] = @[] var futures: seq[Future[void]] = @[]
let list = StatusOptions(powRequirement: some(powReq))
for peer in node.peers(Waku): for peer in node.peers(Waku):
futures.add(peer.powRequirement(cast[uint64](powReq))) futures.add(peer.statusOptions(list))
# Exceptions from sendMsg will not be raised # Exceptions from sendMsg will not be raised
await allFutures(futures) await allFutures(futures)
@ -596,10 +581,14 @@ proc setBloomFilter*(node: EthereumNode, bloom: Bloom) {.async.} =
## ##
## Failures when sending messages to peers will not be reported. ## Failures when sending messages to peers will not be reported.
# NOTE: do we need a tolerance of old bloom filter for some time? # NOTE: do we need a tolerance of old bloom filter for some time?
node.protocolState(Waku).config.bloom = bloom node.protocolState(Waku).config.bloom = some(bloom)
# reset topics
node.protocolState(Waku).config.topics = none(seq[Topic])
var futures: seq[Future[void]] = @[] var futures: seq[Future[void]] = @[]
let list = StatusOptions(bloomFilter: some(bloom))
for peer in node.peers(Waku): for peer in node.peers(Waku):
futures.add(peer.bloomFilterExchange(@bloom)) futures.add(peer.statusOptions(list))
# Exceptions from sendMsg will not be raised # Exceptions from sendMsg will not be raised
await allFutures(futures) await allFutures(futures)
@ -609,13 +598,12 @@ proc setTopicInterest*(node: EthereumNode, topics: seq[Topic]):
if topics.len > topicInterestMax: if topics.len > topicInterestMax:
return false return false
if node.protocolState(Waku).config.topics.isNone():
return false
node.protocolState(Waku).config.topics = some(topics) node.protocolState(Waku).config.topics = some(topics)
var futures: seq[Future[void]] = @[] var futures: seq[Future[void]] = @[]
let list = StatusOptions(topicInterest: some(topics))
for peer in node.peers(Waku): for peer in node.peers(Waku):
futures.add(peer.topicInterest(topics)) futures.add(peer.statusOptions(list))
# Exceptions from sendMsg will not be raised # Exceptions from sendMsg will not be raised
await allFutures(futures) await allFutures(futures)
@ -638,12 +626,17 @@ proc setPeerTrusted*(node: EthereumNode, peerId: NodeId): bool =
peer.state(Waku).trusted = true peer.state(Waku).trusted = true
return true return true
proc setLightNode*(node: EthereumNode, isLightNode: bool) = proc setLightNode*(node: EthereumNode, isLightNode: bool) {.async.} =
## Set this node as a Waku light node. ## Set this node as a Waku light node.
##
## NOTE: Should be run before connection is made with peers as this
## setting is only communicated at peer handshake.
node.protocolState(Waku).config.isLightNode = isLightNode node.protocolState(Waku).config.isLightNode = isLightNode
# TODO: Add starting/stopping of `processQueue` loop depending on value of isLightNode.
var futures: seq[Future[void]] = @[]
let list = StatusOptions(lightNode: some(isLightNode))
for peer in node.peers(Waku):
futures.add(peer.statusOptions(list))
# Exceptions from sendMsg will not be raised
await allFutures(futures)
proc configureWaku*(node: EthereumNode, config: WakuConfig) = proc configureWaku*(node: EthereumNode, config: WakuConfig) =
## Apply a Waku configuration. ## Apply a Waku configuration.

View File

@ -44,6 +44,119 @@ suite "Waku connections":
p2.isNil == false p2.isNil == false
p3.isNil == false p3.isNil == false
asyncTest "Waku set-topic-interest":
var
wakuTopicNode = setupTestNode(Waku)
wakuNode = setupTestNode(Waku)
let
topic1 = [byte 0xDA, 0xDA, 0xDA, 0xAA]
topic2 = [byte 0xD0, 0xD0, 0xD0, 0x00]
wrongTopic = [byte 0x4B, 0x1D, 0x4B, 0x1D]
# Set one topic so we are not considered a full node
wakuTopicNode.protocolState(Waku).config.topics = some(@[topic1])
wakuNode.startListening()
await wakuTopicNode.peerPool.connectToNode(newNode(
initENode(wakuNode.keys.pubKey, wakuNode.address)))
# Update topic interest
check:
await setTopicInterest(wakuTopicNode, @[topic1, topic2])
let payload = repeat(byte 0, 10)
check:
wakuNode.postMessage(ttl = safeTTL, topic = topic1, payload = payload)
wakuNode.postMessage(ttl = safeTTL, topic = topic2, payload = payload)
wakuNode.postMessage(ttl = safeTTL, topic = wrongTopic, payload = payload)
wakuNode.protocolState(Waku).queue.items.len == 3
await sleepAsync(waitInterval)
check:
wakuTopicNode.protocolState(Waku).queue.items.len == 2
asyncTest "Waku set-minimum-pow":
var
wakuPowNode = setupTestNode(Waku)
wakuNode = setupTestNode(Waku)
wakuNode.startListening()
await wakuPowNode.peerPool.connectToNode(newNode(
initENode(wakuNode.keys.pubKey, wakuNode.address)))
# Update minimum pow
await setPowRequirement(wakuPowNode, 1.0)
await sleepAsync(waitInterval)
check:
wakuNode.peerPool.len == 1
# check powRequirement is updated
for peer in wakuNode.peerPool.peers:
check:
peer.state(Waku).powRequirement == 1.0
asyncTest "Waku set-light-node":
var
wakuLightNode = setupTestNode(Waku)
wakuNode = setupTestNode(Waku)
wakuNode.startListening()
await wakuLightNode.peerPool.connectToNode(newNode(
initENode(wakuNode.keys.pubKey, wakuNode.address)))
# Update minimum pow
await setLightNode(wakuLightNode, true)
await sleepAsync(waitInterval)
check:
wakuNode.peerPool.len == 1
# check lightNode is updated
for peer in wakuNode.peerPool.peers:
check:
peer.state(Waku).isLightNode
asyncTest "Waku set-bloom-filter":
var
wakuBloomNode = setupTestNode(Waku)
wakuNode = setupTestNode(Waku)
bloom = fullBloom()
topics = @[[byte 0xDA, 0xDA, 0xDA, 0xAA]]
# Set topic interest
discard await wakuBloomNode.setTopicInterest(topics)
wakuBloomNode.startListening()
await wakuNode.peerPool.connectToNode(newNode(
initENode(wakuBloomNode.keys.pubKey, wakuBloomNode.address)))
# Sanity check
check:
wakuNode.peerPool.len == 1
# check bloom filter is updated
for peer in wakuNode.peerPool.peers:
check:
peer.state(Waku).bloom == bloom
peer.state(Waku).topics == some(topics)
# disable one bit in the bloom filter
bloom[0] = 0x0
# and set it
await setBloomFilter(wakuBloomNode, bloom)
await sleepAsync(waitInterval)
# check bloom filter is updated
check:
wakuNode.peerPool.len == 1
for peer in wakuNode.peerPool.peers:
check:
peer.state(Waku).bloom == bloom
peer.state(Waku).topics == none(seq[Topic])
asyncTest "Waku topic-interest": asyncTest "Waku topic-interest":
var var
wakuTopicNode = setupTestNode(Waku) wakuTopicNode = setupTestNode(Waku)
@ -82,7 +195,7 @@ suite "Waku connections":
# It was checked that the topics don't trigger false positives on the bloom. # It was checked that the topics don't trigger false positives on the bloom.
wakuTopicNode.protocolState(Waku).config.topics = some(@[topic1, topic2]) wakuTopicNode.protocolState(Waku).config.topics = some(@[topic1, topic2])
wakuTopicNode.protocolState(Waku).config.bloom = toBloom([bloomTopic]) wakuTopicNode.protocolState(Waku).config.bloom = some(toBloom([bloomTopic]))
wakuNode.startListening() wakuNode.startListening()
await wakuTopicNode.peerPool.connectToNode(newNode( await wakuTopicNode.peerPool.connectToNode(newNode(
@ -100,7 +213,7 @@ suite "Waku connections":
asyncTest "Light node posting": asyncTest "Light node posting":
var ln = setupTestNode(Waku) var ln = setupTestNode(Waku)
ln.setLightNode(true) await ln.setLightNode(true)
var fn = setupTestNode(Waku) var fn = setupTestNode(Waku)
fn.startListening() fn.startListening()
await ln.peerPool.connectToNode(newNode(initENode(fn.keys.pubKey, await ln.peerPool.connectToNode(newNode(initENode(fn.keys.pubKey,