Merge pull request #154 from status-im/waku-changes

Waku changes to be more according to spec
This commit is contained in:
kdeme 2019-12-20 13:22:25 -08:00 committed by GitHub
commit 658e5a3cc8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 118 additions and 35 deletions

View File

@ -52,6 +52,7 @@ proc runP2pTests() =
"test_shh", "test_shh",
"test_shh_config", "test_shh_config",
"test_shh_connect", "test_shh_connect",
"test_waku_connect",
"test_waku_bridge", "test_waku_bridge",
"test_waku_mail", "test_waku_mail",
"test_waku_mode", "test_waku_mode",

View File

@ -64,12 +64,12 @@ proc requestMail*(node: EthereumNode, peerId: NodeId, request: MailRequest,
# TODO: I guess the idea is to check requestId (Hash) also? # TODO: I guess the idea is to check requestId (Hash) also?
let requests = requests - 1 let requests = requests - 1
# If there is cursor data, do another request # If there is cursor data, do another request
if response.data.cursor.len > 0 and requests > 0: if response.cursor.len > 0 and requests > 0:
var newRequest = request var newRequest = request
newRequest.cursor = response.data.cursor newRequest.cursor = response.cursor
return await requestMail(node, peerId, newRequest, symKey, requests) return await requestMail(node, peerId, newRequest, symKey, requests)
else: else:
return some(response.data.cursor) return some(response.cursor)
else: else:
error "p2pRequestComplete timeout" error "p2pRequestComplete timeout"
return result return result

View File

@ -19,6 +19,9 @@
## measured in seconds. Spam prevention is based on proof-of-work, where large ## measured in seconds. Spam prevention is based on proof-of-work, where large
## or long-lived messages must spend more work. ## or long-lived messages must spend more work.
## ##
## Implementation should be according to Waku specification defined here:
## https://github.com/vacp2p/specs/blob/master/waku/waku.md
##
## Example usage ## Example usage
## ---------- ## ----------
## First an `EthereumNode` needs to be created, either with all capabilities set ## First an `EthereumNode` needs to be created, either with all capabilities set
@ -70,6 +73,7 @@ const
## send to peers, in ms. ## send to peers, in ms.
pruneInterval* = chronos.milliseconds(1000) ## Interval at which message pruneInterval* = chronos.milliseconds(1000) ## Interval at which message
## queue is pruned, in ms. ## queue is pruned, in ms.
topicInterestMax = 1000
type type
WakuMode* = enum WakuMode* = enum
@ -87,6 +91,8 @@ type
bloom*: Bloom bloom*: Bloom
isLightNode*: bool isLightNode*: bool
maxMsgSize*: uint32 maxMsgSize*: uint32
confirmationsEnabled*: bool
rateLimits*: RateLimits
wakuMode*: WakuMode wakuMode*: WakuMode
topics*: seq[Topic] topics*: seq[Topic]
@ -108,16 +114,12 @@ type
config*: WakuConfig config*: WakuConfig
p2pRequestHandler*: P2PRequestHandler p2pRequestHandler*: P2PRequestHandler
# TODO: In the current specification this is not wrapped in a regular envelope RateLimits* = object
# as is done for the P2P Request packet. If we could alter this in the spec it # TODO: uint or specifically uint32?
# would be a cleaner separation between Waku and Mail server / client and then limitIp*: uint
# this could be moved to waku_mail.nim limitPeerId*: uint
# Also, the requestId could live at layer lower. And the protocol DSL limitTopic*: uint
# currently supports this, if used in a requestResponse block.
P2PRequestCompleteObject* = object
requestId*: Hash
lastEnvelopeHash*: Hash
cursor*: Bytes
proc allowed*(msg: Message, config: WakuConfig): bool = proc allowed*(msg: Message, config: WakuConfig): bool =
# Check max msg size, already happens in RLPx but there is a specific waku # Check max msg size, already happens in RLPx but there is a specific waku
@ -155,6 +157,12 @@ proc initProtocolState*(network: WakuNetwork, node: EthereumNode) {.gcsafe.} =
network.config.bloom = fullBloom() network.config.bloom = fullBloom()
network.config.powRequirement = defaultMinPow network.config.powRequirement = defaultMinPow
network.config.isLightNode = false network.config.isLightNode = false
# confirmationsEnabled and rateLimits is not yet used but we add it here and
# in the status message to be compatible with the Status go implementation.
network.config.confirmationsEnabled = false
# TODO: Limits of 0 are ignored I hope, this is not clearly written in spec.
network.config.rateLimits =
RateLimits(limitIp: 0, limitPeerId: 0, limitTopic:0)
network.config.maxMsgSize = defaultMaxMsgSize network.config.maxMsgSize = defaultMaxMsgSize
network.config.wakuMode = None # default no waku mode network.config.wakuMode = None # default no waku mode
network.config.topics = @[] network.config.topics = @[]
@ -175,6 +183,8 @@ p2pProtocol Waku(version = wakuVersion,
cast[uint64](wakuNet.config.powRequirement), cast[uint64](wakuNet.config.powRequirement),
@(wakuNet.config.bloom), @(wakuNet.config.bloom),
wakuNet.config.isLightNode, wakuNet.config.isLightNode,
wakuNet.config.confirmationsEnabled,
wakuNet.config.rateLimits,
wakuNet.config.wakuMode, wakuNet.config.wakuMode,
wakuNet.config.topics, wakuNet.config.topics,
timeout = chronos.milliseconds(500)) timeout = chronos.milliseconds(500))
@ -217,6 +227,7 @@ p2pProtocol Waku(version = wakuVersion,
wakuPeer.trusted = false wakuPeer.trusted = false
wakuPeer.initialized = true wakuPeer.initialized = true
# No timer based queue processing for a light node.
if not wakuNet.config.isLightNode: if not wakuNet.config.isLightNode:
traceAsyncErrors peer.run() traceAsyncErrors peer.run()
@ -228,6 +239,8 @@ p2pProtocol Waku(version = wakuVersion,
powConverted: uint64, powConverted: uint64,
bloom: Bytes, bloom: Bytes,
isLightNode: bool, isLightNode: bool,
confirmationsEnabled: bool,
rateLimits: RateLimits,
wakuMode: WakuMode, wakuMode: WakuMode,
topics: seq[Topic]) topics: seq[Topic])
@ -289,9 +302,17 @@ p2pProtocol Waku(version = wakuVersion,
if bloom.len == bloomSize: if bloom.len == bloomSize:
peer.state.bloom.bytesCopy(bloom) peer.state.bloom.bytesCopy(bloom)
proc topicsExchange(peer: Peer, topics: seq[Topic]) = nextID 20
proc rateLimits(peer: Peer, rateLimits: RateLimits) = discard
proc topicInterest(peer: Peer, topics: openarray[Topic]) =
if not peer.state.initialized: if not peer.state.initialized:
warn "Handshake not completed yet, discarding topicsExchange" warn "Handshake not completed yet, discarding topicInterest"
return
if topics.len > topicInterestMax:
error "Too many topics in the topic-interest list"
return return
if peer.state.wakuMode == WakuChan: if peer.state.wakuMode == WakuChan:
@ -321,13 +342,21 @@ p2pProtocol Waku(version = wakuVersion,
proc p2pSyncRequest(peer: Peer) = discard proc p2pSyncRequest(peer: Peer) = discard
proc p2pSyncResponse(peer: Peer) = discard proc p2pSyncResponse(peer: Peer) = discard
proc p2pRequestComplete(peer: Peer, data: P2PRequestCompleteObject) = discard
# TODO: This is actually rather a requestResponse in combination with proc p2pRequestComplete(peer: Peer, requestId: Hash, lastEnvelopeHash: Hash,
# p2pRequest. However, we can't use that system due to the unfortunate fact cursor: Bytes) = discard
# that the packet IDs are not consecutive, and nextID is not recognized in # TODO:
# between these. The nextID behaviour could be fixed, however it would be # In the current specification the parameters are not wrapped in a regular
# cleaner if the specification could be changed to have these IDs to be # envelope as is done for the P2P Request packet. If we could alter this in
# consecutive. # the spec it would be a cleaner separation between Waku and Mail server /
# client.
# Also, if a requestResponse block is used, a reqestId will automatically
# be added by the protocol DSL.
# However the requestResponse block in combination with p2pRequest cannot be
# used due to the unfortunate fact that the packet IDs are not consecutive,
# and nextID is not recognized in between these. The nextID behaviour could
# be fixed, however it would be cleaner if the specification could be
# changed to have these IDs to be consecutive.
# 'Runner' calls --------------------------------------------------------------- # 'Runner' calls ---------------------------------------------------------------
@ -443,7 +472,7 @@ proc postMessage*(node: EthereumNode, pubKey = none[PublicKey](),
# Allow lightnode to post only direct p2p messages # Allow lightnode to post only direct p2p messages
if targetPeer.isSome(): if targetPeer.isSome():
return node.sendP2PMessage(targetPeer.get(), [env]) return node.sendP2PMessage(targetPeer.get(), [env])
elif not node.protocolState(Waku).config.isLightNode: else:
# non direct p2p message can not have ttl of 0 # non direct p2p message can not have ttl of 0
if env.ttl == 0: if env.ttl == 0:
return false return false
@ -462,10 +491,19 @@ proc postMessage*(node: EthereumNode, pubKey = none[PublicKey](),
if not msg.env.valid(): if not msg.env.valid():
return false return false
return node.queueMessage(msg) result = node.queueMessage(msg)
else:
warn "Light node not allowed to post messages" # Allows light nodes to post via untrusted messages packet.
return false # Queue gets processed immediatly as the node sends only its own messages,
# so the privacy ship has already sailed anyhow.
# TODO:
# - Could be still a concern in terms of efficiency, if multiple messages
# need to be send.
# - For Waku Mode, the checks in processQueue are rather useless as the
# idea is to connect only to 1 node? Also refactor in that case.
if node.protocolState(Waku).config.isLightNode:
for peer in node.peers(Waku):
peer.processQueue()
else: else:
error "Encoding of payload failed" error "Encoding of payload failed"
return false return false
@ -522,15 +560,21 @@ proc setBloomFilter*(node: EthereumNode, bloom: Bloom) {.async.} =
# Exceptions from sendMsg will not be raised # Exceptions from sendMsg will not be raised
await allFutures(futures) await allFutures(futures)
proc setTopics*(node: EthereumNode, topics: seq[Topic]) {.async.} = proc setTopics*(node: EthereumNode, topics: seq[Topic]):
Future[bool] {.async.} =
if topics.len > topicInterestMax:
return false
node.protocolState(Waku).config.topics = topics node.protocolState(Waku).config.topics = topics
var futures: seq[Future[void]] = @[] var futures: seq[Future[void]] = @[]
for peer in node.peers(Waku): for peer in node.peers(Waku):
futures.add(peer.topicsExchange(topics)) futures.add(peer.topicInterest(topics))
# Exceptions from sendMsg will not be raised # Exceptions from sendMsg will not be raised
await allFutures(futures) await allFutures(futures)
return true
proc setMaxMessageSize*(node: EthereumNode, size: uint32): bool = proc setMaxMessageSize*(node: EthereumNode, size: uint32): bool =
## Set the maximum allowed message size. ## Set the maximum allowed message size.
## Can not be set higher than ``defaultMaxMsgSize``. ## Can not be set higher than ``defaultMaxMsgSize``.

View File

@ -0,0 +1,36 @@
#
# Waku
# (c) Copyright 2019
# Status Research & Development GmbH
#
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
import
sequtils, tables, unittest, chronos, eth/[keys, p2p],
eth/p2p/rlpx_protocols/waku_protocol, eth/p2p/peer_pool,
./p2p_test_helper
const safeTTL = 5'u32
# TODO: Just repeat all the test_shh_connect tests here that are applicable or
# have some commonly shared test code for both protocols.
suite "Waku connections":
asyncTest "Light node posting":
var ln = setupTestNode(Waku)
ln.setLightNode(true)
var fn = setupTestNode(Waku)
fn.startListening()
await ln.peerPool.connectToNode(newNode(initENode(fn.keys.pubKey,
fn.address)))
let topic = [byte 0, 0, 0, 0]
check:
ln.peerPool.connectedNodes.len() == 1
# normal post
ln.postMessage(ttl = safeTTL, topic = topic,
payload = repeat(byte 0, 10)) == true
ln.protocolState(Waku).queue.items.len == 1
# TODO: add test on message relaying

View File

@ -58,8 +58,8 @@ suite "Waku Mail Client":
output.bloom == bloom output.bloom == bloom
output.limit == limit output.limit == limit
var test: P2PRequestCompleteObject var dummy: Hash
await peer.p2pRequestComplete(test) await peer.p2pRequestComplete(dummy, dummy, @[])
check await cursorFut.withTimeout(transmissionTimeout) check await cursorFut.withTimeout(transmissionTimeout)
@ -93,13 +93,15 @@ suite "Waku Mail Client":
var envelopes: seq[Envelope] var envelopes: seq[Envelope]
traceAsyncErrors peer.p2pMessage(envelopes) traceAsyncErrors peer.p2pMessage(envelopes)
var test: P2PRequestCompleteObject var cursor: Bytes
count = count - 1 count = count - 1
if count == 0: if count == 0:
test.cursor = @[] cursor = @[]
else: else:
test.cursor = @[byte count] cursor = @[byte count]
traceAsyncErrors peer.p2pRequestComplete(test)
var dummy: Hash
traceAsyncErrors peer.p2pRequestComplete(dummy, dummy, cursor)
simpleServer.enableMailServer(customHandler) simpleServer.enableMailServer(customHandler)
check client.setPeerTrusted(simpleServerNode.id) check client.setPeerTrusted(simpleServerNode.id)