waku: whisper-> waku; version 0

This commit is contained in:
Oskar Thoren 2019-11-15 15:32:46 +08:00 committed by zah
parent 2c49d4adb8
commit 243d2d8b27
1 changed files with 82 additions and 80 deletions

View File

@ -1,5 +1,5 @@
# #
# Whisper # Waku
# (c) Copyright 2018-2019 # (c) Copyright 2018-2019
# Status Research & Development GmbH # Status Research & Development GmbH
# #
@ -8,10 +8,12 @@
# MIT license (LICENSE-MIT) # MIT license (LICENSE-MIT)
# #
## Whisper ## Waku
## ******* ## *******
## ##
## Whisper is a gossip protocol that synchronizes a set of messages across nodes ## Waku is a fork of Whisper.
##
## Waku is a gossip protocol that synchronizes a set of messages across nodes
## with attention given to sender and recipient anonymitiy. Messages are ## with attention given to sender and recipient anonymitiy. Messages are
## categorized by a topic and stay alive in the network based on a time-to-live ## categorized by a topic and stay alive in the network based on a time-to-live
## measured in seconds. Spam prevention is based on proof-of-work, where large ## measured in seconds. Spam prevention is based on proof-of-work, where large
@ -20,13 +22,13 @@
## Example usage ## Example usage
## ---------- ## ----------
## First an `EthereumNode` needs to be created, either with all capabilities set ## First an `EthereumNode` needs to be created, either with all capabilities set
## or with specifically the Whisper capability set. ## or with specifically the Waku capability set.
## The latter can be done like this: ## The latter can be done like this:
## ##
## .. code-block::nim ## .. code-block::nim
## var node = newEthereumNode(keypair, address, netId, nil, ## var node = newEthereumNode(keypair, address, netId, nil,
## addAllCapabilities = false) ## addAllCapabilities = false)
## node.addCapability Whisper ## node.addCapability Waku
## ##
## Now calls such as ``postMessage`` and ``subscribeFilter`` can be done. ## Now calls such as ``postMessage`` and ``subscribeFilter`` can be done.
## However, they only make real sense after ``connectToNetwork`` was started. As ## However, they only make real sense after ``connectToNetwork`` was started. As
@ -39,7 +41,7 @@ import
eth/common/eth_types, eth/[keys, rlp, async_utils, p2p], eth/p2p/ecies eth/common/eth_types, eth/[keys, rlp, async_utils, p2p], eth/p2p/ecies
logScope: logScope:
topics = "whisper" topics = "waku"
const const
flagsLen = 1 ## payload flags field length, bytes flagsLen = 1 ## payload flags field length, bytes
@ -51,8 +53,8 @@ const
bloomSize = 512 div 8 bloomSize = 512 div 8
defaultQueueCapacity = 256 defaultQueueCapacity = 256
defaultFilterQueueCapacity = 64 defaultFilterQueueCapacity = 64
whisperVersion* = 6 ## Whisper version. wakuVersion* = 0 ## Waku version.
whisperVersionStr* = $whisperVersion ## Whisper version. wakuVersionStr* = $wakuVersion ## Waku version.
defaultMinPow* = 0.2'f64 ## The default minimum PoW requirement for this node. defaultMinPow* = 0.2'f64 ## The default minimum PoW requirement for this node.
defaultMaxMsgSize* = 1024'u32 * 1024'u32 ## The current default and max defaultMaxMsgSize* = 1024'u32 * 1024'u32 ## The current default and max
## message size. This can never be larger than the maximum RLPx message size. ## message size. This can never be larger than the maximum RLPx message size.
@ -90,7 +92,7 @@ type
padding*: Option[Bytes] ## Message padding padding*: Option[Bytes] ## Message padding
Envelope* = object Envelope* = object
## What goes on the wire in the whisper protocol - a payload and some ## What goes on the wire in the waku protocol - a payload and some
## book-keeping ## book-keeping
# Don't touch field order, there's lots of macro magic that depends on it # Don't touch field order, there's lots of macro magic that depends on it
expiry*: uint32 ## Unix timestamp when message expires expiry*: uint32 ## Unix timestamp when message expires
@ -123,7 +125,7 @@ type
Queue* = object Queue* = object
## Bounded message repository ## Bounded message repository
## ##
## Whisper uses proof-of-work to judge the usefulness of a message staying ## Waku uses proof-of-work to judge the usefulness of a message staying
## in the "cloud" - messages with low proof-of-work will be removed to make ## in the "cloud" - messages with low proof-of-work will be removed to make
## room for those with higher pow, even if they haven't expired yet. ## room for those with higher pow, even if they haven't expired yet.
## Larger messages and those with high time-to-live will require more pow. ## Larger messages and those with high time-to-live will require more pow.
@ -156,7 +158,7 @@ type
Filters* = Table[string, Filter] Filters* = Table[string, Filter]
WhisperConfig* = object WakuConfig* = object
powRequirement*: float64 powRequirement*: float64
bloom*: Bloom bloom*: Bloom
isLightNode*: bool isLightNode*: bool
@ -175,7 +177,7 @@ proc leadingZeroBits(hash: MDigest): int =
break break
proc calcPow*(size, ttl: uint64, hash: Hash): float64 = proc calcPow*(size, ttl: uint64, hash: Hash): float64 =
## Whisper proof-of-work is defined as the best bit of a hash divided by ## Waku proof-of-work is defined as the best bit of a hash divided by
## encoded size and time-to-live, such that large and long-lived messages get ## encoded size and time-to-live, such that large and long-lived messages get
## penalized ## penalized
@ -183,7 +185,7 @@ proc calcPow*(size, ttl: uint64, hash: Hash): float64 =
return pow(2.0, bits.float64) / (size.float64 * ttl.float64) return pow(2.0, bits.float64) / (size.float64 * ttl.float64)
proc topicBloom*(topic: Topic): Bloom = proc topicBloom*(topic: Topic): Bloom =
## Whisper uses 512-bit bloom filters meaning 9 bits of indexing - 3 9-bit ## Waku uses 512-bit bloom filters meaning 9 bits of indexing - 3 9-bit
## indexes into the bloom are created using the first 3 bytes of the topic and ## indexes into the bloom are created using the first 3 bytes of the topic and
## complementing each byte with an extra bit from the last topic byte ## complementing each byte with an extra bit from the last topic byte
for i in 0..<3: for i in 0..<3:
@ -518,8 +520,8 @@ proc initMessage*(env: Envelope, powCalc = true): Message =
proc hash*(msg: Message): hashes.Hash = hash(msg.hash.data) proc hash*(msg: Message): hashes.Hash = hash(msg.hash.data)
proc allowed*(msg: Message, config: WhisperConfig): bool = proc allowed*(msg: Message, config: WakuConfig): bool =
# Check max msg size, already happens in RLPx but there is a specific shh # Check max msg size, already happens in RLPx but there is a specific waku
# max msg size which should always be < RLPx max msg size # max msg size which should always be < RLPx max msg size
if msg.size > config.maxMsgSize: if msg.size > config.maxMsgSize:
warn "Message size too large", size = msg.size warn "Message size too large", size = msg.size
@ -719,7 +721,7 @@ proc toBloom*(filters: Filters): Bloom =
result = result or filter.bloom result = result or filter.bloom
type type
WhisperPeer = ref object WakuPeer = ref object
initialized: bool # when successfully completed the handshake initialized: bool # when successfully completed the handshake
powRequirement*: float64 powRequirement*: float64
bloom*: Bloom bloom*: Bloom
@ -727,15 +729,15 @@ type
trusted*: bool trusted*: bool
received: HashSet[Message] received: HashSet[Message]
WhisperNetwork = ref object WakuNetwork = ref object
queue*: Queue queue*: Queue
filters*: Filters filters*: Filters
config*: WhisperConfig config*: WakuConfig
proc run(peer: Peer) {.gcsafe, async.} proc run(peer: Peer) {.gcsafe, async.}
proc run(node: EthereumNode, network: WhisperNetwork) {.gcsafe, async.} proc run(node: EthereumNode, network: WakuNetwork) {.gcsafe, async.}
proc initProtocolState*(network: WhisperNetwork, node: EthereumNode) {.gcsafe.} = proc initProtocolState*(network: WakuNetwork, node: EthereumNode) {.gcsafe.} =
network.queue = initQueue(defaultQueueCapacity) network.queue = initQueue(defaultQueueCapacity)
network.filters = initTable[string, Filter]() network.filters = initTable[string, Filter]()
network.config.bloom = fullBloom() network.config.bloom = fullBloom()
@ -744,52 +746,52 @@ proc initProtocolState*(network: WhisperNetwork, node: EthereumNode) {.gcsafe.}
network.config.maxMsgSize = defaultMaxMsgSize network.config.maxMsgSize = defaultMaxMsgSize
asyncCheck node.run(network) asyncCheck node.run(network)
p2pProtocol Whisper(version = whisperVersion, p2pProtocol Waku(version = wakuVersion,
rlpxName = "shh", rlpxName = "waku",
peerState = WhisperPeer, peerState = WakuPeer,
networkState = WhisperNetwork): networkState = WakuNetwork):
onPeerConnected do (peer: Peer): onPeerConnected do (peer: Peer):
trace "onPeerConnected Whisper" trace "onPeerConnected Waku"
let let
whisperNet = peer.networkState wakuNet = peer.networkState
whisperPeer = peer.state wakuPeer = peer.state
let m = await peer.status(whisperVersion, let m = await peer.status(wakuVersion,
cast[uint](whisperNet.config.powRequirement), cast[uint](wakuNet.config.powRequirement),
@(whisperNet.config.bloom), @(wakuNet.config.bloom),
whisperNet.config.isLightNode, wakuNet.config.isLightNode,
timeout = chronos.milliseconds(500)) timeout = chronos.milliseconds(500))
if m.protocolVersion == whisperVersion: if m.protocolVersion == wakuVersion:
debug "Whisper peer", peer, whisperVersion debug "Waku peer", peer, wakuVersion
else: else:
raise newException(UselessPeerError, "Incompatible Whisper version") raise newException(UselessPeerError, "Incompatible Waku version")
whisperPeer.powRequirement = cast[float64](m.powConverted) wakuPeer.powRequirement = cast[float64](m.powConverted)
if m.bloom.len > 0: if m.bloom.len > 0:
if m.bloom.len != bloomSize: if m.bloom.len != bloomSize:
raise newException(UselessPeerError, "Bloomfilter size mismatch") raise newException(UselessPeerError, "Bloomfilter size mismatch")
else: else:
whisperPeer.bloom.bytesCopy(m.bloom) wakuPeer.bloom.bytesCopy(m.bloom)
else: else:
# If no bloom filter is send we allow all # If no bloom filter is send we allow all
whisperPeer.bloom = fullBloom() wakuPeer.bloom = fullBloom()
whisperPeer.isLightNode = m.isLightNode wakuPeer.isLightNode = m.isLightNode
if whisperPeer.isLightNode and whisperNet.config.isLightNode: if wakuPeer.isLightNode and wakuNet.config.isLightNode:
# No sense in connecting two light nodes so we disconnect # No sense in connecting two light nodes so we disconnect
raise newException(UselessPeerError, "Two light nodes connected") raise newException(UselessPeerError, "Two light nodes connected")
whisperPeer.received.init() wakuPeer.received.init()
whisperPeer.trusted = false wakuPeer.trusted = false
whisperPeer.initialized = true wakuPeer.initialized = true
if not whisperNet.config.isLightNode: if not wakuNet.config.isLightNode:
traceAsyncErrors peer.run() traceAsyncErrors peer.run()
debug "Whisper peer initialized", peer debug "Waku peer initialized", peer
handshake: handshake:
proc status(peer: Peer, proc status(peer: Peer,
@ -856,7 +858,7 @@ p2pProtocol Whisper(version = whisperVersion,
proc p2pRequest(peer: Peer, envelope: Envelope) = proc p2pRequest(peer: Peer, envelope: Envelope) =
# TODO: here we would have to allow to insert some specific implementation # TODO: here we would have to allow to insert some specific implementation
# such as e.g. Whisper Mail Server # such as e.g. Waku Mail Server
discard discard
proc p2pMessage(peer: Peer, envelope: Envelope) = proc p2pMessage(peer: Peer, envelope: Envelope) =
@ -884,26 +886,26 @@ proc processQueue(peer: Peer) =
# Send to peer all valid and previously not send envelopes in the queue. # Send to peer all valid and previously not send envelopes in the queue.
var var
envelopes: seq[Envelope] = @[] envelopes: seq[Envelope] = @[]
whisperPeer = peer.state(Whisper) wakuPeer = peer.state(Waku)
whisperNet = peer.networkState(Whisper) wakuNet = peer.networkState(Waku)
for message in whisperNet.queue.items: for message in wakuNet.queue.items:
if whisperPeer.received.contains(message): if wakuPeer.received.contains(message):
# debug "message was already send to peer" # debug "message was already send to peer"
continue continue
if message.pow < whisperPeer.powRequirement: if message.pow < wakuPeer.powRequirement:
debug "Message PoW too low for peer", pow = message.pow, debug "Message PoW too low for peer", pow = message.pow,
powReq = whisperPeer.powRequirement powReq = wakuPeer.powRequirement
continue continue
if not bloomFilterMatch(whisperPeer.bloom, message.bloom): if not bloomFilterMatch(wakuPeer.bloom, message.bloom):
debug "Message does not match peer bloom filter" debug "Message does not match peer bloom filter"
continue continue
trace "Adding envelope" trace "Adding envelope"
envelopes.add(message.env) envelopes.add(message.env)
whisperPeer.received.incl(message) wakuPeer.received.incl(message)
trace "Sending envelopes", amount=envelopes.len trace "Sending envelopes", amount=envelopes.len
# Ignore failure of sending messages, this could occur when the connection # Ignore failure of sending messages, this could occur when the connection
@ -917,18 +919,18 @@ proc run(peer: Peer) {.async.} =
proc pruneReceived(node: EthereumNode) {.raises: [].} = proc pruneReceived(node: EthereumNode) {.raises: [].} =
if node.peerPool != nil: # XXX: a bit dirty to need to check for this here ... if node.peerPool != nil: # XXX: a bit dirty to need to check for this here ...
var whisperNet = node.protocolState(Whisper) var wakuNet = node.protocolState(Waku)
for peer in node.protocolPeers(Whisper): for peer in node.protocolPeers(Waku):
if not peer.initialized: if not peer.initialized:
continue continue
# NOTE: Perhaps alter the queue prune call to keep track of a HashSet # NOTE: Perhaps alter the queue prune call to keep track of a HashSet
# of pruned messages (as these should be smaller), and diff this with # of pruned messages (as these should be smaller), and diff this with
# the received sets. # the received sets.
peer.received = intersection(peer.received, whisperNet.queue.itemHashes) peer.received = intersection(peer.received, wakuNet.queue.itemHashes)
proc run(node: EthereumNode, network: WhisperNetwork) {.async.} = proc run(node: EthereumNode, network: WakuNetwork) {.async.} =
while true: while true:
# prune message queue every second # prune message queue every second
# TTL unit is in seconds, so this should be sufficient? # TTL unit is in seconds, so this should be sufficient?
@ -941,24 +943,24 @@ proc run(node: EthereumNode, network: WhisperNetwork) {.async.} =
# Private EthereumNode calls --------------------------------------------------- # Private EthereumNode calls ---------------------------------------------------
proc sendP2PMessage(node: EthereumNode, peerId: NodeId, env: Envelope): bool = proc sendP2PMessage(node: EthereumNode, peerId: NodeId, env: Envelope): bool =
for peer in node.peers(Whisper): for peer in node.peers(Waku):
if peer.remote.id == peerId: if peer.remote.id == peerId:
asyncCheck peer.p2pMessage(env) asyncCheck peer.p2pMessage(env)
return true return true
proc queueMessage(node: EthereumNode, msg: Message): bool = proc queueMessage(node: EthereumNode, msg: Message): bool =
var whisperNet = node.protocolState(Whisper) var wakuNet = node.protocolState(Waku)
# We have to do the same checks here as in the messages proc not to leak # We have to do the same checks here as in the messages proc not to leak
# any information that the message originates from this node. # any information that the message originates from this node.
if not msg.allowed(whisperNet.config): if not msg.allowed(wakuNet.config):
return false return false
trace "Adding message to queue" trace "Adding message to queue"
if whisperNet.queue.add(msg): if wakuNet.queue.add(msg):
# Also notify our own filters of the message we are sending, # Also notify our own filters of the message we are sending,
# e.g. msg from local Dapp to Dapp # e.g. msg from local Dapp to Dapp
whisperNet.filters.notify(msg) wakuNet.filters.notify(msg)
return true return true
@ -984,7 +986,7 @@ proc postMessage*(node: EthereumNode, pubKey = none[PublicKey](),
# Allow lightnode to post only direct p2p messages # Allow lightnode to post only direct p2p messages
if targetPeer.isSome(): if targetPeer.isSome():
return node.sendP2PMessage(targetPeer.get(), env) return node.sendP2PMessage(targetPeer.get(), env)
elif not node.protocolState(Whisper).config.isLightNode: elif not node.protocolState(Waku).config.isLightNode:
# non direct p2p message can not have ttl of 0 # non direct p2p message can not have ttl of 0
if env.ttl == 0: if env.ttl == 0:
return false return false
@ -1019,21 +1021,21 @@ proc subscribeFilter*(node: EthereumNode, filter: Filter,
## ##
## NOTE: This call allows for a filter without decryption. If encryption is ## NOTE: This call allows for a filter without decryption. If encryption is
## mandatory it should be enforced a layer up. ## mandatory it should be enforced a layer up.
return node.protocolState(Whisper).filters.subscribeFilter(filter, handler) return node.protocolState(Waku).filters.subscribeFilter(filter, handler)
proc unsubscribeFilter*(node: EthereumNode, filterId: string): bool = proc unsubscribeFilter*(node: EthereumNode, filterId: string): bool =
## Remove a previously subscribed filter. ## Remove a previously subscribed filter.
var filter: Filter var filter: Filter
return node.protocolState(Whisper).filters.take(filterId, filter) return node.protocolState(Waku).filters.take(filterId, filter)
proc getFilterMessages*(node: EthereumNode, filterId: string): seq[ReceivedMessage] = proc getFilterMessages*(node: EthereumNode, filterId: string): seq[ReceivedMessage] =
## Get all the messages currently in the filter queue. This will reset the ## Get all the messages currently in the filter queue. This will reset the
## filter message queue. ## filter message queue.
return node.protocolState(Whisper).filters.getFilterMessages(filterId) return node.protocolState(Waku).filters.getFilterMessages(filterId)
proc filtersToBloom*(node: EthereumNode): Bloom = proc filtersToBloom*(node: EthereumNode): Bloom =
## Returns the bloom filter of all topics of all subscribed filters. ## Returns the bloom filter of all topics of all subscribed filters.
return node.protocolState(Whisper).filters.toBloom() return node.protocolState(Waku).filters.toBloom()
proc setPowRequirement*(node: EthereumNode, powReq: float64) {.async.} = proc setPowRequirement*(node: EthereumNode, powReq: float64) {.async.} =
## Sets the PoW requirement for this node, will also send ## Sets the PoW requirement for this node, will also send
@ -1041,9 +1043,9 @@ proc setPowRequirement*(node: EthereumNode, powReq: float64) {.async.} =
## ##
## Failures when sending messages to peers will not be reported. ## Failures when sending messages to peers will not be reported.
# NOTE: do we need a tolerance of old PoW for some time? # NOTE: do we need a tolerance of old PoW for some time?
node.protocolState(Whisper).config.powRequirement = powReq node.protocolState(Waku).config.powRequirement = powReq
var futures: seq[Future[void]] = @[] var futures: seq[Future[void]] = @[]
for peer in node.peers(Whisper): for peer in node.peers(Waku):
futures.add(peer.powRequirement(cast[uint](powReq))) futures.add(peer.powRequirement(cast[uint](powReq)))
# Exceptions from sendMsg will not be raised # Exceptions from sendMsg will not be raised
@ -1055,9 +1057,9 @@ proc setBloomFilter*(node: EthereumNode, bloom: Bloom) {.async.} =
## ##
## Failures when sending messages to peers will not be reported. ## Failures when sending messages to peers will not be reported.
# NOTE: do we need a tolerance of old bloom filter for some time? # NOTE: do we need a tolerance of old bloom filter for some time?
node.protocolState(Whisper).config.bloom = bloom node.protocolState(Waku).config.bloom = bloom
var futures: seq[Future[void]] = @[] var futures: seq[Future[void]] = @[]
for peer in node.peers(Whisper): for peer in node.peers(Waku):
futures.add(peer.bloomFilterExchange(@bloom)) futures.add(peer.bloomFilterExchange(@bloom))
# Exceptions from sendMsg will not be raised # Exceptions from sendMsg will not be raised
@ -1069,32 +1071,32 @@ proc setMaxMessageSize*(node: EthereumNode, size: uint32): bool =
if size > defaultMaxMsgSize: if size > defaultMaxMsgSize:
warn "size > defaultMaxMsgSize" warn "size > defaultMaxMsgSize"
return false return false
node.protocolState(Whisper).config.maxMsgSize = size node.protocolState(Waku).config.maxMsgSize = size
return true return true
proc setPeerTrusted*(node: EthereumNode, peerId: NodeId): bool = proc setPeerTrusted*(node: EthereumNode, peerId: NodeId): bool =
## Set a connected peer as trusted. ## Set a connected peer as trusted.
for peer in node.peers(Whisper): for peer in node.peers(Waku):
if peer.remote.id == peerId: if peer.remote.id == peerId:
peer.state(Whisper).trusted = true peer.state(Waku).trusted = true
return true return true
proc setLightNode*(node: EthereumNode, isLightNode: bool) = proc setLightNode*(node: EthereumNode, isLightNode: bool) =
## Set this node as a Whisper light node. ## Set this node as a Waku light node.
## ##
## NOTE: Should be run before connection is made with peers as this ## NOTE: Should be run before connection is made with peers as this
## setting is only communicated at peer handshake. ## setting is only communicated at peer handshake.
node.protocolState(Whisper).config.isLightNode = isLightNode node.protocolState(Waku).config.isLightNode = isLightNode
proc configureWhisper*(node: EthereumNode, config: WhisperConfig) = proc configureWaku*(node: EthereumNode, config: WakuConfig) =
## Apply a Whisper configuration. ## Apply a Waku configuration.
## ##
## NOTE: Should be run before connection is made with peers as some ## NOTE: Should be run before connection is made with peers as some
## of the settings are only communicated at peer handshake. ## of the settings are only communicated at peer handshake.
node.protocolState(Whisper).config = config node.protocolState(Waku).config = config
proc resetMessageQueue*(node: EthereumNode) = proc resetMessageQueue*(node: EthereumNode) =
## Full reset of the message queue. ## Full reset of the message queue.
## ##
## NOTE: Not something that should be run in normal circumstances. ## NOTE: Not something that should be run in normal circumstances.
node.protocolState(Whisper).queue = initQueue(defaultQueueCapacity) node.protocolState(Waku).queue = initQueue(defaultQueueCapacity)