From 420298dca0d1c3fdf8940f27cb02daababfa8ff7 Mon Sep 17 00:00:00 2001 From: kdeme Date: Wed, 28 Nov 2018 15:11:03 +0100 Subject: [PATCH] Address PR #52 review comments --- ...{shh_protocol.nim => whisper_protocol.nim} | 63 +++++++++++-------- tests/shh_basic_client.nim | 10 +-- tests/tshh.nim | 2 +- tests/tshh_connect.nim | 25 ++++---- tests/tshh_connect_mocked.nim | 2 +- 5 files changed, 55 insertions(+), 47 deletions(-) rename eth_p2p/rlpx_protocols/{shh_protocol.nim => whisper_protocol.nim} (95%) diff --git a/eth_p2p/rlpx_protocols/shh_protocol.nim b/eth_p2p/rlpx_protocols/whisper_protocol.nim similarity index 95% rename from eth_p2p/rlpx_protocols/shh_protocol.nim rename to eth_p2p/rlpx_protocols/whisper_protocol.nim index 8a30f7f..43b09d9 100644 --- a/eth_p2p/rlpx_protocols/shh_protocol.nim +++ b/eth_p2p/rlpx_protocols/whisper_protocol.nim @@ -109,7 +109,7 @@ type allowP2P: bool bloom: Bloom # cached bloom filter of all topics of filter - handler: Option[FilterMsgHandler] + handler: FilterMsgHandler queue: seq[ReceivedMessage] Filters* = Table[string, Filter] @@ -545,8 +545,6 @@ proc add*(self: var Queue, msg: Message): bool = self.itemHashes.excl(last) # check for duplicate - # NOTE: Could also track if duplicates come from the same peer and disconnect - # from that peer. Is this tracking overhead worth it though? if self.itemHashes.containsOrIncl(msg): return false else: @@ -561,15 +559,15 @@ proc newFilter*(src = none[PublicKey](), privateKey = none[PrivateKey](), powReq: powReq, allowP2P: allowP2P, bloom: toBloom(topics)) proc subscribeFilter*(filters: var Filters, filter: Filter, - handler = none[FilterMsgHandler]()): string = + handler:FilterMsgHandler = nil): string = # NOTE: Should we allow a filter without a key? Encryption is mandatory in v6? # Check if asymmetric _and_ symmetric key? Now asymmetric just has precedence. let id = generateRandomID() var filter = filter - if handler.isSome(): - filter.handler = handler + if handler.isNil(): + filter.queue = newSeqOfCap[ReceivedMessage](defaultFilterQueueCapacity) else: - filter.queue = newSeqOfCap[ReceivedMessage](defaultFilterQueueCapacity) + filter.handler = handler filters.add(id, filter) debug "Filter added", filter = id @@ -628,16 +626,16 @@ proc notify*(filters: var Filters, msg: Message) = pow: msg.pow, hash: msg.hash) # Either run callback or add to queue - if filter.handler.isSome(): - filter.handler.get()(receivedMsg) - else: + if filter.handler.isNil(): filter.queue.insert(receivedMsg) + else: + filter.handler(receivedMsg) proc getFilterMessages*(filters: var Filters, filterId: string): seq[ReceivedMessage] = result = @[] if filters.contains(filterId): - if filters[filterId].handler.isNone(): - result = filters[filterId].queue + if filters[filterId].handler.isNil(): + shallowCopy(result, filters[filterId].queue) filters[filterId].queue = newSeqOfCap[ReceivedMessage](defaultFilterQueueCapacity) @@ -715,7 +713,9 @@ p2pProtocol Whisper(version = whisperVersion, whisperPeer.trusted = false whisperPeer.initialized = true - asyncCheck peer.run() + if not whisperNet.config.isLightNode: + asyncCheck peer.run() + debug "Whisper peer initialized" onPeerDisconnected do (peer: Peer, reason: DisconnectionReason) {.gcsafe.}: @@ -747,9 +747,18 @@ p2pProtocol Whisper(version = whisperVersion, # await peer.disconnect(SubprotocolReason) continue - # This peer send it thus should not receive it again - peer.state.received.incl(msg) + # This peer send this message thus should not receive it again. + # If this peer has the message in the `received` set already, this means + # it was either already received here from this peer or send to this peer. + # Either way it will be in our queue already (and the peer should know + # this) and this peer is sending duplicates. + if peer.state.received.containsOrIncl(msg): + warn "Peer sending duplicate messages" + # await peer.disconnect(SubprotocolReason) + continue + # This can still be a duplicate message, but from another peer than + # the peer who send the message. if peer.networkState.queue.add(msg): # notify filters of this message peer.networkState.filters.notify(msg) @@ -817,10 +826,7 @@ proc run(peer: Peer) {.async.} = whisperPeer.running = true while whisperPeer.running: - # XXX: shouldn't this be outside of the loop? - # In case we are runinng a light node, we have nothing to do here? - if not whisperNet.config.isLightNode: - peer.processQueue() + peer.processQueue() await sleepAsync(300) proc pruneReceived(node: EthereumNode) = @@ -906,7 +912,7 @@ proc postMessage*(node: EthereumNode, pubKey = none[PublicKey](), return false proc subscribeFilter*(node: EthereumNode, filter: Filter, - handler = none[FilterMsgHandler]()): string = + handler:FilterMsgHandler = nil): string = return node.protocolState(Whisper).filters.subscribeFilter(filter, handler) proc unsubscribeFilter*(node: EthereumNode, filterId: string): bool = @@ -922,16 +928,20 @@ proc filtersToBloom*(node: EthereumNode): Bloom = proc setPowRequirement*(node: EthereumNode, powReq: float64) {.async.} = # NOTE: do we need a tolerance of old PoW for some time? node.protocolState(Whisper).config.powRequirement = powReq + var futures: seq[Future[void]] = @[] for peer in node.peers(Whisper): - # asyncCheck peer.powRequirement(cast[uint](powReq)) - await peer.powRequirement(cast[uint](powReq)) + futures.add(peer.powRequirement(cast[uint](powReq))) + + await all(futures) proc setBloomFilter*(node: EthereumNode, bloom: Bloom) {.async.} = # NOTE: do we need a tolerance of old bloom filter for some time? node.protocolState(Whisper).config.bloom = bloom + var futures: seq[Future[void]] = @[] for peer in node.peers(Whisper): - # asyncCheck peer.bloomFilterExchange(@bloom) - await peer.bloomFilterExchange(@bloom) + futures.add(peer.bloomFilterExchange(@bloom)) + + await all(futures) proc setMaxMessageSize*(node: EthereumNode, size: uint32): bool = if size > defaultMaxMsgSize: @@ -946,12 +956,11 @@ proc setPeerTrusted*(node: EthereumNode, peerId: NodeId): bool = peer.state(Whisper).trusted = true return true -# XXX: should probably only be allowed before connection is made, -# as there exists no message to communicate to peers that it is a light node -# How to arrange that? +# NOTE: Should be run before connection is made with peers proc setLightNode*(node: EthereumNode, isLightNode: bool) = node.protocolState(Whisper).config.isLightNode = isLightNode +# NOTE: Should be run before connection is made with peers proc configureWhisper*(node: EthereumNode, config: WhisperConfig) = node.protocolState(Whisper).config = config diff --git a/tests/shh_basic_client.nim b/tests/shh_basic_client.nim index b8a1be1..b9fae98 100644 --- a/tests/shh_basic_client.nim +++ b/tests/shh_basic_client.nim @@ -9,7 +9,7 @@ import sequtils, options, strutils, parseopt, asyncdispatch2, - eth_keys, rlp, eth_p2p, eth_p2p/rlpx_protocols/[shh_protocol], + eth_keys, rlp, eth_p2p, eth_p2p/rlpx_protocols/[whisper_protocol], eth_p2p/[discovery, enode, peer_pool] const @@ -153,21 +153,21 @@ if config.watch: # filter encrypted asym discard node.subscribeFilter(newFilter(privateKey = some(encPrivateKey), topics = @[topic]), - some((FilterMsgHandler)handler)) + handler) # filter encrypted asym + signed discard node.subscribeFilter(newFilter(some(signPublicKey), privateKey = some(encPrivateKey), topics = @[topic]), - some((FilterMsgHandler)handler)) + handler) # filter encrypted sym discard node.subscribeFilter(newFilter(symKey = some(symKey), topics = @[topic]), - some((FilterMsgHandler)handler)) + handler) # filter encrypted sym + signed discard node.subscribeFilter(newFilter(some(signPublicKey), symKey = some(symKey), topics = @[topic]), - some((FilterMsgHandler)handler)) + handler) if config.post: # encrypted asym diff --git a/tests/tshh.nim b/tests/tshh.nim index 6622a82..be05ad3 100644 --- a/tests/tshh.nim +++ b/tests/tshh.nim @@ -11,7 +11,7 @@ import sequtils, options, unittest, times, tables, nimcrypto/hash, eth_keys, rlp, - eth_p2p/rlpx_protocols/shh_protocol as whisper + eth_p2p/rlpx_protocols/whisper_protocol as whisper suite "Whisper payload": test "should roundtrip without keys": diff --git a/tests/tshh_connect.nim b/tests/tshh_connect.nim index 2031686..4cd6287 100644 --- a/tests/tshh_connect.nim +++ b/tests/tshh_connect.nim @@ -9,7 +9,7 @@ import sequtils, options, unittest, tables, asyncdispatch2, rlp, eth_keys, - eth_p2p, eth_p2p/rlpx_protocols/[shh_protocol], eth_p2p/[discovery, enode] + eth_p2p, eth_p2p/rlpx_protocols/[whisper_protocol], eth_p2p/[discovery, enode] proc localAddress(port: int): Address = let port = Port(port) @@ -100,18 +100,18 @@ asyncTest "Filters with encryption and signing": # Filters # filter for encrypted asym filters.add(node1.subscribeFilter(newFilter(privateKey = some(encryptKeyPair.seckey), - topics = @[topic]), some(handler1))) + topics = @[topic]), handler1)) # filter for encrypted asym + signed filters.add(node1.subscribeFilter(newFilter(some(signKeyPair.pubkey), privateKey = some(encryptKeyPair.seckey), - topics = @[topic]), some(handler2))) + topics = @[topic]), handler2)) # filter for encrypted sym filters.add(node1.subscribeFilter(newFilter(symKey = some(symKey), - topics = @[topic]), some(handler3))) + topics = @[topic]), handler3)) # filter for encrypted sym + signed filters.add(node1.subscribeFilter(newFilter(some(signKeyPair.pubkey), symKey = some(symKey), - topics = @[topic]), some(handler4))) + topics = @[topic]), handler4)) # Messages # encrypted asym check true == node2.postMessage(some(encryptKeyPair.pubkey), ttl = 5, @@ -153,8 +153,8 @@ asyncTest "Filters with topics": check msg.decoded.payload == payloads[1] futures[1].complete(1) - var filter1 = node1.subscribeFilter(newFilter(topics = @[topic1]), some(handler1)) - var filter2 = node1.subscribeFilter(newFilter(topics = @[topic2]), some(handler2)) + var filter1 = node1.subscribeFilter(newFilter(topics = @[topic1]), handler1) + var filter2 = node1.subscribeFilter(newFilter(topics = @[topic2]), handler2) check: true == node2.postMessage(ttl = 3, topic = topic1, payload = payloads[0]) @@ -183,9 +183,9 @@ asyncTest "Filters with PoW": futures[1].complete(1) var filter1 = node1.subscribeFilter(newFilter(topics = @[topic], powReq = 0), - some(handler1)) + handler1) var filter2 = node1.subscribeFilter(newFilter(topics = @[topic], powReq = 10), - some(handler2)) + handler2) check: true == node2.postMessage(ttl = 2, topic = topic, payload = payload) @@ -228,7 +228,7 @@ asyncTest "Bloomfilter blocking": proc handler(msg: ReceivedMessage) = check msg.decoded.payload == payload f.complete(1) - var filter = node1.subscribeFilter(newFilter(topics = filterTopics), some(handler)) + var filter = node1.subscribeFilter(newFilter(topics = filterTopics), handler) await node1.setBloomFilter(node1.filtersToBloom()) check true == node2.postMessage(ttl = 1, topic = sendTopic1, payload = payload) @@ -298,10 +298,9 @@ asyncTest "Queue pruning": asyncTest "Light node posting": let topic = [byte 0, 0, 0, 0] node1.setLightNode(true) - var result = node1.postMessage(ttl = 2, topic = topic, payload = repeat(byte 0, 10)) check: - result == false + node1.postMessage(ttl = 2, topic = topic, payload = repeat(byte 0, 10)) == false node1.protocolState(Whisper).queue.items.len == 0 node1.setLightNode(false) @@ -314,7 +313,7 @@ asyncTest "P2P": f.complete(1) var filter = node1.subscribeFilter(newFilter(topics = @[topic], allowP2P = true), - some(handler)) + handler) check: true == node1.setPeerTrusted(toNodeId(node2.keys.pubkey)) true == node2.postMessage(ttl = 2, topic = topic, diff --git a/tests/tshh_connect_mocked.nim b/tests/tshh_connect_mocked.nim index 624a918..cd8982c 100644 --- a/tests/tshh_connect_mocked.nim +++ b/tests/tshh_connect_mocked.nim @@ -9,7 +9,7 @@ import options, unittest, asyncdispatch2, rlp, eth_keys, - eth_p2p, eth_p2p/mock_peers, eth_p2p/rlpx_protocols/[shh_protocol] + eth_p2p, eth_p2p/mock_peers, eth_p2p/rlpx_protocols/[whisper_protocol] proc localAddress(port: int): Address = let port = Port(port)