Add StatusOptions alist and remove WakuMode

This commit is contained in:
kdeme 2020-02-04 13:38:55 +01:00
parent 40b96e2d3c
commit 18799f1491
No known key found for this signature in database
GPG Key ID: 4E8DD21420AF43F5
4 changed files with 167 additions and 144 deletions

View File

@ -54,7 +54,6 @@ proc runP2pTests() =
"test_waku_connect", "test_waku_connect",
"test_waku_bridge", "test_waku_bridge",
"test_waku_mail", "test_waku_mail",
"test_waku_mode",
"test_protocol_handlers", "test_protocol_handlers",
"test_enr", "test_enr",
]: ]:

View File

@ -72,16 +72,6 @@ const
topicInterestMax = 1000 topicInterestMax = 1000
type type
WakuMode* = enum
# TODO: is there a reason to allow such "none" mode? This was originally
# put here when it was still supposed to be compatible with Whisper.
None, # No Waku mode
WakuChan, # Waku client
WakuSan # Waku node
# TODO: Light mode could also become part of this enum
# TODO: With discv5, this could be capabilities also announced at level of
# discovery.
WakuConfig* = object WakuConfig* = object
powRequirement*: float64 powRequirement*: float64
bloom*: Bloom bloom*: Bloom
@ -89,8 +79,7 @@ type
maxMsgSize*: uint32 maxMsgSize*: uint32
confirmationsEnabled*: bool confirmationsEnabled*: bool
rateLimits*: RateLimits rateLimits*: RateLimits
wakuMode*: WakuMode topics*: Option[seq[Topic]]
topics*: seq[Topic]
WakuPeer = ref object WakuPeer = ref object
initialized: bool # when successfully completed the handshake initialized: bool # when successfully completed the handshake
@ -98,8 +87,7 @@ type
bloom*: Bloom bloom*: Bloom
isLightNode*: bool isLightNode*: bool
trusted*: bool trusted*: bool
wakuMode*: WakuMode topics*: Option[seq[Topic]]
topics*: seq[Topic]
received: HashSet[Hash] received: HashSet[Hash]
P2PRequestHandler* = proc(peer: Peer, envelope: Envelope) {.gcsafe.} P2PRequestHandler* = proc(peer: Peer, envelope: Envelope) {.gcsafe.}
@ -116,6 +104,89 @@ type
limitPeerId*: uint limitPeerId*: uint
limitTopic*: uint limitTopic*: uint
StatusOptions* = object
powRequirement*: Option[(float64)]
bloomFilter*: Option[Bloom]
lightNode*: Option[bool]
confirmationsEnabled*: Option[bool]
rateLimits*: Option[RateLimits]
topicInterest*: Option[seq[Topic]]
KeyKind* = enum
powRequirementKey,
bloomFilterKey,
lightNodeKey,
confirmationsEnabledKey,
rateLimitsKey,
topicInterestKey
template countSomeFields*(x: StatusOptions): int =
var count = 0
for f in fields(x):
if f.isSome():
inc count
count
proc append*(rlpWriter: var RlpWriter, value: StatusOptions) =
var list = initRlpList(countSomeFields(value))
if value.powRequirement.isSome():
list.append((powRequirementKey, cast[uint64](value.powRequirement.get())))
if value.bloomFilter.isSome():
list.append((bloomFilterKey, @(value.bloomFilter.get())))
if value.lightNode.isSome():
list.append((lightNodeKey, value.lightNode.get()))
if value.confirmationsEnabled.isSome():
list.append((confirmationsEnabledKey, value.confirmationsEnabled.get()))
if value.rateLimits.isSome():
list.append((rateLimitsKey, value.rateLimits.get()))
if value.topicInterest.isSome():
list.append((topicInterestKey, value.topicInterest.get()))
let bytes = list.finish()
rlpWriter.append(rlpFromBytes(bytes.toRange))
proc read*(rlp: var Rlp, T: typedesc[StatusOptions]): T =
if not rlp.isList():
raise newException(RlpTypeMismatch,
"List expected, but the source RLP is not a list.")
let sz = rlp.listLen()
rlp.enterList()
for i in 0 ..< sz:
if not rlp.isList():
raise newException(RlpTypeMismatch,
"List expected, but the source RLP is not a list.")
rlp.enterList()
var k: KeyKind
try:
k = rlp.read(KeyKind)
except RlpTypeMismatch:
# skip unknown keys and their value
rlp.skipElem()
rlp.skipElem()
continue
case k
of powRequirementKey:
let pow = rlp.read(uint64)
result.powRequirement = some(cast[float64](pow))
of bloomFilterKey:
let bloom = rlp.read(seq[byte])
if bloom.len != bloomSize:
raise newException(UselessPeerError, "Bloomfilter size mismatch")
var bloomFilter: Bloom
bloomFilter.bytesCopy(bloom)
result.bloomFilter = some(bloomFilter)
of lightNodeKey:
result.lightNode = some(rlp.read(bool))
of confirmationsEnabledKey:
result.confirmationsEnabled = some(rlp.read(bool))
of rateLimitsKey:
result.rateLimits = some(rlp.read(RateLimits))
of topicInterestKey:
result.topicInterest = some(rlp.read(seq[Topic]))
proc allowed*(msg: Message, config: WakuConfig): bool = proc allowed*(msg: Message, config: WakuConfig): bool =
# Check max msg size, already happens in RLPx but there is a specific waku # Check max msg size, already happens in RLPx but there is a specific waku
@ -135,8 +206,8 @@ proc allowed*(msg: Message, config: WakuConfig): bool =
warn "Message does not match node bloom filter" warn "Message does not match node bloom filter"
return false return false
if config.wakuMode == WakuChan: if config.topics.isSome():
if msg.env.topic notin config.topics: if msg.env.topic notin config.topics.get():
dropped_topic_mismatch_envelopes.inc() dropped_topic_mismatch_envelopes.inc()
warn "Message topic does not match Waku topic list" warn "Message topic does not match Waku topic list"
return false return false
@ -160,8 +231,7 @@ proc initProtocolState*(network: WakuNetwork, node: EthereumNode) {.gcsafe.} =
network.config.rateLimits = network.config.rateLimits =
RateLimits(limitIp: 0, limitPeerId: 0, limitTopic:0) RateLimits(limitIp: 0, limitPeerId: 0, limitTopic:0)
network.config.maxMsgSize = defaultMaxMsgSize network.config.maxMsgSize = defaultMaxMsgSize
network.config.wakuMode = None # default no waku mode network.config.topics = none(seq[Topic])
network.config.topics = @[]
asyncCheck node.run(network) asyncCheck node.run(network)
p2pProtocol Waku(version = wakuVersion, p2pProtocol Waku(version = wakuVersion,
@ -175,49 +245,37 @@ p2pProtocol Waku(version = wakuVersion,
wakuNet = peer.networkState wakuNet = peer.networkState
wakuPeer = peer.state wakuPeer = peer.state
let m = await peer.status(wakuVersion, let list = StatusOptions(
cast[uint64](wakuNet.config.powRequirement), powRequirement: some(wakuNet.config.powRequirement),
@(wakuNet.config.bloom), bloomFilter: some(wakuNet.config.bloom),
wakuNet.config.isLightNode, lightNode: some(wakuNet.config.isLightNode),
wakuNet.config.confirmationsEnabled, confirmationsEnabled: some(wakuNet.config.confirmationsEnabled),
wakuNet.config.rateLimits, rateLimits: some(wakuNet.config.rateLimits),
wakuNet.config.wakuMode, topicInterest: wakuNet.config.topics)
wakuNet.config.topics,
timeout = chronos.milliseconds(500)) let m = await peer.status(wakuVersion, list,
timeout = chronos.milliseconds(500))
if m.protocolVersion == wakuVersion: if m.protocolVersion == wakuVersion:
debug "Waku peer", peer, wakuVersion debug "Waku peer", peer, wakuVersion
else: else:
raise newException(UselessPeerError, "Incompatible Waku version") raise newException(UselessPeerError, "Incompatible Waku version")
wakuPeer.powRequirement = cast[float64](m.powConverted) wakuPeer.powRequirement = m.list.powRequirement.get(defaultMinPow)
wakuPeer.bloom = m.list.bloomFilter.get(fullBloom())
if m.bloom.len > 0: wakuPeer.isLightNode = m.list.lightNode.get(false)
if m.bloom.len != bloomSize:
raise newException(UselessPeerError, "Bloomfilter size mismatch")
else:
wakuPeer.bloom.bytesCopy(m.bloom)
else:
# If no bloom filter is send we allow all
wakuPeer.bloom = fullBloom()
wakuPeer.isLightNode = m.isLightNode
if wakuPeer.isLightNode and wakuNet.config.isLightNode: if wakuPeer.isLightNode and wakuNet.config.isLightNode:
# No sense in connecting two light nodes so we disconnect # No sense in connecting two light nodes so we disconnect
raise newException(UselessPeerError, "Two light nodes connected") raise newException(UselessPeerError, "Two light nodes connected")
# When Waku-san connect to all. When None, connect to all, Waku-chan has wakuPeer.topics = m.list.topicInterest
# to decide to disconnect. When Waku-chan, connect only to Waku-san. if wakuPeer.topics.isSome():
wakuPeer.wakuMode = m.wakuMode if wakuPeer.topics.get().len > topicInterestMax:
if wakuNet.config.wakuMode == WakuChan: raise newException(UselessPeerError, "Topic-interest is too large")
if wakuPeer.wakuMode == WakuChan: if wakuNet.config.topics.isSome():
raise newException(UselessPeerError, "Two Waku-chan connected") raise newException(UselessPeerError,
elif wakuPeer.wakuMode == None: "Two Waku nodes with topic-interest connected")
raise newException(UselessPeerError, "Not in Waku mode")
if wakuNet.config.wakuMode == WakuSan and
wakuPeer.wakuMode == WakuChan:
# TODO: need some maximum check on amount of topics
wakuPeer.topics = m.topics
wakuPeer.received.init() wakuPeer.received.init()
wakuPeer.trusted = false wakuPeer.trusted = false
@ -230,15 +288,7 @@ p2pProtocol Waku(version = wakuVersion,
debug "Waku peer initialized", peer debug "Waku peer initialized", peer
handshake: handshake:
proc status(peer: Peer, proc status(peer: Peer, protocolVersion: uint, list: StatusOptions)
protocolVersion: uint,
powConverted: uint64,
bloom: Bytes,
isLightNode: bool,
confirmationsEnabled: bool,
rateLimits: RateLimits,
wakuMode: WakuMode,
topics: seq[Topic])
proc messages(peer: Peer, envelopes: openarray[Envelope]) = proc messages(peer: Peer, envelopes: openarray[Envelope]) =
if not peer.state.initialized: if not peer.state.initialized:
@ -311,8 +361,11 @@ p2pProtocol Waku(version = wakuVersion,
error "Too many topics in the topic-interest list" error "Too many topics in the topic-interest list"
return return
if peer.state.wakuMode == WakuChan: # TODO: We currently do not allow changing topic-interest.
peer.state.topics = topics # If we want the check here should be removed, however this would be no
# consistent (with Status packet) way of changing back to no topic-interest.
if peer.state.topics.isSome():
peer.state.topics = some(topics)
nextID 126 nextID 126
@ -377,9 +430,8 @@ proc processQueue(peer: Peer) =
trace "Message does not match peer bloom filter" trace "Message does not match peer bloom filter"
continue continue
if wakuNet.config.wakuMode == WakuSan and if wakuPeer.topics.isSome():
wakuPeer.wakuMode == WakuChan: if message.env.topic notin wakuPeer.topics.get():
if message.env.topic notin wakuPeer.topics:
trace "Message does not match topics list" trace "Message does not match topics list"
continue continue
@ -561,7 +613,7 @@ proc setTopics*(node: EthereumNode, topics: seq[Topic]):
if topics.len > topicInterestMax: if topics.len > topicInterestMax:
return false return false
node.protocolState(Waku).config.topics = topics node.protocolState(Waku).config.topics = some(topics)
var futures: seq[Future[void]] = @[] var futures: seq[Future[void]] = @[]
for peer in node.peers(Waku): for peer in node.peers(Waku):
futures.add(peer.topicInterest(topics)) futures.add(peer.topicInterest(topics))

View File

@ -12,11 +12,60 @@ import
eth/p2p/rlpx_protocols/waku_protocol, eth/p2p/peer_pool, eth/p2p/rlpx_protocols/waku_protocol, eth/p2p/peer_pool,
./p2p_test_helper ./p2p_test_helper
const safeTTL = 5'u32 const
safeTTL = 5'u32
waitInterval = messageInterval + 150.milliseconds
# TODO: Just repeat all the test_shh_connect tests here that are applicable or # TODO: Just repeat all the test_shh_connect tests here that are applicable or
# have some commonly shared test code for both protocols. # have some commonly shared test code for both protocols.
suite "Waku connections": suite "Waku connections":
asyncTest "Test Waku connections":
var n1 = setupTestNode(Waku)
var n2 = setupTestNode(Waku)
var n3 = setupTestNode(Waku)
var n4 = setupTestNode(Waku)
var topics: seq[Topic]
n1.protocolState(Waku).config.topics = some(topics)
n2.protocolState(Waku).config.topics = some(topics)
n3.protocolState(Waku).config.topics = none(seq[Topic])
n4.protocolState(Waku).config.topics = none(seq[Topic])
n1.startListening()
n3.startListening()
let p1 = await n2.rlpxConnect(newNode(initENode(n1.keys.pubKey, n1.address)))
let p2 = await n2.rlpxConnect(newNode(initENode(n3.keys.pubKey, n3.address)))
let p3 = await n4.rlpxConnect(newNode(initENode(n3.keys.pubKey, n3.address)))
check:
p1.isNil
p2.isNil == false
p3.isNil == false
asyncTest "Test Waku topic-interest":
var wakuTopicNode = setupTestNode(Waku)
var wakuNode = setupTestNode(Waku)
let topic1 = [byte 0xDA, 0xDA, 0xDA, 0xAA]
let topic2 = [byte 0xD0, 0xD0, 0xD0, 0x00]
let wrongTopic = [byte 0x4B, 0x1D, 0x4B, 0x1D]
wakuTopicNode.protocolState(Waku).config.topics = some(@[topic1, topic2])
wakuNode.startListening()
await wakuTopicNode.peerPool.connectToNode(newNode(initENode(wakuNode.keys.pubKey,
wakuNode.address)))
let payload = repeat(byte 0, 10)
check:
wakuNode.postMessage(ttl = safeTTL, topic = topic1, payload = payload)
wakuNode.postMessage(ttl = safeTTL, topic = topic2, payload = payload)
wakuNode.postMessage(ttl = safeTTL, topic = wrongTopic, payload = payload)
wakuNode.protocolState(Waku).queue.items.len == 3
await sleepAsync(waitInterval)
check:
wakuTopicNode.protocolState(Waku).queue.items.len == 2
asyncTest "Light node posting": asyncTest "Light node posting":
var ln = setupTestNode(Waku) var ln = setupTestNode(Waku)
ln.setLightNode(true) ln.setLightNode(true)

View File

@ -1,77 +0,0 @@
#
# Waku
# (c) Copyright 2019
# Status Research & Development GmbH
#
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
import
sequtils, options, unittest, chronos, eth/[keys, p2p],
eth/p2p/rlpx_protocols/waku_protocol, eth/p2p/peer_pool,
./p2p_test_helper
const
safeTTL = 5'u32
waitInterval = messageInterval + 150.milliseconds
suite "Waku Mode":
asyncTest "Test Waku-chan with Waku-san":
var wakuChan = setupTestNode(Waku)
var wakuSan = setupTestNode(Waku)
let topic1 = [byte 0xDA, 0xDA, 0xDA, 0xAA]
let topic2 = [byte 0xD0, 0xD0, 0xD0, 0x00]
let wrongTopic = [byte 0x4B, 0x1D, 0x4B, 0x1D]
wakuChan.protocolState(Waku).config.wakuMode = WakuChan
wakuChan.protocolState(Waku).config.topics = @[topic1, topic2]
wakuSan.protocolState(Waku).config.wakuMode = WakuSan
wakuSan.startListening()
await wakuChan.peerPool.connectToNode(newNode(initENode(wakuSan.keys.pubKey,
wakuSan.address)))
let payload = repeat(byte 0, 10)
check:
wakuSan.postMessage(ttl = safeTTL, topic = topic1, payload = payload)
wakuSan.postMessage(ttl = safeTTL, topic = topic2, payload = payload)
wakuSan.postMessage(ttl = safeTTL, topic = wrongTopic, payload = payload)
wakuSan.protocolState(Waku).queue.items.len == 3
await sleepAsync(waitInterval)
check:
wakuChan.protocolState(Waku).queue.items.len == 2
asyncTest "Test Waku connections":
var n1 = setupTestNode(Waku)
var n2 = setupTestNode(Waku)
var n3 = setupTestNode(Waku)
var n4 = setupTestNode(Waku)
var n5 = setupTestNode(Waku)
n1.protocolState(Waku).config.wakuMode = WakuMode.None
n2.protocolState(Waku).config.wakuMode = WakuChan
n3.protocolState(Waku).config.wakuMode = WakuChan
n4.protocolState(Waku).config.wakuMode = WakuSan
n5.protocolState(Waku).config.wakuMode = WakuSan
n1.startListening()
n3.startListening()
n5.startListening()
let p1 = await n2.rlpxConnect(newNode(initENode(n1.keys.pubKey,
n1.address)))
let p2 = await n2.rlpxConnect(newNode(initENode(n3.keys.pubKey,
n3.address)))
check:
p1.isNil
p2.isNil
let p3 = await n4.rlpxConnect(newNode(initENode(n1.keys.pubKey,
n1.address)))
let p4 = await n4.rlpxConnect(newNode(initENode(n5.keys.pubKey,
n5.address)))
check:
p3.isNil == false
p4.isNil == false