Address PR #52 review comments

This commit is contained in:
kdeme 2018-11-28 15:11:03 +01:00 committed by zah
parent ccea6dc7b6
commit 420298dca0
5 changed files with 55 additions and 47 deletions

View File

@ -109,7 +109,7 @@ type
allowP2P: bool
bloom: Bloom # cached bloom filter of all topics of filter
handler: Option[FilterMsgHandler]
handler: FilterMsgHandler
queue: seq[ReceivedMessage]
Filters* = Table[string, Filter]
@ -545,8 +545,6 @@ proc add*(self: var Queue, msg: Message): bool =
self.itemHashes.excl(last)
# check for duplicate
# NOTE: Could also track if duplicates come from the same peer and disconnect
# from that peer. Is this tracking overhead worth it though?
if self.itemHashes.containsOrIncl(msg):
return false
else:
@ -561,15 +559,15 @@ proc newFilter*(src = none[PublicKey](), privateKey = none[PrivateKey](),
powReq: powReq, allowP2P: allowP2P, bloom: toBloom(topics))
proc subscribeFilter*(filters: var Filters, filter: Filter,
handler = none[FilterMsgHandler]()): string =
handler:FilterMsgHandler = nil): string =
# NOTE: Should we allow a filter without a key? Encryption is mandatory in v6?
# Check if asymmetric _and_ symmetric key? Now asymmetric just has precedence.
let id = generateRandomID()
var filter = filter
if handler.isSome():
filter.handler = handler
if handler.isNil():
filter.queue = newSeqOfCap[ReceivedMessage](defaultFilterQueueCapacity)
else:
filter.queue = newSeqOfCap[ReceivedMessage](defaultFilterQueueCapacity)
filter.handler = handler
filters.add(id, filter)
debug "Filter added", filter = id
@ -628,16 +626,16 @@ proc notify*(filters: var Filters, msg: Message) =
pow: msg.pow,
hash: msg.hash)
# Either run callback or add to queue
if filter.handler.isSome():
filter.handler.get()(receivedMsg)
else:
if filter.handler.isNil():
filter.queue.insert(receivedMsg)
else:
filter.handler(receivedMsg)
proc getFilterMessages*(filters: var Filters, filterId: string): seq[ReceivedMessage] =
result = @[]
if filters.contains(filterId):
if filters[filterId].handler.isNone():
result = filters[filterId].queue
if filters[filterId].handler.isNil():
shallowCopy(result, filters[filterId].queue)
filters[filterId].queue =
newSeqOfCap[ReceivedMessage](defaultFilterQueueCapacity)
@ -715,7 +713,9 @@ p2pProtocol Whisper(version = whisperVersion,
whisperPeer.trusted = false
whisperPeer.initialized = true
asyncCheck peer.run()
if not whisperNet.config.isLightNode:
asyncCheck peer.run()
debug "Whisper peer initialized"
onPeerDisconnected do (peer: Peer, reason: DisconnectionReason) {.gcsafe.}:
@ -747,9 +747,18 @@ p2pProtocol Whisper(version = whisperVersion,
# await peer.disconnect(SubprotocolReason)
continue
# This peer send it thus should not receive it again
peer.state.received.incl(msg)
# This peer send this message thus should not receive it again.
# If this peer has the message in the `received` set already, this means
# it was either already received here from this peer or send to this peer.
# Either way it will be in our queue already (and the peer should know
# this) and this peer is sending duplicates.
if peer.state.received.containsOrIncl(msg):
warn "Peer sending duplicate messages"
# await peer.disconnect(SubprotocolReason)
continue
# This can still be a duplicate message, but from another peer than
# the peer who send the message.
if peer.networkState.queue.add(msg):
# notify filters of this message
peer.networkState.filters.notify(msg)
@ -817,10 +826,7 @@ proc run(peer: Peer) {.async.} =
whisperPeer.running = true
while whisperPeer.running:
# XXX: shouldn't this be outside of the loop?
# In case we are runinng a light node, we have nothing to do here?
if not whisperNet.config.isLightNode:
peer.processQueue()
peer.processQueue()
await sleepAsync(300)
proc pruneReceived(node: EthereumNode) =
@ -906,7 +912,7 @@ proc postMessage*(node: EthereumNode, pubKey = none[PublicKey](),
return false
proc subscribeFilter*(node: EthereumNode, filter: Filter,
handler = none[FilterMsgHandler]()): string =
handler:FilterMsgHandler = nil): string =
return node.protocolState(Whisper).filters.subscribeFilter(filter, handler)
proc unsubscribeFilter*(node: EthereumNode, filterId: string): bool =
@ -922,16 +928,20 @@ proc filtersToBloom*(node: EthereumNode): Bloom =
proc setPowRequirement*(node: EthereumNode, powReq: float64) {.async.} =
# NOTE: do we need a tolerance of old PoW for some time?
node.protocolState(Whisper).config.powRequirement = powReq
var futures: seq[Future[void]] = @[]
for peer in node.peers(Whisper):
# asyncCheck peer.powRequirement(cast[uint](powReq))
await peer.powRequirement(cast[uint](powReq))
futures.add(peer.powRequirement(cast[uint](powReq)))
await all(futures)
proc setBloomFilter*(node: EthereumNode, bloom: Bloom) {.async.} =
# NOTE: do we need a tolerance of old bloom filter for some time?
node.protocolState(Whisper).config.bloom = bloom
var futures: seq[Future[void]] = @[]
for peer in node.peers(Whisper):
# asyncCheck peer.bloomFilterExchange(@bloom)
await peer.bloomFilterExchange(@bloom)
futures.add(peer.bloomFilterExchange(@bloom))
await all(futures)
proc setMaxMessageSize*(node: EthereumNode, size: uint32): bool =
if size > defaultMaxMsgSize:
@ -946,12 +956,11 @@ proc setPeerTrusted*(node: EthereumNode, peerId: NodeId): bool =
peer.state(Whisper).trusted = true
return true
# XXX: should probably only be allowed before connection is made,
# as there exists no message to communicate to peers that it is a light node
# How to arrange that?
# NOTE: Should be run before connection is made with peers
proc setLightNode*(node: EthereumNode, isLightNode: bool) =
node.protocolState(Whisper).config.isLightNode = isLightNode
# NOTE: Should be run before connection is made with peers
proc configureWhisper*(node: EthereumNode, config: WhisperConfig) =
node.protocolState(Whisper).config = config

View File

@ -9,7 +9,7 @@
import
sequtils, options, strutils, parseopt, asyncdispatch2,
eth_keys, rlp, eth_p2p, eth_p2p/rlpx_protocols/[shh_protocol],
eth_keys, rlp, eth_p2p, eth_p2p/rlpx_protocols/[whisper_protocol],
eth_p2p/[discovery, enode, peer_pool]
const
@ -153,21 +153,21 @@ if config.watch:
# filter encrypted asym
discard node.subscribeFilter(newFilter(privateKey = some(encPrivateKey),
topics = @[topic]),
some((FilterMsgHandler)handler))
handler)
# filter encrypted asym + signed
discard node.subscribeFilter(newFilter(some(signPublicKey),
privateKey = some(encPrivateKey),
topics = @[topic]),
some((FilterMsgHandler)handler))
handler)
# filter encrypted sym
discard node.subscribeFilter(newFilter(symKey = some(symKey),
topics = @[topic]),
some((FilterMsgHandler)handler))
handler)
# filter encrypted sym + signed
discard node.subscribeFilter(newFilter(some(signPublicKey),
symKey = some(symKey),
topics = @[topic]),
some((FilterMsgHandler)handler))
handler)
if config.post:
# encrypted asym

View File

@ -11,7 +11,7 @@ import
sequtils, options, unittest, times, tables,
nimcrypto/hash,
eth_keys, rlp,
eth_p2p/rlpx_protocols/shh_protocol as whisper
eth_p2p/rlpx_protocols/whisper_protocol as whisper
suite "Whisper payload":
test "should roundtrip without keys":

View File

@ -9,7 +9,7 @@
import
sequtils, options, unittest, tables, asyncdispatch2, rlp, eth_keys,
eth_p2p, eth_p2p/rlpx_protocols/[shh_protocol], eth_p2p/[discovery, enode]
eth_p2p, eth_p2p/rlpx_protocols/[whisper_protocol], eth_p2p/[discovery, enode]
proc localAddress(port: int): Address =
let port = Port(port)
@ -100,18 +100,18 @@ asyncTest "Filters with encryption and signing":
# Filters
# filter for encrypted asym
filters.add(node1.subscribeFilter(newFilter(privateKey = some(encryptKeyPair.seckey),
topics = @[topic]), some(handler1)))
topics = @[topic]), handler1))
# filter for encrypted asym + signed
filters.add(node1.subscribeFilter(newFilter(some(signKeyPair.pubkey),
privateKey = some(encryptKeyPair.seckey),
topics = @[topic]), some(handler2)))
topics = @[topic]), handler2))
# filter for encrypted sym
filters.add(node1.subscribeFilter(newFilter(symKey = some(symKey),
topics = @[topic]), some(handler3)))
topics = @[topic]), handler3))
# filter for encrypted sym + signed
filters.add(node1.subscribeFilter(newFilter(some(signKeyPair.pubkey),
symKey = some(symKey),
topics = @[topic]), some(handler4)))
topics = @[topic]), handler4))
# Messages
# encrypted asym
check true == node2.postMessage(some(encryptKeyPair.pubkey), ttl = 5,
@ -153,8 +153,8 @@ asyncTest "Filters with topics":
check msg.decoded.payload == payloads[1]
futures[1].complete(1)
var filter1 = node1.subscribeFilter(newFilter(topics = @[topic1]), some(handler1))
var filter2 = node1.subscribeFilter(newFilter(topics = @[topic2]), some(handler2))
var filter1 = node1.subscribeFilter(newFilter(topics = @[topic1]), handler1)
var filter2 = node1.subscribeFilter(newFilter(topics = @[topic2]), handler2)
check:
true == node2.postMessage(ttl = 3, topic = topic1, payload = payloads[0])
@ -183,9 +183,9 @@ asyncTest "Filters with PoW":
futures[1].complete(1)
var filter1 = node1.subscribeFilter(newFilter(topics = @[topic], powReq = 0),
some(handler1))
handler1)
var filter2 = node1.subscribeFilter(newFilter(topics = @[topic], powReq = 10),
some(handler2))
handler2)
check:
true == node2.postMessage(ttl = 2, topic = topic, payload = payload)
@ -228,7 +228,7 @@ asyncTest "Bloomfilter blocking":
proc handler(msg: ReceivedMessage) =
check msg.decoded.payload == payload
f.complete(1)
var filter = node1.subscribeFilter(newFilter(topics = filterTopics), some(handler))
var filter = node1.subscribeFilter(newFilter(topics = filterTopics), handler)
await node1.setBloomFilter(node1.filtersToBloom())
check true == node2.postMessage(ttl = 1, topic = sendTopic1, payload = payload)
@ -298,10 +298,9 @@ asyncTest "Queue pruning":
asyncTest "Light node posting":
let topic = [byte 0, 0, 0, 0]
node1.setLightNode(true)
var result = node1.postMessage(ttl = 2, topic = topic, payload = repeat(byte 0, 10))
check:
result == false
node1.postMessage(ttl = 2, topic = topic, payload = repeat(byte 0, 10)) == false
node1.protocolState(Whisper).queue.items.len == 0
node1.setLightNode(false)
@ -314,7 +313,7 @@ asyncTest "P2P":
f.complete(1)
var filter = node1.subscribeFilter(newFilter(topics = @[topic], allowP2P = true),
some(handler))
handler)
check:
true == node1.setPeerTrusted(toNodeId(node2.keys.pubkey))
true == node2.postMessage(ttl = 2, topic = topic,

View File

@ -9,7 +9,7 @@
import
options, unittest, asyncdispatch2, rlp, eth_keys,
eth_p2p, eth_p2p/mock_peers, eth_p2p/rlpx_protocols/[shh_protocol]
eth_p2p, eth_p2p/mock_peers, eth_p2p/rlpx_protocols/[whisper_protocol]
proc localAddress(port: int): Address =
let port = Port(port)