2019-11-15 07:31:06 +00:00
|
|
|
#
|
2019-11-15 07:32:46 +00:00
|
|
|
# Waku
|
2019-11-15 07:31:06 +00:00
|
|
|
# (c) Copyright 2018-2019
|
|
|
|
# Status Research & Development GmbH
|
|
|
|
#
|
|
|
|
# Licensed under either of
|
|
|
|
# Apache License, version 2.0, (LICENSE-APACHEv2)
|
|
|
|
# MIT license (LICENSE-MIT)
|
|
|
|
#
|
|
|
|
|
2019-11-15 07:32:46 +00:00
|
|
|
## Waku
|
2019-11-15 07:31:06 +00:00
|
|
|
## *******
|
|
|
|
##
|
2019-11-15 07:32:46 +00:00
|
|
|
## Waku is a fork of Whisper.
|
|
|
|
##
|
|
|
|
## Waku is a gossip protocol that synchronizes a set of messages across nodes
|
2019-11-15 07:31:06 +00:00
|
|
|
## with attention given to sender and recipient anonymitiy. Messages are
|
|
|
|
## categorized by a topic and stay alive in the network based on a time-to-live
|
|
|
|
## measured in seconds. Spam prevention is based on proof-of-work, where large
|
|
|
|
## or long-lived messages must spend more work.
|
|
|
|
##
|
2019-12-19 21:48:51 +00:00
|
|
|
## Implementation should be according to Waku specification defined here:
|
|
|
|
## https://github.com/vacp2p/specs/blob/master/waku/waku.md
|
|
|
|
##
|
2019-11-15 07:31:06 +00:00
|
|
|
## Example usage
|
|
|
|
## ----------
|
|
|
|
## First an `EthereumNode` needs to be created, either with all capabilities set
|
2019-11-15 07:32:46 +00:00
|
|
|
## or with specifically the Waku capability set.
|
2019-11-15 07:31:06 +00:00
|
|
|
## The latter can be done like this:
|
|
|
|
##
|
|
|
|
## .. code-block::nim
|
|
|
|
## var node = newEthereumNode(keypair, address, netId, nil,
|
|
|
|
## addAllCapabilities = false)
|
2019-11-15 07:32:46 +00:00
|
|
|
## node.addCapability Waku
|
2019-11-15 07:31:06 +00:00
|
|
|
##
|
|
|
|
## Now calls such as ``postMessage`` and ``subscribeFilter`` can be done.
|
|
|
|
## However, they only make real sense after ``connectToNetwork`` was started. As
|
|
|
|
## else there will be no peers to send and receive messages from.
|
|
|
|
|
|
|
|
import
|
2019-12-17 22:57:34 +00:00
|
|
|
options, tables, times, chronos, chronicles, metrics,
|
2019-12-10 20:49:54 +00:00
|
|
|
eth/[keys, async_utils, p2p], whisper/whisper_types, eth/trie/trie_defs
|
2019-11-19 12:53:38 +00:00
|
|
|
|
|
|
|
export
|
|
|
|
whisper_types
|
2019-11-15 07:31:06 +00:00
|
|
|
|
|
|
|
logScope:
|
2019-11-15 07:32:46 +00:00
|
|
|
topics = "waku"
|
2019-11-15 07:31:06 +00:00
|
|
|
|
2019-12-17 22:57:34 +00:00
|
|
|
declareCounter valid_envelopes,
|
|
|
|
"Received & posted valid envelopes"
|
|
|
|
declareCounter dropped_low_pow_envelopes,
|
|
|
|
"Dropped envelopes because of too low PoW"
|
|
|
|
declareCounter dropped_too_large_envelopes,
|
|
|
|
"Dropped envelopes because larger than maximum allowed size"
|
|
|
|
declareCounter dropped_bloom_filter_mismatch_envelopes,
|
|
|
|
"Dropped envelopes because not matching with bloom filter"
|
|
|
|
declareCounter dropped_topic_mismatch_envelopes,
|
|
|
|
"Dropped envelopes because of not matching topics"
|
|
|
|
declareCounter dropped_benign_duplicate_envelopes,
|
|
|
|
"Dropped benign duplicate envelopes"
|
|
|
|
declareCounter dropped_malicious_duplicate_envelopes,
|
|
|
|
"Dropped malicious duplicate envelopes"
|
|
|
|
|
2019-11-15 07:31:06 +00:00
|
|
|
const
|
|
|
|
defaultQueueCapacity = 256
|
2019-11-19 03:13:00 +00:00
|
|
|
wakuVersion* = 0 ## Waku version.
|
2019-11-15 07:32:46 +00:00
|
|
|
wakuVersionStr* = $wakuVersion ## Waku version.
|
2019-11-15 07:31:06 +00:00
|
|
|
defaultMinPow* = 0.2'f64 ## The default minimum PoW requirement for this node.
|
|
|
|
defaultMaxMsgSize* = 1024'u32 * 1024'u32 ## The current default and max
|
|
|
|
## message size. This can never be larger than the maximum RLPx message size.
|
|
|
|
messageInterval* = chronos.milliseconds(300) ## Interval at which messages are
|
|
|
|
## send to peers, in ms.
|
|
|
|
pruneInterval* = chronos.milliseconds(1000) ## Interval at which message
|
|
|
|
## queue is pruned, in ms.
|
2019-12-19 21:48:51 +00:00
|
|
|
topicInterestMax = 1000
|
2019-11-15 07:31:06 +00:00
|
|
|
|
|
|
|
type
|
2019-12-12 22:23:26 +00:00
|
|
|
WakuMode* = enum
|
|
|
|
# TODO: is there a reason to allow such "none" mode? This was originally
|
|
|
|
# put here when it was still supposed to be compatible with Whisper.
|
|
|
|
None, # No Waku mode
|
|
|
|
WakuChan, # Waku client
|
|
|
|
WakuSan # Waku node
|
|
|
|
# TODO: Light mode could also become part of this enum
|
|
|
|
# TODO: With discv5, this could be capabilities also announced at level of
|
|
|
|
# discovery.
|
|
|
|
|
2019-11-15 07:32:46 +00:00
|
|
|
WakuConfig* = object
|
2019-11-15 07:31:06 +00:00
|
|
|
powRequirement*: float64
|
|
|
|
bloom*: Bloom
|
|
|
|
isLightNode*: bool
|
|
|
|
maxMsgSize*: uint32
|
2019-12-19 21:48:51 +00:00
|
|
|
confirmationsEnabled*: bool
|
|
|
|
rateLimits*: RateLimits
|
2019-12-12 22:23:26 +00:00
|
|
|
wakuMode*: WakuMode
|
|
|
|
topics*: seq[Topic]
|
2019-11-15 07:31:06 +00:00
|
|
|
|
2019-11-19 12:53:38 +00:00
|
|
|
WakuPeer = ref object
|
|
|
|
initialized: bool # when successfully completed the handshake
|
|
|
|
powRequirement*: float64
|
|
|
|
bloom*: Bloom
|
|
|
|
isLightNode*: bool
|
|
|
|
trusted*: bool
|
2019-12-12 22:23:26 +00:00
|
|
|
wakuMode*: WakuMode
|
|
|
|
topics*: seq[Topic]
|
2019-11-19 12:53:38 +00:00
|
|
|
received: HashSet[Message]
|
2019-11-15 07:31:06 +00:00
|
|
|
|
2019-12-10 20:49:54 +00:00
|
|
|
P2PRequestHandler* = proc(peer: Peer, envelope: Envelope) {.gcsafe.}
|
|
|
|
|
2019-11-19 12:53:38 +00:00
|
|
|
WakuNetwork = ref object
|
2019-11-19 16:22:35 +00:00
|
|
|
queue*: ref Queue
|
2019-11-19 12:53:38 +00:00
|
|
|
filters*: Filters
|
|
|
|
config*: WakuConfig
|
2019-12-10 20:49:54 +00:00
|
|
|
p2pRequestHandler*: P2PRequestHandler
|
2019-11-15 07:31:06 +00:00
|
|
|
|
2019-12-19 21:48:51 +00:00
|
|
|
RateLimits* = object
|
|
|
|
# TODO: uint or specifically uint32?
|
|
|
|
limitIp*: uint
|
|
|
|
limitPeerId*: uint
|
|
|
|
limitTopic*: uint
|
|
|
|
|
|
|
|
|
2019-11-15 07:32:46 +00:00
|
|
|
proc allowed*(msg: Message, config: WakuConfig): bool =
|
2019-11-19 03:14:43 +00:00
|
|
|
# Check max msg size, already happens in RLPx but there is a specific waku
|
2019-11-15 07:31:06 +00:00
|
|
|
# max msg size which should always be < RLPx max msg size
|
|
|
|
if msg.size > config.maxMsgSize:
|
2019-12-17 22:57:34 +00:00
|
|
|
dropped_too_large_envelopes.inc()
|
2019-11-15 07:31:06 +00:00
|
|
|
warn "Message size too large", size = msg.size
|
|
|
|
return false
|
|
|
|
|
|
|
|
if msg.pow < config.powRequirement:
|
2019-12-17 22:57:34 +00:00
|
|
|
dropped_low_pow_envelopes.inc()
|
2019-11-15 07:31:06 +00:00
|
|
|
warn "Message PoW too low", pow = msg.pow, minPow = config.powRequirement
|
|
|
|
return false
|
|
|
|
|
|
|
|
if not bloomFilterMatch(config.bloom, msg.bloom):
|
2019-12-17 22:57:34 +00:00
|
|
|
dropped_bloom_filter_mismatch_envelopes.inc()
|
2019-11-15 07:31:06 +00:00
|
|
|
warn "Message does not match node bloom filter"
|
|
|
|
return false
|
|
|
|
|
2019-12-12 22:23:26 +00:00
|
|
|
if config.wakuMode == WakuChan:
|
|
|
|
if msg.env.topic notin config.topics:
|
2019-12-17 22:57:34 +00:00
|
|
|
dropped_topic_mismatch_envelopes.inc()
|
2019-12-12 22:23:26 +00:00
|
|
|
warn "Message topic does not match Waku topic list"
|
|
|
|
return false
|
|
|
|
|
2019-11-15 07:31:06 +00:00
|
|
|
return true
|
|
|
|
|
|
|
|
proc run(peer: Peer) {.gcsafe, async.}
|
2019-11-15 07:32:46 +00:00
|
|
|
proc run(node: EthereumNode, network: WakuNetwork) {.gcsafe, async.}
|
2019-11-15 07:31:06 +00:00
|
|
|
|
2019-11-15 07:32:46 +00:00
|
|
|
proc initProtocolState*(network: WakuNetwork, node: EthereumNode) {.gcsafe.} =
|
2019-11-21 10:03:43 +00:00
|
|
|
new(network.queue)
|
|
|
|
network.queue[] = initQueue(defaultQueueCapacity)
|
2019-11-15 07:31:06 +00:00
|
|
|
network.filters = initTable[string, Filter]()
|
|
|
|
network.config.bloom = fullBloom()
|
|
|
|
network.config.powRequirement = defaultMinPow
|
|
|
|
network.config.isLightNode = false
|
2019-12-19 21:48:51 +00:00
|
|
|
# confirmationsEnabled and rateLimits is not yet used but we add it here and
|
|
|
|
# in the status message to be compatible with the Status go implementation.
|
|
|
|
network.config.confirmationsEnabled = false
|
|
|
|
# TODO: Limits of 0 are ignored I hope, this is not clearly written in spec.
|
|
|
|
network.config.rateLimits =
|
|
|
|
RateLimits(limitIp: 0, limitPeerId: 0, limitTopic:0)
|
2019-11-15 07:31:06 +00:00
|
|
|
network.config.maxMsgSize = defaultMaxMsgSize
|
2019-12-12 22:23:26 +00:00
|
|
|
network.config.wakuMode = None # default no waku mode
|
|
|
|
network.config.topics = @[]
|
2019-11-15 07:31:06 +00:00
|
|
|
asyncCheck node.run(network)
|
|
|
|
|
2019-11-15 07:32:46 +00:00
|
|
|
p2pProtocol Waku(version = wakuVersion,
|
2019-11-19 03:14:43 +00:00
|
|
|
rlpxName = "waku",
|
2019-11-15 08:56:56 +00:00
|
|
|
peerState = WakuPeer,
|
|
|
|
networkState = WakuNetwork):
|
2019-11-15 07:31:06 +00:00
|
|
|
|
|
|
|
onPeerConnected do (peer: Peer):
|
2019-11-15 07:32:46 +00:00
|
|
|
trace "onPeerConnected Waku"
|
2019-11-15 07:31:06 +00:00
|
|
|
let
|
2019-11-15 07:32:46 +00:00
|
|
|
wakuNet = peer.networkState
|
|
|
|
wakuPeer = peer.state
|
2019-11-15 07:31:06 +00:00
|
|
|
|
2019-11-15 07:32:46 +00:00
|
|
|
let m = await peer.status(wakuVersion,
|
2019-12-17 15:53:39 +00:00
|
|
|
cast[uint64](wakuNet.config.powRequirement),
|
2019-11-15 07:32:46 +00:00
|
|
|
@(wakuNet.config.bloom),
|
|
|
|
wakuNet.config.isLightNode,
|
2019-12-19 21:48:51 +00:00
|
|
|
wakuNet.config.confirmationsEnabled,
|
|
|
|
wakuNet.config.rateLimits,
|
2019-12-12 22:23:26 +00:00
|
|
|
wakuNet.config.wakuMode,
|
|
|
|
wakuNet.config.topics,
|
2019-11-15 07:31:06 +00:00
|
|
|
timeout = chronos.milliseconds(500))
|
|
|
|
|
2019-11-15 07:32:46 +00:00
|
|
|
if m.protocolVersion == wakuVersion:
|
|
|
|
debug "Waku peer", peer, wakuVersion
|
2019-11-15 07:31:06 +00:00
|
|
|
else:
|
2019-11-15 07:32:46 +00:00
|
|
|
raise newException(UselessPeerError, "Incompatible Waku version")
|
2019-11-15 07:31:06 +00:00
|
|
|
|
2019-11-15 07:32:46 +00:00
|
|
|
wakuPeer.powRequirement = cast[float64](m.powConverted)
|
2019-11-15 07:31:06 +00:00
|
|
|
|
|
|
|
if m.bloom.len > 0:
|
|
|
|
if m.bloom.len != bloomSize:
|
|
|
|
raise newException(UselessPeerError, "Bloomfilter size mismatch")
|
|
|
|
else:
|
2019-11-15 07:32:46 +00:00
|
|
|
wakuPeer.bloom.bytesCopy(m.bloom)
|
2019-11-15 07:31:06 +00:00
|
|
|
else:
|
|
|
|
# If no bloom filter is send we allow all
|
2019-11-15 07:32:46 +00:00
|
|
|
wakuPeer.bloom = fullBloom()
|
2019-11-15 07:31:06 +00:00
|
|
|
|
2019-11-15 07:32:46 +00:00
|
|
|
wakuPeer.isLightNode = m.isLightNode
|
|
|
|
if wakuPeer.isLightNode and wakuNet.config.isLightNode:
|
2019-11-15 07:31:06 +00:00
|
|
|
# No sense in connecting two light nodes so we disconnect
|
|
|
|
raise newException(UselessPeerError, "Two light nodes connected")
|
|
|
|
|
2019-12-12 22:23:26 +00:00
|
|
|
# When Waku-san connect to all. When None, connect to all, Waku-chan has
|
|
|
|
# to decide to disconnect. When Waku-chan, connect only to Waku-san.
|
|
|
|
wakuPeer.wakuMode = m.wakuMode
|
|
|
|
if wakuNet.config.wakuMode == WakuChan:
|
|
|
|
if wakuPeer.wakuMode == WakuChan:
|
|
|
|
raise newException(UselessPeerError, "Two Waku-chan connected")
|
|
|
|
elif wakuPeer.wakuMode == None:
|
|
|
|
raise newException(UselessPeerError, "Not in Waku mode")
|
|
|
|
if wakuNet.config.wakuMode == WakuSan and
|
|
|
|
wakuPeer.wakuMode == WakuChan:
|
|
|
|
# TODO: need some maximum check on amount of topics
|
|
|
|
wakuPeer.topics = m.topics
|
|
|
|
|
2019-11-15 07:32:46 +00:00
|
|
|
wakuPeer.received.init()
|
|
|
|
wakuPeer.trusted = false
|
|
|
|
wakuPeer.initialized = true
|
2019-11-15 07:31:06 +00:00
|
|
|
|
2019-11-15 07:32:46 +00:00
|
|
|
if not wakuNet.config.isLightNode:
|
2019-11-15 07:31:06 +00:00
|
|
|
traceAsyncErrors peer.run()
|
|
|
|
|
2019-11-15 07:32:46 +00:00
|
|
|
debug "Waku peer initialized", peer
|
2019-11-15 07:31:06 +00:00
|
|
|
|
|
|
|
handshake:
|
|
|
|
proc status(peer: Peer,
|
|
|
|
protocolVersion: uint,
|
2019-12-17 15:53:39 +00:00
|
|
|
powConverted: uint64,
|
2019-11-15 07:31:06 +00:00
|
|
|
bloom: Bytes,
|
2019-12-12 22:23:26 +00:00
|
|
|
isLightNode: bool,
|
2019-12-19 21:48:51 +00:00
|
|
|
confirmationsEnabled: bool,
|
|
|
|
rateLimits: RateLimits,
|
2019-12-12 22:23:26 +00:00
|
|
|
wakuMode: WakuMode,
|
|
|
|
topics: seq[Topic])
|
2019-11-15 07:31:06 +00:00
|
|
|
|
|
|
|
proc messages(peer: Peer, envelopes: openarray[Envelope]) =
|
|
|
|
if not peer.state.initialized:
|
|
|
|
warn "Handshake not completed yet, discarding messages"
|
|
|
|
return
|
|
|
|
|
|
|
|
for envelope in envelopes:
|
|
|
|
# check if expired or in future, or ttl not 0
|
|
|
|
if not envelope.valid():
|
|
|
|
warn "Expired or future timed envelope", peer
|
|
|
|
# disconnect from peers sending bad envelopes
|
|
|
|
# await peer.disconnect(SubprotocolReason)
|
|
|
|
continue
|
|
|
|
|
|
|
|
let msg = initMessage(envelope)
|
|
|
|
if not msg.allowed(peer.networkState.config):
|
|
|
|
# disconnect from peers sending bad envelopes
|
|
|
|
# await peer.disconnect(SubprotocolReason)
|
|
|
|
continue
|
|
|
|
|
|
|
|
# This peer send this message thus should not receive it again.
|
|
|
|
# If this peer has the message in the `received` set already, this means
|
|
|
|
# it was either already received here from this peer or send to this peer.
|
|
|
|
# Either way it will be in our queue already (and the peer should know
|
|
|
|
# this) and this peer is sending duplicates.
|
|
|
|
# Note: geth does not check if a peer has send a message to them before
|
|
|
|
# broadcasting this message. This too is seen here as a duplicate message
|
|
|
|
# (see above comment). If we want to seperate these cases (e.g. when peer
|
|
|
|
# rating), then we have to add a "peer.state.send" HashSet.
|
|
|
|
if peer.state.received.containsOrIncl(msg):
|
2019-12-17 22:57:34 +00:00
|
|
|
dropped_malicious_duplicate_envelopes.inc()
|
2019-11-15 07:31:06 +00:00
|
|
|
debug "Peer sending duplicate messages", peer, hash = msg.hash
|
|
|
|
# await peer.disconnect(SubprotocolReason)
|
|
|
|
continue
|
|
|
|
|
|
|
|
# This can still be a duplicate message, but from another peer than
|
|
|
|
# the peer who send the message.
|
2019-11-19 16:22:35 +00:00
|
|
|
if peer.networkState.queue[].add(msg):
|
2019-12-17 22:57:34 +00:00
|
|
|
valid_envelopes.inc()
|
2019-11-15 07:31:06 +00:00
|
|
|
# notify filters of this message
|
|
|
|
peer.networkState.filters.notify(msg)
|
2019-12-17 22:57:34 +00:00
|
|
|
else:
|
|
|
|
dropped_benign_duplicate_envelopes.inc()
|
2019-11-15 07:31:06 +00:00
|
|
|
|
2019-12-17 15:53:39 +00:00
|
|
|
proc powRequirement(peer: Peer, value: uint64) =
|
2019-11-15 07:31:06 +00:00
|
|
|
if not peer.state.initialized:
|
|
|
|
warn "Handshake not completed yet, discarding powRequirement"
|
|
|
|
return
|
|
|
|
|
|
|
|
peer.state.powRequirement = cast[float64](value)
|
|
|
|
|
|
|
|
proc bloomFilterExchange(peer: Peer, bloom: Bytes) =
|
|
|
|
if not peer.state.initialized:
|
|
|
|
warn "Handshake not completed yet, discarding bloomFilterExchange"
|
|
|
|
return
|
|
|
|
|
|
|
|
if bloom.len == bloomSize:
|
|
|
|
peer.state.bloom.bytesCopy(bloom)
|
|
|
|
|
2019-12-19 21:48:51 +00:00
|
|
|
nextID 20
|
|
|
|
|
|
|
|
proc rateLimits(peer: Peer, rateLimits: RateLimits) = discard
|
|
|
|
|
|
|
|
proc topicInterest(peer: Peer, topics: openarray[Topic]) =
|
2019-12-12 22:23:26 +00:00
|
|
|
if not peer.state.initialized:
|
2019-12-19 21:48:51 +00:00
|
|
|
warn "Handshake not completed yet, discarding topicInterest"
|
|
|
|
return
|
|
|
|
|
|
|
|
if topics.len > topicInterestMax:
|
|
|
|
error "Too many topics in the topic-interest list"
|
2019-12-12 22:23:26 +00:00
|
|
|
return
|
|
|
|
|
|
|
|
if peer.state.wakuMode == WakuChan:
|
|
|
|
peer.state.topics = topics
|
|
|
|
|
2019-11-15 07:31:06 +00:00
|
|
|
nextID 126
|
|
|
|
|
|
|
|
proc p2pRequest(peer: Peer, envelope: Envelope) =
|
2019-12-10 20:49:54 +00:00
|
|
|
if not peer.networkState.p2pRequestHandler.isNil():
|
|
|
|
peer.networkState.p2pRequestHandler(peer, envelope)
|
2019-11-15 07:31:06 +00:00
|
|
|
|
2019-12-05 23:45:14 +00:00
|
|
|
proc p2pMessage(peer: Peer, envelopes: openarray[Envelope]) =
|
2019-11-15 07:31:06 +00:00
|
|
|
if peer.state.trusted:
|
|
|
|
# when trusted we can bypass any checks on envelope
|
2019-12-05 23:45:14 +00:00
|
|
|
for envelope in envelopes:
|
|
|
|
let msg = Message(env: envelope, isP2P: true)
|
|
|
|
peer.networkState.filters.notify(msg)
|
2019-11-15 07:31:06 +00:00
|
|
|
|
|
|
|
# Following message IDs are not part of EIP-627, but are added and used by
|
|
|
|
# the Status application, we ignore them for now.
|
|
|
|
nextID 11
|
|
|
|
proc batchAcknowledged(peer: Peer) = discard
|
|
|
|
proc messageResponse(peer: Peer) = discard
|
|
|
|
|
|
|
|
nextID 123
|
|
|
|
requestResponse:
|
|
|
|
proc p2pSyncRequest(peer: Peer) = discard
|
|
|
|
proc p2pSyncResponse(peer: Peer) = discard
|
|
|
|
|
2019-12-19 22:23:06 +00:00
|
|
|
|
|
|
|
proc p2pRequestComplete(peer: Peer, requestId: Hash, lastEnvelopeHash: Hash,
|
|
|
|
cursor: Bytes) = discard
|
|
|
|
# TODO:
|
|
|
|
# In the current specification the parameters are not wrapped in a regular
|
|
|
|
# envelope as is done for the P2P Request packet. If we could alter this in
|
|
|
|
# the spec it would be a cleaner separation between Waku and Mail server /
|
|
|
|
# client.
|
|
|
|
# Also, if a requestResponse block is used, a reqestId will automatically
|
|
|
|
# be added by the protocol DSL.
|
|
|
|
# However the requestResponse block in combination with p2pRequest cannot be
|
|
|
|
# used due to the unfortunate fact that the packet IDs are not consecutive,
|
|
|
|
# and nextID is not recognized in between these. The nextID behaviour could
|
|
|
|
# be fixed, however it would be cleaner if the specification could be
|
|
|
|
# changed to have these IDs to be consecutive.
|
2019-11-15 07:31:06 +00:00
|
|
|
|
|
|
|
# 'Runner' calls ---------------------------------------------------------------
|
|
|
|
|
|
|
|
proc processQueue(peer: Peer) =
|
|
|
|
# Send to peer all valid and previously not send envelopes in the queue.
|
|
|
|
var
|
|
|
|
envelopes: seq[Envelope] = @[]
|
2019-11-15 07:32:46 +00:00
|
|
|
wakuPeer = peer.state(Waku)
|
|
|
|
wakuNet = peer.networkState(Waku)
|
2019-11-15 07:31:06 +00:00
|
|
|
|
2019-11-15 07:32:46 +00:00
|
|
|
for message in wakuNet.queue.items:
|
|
|
|
if wakuPeer.received.contains(message):
|
2019-11-15 07:31:06 +00:00
|
|
|
# debug "message was already send to peer"
|
|
|
|
continue
|
|
|
|
|
2019-11-15 07:32:46 +00:00
|
|
|
if message.pow < wakuPeer.powRequirement:
|
2019-11-15 07:31:06 +00:00
|
|
|
debug "Message PoW too low for peer", pow = message.pow,
|
2019-11-15 07:32:46 +00:00
|
|
|
powReq = wakuPeer.powRequirement
|
2019-11-15 07:31:06 +00:00
|
|
|
continue
|
|
|
|
|
2019-11-15 07:32:46 +00:00
|
|
|
if not bloomFilterMatch(wakuPeer.bloom, message.bloom):
|
2019-11-15 07:31:06 +00:00
|
|
|
debug "Message does not match peer bloom filter"
|
|
|
|
continue
|
|
|
|
|
2019-12-12 22:23:26 +00:00
|
|
|
if wakuNet.config.wakuMode == WakuSan and
|
|
|
|
wakuPeer.wakuMode == WakuChan:
|
|
|
|
if message.env.topic notin wakuPeer.topics:
|
|
|
|
continue
|
|
|
|
|
2019-11-15 07:31:06 +00:00
|
|
|
trace "Adding envelope"
|
|
|
|
envelopes.add(message.env)
|
2019-11-15 07:32:46 +00:00
|
|
|
wakuPeer.received.incl(message)
|
2019-11-15 07:31:06 +00:00
|
|
|
|
2019-12-03 15:49:20 +00:00
|
|
|
if envelopes.len() > 0:
|
|
|
|
trace "Sending envelopes", amount=envelopes.len
|
|
|
|
# Ignore failure of sending messages, this could occur when the connection
|
|
|
|
# gets dropped
|
|
|
|
traceAsyncErrors peer.messages(envelopes)
|
2019-11-15 07:31:06 +00:00
|
|
|
|
|
|
|
proc run(peer: Peer) {.async.} =
|
|
|
|
while peer.connectionState notin {Disconnecting, Disconnected}:
|
|
|
|
peer.processQueue()
|
|
|
|
await sleepAsync(messageInterval)
|
|
|
|
|
|
|
|
proc pruneReceived(node: EthereumNode) {.raises: [].} =
|
|
|
|
if node.peerPool != nil: # XXX: a bit dirty to need to check for this here ...
|
2019-11-15 07:32:46 +00:00
|
|
|
var wakuNet = node.protocolState(Waku)
|
2019-11-15 07:31:06 +00:00
|
|
|
|
2019-11-15 07:32:46 +00:00
|
|
|
for peer in node.protocolPeers(Waku):
|
2019-11-15 07:31:06 +00:00
|
|
|
if not peer.initialized:
|
|
|
|
continue
|
|
|
|
|
|
|
|
# NOTE: Perhaps alter the queue prune call to keep track of a HashSet
|
|
|
|
# of pruned messages (as these should be smaller), and diff this with
|
|
|
|
# the received sets.
|
2019-11-15 07:32:46 +00:00
|
|
|
peer.received = intersection(peer.received, wakuNet.queue.itemHashes)
|
2019-11-15 07:31:06 +00:00
|
|
|
|
2019-11-15 07:32:46 +00:00
|
|
|
proc run(node: EthereumNode, network: WakuNetwork) {.async.} =
|
2019-11-15 07:31:06 +00:00
|
|
|
while true:
|
|
|
|
# prune message queue every second
|
|
|
|
# TTL unit is in seconds, so this should be sufficient?
|
2019-11-19 16:22:35 +00:00
|
|
|
network.queue[].prune()
|
2019-11-15 07:31:06 +00:00
|
|
|
# pruning the received sets is not necessary for correct workings
|
|
|
|
# but simply from keeping the sets growing indefinitely
|
|
|
|
node.pruneReceived()
|
|
|
|
await sleepAsync(pruneInterval)
|
|
|
|
|
|
|
|
# Private EthereumNode calls ---------------------------------------------------
|
|
|
|
|
2019-12-05 23:45:14 +00:00
|
|
|
proc sendP2PMessage(node: EthereumNode, peerId: NodeId,
|
|
|
|
envelopes: openarray[Envelope]): bool =
|
|
|
|
for peer in node.peers(Waku):
|
|
|
|
if peer.remote.id == peerId:
|
|
|
|
asyncCheck peer.p2pMessage(envelopes)
|
|
|
|
return true
|
|
|
|
|
2019-11-15 07:31:06 +00:00
|
|
|
proc queueMessage(node: EthereumNode, msg: Message): bool =
|
|
|
|
|
2019-11-15 07:32:46 +00:00
|
|
|
var wakuNet = node.protocolState(Waku)
|
2019-11-15 07:31:06 +00:00
|
|
|
# We have to do the same checks here as in the messages proc not to leak
|
|
|
|
# any information that the message originates from this node.
|
2019-11-15 07:32:46 +00:00
|
|
|
if not msg.allowed(wakuNet.config):
|
2019-11-15 07:31:06 +00:00
|
|
|
return false
|
|
|
|
|
|
|
|
trace "Adding message to queue"
|
2019-11-19 16:22:35 +00:00
|
|
|
if wakuNet.queue[].add(msg):
|
2019-12-17 22:57:34 +00:00
|
|
|
valid_envelopes.inc()
|
2019-11-15 07:31:06 +00:00
|
|
|
# Also notify our own filters of the message we are sending,
|
|
|
|
# e.g. msg from local Dapp to Dapp
|
2019-11-15 07:32:46 +00:00
|
|
|
wakuNet.filters.notify(msg)
|
2019-11-15 07:31:06 +00:00
|
|
|
|
|
|
|
return true
|
|
|
|
|
|
|
|
# Public EthereumNode calls ----------------------------------------------------
|
|
|
|
|
|
|
|
proc postMessage*(node: EthereumNode, pubKey = none[PublicKey](),
|
|
|
|
symKey = none[SymKey](), src = none[PrivateKey](),
|
|
|
|
ttl: uint32, topic: Topic, payload: Bytes,
|
|
|
|
padding = none[Bytes](), powTime = 1'f,
|
|
|
|
powTarget = defaultMinPow,
|
|
|
|
targetPeer = none[NodeId]()): bool =
|
|
|
|
## Post a message on the message queue which will be processed at the
|
|
|
|
## next `messageInterval`.
|
|
|
|
##
|
|
|
|
## NOTE: This call allows a post without encryption. If encryption is
|
|
|
|
## mandatory it should be enforced a layer up
|
|
|
|
let payload = encode(Payload(payload: payload, src: src, dst: pubKey,
|
|
|
|
symKey: symKey, padding: padding))
|
|
|
|
if payload.isSome():
|
|
|
|
var env = Envelope(expiry:epochTime().uint32 + ttl,
|
|
|
|
ttl: ttl, topic: topic, data: payload.get(), nonce: 0)
|
|
|
|
|
|
|
|
# Allow lightnode to post only direct p2p messages
|
|
|
|
if targetPeer.isSome():
|
2019-12-12 13:43:05 +00:00
|
|
|
return node.sendP2PMessage(targetPeer.get(), [env])
|
2019-11-15 07:32:46 +00:00
|
|
|
elif not node.protocolState(Waku).config.isLightNode:
|
2019-11-15 07:31:06 +00:00
|
|
|
# non direct p2p message can not have ttl of 0
|
|
|
|
if env.ttl == 0:
|
|
|
|
return false
|
|
|
|
var msg = initMessage(env, powCalc = false)
|
|
|
|
# XXX: make this non blocking or not?
|
|
|
|
# In its current blocking state, it could be noticed by a peer that no
|
|
|
|
# messages are send for a while, and thus that mining PoW is done, and
|
|
|
|
# that next messages contains a message originated from this peer
|
|
|
|
# zah: It would be hard to execute this in a background thread at the
|
|
|
|
# moment. We'll need a way to send custom "tasks" to the async message
|
|
|
|
# loop (e.g. AD2 support for AsyncChannels).
|
|
|
|
if not msg.sealEnvelope(powTime, powTarget):
|
|
|
|
return false
|
|
|
|
|
|
|
|
# need to check expiry after mining PoW
|
|
|
|
if not msg.env.valid():
|
|
|
|
return false
|
|
|
|
|
|
|
|
return node.queueMessage(msg)
|
|
|
|
else:
|
|
|
|
warn "Light node not allowed to post messages"
|
|
|
|
return false
|
|
|
|
else:
|
|
|
|
error "Encoding of payload failed"
|
|
|
|
return false
|
|
|
|
|
|
|
|
proc subscribeFilter*(node: EthereumNode, filter: Filter,
|
|
|
|
handler:FilterMsgHandler = nil): string =
|
|
|
|
## Initiate a filter for incoming/outgoing messages. Messages can be
|
|
|
|
## retrieved with the `getFilterMessages` call or with a provided
|
|
|
|
## `FilterMsgHandler`.
|
|
|
|
##
|
|
|
|
## NOTE: This call allows for a filter without decryption. If encryption is
|
|
|
|
## mandatory it should be enforced a layer up.
|
2019-11-15 07:32:46 +00:00
|
|
|
return node.protocolState(Waku).filters.subscribeFilter(filter, handler)
|
2019-11-15 07:31:06 +00:00
|
|
|
|
|
|
|
proc unsubscribeFilter*(node: EthereumNode, filterId: string): bool =
|
|
|
|
## Remove a previously subscribed filter.
|
|
|
|
var filter: Filter
|
2019-11-15 07:32:46 +00:00
|
|
|
return node.protocolState(Waku).filters.take(filterId, filter)
|
2019-11-15 07:31:06 +00:00
|
|
|
|
|
|
|
proc getFilterMessages*(node: EthereumNode, filterId: string): seq[ReceivedMessage] =
|
|
|
|
## Get all the messages currently in the filter queue. This will reset the
|
|
|
|
## filter message queue.
|
2019-11-15 07:32:46 +00:00
|
|
|
return node.protocolState(Waku).filters.getFilterMessages(filterId)
|
2019-11-15 07:31:06 +00:00
|
|
|
|
|
|
|
proc filtersToBloom*(node: EthereumNode): Bloom =
|
|
|
|
## Returns the bloom filter of all topics of all subscribed filters.
|
2019-11-15 07:32:46 +00:00
|
|
|
return node.protocolState(Waku).filters.toBloom()
|
2019-11-15 07:31:06 +00:00
|
|
|
|
|
|
|
proc setPowRequirement*(node: EthereumNode, powReq: float64) {.async.} =
|
|
|
|
## Sets the PoW requirement for this node, will also send
|
|
|
|
## this new PoW requirement to all connected peers.
|
|
|
|
##
|
|
|
|
## Failures when sending messages to peers will not be reported.
|
|
|
|
# NOTE: do we need a tolerance of old PoW for some time?
|
2019-11-15 07:32:46 +00:00
|
|
|
node.protocolState(Waku).config.powRequirement = powReq
|
2019-11-15 07:31:06 +00:00
|
|
|
var futures: seq[Future[void]] = @[]
|
2019-11-15 07:32:46 +00:00
|
|
|
for peer in node.peers(Waku):
|
2019-12-17 15:53:39 +00:00
|
|
|
futures.add(peer.powRequirement(cast[uint64](powReq)))
|
2019-11-15 07:31:06 +00:00
|
|
|
|
|
|
|
# Exceptions from sendMsg will not be raised
|
|
|
|
await allFutures(futures)
|
|
|
|
|
|
|
|
proc setBloomFilter*(node: EthereumNode, bloom: Bloom) {.async.} =
|
|
|
|
## Sets the bloom filter for this node, will also send
|
|
|
|
## this new bloom filter to all connected peers.
|
|
|
|
##
|
|
|
|
## Failures when sending messages to peers will not be reported.
|
|
|
|
# NOTE: do we need a tolerance of old bloom filter for some time?
|
2019-11-15 07:32:46 +00:00
|
|
|
node.protocolState(Waku).config.bloom = bloom
|
2019-11-15 07:31:06 +00:00
|
|
|
var futures: seq[Future[void]] = @[]
|
2019-11-15 07:32:46 +00:00
|
|
|
for peer in node.peers(Waku):
|
2019-11-15 07:31:06 +00:00
|
|
|
futures.add(peer.bloomFilterExchange(@bloom))
|
|
|
|
|
|
|
|
# Exceptions from sendMsg will not be raised
|
|
|
|
await allFutures(futures)
|
|
|
|
|
2019-12-19 21:48:51 +00:00
|
|
|
proc setTopics*(node: EthereumNode, topics: seq[Topic]):
|
|
|
|
Future[bool] {.async.} =
|
|
|
|
if topics.len > topicInterestMax:
|
|
|
|
return false
|
|
|
|
|
2019-12-12 22:23:26 +00:00
|
|
|
node.protocolState(Waku).config.topics = topics
|
|
|
|
var futures: seq[Future[void]] = @[]
|
|
|
|
for peer in node.peers(Waku):
|
2019-12-19 21:48:51 +00:00
|
|
|
futures.add(peer.topicInterest(topics))
|
2019-12-12 22:23:26 +00:00
|
|
|
|
|
|
|
# Exceptions from sendMsg will not be raised
|
|
|
|
await allFutures(futures)
|
|
|
|
|
2019-12-19 21:48:51 +00:00
|
|
|
return true
|
|
|
|
|
2019-11-15 07:31:06 +00:00
|
|
|
proc setMaxMessageSize*(node: EthereumNode, size: uint32): bool =
|
|
|
|
## Set the maximum allowed message size.
|
|
|
|
## Can not be set higher than ``defaultMaxMsgSize``.
|
|
|
|
if size > defaultMaxMsgSize:
|
|
|
|
warn "size > defaultMaxMsgSize"
|
|
|
|
return false
|
2019-11-15 07:32:46 +00:00
|
|
|
node.protocolState(Waku).config.maxMsgSize = size
|
2019-11-15 07:31:06 +00:00
|
|
|
return true
|
|
|
|
|
|
|
|
proc setPeerTrusted*(node: EthereumNode, peerId: NodeId): bool =
|
|
|
|
## Set a connected peer as trusted.
|
2019-11-15 07:32:46 +00:00
|
|
|
for peer in node.peers(Waku):
|
2019-11-15 07:31:06 +00:00
|
|
|
if peer.remote.id == peerId:
|
2019-11-15 07:32:46 +00:00
|
|
|
peer.state(Waku).trusted = true
|
2019-11-15 07:31:06 +00:00
|
|
|
return true
|
|
|
|
|
|
|
|
proc setLightNode*(node: EthereumNode, isLightNode: bool) =
|
2019-11-15 07:32:46 +00:00
|
|
|
## Set this node as a Waku light node.
|
2019-11-15 07:31:06 +00:00
|
|
|
##
|
|
|
|
## NOTE: Should be run before connection is made with peers as this
|
|
|
|
## setting is only communicated at peer handshake.
|
2019-11-15 07:32:46 +00:00
|
|
|
node.protocolState(Waku).config.isLightNode = isLightNode
|
2019-11-15 07:31:06 +00:00
|
|
|
|
2019-11-15 07:32:46 +00:00
|
|
|
proc configureWaku*(node: EthereumNode, config: WakuConfig) =
|
|
|
|
## Apply a Waku configuration.
|
2019-11-15 07:31:06 +00:00
|
|
|
##
|
|
|
|
## NOTE: Should be run before connection is made with peers as some
|
|
|
|
## of the settings are only communicated at peer handshake.
|
2019-11-15 07:32:46 +00:00
|
|
|
node.protocolState(Waku).config = config
|
2019-11-15 07:31:06 +00:00
|
|
|
|
|
|
|
proc resetMessageQueue*(node: EthereumNode) =
|
|
|
|
## Full reset of the message queue.
|
|
|
|
##
|
|
|
|
## NOTE: Not something that should be run in normal circumstances.
|
2019-11-19 16:22:35 +00:00
|
|
|
node.protocolState(Waku).queue[] = initQueue(defaultQueueCapacity)
|