More whisper changes (#43)

* Make messages with invalid ID a catchable error as we should not disconnect on this

* Add unimplemented message IDs used by Status

* Make whisper log less verbose + add comment on duplicate messages

* Cleanup + add documentation
This commit is contained in:
kdeme 2019-04-26 15:36:54 +02:00 committed by Jacek Sieka
parent ecc1a995aa
commit 442c3d9f7b
3 changed files with 99 additions and 53 deletions

View File

@ -70,6 +70,7 @@ type
# protocol to a peer that doesn't support the protocol.
MalformedMessageError* = object of CatchableError
UnsupportedMessageError* = object of CatchableError
PeerDisconnected* = object of CatchableError
reason*: DisconnectionReason

View File

@ -255,7 +255,7 @@ proc writeMsgId(p: ProtocolInfo, msgId: int, peer: Peer,
proc invokeThunk*(peer: Peer, msgId: int, msgData: var Rlp): Future[void] =
template invalidIdError: untyped =
raise newException(ValueError,
raise newException(UnsupportedMessageError,
"RLPx message with an invalid id " & $msgId &
" on a connection supporting " & peer.dispatcher.describeProtocols)

View File

@ -12,6 +12,9 @@ import
hashes, byteutils, nimcrypto/[bcmode, hash, keccak, rijndael, sysrand],
eth/p2p, ../ecies
logScope:
topics = "whisper"
const
flagsLen = 1 ## payload flags field length, bytes
gcmIVLen = 12 ## Length of IV (seed) used for AES
@ -23,7 +26,7 @@ const
defaultQueueCapacity = 256
defaultFilterQueueCapacity = 64
whisperVersion* = 6
whisperVersionStr* = "6.0"
whisperVersionStr* = $whisperVersion
defaultMinPow* = 0.2'f64
defaultMaxMsgSize* = 1024'u32 * 1024'u32 # * 10 # should be no higher than max RLPx size
messageInterval* = 300 ## Interval at which messages are send to peers, in ms
@ -512,6 +515,38 @@ proc allowed*(msg: Message, config: WhisperConfig): bool =
return true
# NOTE: PoW calculations are different from go-ethereum implementation,
# which is not conform EIP-627.
# See here: https://github.com/ethereum/go-ethereum/issues/18070
# However, this implementation is also not conform EIP-627 as we do not use the
# size of the RLP-encoded envelope, but the size of the envelope object itself.
# This is done to be able to correctly calculate the bestBitTarget.
# Other options would be:
# - work directly with powTarget in minePow, but this requires recalculation of
# rlp size + calcPow
# - Use worst case size of envelope nonce
proc sealEnvelope(msg: var Message, powTime: float, powTarget: float): bool =
let size = msg.env.len
if powTarget > 0:
let x = powTarget * size.float * msg.env.ttl.float
var bestBitTarget: int
if x <= 1: # log() would return negative numbers or 0
bestBitTarget = 1
else:
bestBitTarget = ceil(log(x, 2)).int
(msg.env.nonce, msg.hash) = msg.env.minePow(powTime, bestBitTarget)
else:
# If no target is set, we are certain of executed powTime
msg.env.expiry += powTime.uint32
(msg.env.nonce, msg.hash) = msg.env.minePow(powTime)
msg.pow = calcPow(size.uint32, msg.env.ttl, msg.hash)
trace "Message PoW", pow = msg.pow
if msg.pow < powTarget:
return false
return true
# Queues -----------------------------------------------------------------------
proc initQueue*(capacity: int): Queue =
@ -616,7 +651,7 @@ proc notify*(filters: var Filters, msg: Message) {.gcsafe.} =
elif filter.symKey.isSome():
keyHash = keccak256.digest(filter.symKey.get())
# else:
# NOTE: should we error on messages without encryption?
# NOTE: In this case the message was not encrypted
else:
if filter.privateKey.isSome():
if keyHash != keccak256.digest(filter.privateKey.get().data):
@ -625,7 +660,7 @@ proc notify*(filters: var Filters, msg: Message) {.gcsafe.} =
if keyHash != keccak256.digest(filter.symKey.get()):
continue
# else:
# NOTE: should we error on messages without encryption?
# NOTE: In this case the message was not encrypted
# When decoding is done we can check the src (signature)
if filter.src.isSome():
@ -663,13 +698,13 @@ proc toBloom*(filters: Filters): Bloom =
type
WhisperPeer = ref object
initialized*: bool # when successfully completed the handshake
initialized: bool # when successfully completed the handshake
powRequirement*: float64
bloom*: Bloom
isLightNode*: bool
trusted*: bool
received: HashSet[Message]
running*: bool
running: bool
WhisperNetwork = ref object
queue*: Queue
@ -694,7 +729,7 @@ p2pProtocol Whisper(version = whisperVersion,
networkState = WhisperNetwork):
onPeerConnected do (peer: Peer):
debug "onPeerConnected Whisper"
trace "onPeerConnected Whisper"
let
whisperNet = peer.networkState
whisperPeer = peer.state
@ -753,7 +788,7 @@ p2pProtocol Whisper(version = whisperVersion,
for envelope in envelopes:
# check if expired or in future, or ttl not 0
if not envelope.valid():
warn "Expired or future timed envelope"
warn "Expired or future timed envelope", peer
# disconnect from peers sending bad envelopes
# await peer.disconnect(SubprotocolReason)
continue
@ -769,8 +804,12 @@ p2pProtocol Whisper(version = whisperVersion,
# it was either already received here from this peer or send to this peer.
# Either way it will be in our queue already (and the peer should know
# this) and this peer is sending duplicates.
# Note: geth does not check if a peer has send a message to them before
# broadcasting this message. This too is seen here as a duplicate message
# (see above comment). If we want to seperate these cases (e.g. when peer
# rating), then we have to add a "peer.state.send" HashSet.
if peer.state.received.containsOrIncl(msg):
warn "Peer sending duplicate messages"
debug "Peer sending duplicate messages", peer, hash = msg.hash
# await peer.disconnect(SubprotocolReason)
continue
@ -807,6 +846,17 @@ p2pProtocol Whisper(version = whisperVersion,
let msg = Message(env: envelope, isP2P: true)
peer.networkState.filters.notify(msg)
# Following message IDs are not part of EIP-627, but are added and used by
# the Status application, we ignore them for now.
nextID 11
proc batchAcknowledged(peer: Peer) = discard
proc messageResponse(peer: Peer) = discard
nextID 123
requestResponse:
proc p2pSyncRequest(peer: Peer) = discard
proc p2pSyncResponse(peer: Peer) = discard
proc p2pRequestComplete(peer: Peer) = discard
# 'Runner' calls ---------------------------------------------------------------
proc processQueue(peer: Peer) =
@ -821,18 +871,19 @@ proc processQueue(peer: Peer) =
continue
if message.pow < whisperPeer.powRequirement:
debug "Message PoW too low for peer"
debug "Message PoW too low for peer", pow = message.pow,
powReq = whisperPeer.powRequirement
continue
if not bloomFilterMatch(whisperPeer.bloom, message.bloom):
debug "Message does not match peer bloom filter"
continue
debug "Adding envelope"
trace "Adding envelope"
envelopes.add(message.env)
whisperPeer.received.incl(message)
debug "Sending envelopes", amount=envelopes.len
trace "Sending envelopes", amount=envelopes.len
# await peer.messages(envelopes)
asyncCheck peer.messages(envelopes)
@ -869,46 +920,14 @@ proc run(node: EthereumNode, network: WhisperNetwork) {.async.} =
node.pruneReceived()
await sleepAsync(pruneInterval)
# Public EthereumNode calls ----------------------------------------------------
# Private EthereumNode calls ---------------------------------------------------
proc sendP2PMessage*(node: EthereumNode, peerId: NodeId, env: Envelope): bool =
proc sendP2PMessage(node: EthereumNode, peerId: NodeId, env: Envelope): bool =
for peer in node.peers(Whisper):
if peer.remote.id == peerId:
asyncCheck peer.p2pMessage(env)
return true
# NOTE: PoW calculations are different from go-ethereum implementation,
# which is not conform EIP-627.
# See here: https://github.com/ethereum/go-ethereum/issues/18070
# However, this implementation is also not conform EIP-627 as we do not use the
# size of the RLP-encoded envelope, but the size of the envelope object itself.
# This is done to be able to correctly calculate the bestBitTarget.
# Other options would be:
# - work directly with powTarget in minePow, but this requires recalculation of
# rlp size + calcPow
# - Use worst case size of envelope nonce
proc sealEnvelope*(msg: var Message, powTime: float, powTarget: float): bool =
let size = msg.env.len
if powTarget > 0:
let x = powTarget * size.float * msg.env.ttl.float
var bestBitTarget: int
if x <= 1: # log() would return negative numbers or 0
bestBitTarget = 1
else:
bestBitTarget = ceil(log(x, 2)).int
(msg.env.nonce, msg.hash) = msg.env.minePow(powTime, bestBitTarget)
else:
# If no target is set, we are certain of executed powTime
msg.env.expiry += powTime.uint32
(msg.env.nonce, msg.hash) = msg.env.minePow(powTime)
msg.pow = calcPow(size.uint32, msg.env.ttl, msg.hash)
trace "Message PoW", pow = msg.pow
if msg.pow < powTarget:
return false
return true
proc queueMessage(node: EthereumNode, msg: Message): bool =
var whisperNet = node.protocolState(Whisper)
@ -917,7 +936,7 @@ proc queueMessage(node: EthereumNode, msg: Message): bool =
if not msg.allowed(whisperNet.config):
return false
debug "Adding message to queue"
trace "Adding message to queue"
if whisperNet.queue.add(msg):
# Also notify our own filters of the message we are sending,
# e.g. msg from local Dapp to Dapp
@ -925,13 +944,18 @@ proc queueMessage(node: EthereumNode, msg: Message): bool =
return true
# Public EthereumNode calls ----------------------------------------------------
proc postMessage*(node: EthereumNode, pubKey = none[PublicKey](),
symKey = none[SymKey](), src = none[PrivateKey](),
ttl: uint32, topic: Topic, payload: Bytes,
padding = none[Bytes](), powTime = 1'f,
powTarget = defaultMinPow,
targetPeer = none[NodeId]()): bool =
# NOTE: Allow a post without a key? Encryption is mandatory in v6?
## Post a message on the message queue which will be processed at the
## next `messageInterval`.
## NOTE: This call allows a post without encryption. If encryption is
## mandatory it should be enforced a layer up
let payload = encode(Payload(payload: payload, src: src, dst: pubKey,
symKey: symKey, padding: padding))
if payload.isSome():
@ -962,7 +986,7 @@ proc postMessage*(node: EthereumNode, pubKey = none[PublicKey](),
return node.queueMessage(msg)
else:
error "Light node not allowed to post messages"
warn "Light node not allowed to post messages"
return false
else:
error "Encoding of payload failed"
@ -970,19 +994,30 @@ proc postMessage*(node: EthereumNode, pubKey = none[PublicKey](),
proc subscribeFilter*(node: EthereumNode, filter: Filter,
handler:FilterMsgHandler = nil): string =
## Initiate a filter for incoming/outgoing messages. Messages can be
## retrieved with the `getFilterMessages` call or with a provided
## `FilterMsgHandler`.
## NOTE: This call allows for a filter without decryption. If encryption is
## mandatory it should be enforced a layer up
return node.protocolState(Whisper).filters.subscribeFilter(filter, handler)
proc unsubscribeFilter*(node: EthereumNode, filterId: string): bool =
## Remove a previously subscribed filter.
var filter: Filter
return node.protocolState(Whisper).filters.take(filterId, filter)
proc getFilterMessages*(node: EthereumNode, filterId: string): seq[ReceivedMessage] =
## Get all the messages currently in the filter queue. This will reset the
## filter message queue
return node.protocolState(Whisper).filters.getFilterMessages(filterId)
proc filtersToBloom*(node: EthereumNode): Bloom =
## returns the bloom filter of all topics of all subscribed filters
return node.protocolState(Whisper).filters.toBloom()
proc setPowRequirement*(node: EthereumNode, powReq: float64) {.async.} =
## Sets the PoW requirement for this node, will also send
## this new PoW requirement to all connected peers
# NOTE: do we need a tolerance of old PoW for some time?
node.protocolState(Whisper).config.powRequirement = powReq
var futures: seq[Future[void]] = @[]
@ -992,6 +1027,8 @@ proc setPowRequirement*(node: EthereumNode, powReq: float64) {.async.} =
await all(futures)
proc setBloomFilter*(node: EthereumNode, bloom: Bloom) {.async.} =
## Sets the bloom filter for this node, will also send
## this new bloom filter to all connected peers
# NOTE: do we need a tolerance of old bloom filter for some time?
node.protocolState(Whisper).config.bloom = bloom
var futures: seq[Future[void]] = @[]
@ -1001,26 +1038,34 @@ proc setBloomFilter*(node: EthereumNode, bloom: Bloom) {.async.} =
await all(futures)
proc setMaxMessageSize*(node: EthereumNode, size: uint32): bool =
## Set the maximum allowed message size
if size > defaultMaxMsgSize:
error "size > maxMsgSize"
warn "size > defaultMaxMsgSize"
return false
node.protocolState(Whisper).config.maxMsgSize = size
return true
proc setPeerTrusted*(node: EthereumNode, peerId: NodeId): bool =
## Set a connected peer as trusted
for peer in node.peers(Whisper):
if peer.remote.id == peerId:
peer.state(Whisper).trusted = true
return true
# NOTE: Should be run before connection is made with peers
proc setLightNode*(node: EthereumNode, isLightNode: bool) =
## Set this node as a Whisper light node
## NOTE: Should be run before connection is made with peers as this
## setting is only communicated at peer handshake
node.protocolState(Whisper).config.isLightNode = isLightNode
# NOTE: Should be run before connection is made with peers
proc configureWhisper*(node: EthereumNode, config: WhisperConfig) =
## Apply a Whisper configuration
## NOTE: Should be run before connection is made with peers as some
## of the settings are only communicated at peer handshake
node.protocolState(Whisper).config = config
# Not something that should be run in normal circumstances
proc resetMessageQueue*(node: EthereumNode) =
## Full reset of the message queue
## NOTE: Not something that should be run in normal circumstances
node.protocolState(Whisper).queue = initQueue(defaultQueueCapacity)