Merge branch 'master' of github.com:status-im/nim-eth

This commit is contained in:
Ștefan Talpalaru 2019-04-29 17:06:09 +02:00
commit 4fbb4603fc
No known key found for this signature in database
GPG Key ID: CBF7934204F1B6F9
3 changed files with 99 additions and 53 deletions

View File

@ -70,6 +70,7 @@ type
# protocol to a peer that doesn't support the protocol. # protocol to a peer that doesn't support the protocol.
MalformedMessageError* = object of CatchableError MalformedMessageError* = object of CatchableError
UnsupportedMessageError* = object of CatchableError
PeerDisconnected* = object of CatchableError PeerDisconnected* = object of CatchableError
reason*: DisconnectionReason reason*: DisconnectionReason

View File

@ -255,7 +255,7 @@ proc writeMsgId(p: ProtocolInfo, msgId: int, peer: Peer,
proc invokeThunk*(peer: Peer, msgId: int, msgData: var Rlp): Future[void] = proc invokeThunk*(peer: Peer, msgId: int, msgData: var Rlp): Future[void] =
template invalidIdError: untyped = template invalidIdError: untyped =
raise newException(ValueError, raise newException(UnsupportedMessageError,
"RLPx message with an invalid id " & $msgId & "RLPx message with an invalid id " & $msgId &
" on a connection supporting " & peer.dispatcher.describeProtocols) " on a connection supporting " & peer.dispatcher.describeProtocols)

View File

@ -12,6 +12,9 @@ import
hashes, byteutils, nimcrypto/[bcmode, hash, keccak, rijndael, sysrand], hashes, byteutils, nimcrypto/[bcmode, hash, keccak, rijndael, sysrand],
eth/p2p, ../ecies eth/p2p, ../ecies
logScope:
topics = "whisper"
const const
flagsLen = 1 ## payload flags field length, bytes flagsLen = 1 ## payload flags field length, bytes
gcmIVLen = 12 ## Length of IV (seed) used for AES gcmIVLen = 12 ## Length of IV (seed) used for AES
@ -23,7 +26,7 @@ const
defaultQueueCapacity = 256 defaultQueueCapacity = 256
defaultFilterQueueCapacity = 64 defaultFilterQueueCapacity = 64
whisperVersion* = 6 whisperVersion* = 6
whisperVersionStr* = "6.0" whisperVersionStr* = $whisperVersion
defaultMinPow* = 0.2'f64 defaultMinPow* = 0.2'f64
defaultMaxMsgSize* = 1024'u32 * 1024'u32 # * 10 # should be no higher than max RLPx size defaultMaxMsgSize* = 1024'u32 * 1024'u32 # * 10 # should be no higher than max RLPx size
messageInterval* = 300 ## Interval at which messages are send to peers, in ms messageInterval* = 300 ## Interval at which messages are send to peers, in ms
@ -512,6 +515,38 @@ proc allowed*(msg: Message, config: WhisperConfig): bool =
return true return true
# NOTE: PoW calculations are different from go-ethereum implementation,
# which is not conform EIP-627.
# See here: https://github.com/ethereum/go-ethereum/issues/18070
# However, this implementation is also not conform EIP-627 as we do not use the
# size of the RLP-encoded envelope, but the size of the envelope object itself.
# This is done to be able to correctly calculate the bestBitTarget.
# Other options would be:
# - work directly with powTarget in minePow, but this requires recalculation of
# rlp size + calcPow
# - Use worst case size of envelope nonce
proc sealEnvelope(msg: var Message, powTime: float, powTarget: float): bool =
let size = msg.env.len
if powTarget > 0:
let x = powTarget * size.float * msg.env.ttl.float
var bestBitTarget: int
if x <= 1: # log() would return negative numbers or 0
bestBitTarget = 1
else:
bestBitTarget = ceil(log(x, 2)).int
(msg.env.nonce, msg.hash) = msg.env.minePow(powTime, bestBitTarget)
else:
# If no target is set, we are certain of executed powTime
msg.env.expiry += powTime.uint32
(msg.env.nonce, msg.hash) = msg.env.minePow(powTime)
msg.pow = calcPow(size.uint32, msg.env.ttl, msg.hash)
trace "Message PoW", pow = msg.pow
if msg.pow < powTarget:
return false
return true
# Queues ----------------------------------------------------------------------- # Queues -----------------------------------------------------------------------
proc initQueue*(capacity: int): Queue = proc initQueue*(capacity: int): Queue =
@ -616,7 +651,7 @@ proc notify*(filters: var Filters, msg: Message) {.gcsafe.} =
elif filter.symKey.isSome(): elif filter.symKey.isSome():
keyHash = keccak256.digest(filter.symKey.get()) keyHash = keccak256.digest(filter.symKey.get())
# else: # else:
# NOTE: should we error on messages without encryption? # NOTE: In this case the message was not encrypted
else: else:
if filter.privateKey.isSome(): if filter.privateKey.isSome():
if keyHash != keccak256.digest(filter.privateKey.get().data): if keyHash != keccak256.digest(filter.privateKey.get().data):
@ -625,7 +660,7 @@ proc notify*(filters: var Filters, msg: Message) {.gcsafe.} =
if keyHash != keccak256.digest(filter.symKey.get()): if keyHash != keccak256.digest(filter.symKey.get()):
continue continue
# else: # else:
# NOTE: should we error on messages without encryption? # NOTE: In this case the message was not encrypted
# When decoding is done we can check the src (signature) # When decoding is done we can check the src (signature)
if filter.src.isSome(): if filter.src.isSome():
@ -663,13 +698,13 @@ proc toBloom*(filters: Filters): Bloom =
type type
WhisperPeer = ref object WhisperPeer = ref object
initialized*: bool # when successfully completed the handshake initialized: bool # when successfully completed the handshake
powRequirement*: float64 powRequirement*: float64
bloom*: Bloom bloom*: Bloom
isLightNode*: bool isLightNode*: bool
trusted*: bool trusted*: bool
received: HashSet[Message] received: HashSet[Message]
running*: bool running: bool
WhisperNetwork = ref object WhisperNetwork = ref object
queue*: Queue queue*: Queue
@ -694,7 +729,7 @@ p2pProtocol Whisper(version = whisperVersion,
networkState = WhisperNetwork): networkState = WhisperNetwork):
onPeerConnected do (peer: Peer): onPeerConnected do (peer: Peer):
debug "onPeerConnected Whisper" trace "onPeerConnected Whisper"
let let
whisperNet = peer.networkState whisperNet = peer.networkState
whisperPeer = peer.state whisperPeer = peer.state
@ -753,7 +788,7 @@ p2pProtocol Whisper(version = whisperVersion,
for envelope in envelopes: for envelope in envelopes:
# check if expired or in future, or ttl not 0 # check if expired or in future, or ttl not 0
if not envelope.valid(): if not envelope.valid():
warn "Expired or future timed envelope" warn "Expired or future timed envelope", peer
# disconnect from peers sending bad envelopes # disconnect from peers sending bad envelopes
# await peer.disconnect(SubprotocolReason) # await peer.disconnect(SubprotocolReason)
continue continue
@ -769,8 +804,12 @@ p2pProtocol Whisper(version = whisperVersion,
# it was either already received here from this peer or send to this peer. # it was either already received here from this peer or send to this peer.
# Either way it will be in our queue already (and the peer should know # Either way it will be in our queue already (and the peer should know
# this) and this peer is sending duplicates. # this) and this peer is sending duplicates.
# Note: geth does not check if a peer has send a message to them before
# broadcasting this message. This too is seen here as a duplicate message
# (see above comment). If we want to seperate these cases (e.g. when peer
# rating), then we have to add a "peer.state.send" HashSet.
if peer.state.received.containsOrIncl(msg): if peer.state.received.containsOrIncl(msg):
warn "Peer sending duplicate messages" debug "Peer sending duplicate messages", peer, hash = msg.hash
# await peer.disconnect(SubprotocolReason) # await peer.disconnect(SubprotocolReason)
continue continue
@ -807,6 +846,17 @@ p2pProtocol Whisper(version = whisperVersion,
let msg = Message(env: envelope, isP2P: true) let msg = Message(env: envelope, isP2P: true)
peer.networkState.filters.notify(msg) peer.networkState.filters.notify(msg)
# Following message IDs are not part of EIP-627, but are added and used by
# the Status application, we ignore them for now.
nextID 11
proc batchAcknowledged(peer: Peer) = discard
proc messageResponse(peer: Peer) = discard
nextID 123
requestResponse:
proc p2pSyncRequest(peer: Peer) = discard
proc p2pSyncResponse(peer: Peer) = discard
proc p2pRequestComplete(peer: Peer) = discard
# 'Runner' calls --------------------------------------------------------------- # 'Runner' calls ---------------------------------------------------------------
proc processQueue(peer: Peer) = proc processQueue(peer: Peer) =
@ -821,18 +871,19 @@ proc processQueue(peer: Peer) =
continue continue
if message.pow < whisperPeer.powRequirement: if message.pow < whisperPeer.powRequirement:
debug "Message PoW too low for peer" debug "Message PoW too low for peer", pow = message.pow,
powReq = whisperPeer.powRequirement
continue continue
if not bloomFilterMatch(whisperPeer.bloom, message.bloom): if not bloomFilterMatch(whisperPeer.bloom, message.bloom):
debug "Message does not match peer bloom filter" debug "Message does not match peer bloom filter"
continue continue
debug "Adding envelope" trace "Adding envelope"
envelopes.add(message.env) envelopes.add(message.env)
whisperPeer.received.incl(message) whisperPeer.received.incl(message)
debug "Sending envelopes", amount=envelopes.len trace "Sending envelopes", amount=envelopes.len
# await peer.messages(envelopes) # await peer.messages(envelopes)
asyncCheck peer.messages(envelopes) asyncCheck peer.messages(envelopes)
@ -869,46 +920,14 @@ proc run(node: EthereumNode, network: WhisperNetwork) {.async.} =
node.pruneReceived() node.pruneReceived()
await sleepAsync(pruneInterval) await sleepAsync(pruneInterval)
# Public EthereumNode calls ---------------------------------------------------- # Private EthereumNode calls ---------------------------------------------------
proc sendP2PMessage*(node: EthereumNode, peerId: NodeId, env: Envelope): bool = proc sendP2PMessage(node: EthereumNode, peerId: NodeId, env: Envelope): bool =
for peer in node.peers(Whisper): for peer in node.peers(Whisper):
if peer.remote.id == peerId: if peer.remote.id == peerId:
asyncCheck peer.p2pMessage(env) asyncCheck peer.p2pMessage(env)
return true return true
# NOTE: PoW calculations are different from go-ethereum implementation,
# which is not conform EIP-627.
# See here: https://github.com/ethereum/go-ethereum/issues/18070
# However, this implementation is also not conform EIP-627 as we do not use the
# size of the RLP-encoded envelope, but the size of the envelope object itself.
# This is done to be able to correctly calculate the bestBitTarget.
# Other options would be:
# - work directly with powTarget in minePow, but this requires recalculation of
# rlp size + calcPow
# - Use worst case size of envelope nonce
proc sealEnvelope*(msg: var Message, powTime: float, powTarget: float): bool =
let size = msg.env.len
if powTarget > 0:
let x = powTarget * size.float * msg.env.ttl.float
var bestBitTarget: int
if x <= 1: # log() would return negative numbers or 0
bestBitTarget = 1
else:
bestBitTarget = ceil(log(x, 2)).int
(msg.env.nonce, msg.hash) = msg.env.minePow(powTime, bestBitTarget)
else:
# If no target is set, we are certain of executed powTime
msg.env.expiry += powTime.uint32
(msg.env.nonce, msg.hash) = msg.env.minePow(powTime)
msg.pow = calcPow(size.uint32, msg.env.ttl, msg.hash)
trace "Message PoW", pow = msg.pow
if msg.pow < powTarget:
return false
return true
proc queueMessage(node: EthereumNode, msg: Message): bool = proc queueMessage(node: EthereumNode, msg: Message): bool =
var whisperNet = node.protocolState(Whisper) var whisperNet = node.protocolState(Whisper)
@ -917,7 +936,7 @@ proc queueMessage(node: EthereumNode, msg: Message): bool =
if not msg.allowed(whisperNet.config): if not msg.allowed(whisperNet.config):
return false return false
debug "Adding message to queue" trace "Adding message to queue"
if whisperNet.queue.add(msg): if whisperNet.queue.add(msg):
# Also notify our own filters of the message we are sending, # Also notify our own filters of the message we are sending,
# e.g. msg from local Dapp to Dapp # e.g. msg from local Dapp to Dapp
@ -925,13 +944,18 @@ proc queueMessage(node: EthereumNode, msg: Message): bool =
return true return true
# Public EthereumNode calls ----------------------------------------------------
proc postMessage*(node: EthereumNode, pubKey = none[PublicKey](), proc postMessage*(node: EthereumNode, pubKey = none[PublicKey](),
symKey = none[SymKey](), src = none[PrivateKey](), symKey = none[SymKey](), src = none[PrivateKey](),
ttl: uint32, topic: Topic, payload: Bytes, ttl: uint32, topic: Topic, payload: Bytes,
padding = none[Bytes](), powTime = 1'f, padding = none[Bytes](), powTime = 1'f,
powTarget = defaultMinPow, powTarget = defaultMinPow,
targetPeer = none[NodeId]()): bool = targetPeer = none[NodeId]()): bool =
# NOTE: Allow a post without a key? Encryption is mandatory in v6? ## Post a message on the message queue which will be processed at the
## next `messageInterval`.
## NOTE: This call allows a post without encryption. If encryption is
## mandatory it should be enforced a layer up
let payload = encode(Payload(payload: payload, src: src, dst: pubKey, let payload = encode(Payload(payload: payload, src: src, dst: pubKey,
symKey: symKey, padding: padding)) symKey: symKey, padding: padding))
if payload.isSome(): if payload.isSome():
@ -962,7 +986,7 @@ proc postMessage*(node: EthereumNode, pubKey = none[PublicKey](),
return node.queueMessage(msg) return node.queueMessage(msg)
else: else:
error "Light node not allowed to post messages" warn "Light node not allowed to post messages"
return false return false
else: else:
error "Encoding of payload failed" error "Encoding of payload failed"
@ -970,19 +994,30 @@ proc postMessage*(node: EthereumNode, pubKey = none[PublicKey](),
proc subscribeFilter*(node: EthereumNode, filter: Filter, proc subscribeFilter*(node: EthereumNode, filter: Filter,
handler:FilterMsgHandler = nil): string = handler:FilterMsgHandler = nil): string =
## Initiate a filter for incoming/outgoing messages. Messages can be
## retrieved with the `getFilterMessages` call or with a provided
## `FilterMsgHandler`.
## NOTE: This call allows for a filter without decryption. If encryption is
## mandatory it should be enforced a layer up
return node.protocolState(Whisper).filters.subscribeFilter(filter, handler) return node.protocolState(Whisper).filters.subscribeFilter(filter, handler)
proc unsubscribeFilter*(node: EthereumNode, filterId: string): bool = proc unsubscribeFilter*(node: EthereumNode, filterId: string): bool =
## Remove a previously subscribed filter.
var filter: Filter var filter: Filter
return node.protocolState(Whisper).filters.take(filterId, filter) return node.protocolState(Whisper).filters.take(filterId, filter)
proc getFilterMessages*(node: EthereumNode, filterId: string): seq[ReceivedMessage] = proc getFilterMessages*(node: EthereumNode, filterId: string): seq[ReceivedMessage] =
## Get all the messages currently in the filter queue. This will reset the
## filter message queue
return node.protocolState(Whisper).filters.getFilterMessages(filterId) return node.protocolState(Whisper).filters.getFilterMessages(filterId)
proc filtersToBloom*(node: EthereumNode): Bloom = proc filtersToBloom*(node: EthereumNode): Bloom =
## returns the bloom filter of all topics of all subscribed filters
return node.protocolState(Whisper).filters.toBloom() return node.protocolState(Whisper).filters.toBloom()
proc setPowRequirement*(node: EthereumNode, powReq: float64) {.async.} = proc setPowRequirement*(node: EthereumNode, powReq: float64) {.async.} =
## Sets the PoW requirement for this node, will also send
## this new PoW requirement to all connected peers
# NOTE: do we need a tolerance of old PoW for some time? # NOTE: do we need a tolerance of old PoW for some time?
node.protocolState(Whisper).config.powRequirement = powReq node.protocolState(Whisper).config.powRequirement = powReq
var futures: seq[Future[void]] = @[] var futures: seq[Future[void]] = @[]
@ -992,6 +1027,8 @@ proc setPowRequirement*(node: EthereumNode, powReq: float64) {.async.} =
await all(futures) await all(futures)
proc setBloomFilter*(node: EthereumNode, bloom: Bloom) {.async.} = proc setBloomFilter*(node: EthereumNode, bloom: Bloom) {.async.} =
## Sets the bloom filter for this node, will also send
## this new bloom filter to all connected peers
# NOTE: do we need a tolerance of old bloom filter for some time? # NOTE: do we need a tolerance of old bloom filter for some time?
node.protocolState(Whisper).config.bloom = bloom node.protocolState(Whisper).config.bloom = bloom
var futures: seq[Future[void]] = @[] var futures: seq[Future[void]] = @[]
@ -1001,26 +1038,34 @@ proc setBloomFilter*(node: EthereumNode, bloom: Bloom) {.async.} =
await all(futures) await all(futures)
proc setMaxMessageSize*(node: EthereumNode, size: uint32): bool = proc setMaxMessageSize*(node: EthereumNode, size: uint32): bool =
## Set the maximum allowed message size
if size > defaultMaxMsgSize: if size > defaultMaxMsgSize:
error "size > maxMsgSize" warn "size > defaultMaxMsgSize"
return false return false
node.protocolState(Whisper).config.maxMsgSize = size node.protocolState(Whisper).config.maxMsgSize = size
return true return true
proc setPeerTrusted*(node: EthereumNode, peerId: NodeId): bool = proc setPeerTrusted*(node: EthereumNode, peerId: NodeId): bool =
## Set a connected peer as trusted
for peer in node.peers(Whisper): for peer in node.peers(Whisper):
if peer.remote.id == peerId: if peer.remote.id == peerId:
peer.state(Whisper).trusted = true peer.state(Whisper).trusted = true
return true return true
# NOTE: Should be run before connection is made with peers
proc setLightNode*(node: EthereumNode, isLightNode: bool) = proc setLightNode*(node: EthereumNode, isLightNode: bool) =
## Set this node as a Whisper light node
## NOTE: Should be run before connection is made with peers as this
## setting is only communicated at peer handshake
node.protocolState(Whisper).config.isLightNode = isLightNode node.protocolState(Whisper).config.isLightNode = isLightNode
# NOTE: Should be run before connection is made with peers
proc configureWhisper*(node: EthereumNode, config: WhisperConfig) = proc configureWhisper*(node: EthereumNode, config: WhisperConfig) =
## Apply a Whisper configuration
## NOTE: Should be run before connection is made with peers as some
## of the settings are only communicated at peer handshake
node.protocolState(Whisper).config = config node.protocolState(Whisper).config = config
# Not something that should be run in normal circumstances
proc resetMessageQueue*(node: EthereumNode) = proc resetMessageQueue*(node: EthereumNode) =
## Full reset of the message queue
## NOTE: Not something that should be run in normal circumstances
node.protocolState(Whisper).queue = initQueue(defaultQueueCapacity) node.protocolState(Whisper).queue = initQueue(defaultQueueCapacity)