mirror of https://github.com/status-im/nim-eth.git
Remove waku protocol from the repository
This commit is contained in:
parent
cea370c4fa
commit
17586c05d7
|
@ -45,9 +45,6 @@ proc runP2pTests() =
|
|||
"test_shh",
|
||||
"test_shh_config",
|
||||
"test_shh_connect",
|
||||
"test_waku_connect",
|
||||
"test_waku_bridge",
|
||||
"test_waku_mail",
|
||||
"test_protocol_handlers",
|
||||
"test_enr",
|
||||
"test_discoveryv5",
|
||||
|
|
|
@ -1,17 +0,0 @@
|
|||
#
|
||||
# Waku - Whisper Bridge
|
||||
# (c) Copyright 2019
|
||||
# Status Research & Development GmbH
|
||||
#
|
||||
# Licensed under either of
|
||||
# Apache License, version 2.0, (LICENSE-APACHEv2)
|
||||
# MIT license (LICENSE-MIT)
|
||||
#
|
||||
|
||||
import
|
||||
eth/p2p,
|
||||
eth/p2p/rlpx_protocols/waku_protocol,
|
||||
eth/p2p/rlpx_protocols/whisper_protocol
|
||||
|
||||
proc shareMessageQueue*(node: EthereumNode) =
|
||||
node.protocolState(Waku).queue = node.protocolState(Whisper).queue
|
|
@ -1,85 +0,0 @@
|
|||
#
|
||||
# Waku Mail Client & Server
|
||||
# (c) Copyright 2019
|
||||
# Status Research & Development GmbH
|
||||
#
|
||||
# Licensed under either of
|
||||
# Apache License, version 2.0, (LICENSE-APACHEv2)
|
||||
# MIT license (LICENSE-MIT)
|
||||
#
|
||||
import
|
||||
chronos,
|
||||
eth/[p2p, async_utils], eth/p2p/rlpx_protocols/waku_protocol
|
||||
|
||||
const
|
||||
requestCompleteTimeout = chronos.seconds(5)
|
||||
|
||||
type
|
||||
Cursor = seq[byte]
|
||||
|
||||
MailRequest* = object
|
||||
lower*: uint32 ## Unix timestamp; oldest requested envelope's creation time
|
||||
upper*: uint32 ## Unix timestamp; newest requested envelope's creation time
|
||||
bloom*: seq[byte] ## Bloom filter to apply on the envelopes
|
||||
limit*: uint32 ## Maximum amount of envelopes to return
|
||||
cursor*: Cursor ## Optional cursor
|
||||
|
||||
proc requestMail*(node: EthereumNode, peerId: NodeId, request: MailRequest,
|
||||
symKey: SymKey, requests = 10): Future[Option[Cursor]] {.async.} =
|
||||
## Send p2p mail request and check request complete.
|
||||
## If result is none, and error occured. If result is a none empty cursor,
|
||||
## more envelopes are available.
|
||||
# TODO: Perhaps don't go the recursive route or could use the actual response
|
||||
# proc to implement this (via a handler) and store the necessary data in the
|
||||
# WakuPeer object.
|
||||
# TODO: Several requestMail calls in parallel can create issues with handling
|
||||
# the wrong response to a request. Can additionaly check the requestId but
|
||||
# that would only solve it half. Better to use the requestResponse mechanism.
|
||||
|
||||
# TODO: move this check out of requestMail?
|
||||
let peer = node.getPeer(peerId, Waku)
|
||||
if not peer.isSome():
|
||||
error "Invalid peer"
|
||||
return result
|
||||
elif not peer.get().state(Waku).trusted:
|
||||
return result
|
||||
|
||||
var writer = initRlpWriter()
|
||||
writer.append(request)
|
||||
let payload = writer.finish()
|
||||
let data = encode(Payload(payload: payload, symKey: some(symKey)))
|
||||
if not data.isSome():
|
||||
error "Encoding of payload failed"
|
||||
return result
|
||||
|
||||
# TODO: should this envelope be valid in terms of ttl, PoW, etc.?
|
||||
let env = Envelope(expiry:0, ttl: 0, data: data.get(), nonce: 0)
|
||||
# Send the request
|
||||
traceAsyncErrors peer.get().p2pRequest(env)
|
||||
|
||||
# Wait for the Request Complete packet
|
||||
var f = peer.get().nextMsg(Waku.p2pRequestComplete)
|
||||
if await f.withTimeout(requestCompleteTimeout):
|
||||
let response = f.read()
|
||||
# TODO: I guess the idea is to check requestId (Hash) also?
|
||||
let requests = requests - 1
|
||||
# If there is cursor data, do another request
|
||||
if response.cursor.len > 0 and requests > 0:
|
||||
var newRequest = request
|
||||
newRequest.cursor = response.cursor
|
||||
return await requestMail(node, peerId, newRequest, symKey, requests)
|
||||
else:
|
||||
return some(response.cursor)
|
||||
else:
|
||||
error "p2pRequestComplete timeout"
|
||||
return result
|
||||
|
||||
proc p2pRequestHandler(peer: Peer, envelope: Envelope) =
|
||||
# Mail server p2p request implementation
|
||||
discard
|
||||
|
||||
proc enableMailServer*(node: EthereumNode, customHandler: P2PRequestHandler) =
|
||||
node.protocolState(Waku).p2pRequestHandler = customHandler
|
||||
|
||||
proc enableMailServer*(node: EthereumNode) =
|
||||
node.protocolState(Waku).p2pRequestHandler = p2pRequestHandler
|
|
@ -1,648 +0,0 @@
|
|||
#
|
||||
# Waku
|
||||
# (c) Copyright 2018-2019
|
||||
# Status Research & Development GmbH
|
||||
#
|
||||
# Licensed under either of
|
||||
# Apache License, version 2.0, (LICENSE-APACHEv2)
|
||||
# MIT license (LICENSE-MIT)
|
||||
#
|
||||
|
||||
## Waku
|
||||
## *******
|
||||
##
|
||||
## Waku is a fork of Whisper.
|
||||
##
|
||||
## Waku is a gossip protocol that synchronizes a set of messages across nodes
|
||||
## with attention given to sender and recipient anonymitiy. Messages are
|
||||
## categorized by a topic and stay alive in the network based on a time-to-live
|
||||
## measured in seconds. Spam prevention is based on proof-of-work, where large
|
||||
## or long-lived messages must spend more work.
|
||||
##
|
||||
## Implementation should be according to Waku specification defined here:
|
||||
## https://github.com/vacp2p/specs/blob/master/waku/waku.md
|
||||
##
|
||||
## Example usage
|
||||
## ----------
|
||||
## First an `EthereumNode` needs to be created, either with all capabilities set
|
||||
## or with specifically the Waku capability set.
|
||||
## The latter can be done like this:
|
||||
##
|
||||
## .. code-block::nim
|
||||
## var node = newEthereumNode(keypair, address, netId, nil,
|
||||
## addAllCapabilities = false)
|
||||
## node.addCapability Waku
|
||||
##
|
||||
## Now calls such as ``postMessage`` and ``subscribeFilter`` can be done.
|
||||
## However, they only make real sense after ``connectToNetwork`` was started. As
|
||||
## else there will be no peers to send and receive messages from.
|
||||
|
||||
import
|
||||
options, tables, times, chronos, chronicles, metrics,
|
||||
eth/[keys, async_utils, p2p], whisper/whisper_types, eth/trie/trie_defs
|
||||
|
||||
export
|
||||
whisper_types
|
||||
|
||||
logScope:
|
||||
topics = "waku"
|
||||
|
||||
declarePublicCounter dropped_low_pow_envelopes,
|
||||
"Dropped envelopes because of too low PoW"
|
||||
declarePublicCounter dropped_too_large_envelopes,
|
||||
"Dropped envelopes because larger than maximum allowed size"
|
||||
declarePublicCounter dropped_bloom_filter_mismatch_envelopes,
|
||||
"Dropped envelopes because not matching with bloom filter"
|
||||
declarePublicCounter dropped_topic_mismatch_envelopes,
|
||||
"Dropped envelopes because of not matching topics"
|
||||
declarePublicCounter dropped_duplicate_envelopes,
|
||||
"Dropped duplicate envelopes"
|
||||
|
||||
const
|
||||
defaultQueueCapacity = 2048
|
||||
wakuVersion* = 1 ## Waku version.
|
||||
wakuVersionStr* = $wakuVersion ## Waku version.
|
||||
defaultMinPow* = 0.2'f64 ## The default minimum PoW requirement for this node.
|
||||
defaultMaxMsgSize* = 1024'u32 * 1024'u32 ## The current default and max
|
||||
## message size. This can never be larger than the maximum RLPx message size.
|
||||
messageInterval* = chronos.milliseconds(300) ## Interval at which messages are
|
||||
## send to peers, in ms.
|
||||
pruneInterval* = chronos.milliseconds(1000) ## Interval at which message
|
||||
## queue is pruned, in ms.
|
||||
topicInterestMax = 10000
|
||||
|
||||
type
|
||||
WakuConfig* = object
|
||||
powRequirement*: float64
|
||||
bloom*: Option[Bloom]
|
||||
isLightNode*: bool
|
||||
maxMsgSize*: uint32
|
||||
confirmationsEnabled*: bool
|
||||
rateLimits*: Option[RateLimits]
|
||||
topics*: Option[seq[Topic]]
|
||||
|
||||
WakuPeer = ref object
|
||||
initialized: bool # when successfully completed the handshake
|
||||
powRequirement*: float64
|
||||
bloom*: Bloom
|
||||
isLightNode*: bool
|
||||
trusted*: bool
|
||||
topics*: Option[seq[Topic]]
|
||||
received: HashSet[Hash]
|
||||
|
||||
P2PRequestHandler* = proc(peer: Peer, envelope: Envelope) {.gcsafe.}
|
||||
|
||||
WakuNetwork = ref object
|
||||
queue*: ref Queue
|
||||
filters*: Filters
|
||||
config*: WakuConfig
|
||||
p2pRequestHandler*: P2PRequestHandler
|
||||
|
||||
RateLimits* = object
|
||||
# TODO: uint or specifically uint32?
|
||||
limitIp*: uint
|
||||
limitPeerId*: uint
|
||||
limitTopic*: uint
|
||||
|
||||
StatusOptions* = object
|
||||
powRequirement*: Option[(float64)]
|
||||
bloomFilter*: Option[Bloom]
|
||||
lightNode*: Option[bool]
|
||||
confirmationsEnabled*: Option[bool]
|
||||
rateLimits*: Option[RateLimits]
|
||||
topicInterest*: Option[seq[Topic]]
|
||||
|
||||
KeyKind* = enum
|
||||
powRequirementKey,
|
||||
bloomFilterKey,
|
||||
lightNodeKey,
|
||||
confirmationsEnabledKey,
|
||||
rateLimitsKey,
|
||||
topicInterestKey
|
||||
|
||||
template countSomeFields*(x: StatusOptions): int =
|
||||
var count = 0
|
||||
for f in fields(x):
|
||||
if f.isSome():
|
||||
inc count
|
||||
count
|
||||
|
||||
proc append*(rlpWriter: var RlpWriter, value: StatusOptions) =
|
||||
var list = initRlpList(countSomeFields(value))
|
||||
if value.powRequirement.isSome():
|
||||
list.append((powRequirementKey, cast[uint64](value.powRequirement.get())))
|
||||
if value.bloomFilter.isSome():
|
||||
list.append((bloomFilterKey, @(value.bloomFilter.get())))
|
||||
if value.lightNode.isSome():
|
||||
list.append((lightNodeKey, value.lightNode.get()))
|
||||
if value.confirmationsEnabled.isSome():
|
||||
list.append((confirmationsEnabledKey, value.confirmationsEnabled.get()))
|
||||
if value.rateLimits.isSome():
|
||||
list.append((rateLimitsKey, value.rateLimits.get()))
|
||||
if value.topicInterest.isSome():
|
||||
list.append((topicInterestKey, value.topicInterest.get()))
|
||||
|
||||
let bytes = list.finish()
|
||||
|
||||
rlpWriter.append(rlpFromBytes(bytes))
|
||||
|
||||
proc read*(rlp: var Rlp, T: typedesc[StatusOptions]): T =
|
||||
if not rlp.isList():
|
||||
raise newException(RlpTypeMismatch,
|
||||
"List expected, but the source RLP is not a list.")
|
||||
|
||||
let sz = rlp.listLen()
|
||||
# We already know that we are working with a list
|
||||
doAssert rlp.enterList()
|
||||
for i in 0 ..< sz:
|
||||
rlp.tryEnterList()
|
||||
|
||||
var k: KeyKind
|
||||
try:
|
||||
k = rlp.read(KeyKind)
|
||||
except RlpTypeMismatch:
|
||||
# skip unknown keys and their value
|
||||
rlp.skipElem()
|
||||
rlp.skipElem()
|
||||
continue
|
||||
|
||||
case k
|
||||
of powRequirementKey:
|
||||
let pow = rlp.read(uint64)
|
||||
result.powRequirement = some(cast[float64](pow))
|
||||
of bloomFilterKey:
|
||||
let bloom = rlp.read(seq[byte])
|
||||
if bloom.len != bloomSize:
|
||||
raise newException(UselessPeerError, "Bloomfilter size mismatch")
|
||||
var bloomFilter: Bloom
|
||||
bloomFilter.bytesCopy(bloom)
|
||||
result.bloomFilter = some(bloomFilter)
|
||||
of lightNodeKey:
|
||||
result.lightNode = some(rlp.read(bool))
|
||||
of confirmationsEnabledKey:
|
||||
result.confirmationsEnabled = some(rlp.read(bool))
|
||||
of rateLimitsKey:
|
||||
result.rateLimits = some(rlp.read(RateLimits))
|
||||
of topicInterestKey:
|
||||
result.topicInterest = some(rlp.read(seq[Topic]))
|
||||
|
||||
proc allowed*(msg: Message, config: WakuConfig): bool =
|
||||
# Check max msg size, already happens in RLPx but there is a specific waku
|
||||
# max msg size which should always be < RLPx max msg size
|
||||
if msg.size > config.maxMsgSize:
|
||||
dropped_too_large_envelopes.inc()
|
||||
warn "Message size too large", size = msg.size
|
||||
return false
|
||||
|
||||
if msg.pow < config.powRequirement:
|
||||
dropped_low_pow_envelopes.inc()
|
||||
warn "Message PoW too low", pow = msg.pow, minPow = config.powRequirement
|
||||
return false
|
||||
|
||||
if config.topics.isSome():
|
||||
if msg.env.topic notin config.topics.get():
|
||||
dropped_topic_mismatch_envelopes.inc()
|
||||
warn "Message topic does not match Waku topic list"
|
||||
return false
|
||||
else:
|
||||
if config.bloom.isSome() and not bloomFilterMatch(config.bloom.get(), msg.bloom):
|
||||
dropped_bloom_filter_mismatch_envelopes.inc()
|
||||
warn "Message does not match node bloom filter"
|
||||
return false
|
||||
|
||||
return true
|
||||
|
||||
proc run(peer: Peer) {.gcsafe, async.}
|
||||
proc run(node: EthereumNode, network: WakuNetwork) {.gcsafe, async.}
|
||||
|
||||
proc initProtocolState*(network: WakuNetwork, node: EthereumNode) {.gcsafe.} =
|
||||
new(network.queue)
|
||||
network.queue[] = initQueue(defaultQueueCapacity)
|
||||
network.filters = initTable[string, Filter]()
|
||||
network.config.bloom = some(fullBloom())
|
||||
network.config.powRequirement = defaultMinPow
|
||||
network.config.isLightNode = false
|
||||
# RateLimits and confirmations are not yet implemented so we set confirmations
|
||||
# to false and we don't pass RateLimits at all.
|
||||
network.config.confirmationsEnabled = false
|
||||
network.config.rateLimits = none(RateLimits)
|
||||
network.config.maxMsgSize = defaultMaxMsgSize
|
||||
network.config.topics = none(seq[Topic])
|
||||
asyncCheck node.run(network)
|
||||
|
||||
p2pProtocol Waku(version = wakuVersion,
|
||||
rlpxName = "waku",
|
||||
peerState = WakuPeer,
|
||||
networkState = WakuNetwork):
|
||||
|
||||
onPeerConnected do (peer: Peer):
|
||||
trace "onPeerConnected Waku"
|
||||
let
|
||||
wakuNet = peer.networkState
|
||||
wakuPeer = peer.state
|
||||
|
||||
let options = StatusOptions(
|
||||
powRequirement: some(wakuNet.config.powRequirement),
|
||||
bloomFilter: wakuNet.config.bloom,
|
||||
lightNode: some(wakuNet.config.isLightNode),
|
||||
confirmationsEnabled: some(wakuNet.config.confirmationsEnabled),
|
||||
rateLimits: wakuNet.config.rateLimits,
|
||||
topicInterest: wakuNet.config.topics)
|
||||
|
||||
let m = await peer.status(options,
|
||||
timeout = chronos.milliseconds(5000))
|
||||
|
||||
wakuPeer.powRequirement = m.options.powRequirement.get(defaultMinPow)
|
||||
wakuPeer.bloom = m.options.bloomFilter.get(fullBloom())
|
||||
|
||||
wakuPeer.isLightNode = m.options.lightNode.get(false)
|
||||
if wakuPeer.isLightNode and wakuNet.config.isLightNode:
|
||||
# No sense in connecting two light nodes so we disconnect
|
||||
raise newException(UselessPeerError, "Two light nodes connected")
|
||||
|
||||
wakuPeer.topics = m.options.topicInterest
|
||||
if wakuPeer.topics.isSome():
|
||||
if wakuPeer.topics.get().len > topicInterestMax:
|
||||
raise newException(UselessPeerError, "Topic-interest is too large")
|
||||
if wakuNet.config.topics.isSome():
|
||||
raise newException(UselessPeerError,
|
||||
"Two Waku nodes with topic-interest connected")
|
||||
|
||||
wakuPeer.received.init()
|
||||
wakuPeer.trusted = false
|
||||
wakuPeer.initialized = true
|
||||
|
||||
# No timer based queue processing for a light node.
|
||||
if not wakuNet.config.isLightNode:
|
||||
traceAsyncErrors peer.run()
|
||||
|
||||
debug "Waku peer initialized", peer
|
||||
|
||||
handshake:
|
||||
proc status(peer: Peer, options: StatusOptions)
|
||||
|
||||
proc messages(peer: Peer, envelopes: openarray[Envelope]) =
|
||||
if not peer.state.initialized:
|
||||
warn "Handshake not completed yet, discarding messages"
|
||||
return
|
||||
|
||||
for envelope in envelopes:
|
||||
# check if expired or in future, or ttl not 0
|
||||
if not envelope.valid():
|
||||
warn "Expired or future timed envelope", peer
|
||||
# disconnect from peers sending bad envelopes
|
||||
# await peer.disconnect(SubprotocolReason)
|
||||
continue
|
||||
|
||||
let msg = initMessage(envelope)
|
||||
if not msg.allowed(peer.networkState.config):
|
||||
# disconnect from peers sending bad envelopes
|
||||
# await peer.disconnect(SubprotocolReason)
|
||||
continue
|
||||
|
||||
# This peer send this message thus should not receive it again.
|
||||
# If this peer has the message in the `received` set already, this means
|
||||
# it was either already received here from this peer or send to this peer.
|
||||
# Either way it will be in our queue already (and the peer should know
|
||||
# this) and this peer is sending duplicates.
|
||||
# Note: geth does not check if a peer has send a message to them before
|
||||
# broadcasting this message. This too is seen here as a duplicate message
|
||||
# (see above comment). If we want to seperate these cases (e.g. when peer
|
||||
# rating), then we have to add a "peer.state.send" HashSet.
|
||||
# Note: it could also be a race between the arrival of a message send by
|
||||
# this node to a peer and that same message arriving from that peer (after
|
||||
# it was received from another peer) here.
|
||||
if peer.state.received.containsOrIncl(msg.hash):
|
||||
dropped_duplicate_envelopes.inc()
|
||||
trace "Peer sending duplicate messages", peer, hash = $msg.hash
|
||||
# await peer.disconnect(SubprotocolReason)
|
||||
continue
|
||||
|
||||
# This can still be a duplicate message, but from another peer than
|
||||
# the peer who send the message.
|
||||
if peer.networkState.queue[].add(msg):
|
||||
# notify filters of this message
|
||||
peer.networkState.filters.notify(msg)
|
||||
|
||||
nextID 22
|
||||
|
||||
proc statusOptions(peer: Peer, options: StatusOptions) =
|
||||
if not peer.state.initialized:
|
||||
warn "Handshake not completed yet, discarding statusOptions"
|
||||
return
|
||||
|
||||
if options.topicInterest.isSome():
|
||||
peer.state.topics = options.topicInterest
|
||||
elif options.bloomFilter.isSome():
|
||||
peer.state.bloom = options.bloomFilter.get()
|
||||
peer.state.topics = none(seq[Topic])
|
||||
|
||||
if options.powRequirement.isSome():
|
||||
peer.state.powRequirement = options.powRequirement.get()
|
||||
|
||||
if options.lightNode.isSome():
|
||||
peer.state.isLightNode = options.lightNode.get()
|
||||
|
||||
nextID 126
|
||||
|
||||
proc p2pRequest(peer: Peer, envelope: Envelope) =
|
||||
if not peer.networkState.p2pRequestHandler.isNil():
|
||||
peer.networkState.p2pRequestHandler(peer, envelope)
|
||||
|
||||
proc p2pMessage(peer: Peer, envelopes: openarray[Envelope]) =
|
||||
if peer.state.trusted:
|
||||
# when trusted we can bypass any checks on envelope
|
||||
for envelope in envelopes:
|
||||
let msg = Message(env: envelope, isP2P: true)
|
||||
peer.networkState.filters.notify(msg)
|
||||
|
||||
# Following message IDs are not part of EIP-627, but are added and used by
|
||||
# the Status application, we ignore them for now.
|
||||
nextID 11
|
||||
proc batchAcknowledged(peer: Peer) = discard
|
||||
proc messageResponse(peer: Peer) = discard
|
||||
|
||||
nextID 123
|
||||
requestResponse:
|
||||
proc p2pSyncRequest(peer: Peer) = discard
|
||||
proc p2pSyncResponse(peer: Peer) = discard
|
||||
|
||||
|
||||
proc p2pRequestComplete(peer: Peer, requestId: Hash, lastEnvelopeHash: Hash,
|
||||
cursor: seq[byte]) = discard
|
||||
# TODO:
|
||||
# In the current specification the parameters are not wrapped in a regular
|
||||
# envelope as is done for the P2P Request packet. If we could alter this in
|
||||
# the spec it would be a cleaner separation between Waku and Mail server /
|
||||
# client.
|
||||
# Also, if a requestResponse block is used, a reqestId will automatically
|
||||
# be added by the protocol DSL.
|
||||
# However the requestResponse block in combination with p2pRequest cannot be
|
||||
# used due to the unfortunate fact that the packet IDs are not consecutive,
|
||||
# and nextID is not recognized in between these. The nextID behaviour could
|
||||
# be fixed, however it would be cleaner if the specification could be
|
||||
# changed to have these IDs to be consecutive.
|
||||
|
||||
# 'Runner' calls ---------------------------------------------------------------
|
||||
|
||||
proc processQueue(peer: Peer) =
|
||||
# Send to peer all valid and previously not send envelopes in the queue.
|
||||
var
|
||||
envelopes: seq[Envelope] = @[]
|
||||
wakuPeer = peer.state(Waku)
|
||||
wakuNet = peer.networkState(Waku)
|
||||
|
||||
for message in wakuNet.queue.items:
|
||||
if wakuPeer.received.contains(message.hash):
|
||||
# trace "message was already send to peer", hash = $message.hash, peer
|
||||
continue
|
||||
|
||||
if message.pow < wakuPeer.powRequirement:
|
||||
trace "Message PoW too low for peer", pow = message.pow,
|
||||
powReq = wakuPeer.powRequirement
|
||||
continue
|
||||
|
||||
if wakuPeer.topics.isSome():
|
||||
if message.env.topic notin wakuPeer.topics.get():
|
||||
trace "Message does not match topics list"
|
||||
continue
|
||||
else:
|
||||
if not bloomFilterMatch(wakuPeer.bloom, message.bloom):
|
||||
trace "Message does not match peer bloom filter"
|
||||
continue
|
||||
|
||||
trace "Adding envelope"
|
||||
envelopes.add(message.env)
|
||||
wakuPeer.received.incl(message.hash)
|
||||
|
||||
if envelopes.len() > 0:
|
||||
trace "Sending envelopes", amount=envelopes.len
|
||||
# Ignore failure of sending messages, this could occur when the connection
|
||||
# gets dropped
|
||||
traceAsyncErrors peer.messages(envelopes)
|
||||
|
||||
proc run(peer: Peer) {.async.} =
|
||||
while peer.connectionState notin {Disconnecting, Disconnected}:
|
||||
peer.processQueue()
|
||||
await sleepAsync(messageInterval)
|
||||
|
||||
proc pruneReceived(node: EthereumNode) {.raises: [].} =
|
||||
if node.peerPool != nil: # XXX: a bit dirty to need to check for this here ...
|
||||
var wakuNet = node.protocolState(Waku)
|
||||
|
||||
for peer in node.protocolPeers(Waku):
|
||||
if not peer.initialized:
|
||||
continue
|
||||
|
||||
# NOTE: Perhaps alter the queue prune call to keep track of a HashSet
|
||||
# of pruned messages (as these should be smaller), and diff this with
|
||||
# the received sets.
|
||||
peer.received = intersection(peer.received, wakuNet.queue.itemHashes)
|
||||
|
||||
proc run(node: EthereumNode, network: WakuNetwork) {.async.} =
|
||||
while true:
|
||||
# prune message queue every second
|
||||
# TTL unit is in seconds, so this should be sufficient?
|
||||
network.queue[].prune()
|
||||
# pruning the received sets is not necessary for correct workings
|
||||
# but simply from keeping the sets growing indefinitely
|
||||
node.pruneReceived()
|
||||
await sleepAsync(pruneInterval)
|
||||
|
||||
# Private EthereumNode calls ---------------------------------------------------
|
||||
|
||||
proc sendP2PMessage(node: EthereumNode, peerId: NodeId,
|
||||
envelopes: openarray[Envelope]): bool =
|
||||
for peer in node.peers(Waku):
|
||||
if peer.remote.id == peerId:
|
||||
asyncCheck peer.p2pMessage(envelopes)
|
||||
return true
|
||||
|
||||
proc queueMessage(node: EthereumNode, msg: Message): bool =
|
||||
|
||||
var wakuNet = node.protocolState(Waku)
|
||||
# We have to do the same checks here as in the messages proc not to leak
|
||||
# any information that the message originates from this node.
|
||||
if not msg.allowed(wakuNet.config):
|
||||
return false
|
||||
|
||||
trace "Adding message to queue", hash = $msg.hash
|
||||
if wakuNet.queue[].add(msg):
|
||||
# Also notify our own filters of the message we are sending,
|
||||
# e.g. msg from local Dapp to Dapp
|
||||
wakuNet.filters.notify(msg)
|
||||
|
||||
return true
|
||||
|
||||
# Public EthereumNode calls ----------------------------------------------------
|
||||
|
||||
proc postMessage*(node: EthereumNode, pubKey = none[PublicKey](),
|
||||
symKey = none[SymKey](), src = none[PrivateKey](),
|
||||
ttl: uint32, topic: Topic, payload: seq[byte],
|
||||
padding = none[seq[byte]](), powTime = 1'f,
|
||||
powTarget = defaultMinPow,
|
||||
targetPeer = none[NodeId]()): bool =
|
||||
## Post a message on the message queue which will be processed at the
|
||||
## next `messageInterval`.
|
||||
##
|
||||
## NOTE: This call allows a post without encryption. If encryption is
|
||||
## mandatory it should be enforced a layer up
|
||||
let payload = encode(Payload(payload: payload, src: src, dst: pubKey,
|
||||
symKey: symKey, padding: padding))
|
||||
if payload.isSome():
|
||||
var env = Envelope(expiry:epochTime().uint32 + ttl,
|
||||
ttl: ttl, topic: topic, data: payload.get(), nonce: 0)
|
||||
|
||||
# Allow lightnode to post only direct p2p messages
|
||||
if targetPeer.isSome():
|
||||
return node.sendP2PMessage(targetPeer.get(), [env])
|
||||
else:
|
||||
# non direct p2p message can not have ttl of 0
|
||||
if env.ttl == 0:
|
||||
return false
|
||||
var msg = initMessage(env, powCalc = false)
|
||||
# XXX: make this non blocking or not?
|
||||
# In its current blocking state, it could be noticed by a peer that no
|
||||
# messages are send for a while, and thus that mining PoW is done, and
|
||||
# that next messages contains a message originated from this peer
|
||||
# zah: It would be hard to execute this in a background thread at the
|
||||
# moment. We'll need a way to send custom "tasks" to the async message
|
||||
# loop (e.g. AD2 support for AsyncChannels).
|
||||
if not msg.sealEnvelope(powTime, powTarget):
|
||||
return false
|
||||
|
||||
# need to check expiry after mining PoW
|
||||
if not msg.env.valid():
|
||||
return false
|
||||
|
||||
result = node.queueMessage(msg)
|
||||
|
||||
# Allows light nodes to post via untrusted messages packet.
|
||||
# Queue gets processed immediatly as the node sends only its own messages,
|
||||
# so the privacy ship has already sailed anyhow.
|
||||
# TODO:
|
||||
# - Could be still a concern in terms of efficiency, if multiple messages
|
||||
# need to be send.
|
||||
# - For Waku Mode, the checks in processQueue are rather useless as the
|
||||
# idea is to connect only to 1 node? Also refactor in that case.
|
||||
if node.protocolState(Waku).config.isLightNode:
|
||||
for peer in node.peers(Waku):
|
||||
peer.processQueue()
|
||||
else:
|
||||
error "Encoding of payload failed"
|
||||
return false
|
||||
|
||||
proc subscribeFilter*(node: EthereumNode, filter: Filter,
|
||||
handler:FilterMsgHandler = nil): string =
|
||||
## Initiate a filter for incoming/outgoing messages. Messages can be
|
||||
## retrieved with the `getFilterMessages` call or with a provided
|
||||
## `FilterMsgHandler`.
|
||||
##
|
||||
## NOTE: This call allows for a filter without decryption. If encryption is
|
||||
## mandatory it should be enforced a layer up.
|
||||
return node.protocolState(Waku).filters.subscribeFilter(filter, handler)
|
||||
|
||||
proc unsubscribeFilter*(node: EthereumNode, filterId: string): bool =
|
||||
## Remove a previously subscribed filter.
|
||||
var filter: Filter
|
||||
return node.protocolState(Waku).filters.take(filterId, filter)
|
||||
|
||||
proc getFilterMessages*(node: EthereumNode, filterId: string): seq[ReceivedMessage] =
|
||||
## Get all the messages currently in the filter queue. This will reset the
|
||||
## filter message queue.
|
||||
return node.protocolState(Waku).filters.getFilterMessages(filterId)
|
||||
|
||||
proc filtersToBloom*(node: EthereumNode): Bloom =
|
||||
## Returns the bloom filter of all topics of all subscribed filters.
|
||||
return node.protocolState(Waku).filters.toBloom()
|
||||
|
||||
proc setPowRequirement*(node: EthereumNode, powReq: float64) {.async.} =
|
||||
## Sets the PoW requirement for this node, will also send
|
||||
## this new PoW requirement to all connected peers.
|
||||
##
|
||||
## Failures when sending messages to peers will not be reported.
|
||||
# NOTE: do we need a tolerance of old PoW for some time?
|
||||
node.protocolState(Waku).config.powRequirement = powReq
|
||||
var futures: seq[Future[void]] = @[]
|
||||
let list = StatusOptions(powRequirement: some(powReq))
|
||||
for peer in node.peers(Waku):
|
||||
futures.add(peer.statusOptions(list))
|
||||
|
||||
# Exceptions from sendMsg will not be raised
|
||||
await allFutures(futures)
|
||||
|
||||
proc setBloomFilter*(node: EthereumNode, bloom: Bloom) {.async.} =
|
||||
## Sets the bloom filter for this node, will also send
|
||||
## this new bloom filter to all connected peers.
|
||||
##
|
||||
## Failures when sending messages to peers will not be reported.
|
||||
# NOTE: do we need a tolerance of old bloom filter for some time?
|
||||
node.protocolState(Waku).config.bloom = some(bloom)
|
||||
# reset topics
|
||||
node.protocolState(Waku).config.topics = none(seq[Topic])
|
||||
|
||||
var futures: seq[Future[void]] = @[]
|
||||
let list = StatusOptions(bloomFilter: some(bloom))
|
||||
for peer in node.peers(Waku):
|
||||
futures.add(peer.statusOptions(list))
|
||||
|
||||
# Exceptions from sendMsg will not be raised
|
||||
await allFutures(futures)
|
||||
|
||||
proc setTopicInterest*(node: EthereumNode, topics: seq[Topic]):
|
||||
Future[bool] {.async.} =
|
||||
if topics.len > topicInterestMax:
|
||||
return false
|
||||
|
||||
node.protocolState(Waku).config.topics = some(topics)
|
||||
|
||||
var futures: seq[Future[void]] = @[]
|
||||
let list = StatusOptions(topicInterest: some(topics))
|
||||
for peer in node.peers(Waku):
|
||||
futures.add(peer.statusOptions(list))
|
||||
|
||||
# Exceptions from sendMsg will not be raised
|
||||
await allFutures(futures)
|
||||
|
||||
return true
|
||||
|
||||
proc setMaxMessageSize*(node: EthereumNode, size: uint32): bool =
|
||||
## Set the maximum allowed message size.
|
||||
## Can not be set higher than ``defaultMaxMsgSize``.
|
||||
if size > defaultMaxMsgSize:
|
||||
warn "size > defaultMaxMsgSize"
|
||||
return false
|
||||
node.protocolState(Waku).config.maxMsgSize = size
|
||||
return true
|
||||
|
||||
proc setPeerTrusted*(node: EthereumNode, peerId: NodeId): bool =
|
||||
## Set a connected peer as trusted.
|
||||
for peer in node.peers(Waku):
|
||||
if peer.remote.id == peerId:
|
||||
peer.state(Waku).trusted = true
|
||||
return true
|
||||
|
||||
proc setLightNode*(node: EthereumNode, isLightNode: bool) {.async.} =
|
||||
## Set this node as a Waku light node.
|
||||
node.protocolState(Waku).config.isLightNode = isLightNode
|
||||
# TODO: Add starting/stopping of `processQueue` loop depending on value of isLightNode.
|
||||
var futures: seq[Future[void]] = @[]
|
||||
let list = StatusOptions(lightNode: some(isLightNode))
|
||||
for peer in node.peers(Waku):
|
||||
futures.add(peer.statusOptions(list))
|
||||
|
||||
# Exceptions from sendMsg will not be raised
|
||||
await allFutures(futures)
|
||||
|
||||
proc configureWaku*(node: EthereumNode, config: WakuConfig) =
|
||||
## Apply a Waku configuration.
|
||||
##
|
||||
## NOTE: Should be run before connection is made with peers as some
|
||||
## of the settings are only communicated at peer handshake.
|
||||
node.protocolState(Waku).config = config
|
||||
|
||||
proc resetMessageQueue*(node: EthereumNode) =
|
||||
## Full reset of the message queue.
|
||||
##
|
||||
## NOTE: Not something that should be run in normal circumstances.
|
||||
node.protocolState(Waku).queue[] = initQueue(defaultQueueCapacity)
|
|
@ -1,95 +0,0 @@
|
|||
#
|
||||
# Ethereum P2P
|
||||
# (c) Copyright 2018
|
||||
# Status Research & Development GmbH
|
||||
#
|
||||
# Licensed under either of
|
||||
# Apache License, version 2.0, (LICENSE-APACHEv2)
|
||||
# MIT license (LICENSE-MIT)
|
||||
|
||||
import
|
||||
sequtils, unittest, tables, chronos, eth/p2p, eth/p2p/peer_pool,
|
||||
eth/p2p/rlpx_protocols/waku_bridge,
|
||||
eth/p2p/rlpx_protocols/waku_protocol as waku,
|
||||
eth/p2p/rlpx_protocols/whisper_protocol as whisper,
|
||||
./p2p_test_helper
|
||||
|
||||
let safeTTL = 5'u32
|
||||
let waitInterval = waku.messageInterval + 150.milliseconds
|
||||
|
||||
procSuite "Waku - Whisper bridge tests":
|
||||
# Waku Whisper node has both capabilities, listens to Whisper and Waku and
|
||||
# relays traffic between the two.
|
||||
var
|
||||
nodeWakuWhisper = setupTestNode(Whisper, Waku) # This will be the bridge
|
||||
nodeWhisper = setupTestNode(Whisper)
|
||||
nodeWaku = setupTestNode(Waku)
|
||||
|
||||
nodeWakuWhisper.startListening()
|
||||
let bridgeNode = newNode(nodeWakuWhisper.toENode())
|
||||
nodeWakuWhisper.shareMessageQueue()
|
||||
|
||||
waitFor nodeWhisper.peerPool.connectToNode(bridgeNode)
|
||||
waitFor nodeWaku.peerPool.connectToNode(bridgeNode)
|
||||
|
||||
asyncTest "WakuWhisper and Whisper peers connected":
|
||||
check:
|
||||
nodeWhisper.peerPool.connectedNodes.len() == 1
|
||||
nodeWaku.peerPool.connectedNodes.len() == 1
|
||||
|
||||
asyncTest "Whisper - Waku communcation via bridge":
|
||||
# topic whisper node subscribes to, waku node posts to
|
||||
let topic1 = [byte 0x12, 0, 0, 0]
|
||||
# topic waku node subscribes to, whisper node posts to
|
||||
let topic2 = [byte 0x34, 0, 0, 0]
|
||||
var payloads = [repeat(byte 0, 10), repeat(byte 1, 10)]
|
||||
var futures = [newFuture[int](), newFuture[int]()]
|
||||
|
||||
proc handler1(msg: whisper.ReceivedMessage) =
|
||||
check msg.decoded.payload == payloads[0]
|
||||
futures[0].complete(1)
|
||||
proc handler2(msg: waku.ReceivedMessage) =
|
||||
check msg.decoded.payload == payloads[1]
|
||||
futures[1].complete(1)
|
||||
|
||||
var filter1 = whisper.subscribeFilter(nodeWhisper,
|
||||
whisper.initFilter(topics = @[topic1]), handler1)
|
||||
var filter2 = waku.subscribeFilter(nodeWaku,
|
||||
waku.initFilter(topics = @[topic2]), handler2)
|
||||
|
||||
check:
|
||||
# Message should also end up in the Whisper node its queue via the bridge
|
||||
waku.postMessage(nodeWaku, ttl = safeTTL + 1, topic = topic1,
|
||||
payload = payloads[0]) == true
|
||||
# Message should also end up in the Waku node its queue via the bridge
|
||||
whisper.postMessage(nodeWhisper, ttl = safeTTL, topic = topic2,
|
||||
payload = payloads[1]) == true
|
||||
nodeWhisper.protocolState(Whisper).queue.items.len == 1
|
||||
nodeWaku.protocolState(Waku).queue.items.len == 1
|
||||
|
||||
# waitInterval*2 as messages have to pass the bridge also (2 hops)
|
||||
await allFutures(futures).withTimeout(waitInterval*2)
|
||||
|
||||
# Relay can receive Whisper & Waku messages
|
||||
nodeWakuWhisper.protocolState(Whisper).queue.items.len == 2
|
||||
nodeWakuWhisper.protocolState(Waku).queue.items.len == 2
|
||||
|
||||
# Whisper node can receive Waku messages (via bridge)
|
||||
nodeWhisper.protocolState(Whisper).queue.items.len == 2
|
||||
# Waku node can receive Whisper messages (via bridge)
|
||||
nodeWaku.protocolState(Waku).queue.items.len == 2
|
||||
|
||||
whisper.unsubscribeFilter(nodeWhisper, filter1) == true
|
||||
waku.unsubscribeFilter(nodeWaku, filter2) == true
|
||||
|
||||
# XXX: This reads a bit weird, but eh
|
||||
waku.resetMessageQueue(nodeWaku)
|
||||
whisper.resetMessageQueue(nodeWhisper)
|
||||
# shared queue so Waku and Whisper should be set to 0
|
||||
waku.resetMessageQueue(nodeWakuWhisper)
|
||||
|
||||
check:
|
||||
nodeWhisper.protocolState(Whisper).queue.items.len == 0
|
||||
nodeWaku.protocolState(Waku).queue.items.len == 0
|
||||
nodeWakuWhisper.protocolState(Whisper).queue.items.len == 0
|
||||
nodeWakuWhisper.protocolState(Waku).queue.items.len == 0
|
|
@ -1,242 +0,0 @@
|
|||
#
|
||||
# Waku
|
||||
# (c) Copyright 2019
|
||||
# Status Research & Development GmbH
|
||||
#
|
||||
# Licensed under either of
|
||||
# Apache License, version 2.0, (LICENSE-APACHEv2)
|
||||
# MIT license (LICENSE-MIT)
|
||||
|
||||
import
|
||||
sequtils, tables, unittest, chronos, eth/[keys, p2p],
|
||||
eth/p2p/rlpx_protocols/waku_protocol, eth/p2p/peer_pool,
|
||||
./p2p_test_helper
|
||||
|
||||
const
|
||||
safeTTL = 5'u32
|
||||
waitInterval = messageInterval + 150.milliseconds
|
||||
conditionTimeoutMs = 3000
|
||||
|
||||
# check on a condition until true or return a future containing false
|
||||
# if timeout expires first
|
||||
proc eventually(timeout: int, condition: proc(): bool {.gcsafe.}): Future[bool] =
|
||||
let wrappedCondition = proc(): Future[bool] {.async.} =
|
||||
let f = newFuture[bool]()
|
||||
while not condition():
|
||||
await sleepAsync(100.milliseconds)
|
||||
f.complete(true)
|
||||
return await f
|
||||
return withTimeout(wrappedCondition(), timeout)
|
||||
|
||||
# TODO: Just repeat all the test_shh_connect tests here that are applicable or
|
||||
# have some commonly shared test code for both protocols.
|
||||
suite "Waku connections":
|
||||
asyncTest "Waku connections":
|
||||
var
|
||||
n1 = setupTestNode(Waku)
|
||||
n2 = setupTestNode(Waku)
|
||||
n3 = setupTestNode(Waku)
|
||||
n4 = setupTestNode(Waku)
|
||||
|
||||
var topics: seq[Topic]
|
||||
n1.protocolState(Waku).config.topics = some(topics)
|
||||
n2.protocolState(Waku).config.topics = some(topics)
|
||||
n3.protocolState(Waku).config.topics = none(seq[Topic])
|
||||
n4.protocolState(Waku).config.topics = none(seq[Topic])
|
||||
|
||||
n1.startListening()
|
||||
n3.startListening()
|
||||
|
||||
let
|
||||
p1 = await n2.rlpxConnect(newNode(n1.toENode()))
|
||||
p2 = await n2.rlpxConnect(newNode(n3.toENode()))
|
||||
p3 = await n4.rlpxConnect(newNode(n3.toENode()))
|
||||
check:
|
||||
p1.isNil
|
||||
p2.isNil == false
|
||||
p3.isNil == false
|
||||
|
||||
asyncTest "Waku set-topic-interest":
|
||||
var
|
||||
wakuTopicNode = setupTestNode(Waku)
|
||||
wakuNode = setupTestNode(Waku)
|
||||
|
||||
let
|
||||
topic1 = [byte 0xDA, 0xDA, 0xDA, 0xAA]
|
||||
topic2 = [byte 0xD0, 0xD0, 0xD0, 0x00]
|
||||
wrongTopic = [byte 0x4B, 0x1D, 0x4B, 0x1D]
|
||||
|
||||
# Set one topic so we are not considered a full node
|
||||
wakuTopicNode.protocolState(Waku).config.topics = some(@[topic1])
|
||||
|
||||
wakuNode.startListening()
|
||||
await wakuTopicNode.peerPool.connectToNode(newNode(wakuNode.toENode()))
|
||||
|
||||
# Update topic interest
|
||||
check:
|
||||
await setTopicInterest(wakuTopicNode, @[topic1, topic2])
|
||||
|
||||
let payload = repeat(byte 0, 10)
|
||||
check:
|
||||
wakuNode.postMessage(ttl = safeTTL, topic = topic1, payload = payload)
|
||||
wakuNode.postMessage(ttl = safeTTL, topic = topic2, payload = payload)
|
||||
wakuNode.postMessage(ttl = safeTTL, topic = wrongTopic, payload = payload)
|
||||
wakuNode.protocolState(Waku).queue.items.len == 3
|
||||
await sleepAsync(waitInterval)
|
||||
check:
|
||||
wakuTopicNode.protocolState(Waku).queue.items.len == 2
|
||||
|
||||
asyncTest "Waku set-minimum-pow":
|
||||
var
|
||||
wakuPowNode = setupTestNode(Waku)
|
||||
wakuNode = setupTestNode(Waku)
|
||||
|
||||
wakuNode.startListening()
|
||||
await wakuPowNode.peerPool.connectToNode(newNode(wakuNode.toENode()))
|
||||
|
||||
# Update minimum pow
|
||||
await setPowRequirement(wakuPowNode, 1.0)
|
||||
await sleepAsync(waitInterval)
|
||||
|
||||
check:
|
||||
wakuNode.peerPool.len == 1
|
||||
|
||||
# check powRequirement is updated
|
||||
for peer in wakuNode.peerPool.peers:
|
||||
check:
|
||||
peer.state(Waku).powRequirement == 1.0
|
||||
|
||||
asyncTest "Waku set-light-node":
|
||||
var
|
||||
wakuLightNode = setupTestNode(Waku)
|
||||
wakuNode = setupTestNode(Waku)
|
||||
|
||||
wakuNode.startListening()
|
||||
await wakuLightNode.peerPool.connectToNode(newNode(wakuNode.toENode()))
|
||||
|
||||
# Update minimum pow
|
||||
await setLightNode(wakuLightNode, true)
|
||||
await sleepAsync(waitInterval)
|
||||
|
||||
check:
|
||||
wakuNode.peerPool.len == 1
|
||||
|
||||
# check lightNode is updated
|
||||
for peer in wakuNode.peerPool.peers:
|
||||
check:
|
||||
peer.state(Waku).isLightNode
|
||||
|
||||
asyncTest "Waku set-bloom-filter":
|
||||
var
|
||||
wakuBloomNode = setupTestNode(Waku)
|
||||
wakuNode = setupTestNode(Waku)
|
||||
bloom = fullBloom()
|
||||
topics = @[[byte 0xDA, 0xDA, 0xDA, 0xAA]]
|
||||
|
||||
# Set topic interest
|
||||
discard await wakuBloomNode.setTopicInterest(topics)
|
||||
|
||||
wakuBloomNode.startListening()
|
||||
await wakuNode.peerPool.connectToNode(newNode(wakuBloomNode.toENode()))
|
||||
|
||||
# Sanity check
|
||||
check:
|
||||
wakuNode.peerPool.len == 1
|
||||
|
||||
# check bloom filter is updated
|
||||
for peer in wakuNode.peerPool.peers:
|
||||
check:
|
||||
peer.state(Waku).bloom == bloom
|
||||
peer.state(Waku).topics == some(topics)
|
||||
|
||||
let hasBloomNodeConnectedCondition = proc(): bool = wakuBloomNode.peerPool.len == 1
|
||||
# wait for the peer to be connected on the other side
|
||||
let hasBloomNodeConnected = await eventually(conditionTimeoutMs, hasBloomNodeConnectedCondition)
|
||||
# check bloom filter is updated
|
||||
check:
|
||||
hasBloomNodeConnected
|
||||
|
||||
# disable one bit in the bloom filter
|
||||
bloom[0] = 0x0
|
||||
|
||||
# and set it
|
||||
await setBloomFilter(wakuBloomNode, bloom)
|
||||
|
||||
let bloomFilterUpdatedCondition = proc(): bool =
|
||||
for peer in wakuNode.peerPool.peers:
|
||||
return peer.state(Waku).bloom == bloom and peer.state(Waku).topics == none(seq[Topic])
|
||||
|
||||
let bloomFilterUpdated = await eventually(conditionTimeoutMs, bloomFilterUpdatedCondition)
|
||||
# check bloom filter is updated
|
||||
check:
|
||||
bloomFilterUpdated
|
||||
|
||||
asyncTest "Waku topic-interest":
|
||||
var
|
||||
wakuTopicNode = setupTestNode(Waku)
|
||||
wakuNode = setupTestNode(Waku)
|
||||
|
||||
let
|
||||
topic1 = [byte 0xDA, 0xDA, 0xDA, 0xAA]
|
||||
topic2 = [byte 0xD0, 0xD0, 0xD0, 0x00]
|
||||
wrongTopic = [byte 0x4B, 0x1D, 0x4B, 0x1D]
|
||||
|
||||
wakuTopicNode.protocolState(Waku).config.topics = some(@[topic1, topic2])
|
||||
|
||||
wakuNode.startListening()
|
||||
await wakuTopicNode.peerPool.connectToNode(newNode(wakuNode.toENode()))
|
||||
|
||||
let payload = repeat(byte 0, 10)
|
||||
check:
|
||||
wakuNode.postMessage(ttl = safeTTL, topic = topic1, payload = payload)
|
||||
wakuNode.postMessage(ttl = safeTTL, topic = topic2, payload = payload)
|
||||
wakuNode.postMessage(ttl = safeTTL, topic = wrongTopic, payload = payload)
|
||||
wakuNode.protocolState(Waku).queue.items.len == 3
|
||||
|
||||
let response = await eventually(conditionTimeoutMs, proc (): bool = wakuTopicNode.protocolState(Waku).queue.items.len == 2)
|
||||
check:
|
||||
response
|
||||
|
||||
asyncTest "Waku topic-interest versus bloom filter":
|
||||
var
|
||||
wakuTopicNode = setupTestNode(Waku)
|
||||
wakuNode = setupTestNode(Waku)
|
||||
|
||||
let
|
||||
topic1 = [byte 0xDA, 0xDA, 0xDA, 0xAA]
|
||||
topic2 = [byte 0xD0, 0xD0, 0xD0, 0x00]
|
||||
bloomTopic = [byte 0x4B, 0x1D, 0x4B, 0x1D]
|
||||
|
||||
# It was checked that the topics don't trigger false positives on the bloom.
|
||||
wakuTopicNode.protocolState(Waku).config.topics = some(@[topic1, topic2])
|
||||
wakuTopicNode.protocolState(Waku).config.bloom = some(toBloom([bloomTopic]))
|
||||
|
||||
wakuNode.startListening()
|
||||
await wakuTopicNode.peerPool.connectToNode(newNode(wakuNode.toENode()))
|
||||
|
||||
let payload = repeat(byte 0, 10)
|
||||
check:
|
||||
wakuNode.postMessage(ttl = safeTTL, topic = topic1, payload = payload)
|
||||
wakuNode.postMessage(ttl = safeTTL, topic = topic2, payload = payload)
|
||||
wakuNode.postMessage(ttl = safeTTL, topic = bloomTopic, payload = payload)
|
||||
wakuNode.protocolState(Waku).queue.items.len == 3
|
||||
await sleepAsync(waitInterval)
|
||||
check:
|
||||
wakuTopicNode.protocolState(Waku).queue.items.len == 2
|
||||
|
||||
asyncTest "Light node posting":
|
||||
var ln = setupTestNode(Waku)
|
||||
await ln.setLightNode(true)
|
||||
var fn = setupTestNode(Waku)
|
||||
fn.startListening()
|
||||
await ln.peerPool.connectToNode(newNode(fn.toENode()))
|
||||
|
||||
let topic = [byte 0, 0, 0, 0]
|
||||
|
||||
check:
|
||||
ln.peerPool.connectedNodes.len() == 1
|
||||
# normal post
|
||||
ln.postMessage(ttl = safeTTL, topic = topic,
|
||||
payload = repeat(byte 0, 10)) == true
|
||||
ln.protocolState(Waku).queue.items.len == 1
|
||||
# TODO: add test on message relaying
|
|
@ -1,116 +0,0 @@
|
|||
import
|
||||
unittest, chronos, tables, sequtils, times,
|
||||
eth/[p2p, async_utils], eth/p2p/peer_pool,
|
||||
eth/p2p/rlpx_protocols/[waku_protocol, waku_mail],
|
||||
./p2p_test_helper
|
||||
|
||||
const
|
||||
transmissionTimeout = chronos.milliseconds(100)
|
||||
|
||||
proc waitForConnected(node: EthereumNode) {.async.} =
|
||||
while node.peerPool.connectedNodes.len == 0:
|
||||
await sleepAsync(chronos.milliseconds(1))
|
||||
|
||||
procSuite "Waku Mail Client":
|
||||
var client = setupTestNode(Waku)
|
||||
var simpleServer = setupTestNode(Waku)
|
||||
|
||||
simpleServer.startListening()
|
||||
let simpleServerNode = newNode(simpleServer.toENode())
|
||||
let clientNode = newNode(client.toENode())
|
||||
waitFor client.peerPool.connectToNode(simpleServerNode)
|
||||
require:
|
||||
waitFor simpleServer.waitForConnected().withTimeout(transmissionTimeout)
|
||||
|
||||
asyncTest "Two peers connected":
|
||||
check:
|
||||
client.peerPool.connectedNodes.len() == 1
|
||||
simpleServer.peerPool.connectedNodes.len() == 1
|
||||
|
||||
asyncTest "Mail Request and Request Complete":
|
||||
let
|
||||
topic = [byte 0, 0, 0, 0]
|
||||
bloom = toBloom(@[topic])
|
||||
lower = 0'u32
|
||||
upper = epochTime().uint32
|
||||
limit = 100'u32
|
||||
request = MailRequest(lower: lower, upper: upper, bloom: @bloom,
|
||||
limit: limit)
|
||||
|
||||
var symKey: SymKey
|
||||
check client.setPeerTrusted(simpleServerNode.id)
|
||||
var cursorFut = client.requestMail(simpleServerNode.id, request, symKey, 1)
|
||||
|
||||
# Simple mailserver part
|
||||
let peer = simpleServer.peerPool.connectedNodes[clientNode]
|
||||
var f = peer.nextMsg(Waku.p2pRequest)
|
||||
require await f.withTimeout(transmissionTimeout)
|
||||
let response = f.read()
|
||||
let decoded = decode(response.envelope.data, symKey = some(symKey))
|
||||
require decoded.isSome()
|
||||
|
||||
var rlp = rlpFromBytes(decoded.get().payload)
|
||||
let output = rlp.read(MailRequest)
|
||||
check:
|
||||
output.lower == lower
|
||||
output.upper == upper
|
||||
output.bloom == bloom
|
||||
output.limit == limit
|
||||
|
||||
var dummy: Hash
|
||||
await peer.p2pRequestComplete(dummy, dummy, @[])
|
||||
|
||||
check await cursorFut.withTimeout(transmissionTimeout)
|
||||
|
||||
asyncTest "Mail Send":
|
||||
let topic = [byte 0x12, 0x34, 0x56, 0x78]
|
||||
let payload = repeat(byte 0, 10)
|
||||
var f = newFuture[int]()
|
||||
|
||||
proc handler(msg: ReceivedMessage) =
|
||||
check msg.decoded.payload == payload
|
||||
f.complete(1)
|
||||
|
||||
let filter = subscribeFilter(client,
|
||||
initFilter(topics = @[topic], allowP2P = true), handler)
|
||||
|
||||
check:
|
||||
client.setPeerTrusted(simpleServerNode.id)
|
||||
# ttl 0 to show that ttl should be ignored
|
||||
# TODO: perhaps not the best way to test this, means no PoW calculation
|
||||
# may be done, and not sure if that is OK?
|
||||
simpleServer.postMessage(ttl = 0, topic = topic, payload = payload,
|
||||
targetPeer = some(clientNode.id))
|
||||
|
||||
await f.withTimeout(transmissionTimeout)
|
||||
|
||||
client.unsubscribeFilter(filter)
|
||||
|
||||
asyncTest "Multiple Client Request and Complete":
|
||||
var count = 5
|
||||
proc customHandler(peer: Peer, envelope: Envelope)=
|
||||
var envelopes: seq[Envelope]
|
||||
traceAsyncErrors peer.p2pMessage(envelopes)
|
||||
|
||||
var cursor: seq[byte]
|
||||
count = count - 1
|
||||
if count == 0:
|
||||
cursor = @[]
|
||||
else:
|
||||
cursor = @[byte count]
|
||||
|
||||
var dummy: Hash
|
||||
traceAsyncErrors peer.p2pRequestComplete(dummy, dummy, cursor)
|
||||
|
||||
simpleServer.enableMailServer(customHandler)
|
||||
check client.setPeerTrusted(simpleServerNode.id)
|
||||
var request: MailRequest
|
||||
var symKey: SymKey
|
||||
let cursor =
|
||||
await client.requestMail(simpleServerNode.id, request, symKey, 5)
|
||||
require cursor.isSome()
|
||||
check:
|
||||
cursor.get().len == 0
|
||||
count == 0
|
||||
|
||||
# TODO: Also check for received envelopes.
|
Loading…
Reference in New Issue