Implement Waku mode PoC

This commit is contained in:
kdeme 2019-12-12 23:23:26 +01:00
parent b2656cc3a9
commit f940162b6b
No known key found for this signature in database
GPG Key ID: 4E8DD21420AF43F5
3 changed files with 139 additions and 1 deletions

View File

@ -54,6 +54,7 @@ proc runP2pTests() =
"test_shh_connect", "test_shh_connect",
"test_waku_bridge", "test_waku_bridge",
"test_waku_mail", "test_waku_mail",
"test_waku_mode",
"test_protocol_handlers", "test_protocol_handlers",
]: ]:
runTest("tests/p2p/" & filename) runTest("tests/p2p/" & filename)

View File

@ -57,11 +57,23 @@ const
## queue is pruned, in ms. ## queue is pruned, in ms.
type type
WakuMode* = enum
# TODO: is there a reason to allow such "none" mode? This was originally
# put here when it was still supposed to be compatible with Whisper.
None, # No Waku mode
WakuChan, # Waku client
WakuSan # Waku node
# TODO: Light mode could also become part of this enum
# TODO: With discv5, this could be capabilities also announced at level of
# discovery.
WakuConfig* = object WakuConfig* = object
powRequirement*: float64 powRequirement*: float64
bloom*: Bloom bloom*: Bloom
isLightNode*: bool isLightNode*: bool
maxMsgSize*: uint32 maxMsgSize*: uint32
wakuMode*: WakuMode
topics*: seq[Topic]
WakuPeer = ref object WakuPeer = ref object
initialized: bool # when successfully completed the handshake initialized: bool # when successfully completed the handshake
@ -69,6 +81,8 @@ type
bloom*: Bloom bloom*: Bloom
isLightNode*: bool isLightNode*: bool
trusted*: bool trusted*: bool
wakuMode*: WakuMode
topics*: seq[Topic]
received: HashSet[Message] received: HashSet[Message]
P2PRequestHandler* = proc(peer: Peer, envelope: Envelope) {.gcsafe.} P2PRequestHandler* = proc(peer: Peer, envelope: Envelope) {.gcsafe.}
@ -105,6 +119,11 @@ proc allowed*(msg: Message, config: WakuConfig): bool =
warn "Message does not match node bloom filter" warn "Message does not match node bloom filter"
return false return false
if config.wakuMode == WakuChan:
if msg.env.topic notin config.topics:
warn "Message topic does not match Waku topic list"
return false
return true return true
proc run(peer: Peer) {.gcsafe, async.} proc run(peer: Peer) {.gcsafe, async.}
@ -118,6 +137,8 @@ proc initProtocolState*(network: WakuNetwork, node: EthereumNode) {.gcsafe.} =
network.config.powRequirement = defaultMinPow network.config.powRequirement = defaultMinPow
network.config.isLightNode = false network.config.isLightNode = false
network.config.maxMsgSize = defaultMaxMsgSize network.config.maxMsgSize = defaultMaxMsgSize
network.config.wakuMode = None # default no waku mode
network.config.topics = @[]
asyncCheck node.run(network) asyncCheck node.run(network)
p2pProtocol Waku(version = wakuVersion, p2pProtocol Waku(version = wakuVersion,
@ -135,6 +156,8 @@ p2pProtocol Waku(version = wakuVersion,
cast[uint](wakuNet.config.powRequirement), cast[uint](wakuNet.config.powRequirement),
@(wakuNet.config.bloom), @(wakuNet.config.bloom),
wakuNet.config.isLightNode, wakuNet.config.isLightNode,
wakuNet.config.wakuMode,
wakuNet.config.topics,
timeout = chronos.milliseconds(500)) timeout = chronos.milliseconds(500))
if m.protocolVersion == wakuVersion: if m.protocolVersion == wakuVersion:
@ -158,6 +181,19 @@ p2pProtocol Waku(version = wakuVersion,
# No sense in connecting two light nodes so we disconnect # No sense in connecting two light nodes so we disconnect
raise newException(UselessPeerError, "Two light nodes connected") raise newException(UselessPeerError, "Two light nodes connected")
# When Waku-san connect to all. When None, connect to all, Waku-chan has
# to decide to disconnect. When Waku-chan, connect only to Waku-san.
wakuPeer.wakuMode = m.wakuMode
if wakuNet.config.wakuMode == WakuChan:
if wakuPeer.wakuMode == WakuChan:
raise newException(UselessPeerError, "Two Waku-chan connected")
elif wakuPeer.wakuMode == None:
raise newException(UselessPeerError, "Not in Waku mode")
if wakuNet.config.wakuMode == WakuSan and
wakuPeer.wakuMode == WakuChan:
# TODO: need some maximum check on amount of topics
wakuPeer.topics = m.topics
wakuPeer.received.init() wakuPeer.received.init()
wakuPeer.trusted = false wakuPeer.trusted = false
wakuPeer.initialized = true wakuPeer.initialized = true
@ -172,7 +208,9 @@ p2pProtocol Waku(version = wakuVersion,
protocolVersion: uint, protocolVersion: uint,
powConverted: uint, powConverted: uint,
bloom: Bytes, bloom: Bytes,
isLightNode: bool) isLightNode: bool,
wakuMode: WakuMode,
topics: seq[Topic])
proc messages(peer: Peer, envelopes: openarray[Envelope]) = proc messages(peer: Peer, envelopes: openarray[Envelope]) =
if not peer.state.initialized: if not peer.state.initialized:
@ -228,6 +266,14 @@ p2pProtocol Waku(version = wakuVersion,
if bloom.len == bloomSize: if bloom.len == bloomSize:
peer.state.bloom.bytesCopy(bloom) peer.state.bloom.bytesCopy(bloom)
proc topicsExchange(peer: Peer, topics: seq[Topic]) =
if not peer.state.initialized:
warn "Handshake not completed yet, discarding topicsExchange"
return
if peer.state.wakuMode == WakuChan:
peer.state.topics = topics
nextID 126 nextID 126
proc p2pRequest(peer: Peer, envelope: Envelope) = proc p2pRequest(peer: Peer, envelope: Envelope) =
@ -283,6 +329,11 @@ proc processQueue(peer: Peer) =
debug "Message does not match peer bloom filter" debug "Message does not match peer bloom filter"
continue continue
if wakuNet.config.wakuMode == WakuSan and
wakuPeer.wakuMode == WakuChan:
if message.env.topic notin wakuPeer.topics:
continue
trace "Adding envelope" trace "Adding envelope"
envelopes.add(message.env) envelopes.add(message.env)
wakuPeer.received.incl(message) wakuPeer.received.incl(message)
@ -447,6 +498,15 @@ proc setBloomFilter*(node: EthereumNode, bloom: Bloom) {.async.} =
# Exceptions from sendMsg will not be raised # Exceptions from sendMsg will not be raised
await allFutures(futures) await allFutures(futures)
proc setTopics*(node: EthereumNode, topics: seq[Topic]) {.async.} =
node.protocolState(Waku).config.topics = topics
var futures: seq[Future[void]] = @[]
for peer in node.peers(Waku):
futures.add(peer.topicsExchange(topics))
# Exceptions from sendMsg will not be raised
await allFutures(futures)
proc setMaxMessageSize*(node: EthereumNode, size: uint32): bool = proc setMaxMessageSize*(node: EthereumNode, size: uint32): bool =
## Set the maximum allowed message size. ## Set the maximum allowed message size.
## Can not be set higher than ``defaultMaxMsgSize``. ## Can not be set higher than ``defaultMaxMsgSize``.

View File

@ -0,0 +1,77 @@
#
# Waku
# (c) Copyright 2019
# Status Research & Development GmbH
#
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
import
sequtils, options, unittest, chronos, eth/[keys, p2p],
eth/p2p/rlpx_protocols/waku_protocol, eth/p2p/peer_pool,
./p2p_test_helper
const
safeTTL = 5'u32
waitInterval = messageInterval + 150.milliseconds
suite "Waku Mode":
asyncTest "Test Waku-chan with Waku-san":
var wakuChan = setupTestNode(Waku)
var wakuSan = setupTestNode(Waku)
let topic1 = [byte 0xDA, 0xDA, 0xDA, 0xAA]
let topic2 = [byte 0xD0, 0xD0, 0xD0, 0x00]
let wrongTopic = [byte 0x4B, 0x1D, 0x4B, 0x1D]
wakuChan.protocolState(Waku).config.wakuMode = WakuChan
wakuChan.protocolState(Waku).config.topics = @[topic1, topic2]
wakuSan.protocolState(Waku).config.wakuMode = WakuSan
wakuSan.startListening()
await wakuChan.peerPool.connectToNode(newNode(initENode(wakuSan.keys.pubKey,
wakuSan.address)))
let payload = repeat(byte 0, 10)
check:
wakuSan.postMessage(ttl = safeTTL, topic = topic1, payload = payload)
wakuSan.postMessage(ttl = safeTTL, topic = topic2, payload = payload)
wakuSan.postMessage(ttl = safeTTL, topic = wrongTopic, payload = payload)
wakuSan.protocolState(Waku).queue.items.len == 3
await sleepAsync(waitInterval)
check:
wakuChan.protocolState(Waku).queue.items.len == 2
asyncTest "Test Waku connections":
var n1 = setupTestNode(Waku)
var n2 = setupTestNode(Waku)
var n3 = setupTestNode(Waku)
var n4 = setupTestNode(Waku)
var n5 = setupTestNode(Waku)
n1.protocolState(Waku).config.wakuMode = WakuMode.None
n2.protocolState(Waku).config.wakuMode = WakuChan
n3.protocolState(Waku).config.wakuMode = WakuChan
n4.protocolState(Waku).config.wakuMode = WakuSan
n5.protocolState(Waku).config.wakuMode = WakuSan
n1.startListening()
n3.startListening()
n5.startListening()
let p1 = await n2.rlpxConnect(newNode(initENode(n1.keys.pubKey,
n1.address)))
let p2 = await n2.rlpxConnect(newNode(initENode(n3.keys.pubKey,
n3.address)))
check:
p1.isNil
p2.isNil
let p3 = await n4.rlpxConnect(newNode(initENode(n1.keys.pubKey,
n1.address)))
let p4 = await n4.rlpxConnect(newNode(initENode(n5.keys.pubKey,
n5.address)))
check:
p3.isNil == false
p4.isNil == false