From c599f7649d41f9300d6ae5f4f3f86c8a55125109 Mon Sep 17 00:00:00 2001 From: kdeme Date: Sat, 23 Mar 2019 21:53:03 +0100 Subject: [PATCH] Add powTarget and other changes for Whisper RPC implementation --- eth/p2p/rlpx_protocols/whisper_protocol.nim | 107 ++++++++++++++------ tests/p2p/test_shh.nim | 6 +- tests/p2p/test_shh_connect.nim | 20 ++-- 3 files changed, 89 insertions(+), 44 deletions(-) diff --git a/eth/p2p/rlpx_protocols/whisper_protocol.nim b/eth/p2p/rlpx_protocols/whisper_protocol.nim index e2964e4..292396f 100644 --- a/eth/p2p/rlpx_protocols/whisper_protocol.nim +++ b/eth/p2p/rlpx_protocols/whisper_protocol.nim @@ -23,6 +23,7 @@ const defaultQueueCapacity = 256 defaultFilterQueueCapacity = 64 whisperVersion* = 6 + whisperVersionStr* = "6.0" defaultMinPow* = 0.001'f64 defaultMaxMsgSize* = 1024'u32 * 1024'u32 # * 10 # should be no higher than max RLPx size messageInterval* = 300 ## Interval at which messages are send to peers, in ms @@ -79,6 +80,7 @@ type topic*: Topic pow*: float64 hash*: Hash + dst*: Option[PublicKey] Queue* = object ## Bounded message repository @@ -103,12 +105,12 @@ type FilterMsgHandler* = proc(msg: ReceivedMessage) {.gcsafe, closure.} Filter* = object - src: Option[PublicKey] - privateKey: Option[PrivateKey] - symKey: Option[SymKey] - topics: seq[Topic] - powReq: float64 - allowP2P: bool + src*: Option[PublicKey] + privateKey*: Option[PrivateKey] + symKey*: Option[SymKey] + topics*: seq[Topic] + powReq*: float64 + allowP2P*: bool bloom: Bloom # cached bloom filter of all topics of filter handler: FilterMsgHandler @@ -170,7 +172,7 @@ proc topicBloom*(topic: Topic): Bloom = doAssert idx <= 511 result[idx div 8] = result[idx div 8] or byte(1 shl (idx and 7'u16)) -proc generateRandomID(): string = +proc generateRandomID*(): string = var bytes: array[256 div 8, byte] while true: # XXX: error instead of looping? if randomBytes(bytes) == 256 div 8: @@ -424,6 +426,8 @@ proc valid*(self: Envelope, now = epochTime()): bool = return true +proc len(self: Envelope): int = 20 + self.data.len + proc toShortRlp(self: Envelope): Bytes = ## RLP-encoded message without nonce is used during proof-of-work calculations rlp.encodeList(self.expiry, self.ttl, self.topic, self.data) @@ -432,11 +436,7 @@ proc toRlp(self: Envelope): Bytes = ## What gets sent out over the wire includes the nonce rlp.encode(self) -# NOTE: minePow and calcPowHash are different from go-ethereum implementation. -# Is correct however with EIP-627, but perhaps this is not up to date. -# Follow-up here: https://github.com/ethereum/go-ethereum/issues/18070 - -proc minePow*(self: Envelope, seconds: float): uint64 = +proc minePow*(self: Envelope, seconds: float, bestBitTarget: int = 0): (uint64, Hash) = ## For the given envelope, spend millis milliseconds to find the ## best proof-of-work and return the nonce let bytes = self.toShortRlp() @@ -445,19 +445,22 @@ proc minePow*(self: Envelope, seconds: float): uint64 = ctx.init() ctx.update(bytes) - var bestPow: float64 = 0.0 + var bestBit: int = 0 let mineEnd = epochTime() + seconds var i: uint64 - while epochTime() < mineEnd or bestPow == 0: # At least one round + while epochTime() < mineEnd or bestBit == 0: # At least one round var tmp = ctx # copy hash calculated so far - we'll reuse that for each iter tmp.update(i.toBE()) # XXX:a random nonce here would not leak number of iters - let pow = calcPow(1, 1, tmp.finish()) - if pow > bestPow: # XXX: could also compare hashes as numbers instead - bestPow = pow - result = i.uint64 + let hash = tmp.finish() + let zeroBits = leadingZeroBits(hash) + 1 + if zeroBits > bestBit: # XXX: could also compare hashes as numbers instead + bestBit = zeroBits + result = (i, hash) + if bestBitTarget > 0 and bestBit >= bestBitTarget: + break i.inc @@ -481,12 +484,14 @@ proc cmpPow(a, b: Message): int = elif a.pow == b.pow: 0 else: -1 -proc initMessage*(env: Envelope): Message = +proc initMessage*(env: Envelope, powCalc = true): Message = result.env = env - result.hash = env.calcPowHash() result.size = env.toRlp().len().uint32 # XXX: calc len without creating RLP - result.pow = calcPow(result.size, result.env.ttl, result.hash) result.bloom = topicBloom(env.topic) + if powCalc: + result.hash = env.calcPowHash() + result.pow = calcPow(result.env.len.uint32, result.env.ttl, result.hash) + trace "Message PoW", pow = result.pow proc hash*(msg: Message): hashes.Hash = hash(msg.hash.data) @@ -584,6 +589,7 @@ proc subscribeFilter*(filters: var Filters, filter: Filter, proc notify*(filters: var Filters, msg: Message) {.gcsafe.} = var decoded: Option[DecodedPayload] var keyHash: Hash + var dst: Option[PublicKey] for filter in filters.mvalues: if not filter.allowP2P and msg.isP2P: @@ -603,6 +609,8 @@ proc notify*(filters: var Filters, msg: Message) {.gcsafe.} = symKey = filter.symKey) if filter.privateKey.isSome(): keyHash = keccak256.digest(filter.privateKey.get().data) + # TODO: Get rid of the hash and just use pubkey to compare? + dst = some(getPublicKey(filter.privateKey.get())) elif filter.symKey.isSome(): keyHash = keccak256.digest(filter.symKey.get()) # else: @@ -632,7 +640,8 @@ proc notify*(filters: var Filters, msg: Message) {.gcsafe.} = ttl: msg.env.ttl, topic: msg.env.topic, pow: msg.pow, - hash: msg.hash) + hash: msg.hash, + dst: dst) # Either run callback or add to queue if filter.handler.isNil(): filter.queue.insert(receivedMsg) @@ -868,14 +877,43 @@ proc sendP2PMessage*(node: EthereumNode, peerId: NodeId, env: Envelope): bool = asyncCheck peer.p2pMessage(env) return true -proc sendMessage*(node: EthereumNode, env: var Envelope): bool = - if not env.valid(): # actually just ttl !=0 is sufficient - return false +# NOTE: PoW calculations are different from go-ethereum implementation, +# which is not conform EIP-627. +# See here: https://github.com/ethereum/go-ethereum/issues/18070 +# However, this implementation is also not conform EIP-627 as we do not use the +# size of the RLP-encoded envelope, but the size of the envelope object itself. +# This is done to be able to correctly calculate the bestBitTarget. +# Other options would be: +# - work directly with powTarget in minePow, but this requires recalculation of +# rlp size + calcPow +# - Use worst case size of envelope nonce +proc sealEnvelope*(msg: var Message, powTime: float, powTarget: float): bool = + let size = msg.env.len + if powTarget > 0: + let x = powTarget * size.float * msg.env.ttl.float + var bestBitTarget: int + if x <= 1: # log() would return negative numbers or 0 + bestBitTarget = 1 + else: + bestBitTarget = ceil(log(x, 2)).int + (msg.env.nonce, msg.hash) = msg.env.minePow(powTime, bestBitTarget) + else: + # If no target is set, we are certain of executed powTime + msg.env.expiry += powTime.uint32 + (msg.env.nonce, msg.hash) = msg.env.minePow(powTime) + + msg.pow = calcPow(size.uint32, msg.env.ttl, msg.hash) + trace "Message PoW", pow = msg.pow + if msg.pow < powTarget: + return false + + return true + +proc queueMessage(node: EthereumNode, msg: Message): bool = var whisperNet = node.protocolState(Whisper) # We have to do the same checks here as in the messages proc not to leak # any information that the message originates from this node. - let msg = initMessage(env) if not msg.allowed(whisperNet.config): return false @@ -891,18 +929,23 @@ proc postMessage*(node: EthereumNode, pubKey = none[PublicKey](), symKey = none[SymKey](), src = none[PrivateKey](), ttl: uint32, topic: Topic, payload: Bytes, padding = none[Bytes](), powTime = 1'f, + powTarget = defaultMinPow, targetPeer = none[NodeId]()): bool = # NOTE: Allow a post without a key? Encryption is mandatory in v6? let payload = encode(Payload(payload: payload, src: src, dst: pubKey, symKey: symKey, padding: padding)) if payload.isSome(): - var env = Envelope(expiry:epochTime().uint32 + ttl + powTime.uint32, + var env = Envelope(expiry:epochTime().uint32 + ttl, ttl: ttl, topic: topic, data: payload.get(), nonce: 0) # Allow lightnode to post only direct p2p messages if targetPeer.isSome(): return node.sendP2PMessage(targetPeer.get(), env) elif not node.protocolState(Whisper).config.isLightNode: + # non direct p2p message can not have ttl of 0 + if env.ttl == 0: + return false + var msg = initMessage(env, powCalc = false) # XXX: make this non blocking or not? # In its current blocking state, it could be noticed by a peer that no # messages are send for a while, and thus that mining PoW is done, and @@ -910,8 +953,14 @@ proc postMessage*(node: EthereumNode, pubKey = none[PublicKey](), # zah: It would be hard to execute this in a background thread at the # moment. We'll need a way to send custom "tasks" to the async message # loop (e.g. AD2 support for AsyncChannels). - env.nonce = env.minePow(powTime) - return node.sendMessage(env) + if not msg.sealEnvelope(powTime, powTarget): + return false + + # need to check expiry after mining PoW + if not msg.env.valid(): + return false + + return node.queueMessage(msg) else: error "Light node not allowed to post messages" return false diff --git a/tests/p2p/test_shh.nim b/tests/p2p/test_shh.nim index 2fb1d4a..8bbd4a3 100644 --- a/tests/p2p/test_shh.nim +++ b/tests/p2p/test_shh.nim @@ -336,14 +336,16 @@ suite "Whisper filter": let padding = some(repeat(byte 0, 251)) # this message has a PoW of 0.02962962962962963, number should be updated # in case PoW algorithm changes or contents of padding, payload, topic, etc. + # update: now with NON rlp encoded envelope size the PoW of this message is + # 0.02898550724637681 let msg = prepFilterTestMsg(topic = topic, padding = padding) var filters = initTable[string, Filter]() let filterId1 = filters.subscribeFilter( - newFilter(topics = @[topic], powReq = 0.02962962962962963)) + newFilter(topics = @[topic], powReq = 0.02898550724637681)) filterId2 = filters.subscribeFilter( - newFilter(topics = @[topic], powReq = 0.02962962962962964)) + newFilter(topics = @[topic], powReq = 0.02898550724637682)) notify(filters, msg) diff --git a/tests/p2p/test_shh_connect.nim b/tests/p2p/test_shh_connect.nim index bad506e..c5cf279 100644 --- a/tests/p2p/test_shh_connect.nim +++ b/tests/p2p/test_shh_connect.nim @@ -17,6 +17,7 @@ proc resetMessageQueues(nodes: varargs[EthereumNode]) = node.resetMessageQueue() let bootENode = waitFor setupBootNode() +let safeTTL = 5'u32 var node1 = setupTestNode(Whisper) var node2 = setupTestNode(Whisper) @@ -75,7 +76,6 @@ suite "Whisper connections": filters.add(node1.subscribeFilter(newFilter(some(signKeyPair.pubkey), symKey = some(symKey), topics = @[topic]), handler4)) - var safeTTL = 5'u32 # Messages check: # encrypted asym @@ -122,7 +122,6 @@ suite "Whisper connections": var filter1 = node1.subscribeFilter(newFilter(topics = @[topic1]), handler1) var filter2 = node1.subscribeFilter(newFilter(topics = @[topic2]), handler2) - var safeTTL = 3'u32 check: node2.postMessage(ttl = safeTTL + 1, topic = topic1, payload = payloads[0]) == true @@ -157,7 +156,6 @@ suite "Whisper connections": var filter2 = node1.subscribeFilter(newFilter(topics = @[topic], powReq = 1_000_000), handler2) - let safeTTL = 2'u32 check: node2.postMessage(ttl = safeTTL, topic = topic, payload = payload) == true @@ -179,8 +177,8 @@ suite "Whisper connections": var filter = node1.subscribeFilter(newFilter(topics = @[topic])) for i in countdown(10, 1): - check node2.postMessage(ttl = i.uint32, topic = topic, - payload = payload) == true + check node2.postMessage(ttl = safeTTL, topic = topic, + payload = payload) == true await sleepAsync(messageInterval) check: @@ -194,7 +192,6 @@ suite "Whisper connections": let topic = [byte 0, 0, 0, 0] var filter = node1.subscribeFilter(newFilter(topics = @[topic])) - let safeTTL = 2'u32 check: node1.postMessage(ttl = safeTTL, topic = topic, payload = repeat(byte 4, 10)) == true @@ -216,7 +213,6 @@ suite "Whisper connections": var filter = node1.subscribeFilter(newFilter(topics = filterTopics), handler) await node1.setBloomFilter(node1.filtersToBloom()) - let safeTTL = 2'u32 check: node2.postMessage(ttl = safeTTL, topic = sendTopic1, payload = payload) == true @@ -251,7 +247,6 @@ suite "Whisper connections": asyncTest "PoW blocking": let topic = [byte 0, 0, 0, 0] let payload = repeat(byte 0, 10) - let safeTTL = 2'u32 await node1.setPowRequirement(1_000_000) check: @@ -279,15 +274,15 @@ suite "Whisper connections": # We need a minimum TTL of 2 as when set to 1 there is a small chance that # it is already expired after messageInterval due to rounding down of float # to uint32 in postMessage() - let minTTL = 2'u32 - for i in countdown(minTTL + 9, minTTL): - check node2.postMessage(ttl = i, topic = topic, payload = payload) == true + let lowerTTL = 2'u32 # Lower TTL as we need to wait for messages to expire + for i in countdown(10, 1): + check node2.postMessage(ttl = lowerTTL, topic = topic, payload = payload) == true check node2.protocolState(Whisper).queue.items.len == 10 await sleepAsync(messageInterval) check node1.protocolState(Whisper).queue.items.len == 10 - await sleepAsync(int(minTTL*1000)) + await sleepAsync(int((lowerTTL+1)*1000)) check node1.protocolState(Whisper).queue.items.len == 0 check node2.protocolState(Whisper).queue.items.len == 0 @@ -326,7 +321,6 @@ suite "Whisper connections": let topic = [byte 0, 0, 0, 0] - let safeTTL = 2'u32 check: # normal post ln1.postMessage(ttl = safeTTL, topic = topic,