Add powTarget and other changes for Whisper RPC implementation

This commit is contained in:
kdeme 2019-03-23 21:53:03 +01:00 committed by zah
parent 9e5cf2086c
commit c599f7649d
3 changed files with 89 additions and 44 deletions

View File

@ -23,6 +23,7 @@ const
defaultQueueCapacity = 256 defaultQueueCapacity = 256
defaultFilterQueueCapacity = 64 defaultFilterQueueCapacity = 64
whisperVersion* = 6 whisperVersion* = 6
whisperVersionStr* = "6.0"
defaultMinPow* = 0.001'f64 defaultMinPow* = 0.001'f64
defaultMaxMsgSize* = 1024'u32 * 1024'u32 # * 10 # should be no higher than max RLPx size defaultMaxMsgSize* = 1024'u32 * 1024'u32 # * 10 # should be no higher than max RLPx size
messageInterval* = 300 ## Interval at which messages are send to peers, in ms messageInterval* = 300 ## Interval at which messages are send to peers, in ms
@ -79,6 +80,7 @@ type
topic*: Topic topic*: Topic
pow*: float64 pow*: float64
hash*: Hash hash*: Hash
dst*: Option[PublicKey]
Queue* = object Queue* = object
## Bounded message repository ## Bounded message repository
@ -103,12 +105,12 @@ type
FilterMsgHandler* = proc(msg: ReceivedMessage) {.gcsafe, closure.} FilterMsgHandler* = proc(msg: ReceivedMessage) {.gcsafe, closure.}
Filter* = object Filter* = object
src: Option[PublicKey] src*: Option[PublicKey]
privateKey: Option[PrivateKey] privateKey*: Option[PrivateKey]
symKey: Option[SymKey] symKey*: Option[SymKey]
topics: seq[Topic] topics*: seq[Topic]
powReq: float64 powReq*: float64
allowP2P: bool allowP2P*: bool
bloom: Bloom # cached bloom filter of all topics of filter bloom: Bloom # cached bloom filter of all topics of filter
handler: FilterMsgHandler handler: FilterMsgHandler
@ -170,7 +172,7 @@ proc topicBloom*(topic: Topic): Bloom =
doAssert idx <= 511 doAssert idx <= 511
result[idx div 8] = result[idx div 8] or byte(1 shl (idx and 7'u16)) result[idx div 8] = result[idx div 8] or byte(1 shl (idx and 7'u16))
proc generateRandomID(): string = proc generateRandomID*(): string =
var bytes: array[256 div 8, byte] var bytes: array[256 div 8, byte]
while true: # XXX: error instead of looping? while true: # XXX: error instead of looping?
if randomBytes(bytes) == 256 div 8: if randomBytes(bytes) == 256 div 8:
@ -424,6 +426,8 @@ proc valid*(self: Envelope, now = epochTime()): bool =
return true return true
proc len(self: Envelope): int = 20 + self.data.len
proc toShortRlp(self: Envelope): Bytes = proc toShortRlp(self: Envelope): Bytes =
## RLP-encoded message without nonce is used during proof-of-work calculations ## RLP-encoded message without nonce is used during proof-of-work calculations
rlp.encodeList(self.expiry, self.ttl, self.topic, self.data) rlp.encodeList(self.expiry, self.ttl, self.topic, self.data)
@ -432,11 +436,7 @@ proc toRlp(self: Envelope): Bytes =
## What gets sent out over the wire includes the nonce ## What gets sent out over the wire includes the nonce
rlp.encode(self) rlp.encode(self)
# NOTE: minePow and calcPowHash are different from go-ethereum implementation. proc minePow*(self: Envelope, seconds: float, bestBitTarget: int = 0): (uint64, Hash) =
# Is correct however with EIP-627, but perhaps this is not up to date.
# Follow-up here: https://github.com/ethereum/go-ethereum/issues/18070
proc minePow*(self: Envelope, seconds: float): uint64 =
## For the given envelope, spend millis milliseconds to find the ## For the given envelope, spend millis milliseconds to find the
## best proof-of-work and return the nonce ## best proof-of-work and return the nonce
let bytes = self.toShortRlp() let bytes = self.toShortRlp()
@ -445,19 +445,22 @@ proc minePow*(self: Envelope, seconds: float): uint64 =
ctx.init() ctx.init()
ctx.update(bytes) ctx.update(bytes)
var bestPow: float64 = 0.0 var bestBit: int = 0
let mineEnd = epochTime() + seconds let mineEnd = epochTime() + seconds
var i: uint64 var i: uint64
while epochTime() < mineEnd or bestPow == 0: # At least one round while epochTime() < mineEnd or bestBit == 0: # At least one round
var tmp = ctx # copy hash calculated so far - we'll reuse that for each iter var tmp = ctx # copy hash calculated so far - we'll reuse that for each iter
tmp.update(i.toBE()) tmp.update(i.toBE())
# XXX:a random nonce here would not leak number of iters # XXX:a random nonce here would not leak number of iters
let pow = calcPow(1, 1, tmp.finish()) let hash = tmp.finish()
if pow > bestPow: # XXX: could also compare hashes as numbers instead let zeroBits = leadingZeroBits(hash) + 1
bestPow = pow if zeroBits > bestBit: # XXX: could also compare hashes as numbers instead
result = i.uint64 bestBit = zeroBits
result = (i, hash)
if bestBitTarget > 0 and bestBit >= bestBitTarget:
break
i.inc i.inc
@ -481,12 +484,14 @@ proc cmpPow(a, b: Message): int =
elif a.pow == b.pow: 0 elif a.pow == b.pow: 0
else: -1 else: -1
proc initMessage*(env: Envelope): Message = proc initMessage*(env: Envelope, powCalc = true): Message =
result.env = env result.env = env
result.hash = env.calcPowHash()
result.size = env.toRlp().len().uint32 # XXX: calc len without creating RLP result.size = env.toRlp().len().uint32 # XXX: calc len without creating RLP
result.pow = calcPow(result.size, result.env.ttl, result.hash)
result.bloom = topicBloom(env.topic) result.bloom = topicBloom(env.topic)
if powCalc:
result.hash = env.calcPowHash()
result.pow = calcPow(result.env.len.uint32, result.env.ttl, result.hash)
trace "Message PoW", pow = result.pow
proc hash*(msg: Message): hashes.Hash = hash(msg.hash.data) proc hash*(msg: Message): hashes.Hash = hash(msg.hash.data)
@ -584,6 +589,7 @@ proc subscribeFilter*(filters: var Filters, filter: Filter,
proc notify*(filters: var Filters, msg: Message) {.gcsafe.} = proc notify*(filters: var Filters, msg: Message) {.gcsafe.} =
var decoded: Option[DecodedPayload] var decoded: Option[DecodedPayload]
var keyHash: Hash var keyHash: Hash
var dst: Option[PublicKey]
for filter in filters.mvalues: for filter in filters.mvalues:
if not filter.allowP2P and msg.isP2P: if not filter.allowP2P and msg.isP2P:
@ -603,6 +609,8 @@ proc notify*(filters: var Filters, msg: Message) {.gcsafe.} =
symKey = filter.symKey) symKey = filter.symKey)
if filter.privateKey.isSome(): if filter.privateKey.isSome():
keyHash = keccak256.digest(filter.privateKey.get().data) keyHash = keccak256.digest(filter.privateKey.get().data)
# TODO: Get rid of the hash and just use pubkey to compare?
dst = some(getPublicKey(filter.privateKey.get()))
elif filter.symKey.isSome(): elif filter.symKey.isSome():
keyHash = keccak256.digest(filter.symKey.get()) keyHash = keccak256.digest(filter.symKey.get())
# else: # else:
@ -632,7 +640,8 @@ proc notify*(filters: var Filters, msg: Message) {.gcsafe.} =
ttl: msg.env.ttl, ttl: msg.env.ttl,
topic: msg.env.topic, topic: msg.env.topic,
pow: msg.pow, pow: msg.pow,
hash: msg.hash) hash: msg.hash,
dst: dst)
# Either run callback or add to queue # Either run callback or add to queue
if filter.handler.isNil(): if filter.handler.isNil():
filter.queue.insert(receivedMsg) filter.queue.insert(receivedMsg)
@ -868,14 +877,43 @@ proc sendP2PMessage*(node: EthereumNode, peerId: NodeId, env: Envelope): bool =
asyncCheck peer.p2pMessage(env) asyncCheck peer.p2pMessage(env)
return true return true
proc sendMessage*(node: EthereumNode, env: var Envelope): bool = # NOTE: PoW calculations are different from go-ethereum implementation,
if not env.valid(): # actually just ttl !=0 is sufficient # which is not conform EIP-627.
return false # See here: https://github.com/ethereum/go-ethereum/issues/18070
# However, this implementation is also not conform EIP-627 as we do not use the
# size of the RLP-encoded envelope, but the size of the envelope object itself.
# This is done to be able to correctly calculate the bestBitTarget.
# Other options would be:
# - work directly with powTarget in minePow, but this requires recalculation of
# rlp size + calcPow
# - Use worst case size of envelope nonce
proc sealEnvelope*(msg: var Message, powTime: float, powTarget: float): bool =
let size = msg.env.len
if powTarget > 0:
let x = powTarget * size.float * msg.env.ttl.float
var bestBitTarget: int
if x <= 1: # log() would return negative numbers or 0
bestBitTarget = 1
else:
bestBitTarget = ceil(log(x, 2)).int
(msg.env.nonce, msg.hash) = msg.env.minePow(powTime, bestBitTarget)
else:
# If no target is set, we are certain of executed powTime
msg.env.expiry += powTime.uint32
(msg.env.nonce, msg.hash) = msg.env.minePow(powTime)
msg.pow = calcPow(size.uint32, msg.env.ttl, msg.hash)
trace "Message PoW", pow = msg.pow
if msg.pow < powTarget:
return false
return true
proc queueMessage(node: EthereumNode, msg: Message): bool =
var whisperNet = node.protocolState(Whisper) var whisperNet = node.protocolState(Whisper)
# We have to do the same checks here as in the messages proc not to leak # We have to do the same checks here as in the messages proc not to leak
# any information that the message originates from this node. # any information that the message originates from this node.
let msg = initMessage(env)
if not msg.allowed(whisperNet.config): if not msg.allowed(whisperNet.config):
return false return false
@ -891,18 +929,23 @@ proc postMessage*(node: EthereumNode, pubKey = none[PublicKey](),
symKey = none[SymKey](), src = none[PrivateKey](), symKey = none[SymKey](), src = none[PrivateKey](),
ttl: uint32, topic: Topic, payload: Bytes, ttl: uint32, topic: Topic, payload: Bytes,
padding = none[Bytes](), powTime = 1'f, padding = none[Bytes](), powTime = 1'f,
powTarget = defaultMinPow,
targetPeer = none[NodeId]()): bool = targetPeer = none[NodeId]()): bool =
# NOTE: Allow a post without a key? Encryption is mandatory in v6? # NOTE: Allow a post without a key? Encryption is mandatory in v6?
let payload = encode(Payload(payload: payload, src: src, dst: pubKey, let payload = encode(Payload(payload: payload, src: src, dst: pubKey,
symKey: symKey, padding: padding)) symKey: symKey, padding: padding))
if payload.isSome(): if payload.isSome():
var env = Envelope(expiry:epochTime().uint32 + ttl + powTime.uint32, var env = Envelope(expiry:epochTime().uint32 + ttl,
ttl: ttl, topic: topic, data: payload.get(), nonce: 0) ttl: ttl, topic: topic, data: payload.get(), nonce: 0)
# Allow lightnode to post only direct p2p messages # Allow lightnode to post only direct p2p messages
if targetPeer.isSome(): if targetPeer.isSome():
return node.sendP2PMessage(targetPeer.get(), env) return node.sendP2PMessage(targetPeer.get(), env)
elif not node.protocolState(Whisper).config.isLightNode: elif not node.protocolState(Whisper).config.isLightNode:
# non direct p2p message can not have ttl of 0
if env.ttl == 0:
return false
var msg = initMessage(env, powCalc = false)
# XXX: make this non blocking or not? # XXX: make this non blocking or not?
# In its current blocking state, it could be noticed by a peer that no # In its current blocking state, it could be noticed by a peer that no
# messages are send for a while, and thus that mining PoW is done, and # messages are send for a while, and thus that mining PoW is done, and
@ -910,8 +953,14 @@ proc postMessage*(node: EthereumNode, pubKey = none[PublicKey](),
# zah: It would be hard to execute this in a background thread at the # zah: It would be hard to execute this in a background thread at the
# moment. We'll need a way to send custom "tasks" to the async message # moment. We'll need a way to send custom "tasks" to the async message
# loop (e.g. AD2 support for AsyncChannels). # loop (e.g. AD2 support for AsyncChannels).
env.nonce = env.minePow(powTime) if not msg.sealEnvelope(powTime, powTarget):
return node.sendMessage(env) return false
# need to check expiry after mining PoW
if not msg.env.valid():
return false
return node.queueMessage(msg)
else: else:
error "Light node not allowed to post messages" error "Light node not allowed to post messages"
return false return false

View File

@ -336,14 +336,16 @@ suite "Whisper filter":
let padding = some(repeat(byte 0, 251)) let padding = some(repeat(byte 0, 251))
# this message has a PoW of 0.02962962962962963, number should be updated # this message has a PoW of 0.02962962962962963, number should be updated
# in case PoW algorithm changes or contents of padding, payload, topic, etc. # in case PoW algorithm changes or contents of padding, payload, topic, etc.
# update: now with NON rlp encoded envelope size the PoW of this message is
# 0.02898550724637681
let msg = prepFilterTestMsg(topic = topic, padding = padding) let msg = prepFilterTestMsg(topic = topic, padding = padding)
var filters = initTable[string, Filter]() var filters = initTable[string, Filter]()
let let
filterId1 = filters.subscribeFilter( filterId1 = filters.subscribeFilter(
newFilter(topics = @[topic], powReq = 0.02962962962962963)) newFilter(topics = @[topic], powReq = 0.02898550724637681))
filterId2 = filters.subscribeFilter( filterId2 = filters.subscribeFilter(
newFilter(topics = @[topic], powReq = 0.02962962962962964)) newFilter(topics = @[topic], powReq = 0.02898550724637682))
notify(filters, msg) notify(filters, msg)

View File

@ -17,6 +17,7 @@ proc resetMessageQueues(nodes: varargs[EthereumNode]) =
node.resetMessageQueue() node.resetMessageQueue()
let bootENode = waitFor setupBootNode() let bootENode = waitFor setupBootNode()
let safeTTL = 5'u32
var node1 = setupTestNode(Whisper) var node1 = setupTestNode(Whisper)
var node2 = setupTestNode(Whisper) var node2 = setupTestNode(Whisper)
@ -75,7 +76,6 @@ suite "Whisper connections":
filters.add(node1.subscribeFilter(newFilter(some(signKeyPair.pubkey), filters.add(node1.subscribeFilter(newFilter(some(signKeyPair.pubkey),
symKey = some(symKey), symKey = some(symKey),
topics = @[topic]), handler4)) topics = @[topic]), handler4))
var safeTTL = 5'u32
# Messages # Messages
check: check:
# encrypted asym # encrypted asym
@ -122,7 +122,6 @@ suite "Whisper connections":
var filter1 = node1.subscribeFilter(newFilter(topics = @[topic1]), handler1) var filter1 = node1.subscribeFilter(newFilter(topics = @[topic1]), handler1)
var filter2 = node1.subscribeFilter(newFilter(topics = @[topic2]), handler2) var filter2 = node1.subscribeFilter(newFilter(topics = @[topic2]), handler2)
var safeTTL = 3'u32
check: check:
node2.postMessage(ttl = safeTTL + 1, topic = topic1, node2.postMessage(ttl = safeTTL + 1, topic = topic1,
payload = payloads[0]) == true payload = payloads[0]) == true
@ -157,7 +156,6 @@ suite "Whisper connections":
var filter2 = node1.subscribeFilter(newFilter(topics = @[topic], var filter2 = node1.subscribeFilter(newFilter(topics = @[topic],
powReq = 1_000_000), handler2) powReq = 1_000_000), handler2)
let safeTTL = 2'u32
check: check:
node2.postMessage(ttl = safeTTL, topic = topic, payload = payload) == true node2.postMessage(ttl = safeTTL, topic = topic, payload = payload) == true
@ -179,8 +177,8 @@ suite "Whisper connections":
var filter = node1.subscribeFilter(newFilter(topics = @[topic])) var filter = node1.subscribeFilter(newFilter(topics = @[topic]))
for i in countdown(10, 1): for i in countdown(10, 1):
check node2.postMessage(ttl = i.uint32, topic = topic, check node2.postMessage(ttl = safeTTL, topic = topic,
payload = payload) == true payload = payload) == true
await sleepAsync(messageInterval) await sleepAsync(messageInterval)
check: check:
@ -194,7 +192,6 @@ suite "Whisper connections":
let topic = [byte 0, 0, 0, 0] let topic = [byte 0, 0, 0, 0]
var filter = node1.subscribeFilter(newFilter(topics = @[topic])) var filter = node1.subscribeFilter(newFilter(topics = @[topic]))
let safeTTL = 2'u32
check: check:
node1.postMessage(ttl = safeTTL, topic = topic, node1.postMessage(ttl = safeTTL, topic = topic,
payload = repeat(byte 4, 10)) == true payload = repeat(byte 4, 10)) == true
@ -216,7 +213,6 @@ suite "Whisper connections":
var filter = node1.subscribeFilter(newFilter(topics = filterTopics), handler) var filter = node1.subscribeFilter(newFilter(topics = filterTopics), handler)
await node1.setBloomFilter(node1.filtersToBloom()) await node1.setBloomFilter(node1.filtersToBloom())
let safeTTL = 2'u32
check: check:
node2.postMessage(ttl = safeTTL, topic = sendTopic1, node2.postMessage(ttl = safeTTL, topic = sendTopic1,
payload = payload) == true payload = payload) == true
@ -251,7 +247,6 @@ suite "Whisper connections":
asyncTest "PoW blocking": asyncTest "PoW blocking":
let topic = [byte 0, 0, 0, 0] let topic = [byte 0, 0, 0, 0]
let payload = repeat(byte 0, 10) let payload = repeat(byte 0, 10)
let safeTTL = 2'u32
await node1.setPowRequirement(1_000_000) await node1.setPowRequirement(1_000_000)
check: check:
@ -279,15 +274,15 @@ suite "Whisper connections":
# We need a minimum TTL of 2 as when set to 1 there is a small chance that # We need a minimum TTL of 2 as when set to 1 there is a small chance that
# it is already expired after messageInterval due to rounding down of float # it is already expired after messageInterval due to rounding down of float
# to uint32 in postMessage() # to uint32 in postMessage()
let minTTL = 2'u32 let lowerTTL = 2'u32 # Lower TTL as we need to wait for messages to expire
for i in countdown(minTTL + 9, minTTL): for i in countdown(10, 1):
check node2.postMessage(ttl = i, topic = topic, payload = payload) == true check node2.postMessage(ttl = lowerTTL, topic = topic, payload = payload) == true
check node2.protocolState(Whisper).queue.items.len == 10 check node2.protocolState(Whisper).queue.items.len == 10
await sleepAsync(messageInterval) await sleepAsync(messageInterval)
check node1.protocolState(Whisper).queue.items.len == 10 check node1.protocolState(Whisper).queue.items.len == 10
await sleepAsync(int(minTTL*1000)) await sleepAsync(int((lowerTTL+1)*1000))
check node1.protocolState(Whisper).queue.items.len == 0 check node1.protocolState(Whisper).queue.items.len == 0
check node2.protocolState(Whisper).queue.items.len == 0 check node2.protocolState(Whisper).queue.items.len == 0
@ -326,7 +321,6 @@ suite "Whisper connections":
let topic = [byte 0, 0, 0, 0] let topic = [byte 0, 0, 0, 0]
let safeTTL = 2'u32
check: check:
# normal post # normal post
ln1.postMessage(ttl = safeTTL, topic = topic, ln1.postMessage(ttl = safeTTL, topic = topic,