diff --git a/eth/p2p/private/p2p_types.nim b/eth/p2p/private/p2p_types.nim index cc06fc2..0fcda1a 100644 --- a/eth/p2p/private/p2p_types.nim +++ b/eth/p2p/private/p2p_types.nim @@ -70,6 +70,7 @@ type # protocol to a peer that doesn't support the protocol. MalformedMessageError* = object of CatchableError + UnsupportedMessageError* = object of CatchableError PeerDisconnected* = object of CatchableError reason*: DisconnectionReason diff --git a/eth/p2p/rlpx.nim b/eth/p2p/rlpx.nim index ec6d1fa..49c2de1 100644 --- a/eth/p2p/rlpx.nim +++ b/eth/p2p/rlpx.nim @@ -255,7 +255,7 @@ proc writeMsgId(p: ProtocolInfo, msgId: int, peer: Peer, proc invokeThunk*(peer: Peer, msgId: int, msgData: var Rlp): Future[void] = template invalidIdError: untyped = - raise newException(ValueError, + raise newException(UnsupportedMessageError, "RLPx message with an invalid id " & $msgId & " on a connection supporting " & peer.dispatcher.describeProtocols) diff --git a/eth/p2p/rlpx_protocols/whisper_protocol.nim b/eth/p2p/rlpx_protocols/whisper_protocol.nim index 339fa0e..7e58124 100644 --- a/eth/p2p/rlpx_protocols/whisper_protocol.nim +++ b/eth/p2p/rlpx_protocols/whisper_protocol.nim @@ -12,6 +12,9 @@ import hashes, byteutils, nimcrypto/[bcmode, hash, keccak, rijndael, sysrand], eth/p2p, ../ecies +logScope: + topics = "whisper" + const flagsLen = 1 ## payload flags field length, bytes gcmIVLen = 12 ## Length of IV (seed) used for AES @@ -23,7 +26,7 @@ const defaultQueueCapacity = 256 defaultFilterQueueCapacity = 64 whisperVersion* = 6 - whisperVersionStr* = "6.0" + whisperVersionStr* = $whisperVersion defaultMinPow* = 0.2'f64 defaultMaxMsgSize* = 1024'u32 * 1024'u32 # * 10 # should be no higher than max RLPx size messageInterval* = 300 ## Interval at which messages are send to peers, in ms @@ -512,6 +515,38 @@ proc allowed*(msg: Message, config: WhisperConfig): bool = return true +# NOTE: PoW calculations are different from go-ethereum implementation, +# which is not conform EIP-627. +# See here: https://github.com/ethereum/go-ethereum/issues/18070 +# However, this implementation is also not conform EIP-627 as we do not use the +# size of the RLP-encoded envelope, but the size of the envelope object itself. +# This is done to be able to correctly calculate the bestBitTarget. +# Other options would be: +# - work directly with powTarget in minePow, but this requires recalculation of +# rlp size + calcPow +# - Use worst case size of envelope nonce +proc sealEnvelope(msg: var Message, powTime: float, powTarget: float): bool = + let size = msg.env.len + if powTarget > 0: + let x = powTarget * size.float * msg.env.ttl.float + var bestBitTarget: int + if x <= 1: # log() would return negative numbers or 0 + bestBitTarget = 1 + else: + bestBitTarget = ceil(log(x, 2)).int + (msg.env.nonce, msg.hash) = msg.env.minePow(powTime, bestBitTarget) + else: + # If no target is set, we are certain of executed powTime + msg.env.expiry += powTime.uint32 + (msg.env.nonce, msg.hash) = msg.env.minePow(powTime) + + msg.pow = calcPow(size.uint32, msg.env.ttl, msg.hash) + trace "Message PoW", pow = msg.pow + if msg.pow < powTarget: + return false + + return true + # Queues ----------------------------------------------------------------------- proc initQueue*(capacity: int): Queue = @@ -616,7 +651,7 @@ proc notify*(filters: var Filters, msg: Message) {.gcsafe.} = elif filter.symKey.isSome(): keyHash = keccak256.digest(filter.symKey.get()) # else: - # NOTE: should we error on messages without encryption? + # NOTE: In this case the message was not encrypted else: if filter.privateKey.isSome(): if keyHash != keccak256.digest(filter.privateKey.get().data): @@ -625,7 +660,7 @@ proc notify*(filters: var Filters, msg: Message) {.gcsafe.} = if keyHash != keccak256.digest(filter.symKey.get()): continue # else: - # NOTE: should we error on messages without encryption? + # NOTE: In this case the message was not encrypted # When decoding is done we can check the src (signature) if filter.src.isSome(): @@ -663,13 +698,13 @@ proc toBloom*(filters: Filters): Bloom = type WhisperPeer = ref object - initialized*: bool # when successfully completed the handshake + initialized: bool # when successfully completed the handshake powRequirement*: float64 bloom*: Bloom isLightNode*: bool trusted*: bool received: HashSet[Message] - running*: bool + running: bool WhisperNetwork = ref object queue*: Queue @@ -694,7 +729,7 @@ p2pProtocol Whisper(version = whisperVersion, networkState = WhisperNetwork): onPeerConnected do (peer: Peer): - debug "onPeerConnected Whisper" + trace "onPeerConnected Whisper" let whisperNet = peer.networkState whisperPeer = peer.state @@ -753,7 +788,7 @@ p2pProtocol Whisper(version = whisperVersion, for envelope in envelopes: # check if expired or in future, or ttl not 0 if not envelope.valid(): - warn "Expired or future timed envelope" + warn "Expired or future timed envelope", peer # disconnect from peers sending bad envelopes # await peer.disconnect(SubprotocolReason) continue @@ -769,8 +804,12 @@ p2pProtocol Whisper(version = whisperVersion, # it was either already received here from this peer or send to this peer. # Either way it will be in our queue already (and the peer should know # this) and this peer is sending duplicates. + # Note: geth does not check if a peer has send a message to them before + # broadcasting this message. This too is seen here as a duplicate message + # (see above comment). If we want to seperate these cases (e.g. when peer + # rating), then we have to add a "peer.state.send" HashSet. if peer.state.received.containsOrIncl(msg): - warn "Peer sending duplicate messages" + debug "Peer sending duplicate messages", peer, hash = msg.hash # await peer.disconnect(SubprotocolReason) continue @@ -807,6 +846,17 @@ p2pProtocol Whisper(version = whisperVersion, let msg = Message(env: envelope, isP2P: true) peer.networkState.filters.notify(msg) + # Following message IDs are not part of EIP-627, but are added and used by + # the Status application, we ignore them for now. + nextID 11 + proc batchAcknowledged(peer: Peer) = discard + proc messageResponse(peer: Peer) = discard + nextID 123 + requestResponse: + proc p2pSyncRequest(peer: Peer) = discard + proc p2pSyncResponse(peer: Peer) = discard + proc p2pRequestComplete(peer: Peer) = discard + # 'Runner' calls --------------------------------------------------------------- proc processQueue(peer: Peer) = @@ -821,18 +871,19 @@ proc processQueue(peer: Peer) = continue if message.pow < whisperPeer.powRequirement: - debug "Message PoW too low for peer" + debug "Message PoW too low for peer", pow = message.pow, + powReq = whisperPeer.powRequirement continue if not bloomFilterMatch(whisperPeer.bloom, message.bloom): debug "Message does not match peer bloom filter" continue - debug "Adding envelope" + trace "Adding envelope" envelopes.add(message.env) whisperPeer.received.incl(message) - debug "Sending envelopes", amount=envelopes.len + trace "Sending envelopes", amount=envelopes.len # await peer.messages(envelopes) asyncCheck peer.messages(envelopes) @@ -869,46 +920,14 @@ proc run(node: EthereumNode, network: WhisperNetwork) {.async.} = node.pruneReceived() await sleepAsync(pruneInterval) -# Public EthereumNode calls ---------------------------------------------------- +# Private EthereumNode calls --------------------------------------------------- -proc sendP2PMessage*(node: EthereumNode, peerId: NodeId, env: Envelope): bool = +proc sendP2PMessage(node: EthereumNode, peerId: NodeId, env: Envelope): bool = for peer in node.peers(Whisper): if peer.remote.id == peerId: asyncCheck peer.p2pMessage(env) return true -# NOTE: PoW calculations are different from go-ethereum implementation, -# which is not conform EIP-627. -# See here: https://github.com/ethereum/go-ethereum/issues/18070 -# However, this implementation is also not conform EIP-627 as we do not use the -# size of the RLP-encoded envelope, but the size of the envelope object itself. -# This is done to be able to correctly calculate the bestBitTarget. -# Other options would be: -# - work directly with powTarget in minePow, but this requires recalculation of -# rlp size + calcPow -# - Use worst case size of envelope nonce -proc sealEnvelope*(msg: var Message, powTime: float, powTarget: float): bool = - let size = msg.env.len - if powTarget > 0: - let x = powTarget * size.float * msg.env.ttl.float - var bestBitTarget: int - if x <= 1: # log() would return negative numbers or 0 - bestBitTarget = 1 - else: - bestBitTarget = ceil(log(x, 2)).int - (msg.env.nonce, msg.hash) = msg.env.minePow(powTime, bestBitTarget) - else: - # If no target is set, we are certain of executed powTime - msg.env.expiry += powTime.uint32 - (msg.env.nonce, msg.hash) = msg.env.minePow(powTime) - - msg.pow = calcPow(size.uint32, msg.env.ttl, msg.hash) - trace "Message PoW", pow = msg.pow - if msg.pow < powTarget: - return false - - return true - proc queueMessage(node: EthereumNode, msg: Message): bool = var whisperNet = node.protocolState(Whisper) @@ -917,7 +936,7 @@ proc queueMessage(node: EthereumNode, msg: Message): bool = if not msg.allowed(whisperNet.config): return false - debug "Adding message to queue" + trace "Adding message to queue" if whisperNet.queue.add(msg): # Also notify our own filters of the message we are sending, # e.g. msg from local Dapp to Dapp @@ -925,13 +944,18 @@ proc queueMessage(node: EthereumNode, msg: Message): bool = return true +# Public EthereumNode calls ---------------------------------------------------- + proc postMessage*(node: EthereumNode, pubKey = none[PublicKey](), symKey = none[SymKey](), src = none[PrivateKey](), ttl: uint32, topic: Topic, payload: Bytes, padding = none[Bytes](), powTime = 1'f, powTarget = defaultMinPow, targetPeer = none[NodeId]()): bool = - # NOTE: Allow a post without a key? Encryption is mandatory in v6? + ## Post a message on the message queue which will be processed at the + ## next `messageInterval`. + ## NOTE: This call allows a post without encryption. If encryption is + ## mandatory it should be enforced a layer up let payload = encode(Payload(payload: payload, src: src, dst: pubKey, symKey: symKey, padding: padding)) if payload.isSome(): @@ -962,7 +986,7 @@ proc postMessage*(node: EthereumNode, pubKey = none[PublicKey](), return node.queueMessage(msg) else: - error "Light node not allowed to post messages" + warn "Light node not allowed to post messages" return false else: error "Encoding of payload failed" @@ -970,19 +994,30 @@ proc postMessage*(node: EthereumNode, pubKey = none[PublicKey](), proc subscribeFilter*(node: EthereumNode, filter: Filter, handler:FilterMsgHandler = nil): string = + ## Initiate a filter for incoming/outgoing messages. Messages can be + ## retrieved with the `getFilterMessages` call or with a provided + ## `FilterMsgHandler`. + ## NOTE: This call allows for a filter without decryption. If encryption is + ## mandatory it should be enforced a layer up return node.protocolState(Whisper).filters.subscribeFilter(filter, handler) proc unsubscribeFilter*(node: EthereumNode, filterId: string): bool = + ## Remove a previously subscribed filter. var filter: Filter return node.protocolState(Whisper).filters.take(filterId, filter) proc getFilterMessages*(node: EthereumNode, filterId: string): seq[ReceivedMessage] = + ## Get all the messages currently in the filter queue. This will reset the + ## filter message queue return node.protocolState(Whisper).filters.getFilterMessages(filterId) proc filtersToBloom*(node: EthereumNode): Bloom = + ## returns the bloom filter of all topics of all subscribed filters return node.protocolState(Whisper).filters.toBloom() proc setPowRequirement*(node: EthereumNode, powReq: float64) {.async.} = + ## Sets the PoW requirement for this node, will also send + ## this new PoW requirement to all connected peers # NOTE: do we need a tolerance of old PoW for some time? node.protocolState(Whisper).config.powRequirement = powReq var futures: seq[Future[void]] = @[] @@ -992,6 +1027,8 @@ proc setPowRequirement*(node: EthereumNode, powReq: float64) {.async.} = await all(futures) proc setBloomFilter*(node: EthereumNode, bloom: Bloom) {.async.} = + ## Sets the bloom filter for this node, will also send + ## this new bloom filter to all connected peers # NOTE: do we need a tolerance of old bloom filter for some time? node.protocolState(Whisper).config.bloom = bloom var futures: seq[Future[void]] = @[] @@ -1001,26 +1038,34 @@ proc setBloomFilter*(node: EthereumNode, bloom: Bloom) {.async.} = await all(futures) proc setMaxMessageSize*(node: EthereumNode, size: uint32): bool = + ## Set the maximum allowed message size if size > defaultMaxMsgSize: - error "size > maxMsgSize" + warn "size > defaultMaxMsgSize" return false node.protocolState(Whisper).config.maxMsgSize = size return true proc setPeerTrusted*(node: EthereumNode, peerId: NodeId): bool = + ## Set a connected peer as trusted for peer in node.peers(Whisper): if peer.remote.id == peerId: peer.state(Whisper).trusted = true return true -# NOTE: Should be run before connection is made with peers proc setLightNode*(node: EthereumNode, isLightNode: bool) = + ## Set this node as a Whisper light node + ## NOTE: Should be run before connection is made with peers as this + ## setting is only communicated at peer handshake node.protocolState(Whisper).config.isLightNode = isLightNode -# NOTE: Should be run before connection is made with peers + proc configureWhisper*(node: EthereumNode, config: WhisperConfig) = + ## Apply a Whisper configuration + ## NOTE: Should be run before connection is made with peers as some + ## of the settings are only communicated at peer handshake node.protocolState(Whisper).config = config -# Not something that should be run in normal circumstances proc resetMessageQueue*(node: EthereumNode) = + ## Full reset of the message queue + ## NOTE: Not something that should be run in normal circumstances node.protocolState(Whisper).queue = initQueue(defaultQueueCapacity)