From b2273fff9a98a8918e0e8b5012aee1754ba3e943 Mon Sep 17 00:00:00 2001 From: rshiv Date: Wed, 8 Dec 2021 18:35:47 +0000 Subject: [PATCH] Handles stale clients in FILTER protocol (#782) * Handles stale clients Signed-off-by: rshiv * adds test Signed-off-by: rshiv * removes failed client from subscriber list Signed-off-by: rshiv * adds filter timeout config Signed-off-by: rshiv * reverts peer removal logic Signed-off-by: rshiv * resolve ci issues Signed-off-by: rshiv * resolves review comment Signed-off-by: rshiv * solves review comments Signed-off-by: rshiv * ChangeLog update Signed-off-by: rshiv * resolves review comment Signed-off-by: rshiv * Update CHANGELOG.md Co-authored-by: Hanno Cornelius <68783915+jm-clius@users.noreply.github.com> * fixes review comments Signed-off-by: rshiv * handles CI issues Signed-off-by: rshiv * tries to solve test CI Signed-off-by: rshiv * solves CI issue Signed-off-by: rshiv * resolves ci issue Signed-off-by: rshiv * resolves CI issue Signed-off-by: rshiv * resolves review comments Signed-off-by: rshiv Co-authored-by: Hanno Cornelius <68783915+jm-clius@users.noreply.github.com> --- CHANGELOG.md | 12 ++ tests/v2/test_waku_filter.nim | 138 ++++++++++++++++++ waku/v2/node/config.nim | 7 +- waku/v2/node/wakunode2.nim | 9 +- waku/v2/protocol/waku_filter/waku_filter.nim | 44 +++++- .../waku_filter/waku_filter_types.nim | 3 + 6 files changed, 202 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ff289f93f..e58eec8f6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,15 @@ +## Next version + +This release contains the following: + +### Features +- Waku v2 node timeout for Filter nodes. +- Waku v2 node support for secure websockets. + +### Changes +- The WakuInfo Object field of `listenStr` is deprecated and is now replaced with `listenAddresses` +which is a sequence of string. + ## 2021-11-05 v0.6 Some useful features and fixes in this release, include: diff --git a/tests/v2/test_waku_filter.nim b/tests/v2/test_waku_filter.nim index e8968450d..68fb780f9 100644 --- a/tests/v2/test_waku_filter.nim +++ b/tests/v2/test_waku_filter.nim @@ -144,3 +144,141 @@ procSuite "Waku Filter": check: idOpt.isNone + + asyncTest "Handle failed clients": + const defaultTopic = "/waku/2/default-waku/proto" + + let + key = PrivateKey.random(ECDSA, rng[]).get() + peer = PeerInfo.new(key) + contentTopic = ContentTopic("/waku/2/default-content/proto") + post = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic) + + var dialSwitch = newStandardSwitch() + discard await dialSwitch.start() + + var listenSwitch = newStandardSwitch(some(key)) + discard await listenSwitch.start() + + var responseCompletionFuture = newFuture[bool]() + proc handle(requestId: string, msg: MessagePush) {.gcsafe, closure.} = + check: + msg.messages.len() == 1 + msg.messages[0] == post + responseCompletionFuture.complete(true) + + let + proto = WakuFilter.init(PeerManager.new(dialSwitch), crypto.newRng(), handle) + rpc = FilterRequest(contentFilters: @[ContentFilter(contentTopic: contentTopic)], pubSubTopic: defaultTopic, subscribe: true) + + dialSwitch.mount(proto) + proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo()) + + proc emptyHandle(requestId: string, msg: MessagePush) {.gcsafe, closure.} = + discard + + let proto2 = WakuFilter.init(PeerManager.new(listenSwitch), crypto.newRng(), emptyHandle, 1.seconds) + + listenSwitch.mount(proto2) + + let id = (await proto.subscribe(rpc)).get() + + await sleepAsync(2.seconds) + + await proto2.handleMessage(defaultTopic, post) + + check: + # Check that subscription works as expected + (await responseCompletionFuture.withTimeout(3.seconds)) == true + + # Stop switch to test unsubscribe + discard dialSwitch.stop() + + await sleepAsync(2.seconds) + + #First failure should not remove the subscription + await proto2.handleMessage(defaultTopic, post) + + await sleepAsync(2000.millis) + check: + proto2.subscribers.len() == 1 + + #Second failure should remove the subscription + await proto2.handleMessage(defaultTopic, post) + + check: + proto2.subscribers.len() == 0 + + asyncTest "Handles failed clients coming back up": + const defaultTopic = "/waku/2/default-waku/proto" + + let + key = PrivateKey.random(ECDSA, rng[]).get() + peer = PeerInfo.new(key) + contentTopic = ContentTopic("/waku/2/default-content/proto") + post = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic) + + var dialSwitch = newStandardSwitch() + discard await dialSwitch.start() + + var listenSwitch = newStandardSwitch(some(key)) + discard await listenSwitch.start() + + var responseCompletionFuture = newFuture[bool]() + proc handle(requestId: string, msg: MessagePush) {.gcsafe, closure.} = + check: + msg.messages.len() == 1 + msg.messages[0] == post + responseCompletionFuture.complete(true) + + let + proto = WakuFilter.init(PeerManager.new(dialSwitch), crypto.newRng(), handle) + rpc = FilterRequest(contentFilters: @[ContentFilter(contentTopic: contentTopic)], pubSubTopic: defaultTopic, subscribe: true) + + dialSwitch.mount(proto) + proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo()) + + proc emptyHandle(requestId: string, msg: MessagePush) {.gcsafe, closure.} = + discard + + let proto2 = WakuFilter.init(PeerManager.new(listenSwitch), crypto.newRng(), emptyHandle, 2.seconds) + + listenSwitch.mount(proto2) + + let id = (await proto.subscribe(rpc)).get() + + await sleepAsync(2.seconds) + + await proto2.handleMessage(defaultTopic, post) + + check: + # Check that subscription works as expected + (await responseCompletionFuture.withTimeout(3.seconds)) == true + + responseCompletionFuture = newFuture[bool]() + + # Stop switch to test unsubscribe + discard dialSwitch.stop() + + await sleepAsync(1.seconds) + + #First failure should add to failure list + await proto2.handleMessage(defaultTopic, post) + + check: + proto2.failedPeers.len() == 1 + + discard dialSwitch.start() + dialSwitch.mount(proto) + #Second failure should remove the subscription + await proto2.handleMessage(defaultTopic, post) + + check: + # Check that subscription works as expected + (await responseCompletionFuture.withTimeout(3.seconds)) == true + + check: + proto2.failedPeers.len() == 0 + + discard dialSwitch.stop() + discard listenSwitch.stop() diff --git a/waku/v2/node/config.nim b/waku/v2/node/config.nim index 183e06972..2bd283e16 100644 --- a/waku/v2/node/config.nim +++ b/waku/v2/node/config.nim @@ -7,7 +7,7 @@ import nimcrypto/utils, eth/keys, ../protocol/waku_rln_relay/[waku_rln_relay_types] - + type WakuNodeConf* = object ## General node config @@ -129,6 +129,11 @@ type defaultValue: "" name: "filternode" }: string + filterTimeout* {. + desc: "Timeout for filter node in seconds.", + defaultValue: 14400 # 4 hours + name: "filter-timeout" }: int64 + ## Swap config swap* {. diff --git a/waku/v2/node/wakunode2.nim b/waku/v2/node/wakunode2.nim index 2f936db01..67817cde0 100644 --- a/waku/v2/node/wakunode2.nim +++ b/waku/v2/node/wakunode2.nim @@ -53,6 +53,9 @@ const clientId* = "Nimbus Waku v2 node" # Default topic const defaultTopic = "/waku/2/default-waku/proto" +# Default Waku Filter Timeout +const WakuFilterTimeout: Duration = 1.days + # key and crypto modules different type @@ -430,7 +433,7 @@ proc info*(node: WakuNode): WakuInfo = let wakuInfo = WakuInfo(listenAddresses: listenStr) return wakuInfo -proc mountFilter*(node: WakuNode) {.raises: [Defect, KeyError, LPError]} = +proc mountFilter*(node: WakuNode, filterTimeout: Duration = WakuFilterTimeout) {.raises: [Defect, KeyError, LPError]} = info "mounting filter" proc filterHandler(requestId: string, msg: MessagePush) {.gcsafe, raises: [Defect, KeyError].} = @@ -440,7 +443,7 @@ proc mountFilter*(node: WakuNode) {.raises: [Defect, KeyError, LPError]} = node.filters.notify(message, requestId) # Trigger filter handlers on a light node waku_node_messages.inc(labelValues = ["filter"]) - node.wakuFilter = WakuFilter.init(node.peerManager, node.rng, filterHandler) + node.wakuFilter = WakuFilter.init(node.peerManager, node.rng, filterHandler, filterTimeout) node.switch.mount(node.wakuFilter, protocolMatcher(WakuFilterCodec)) # NOTE: If using the swap protocol, it must be mounted before store. This is @@ -1089,7 +1092,7 @@ when isMainModule: # Filter setup. NOTE Must be mounted after relay if (conf.filternode != "") or (conf.filter): - mountFilter(node) + mountFilter(node, filterTimeout = chronos.seconds(conf.filterTimeout)) if conf.filternode != "": setFilterPeer(node, conf.filternode) diff --git a/waku/v2/protocol/waku_filter/waku_filter.nim b/waku/v2/protocol/waku_filter/waku_filter.nim index 71c05b854..b6fea730a 100644 --- a/waku/v2/protocol/waku_filter/waku_filter.nim +++ b/waku/v2/protocol/waku_filter/waku_filter.nim @@ -30,6 +30,7 @@ logScope: const WakuFilterCodec* = "/vac/waku/filter/2.0.0-beta1" + WakuFilterTimeout: Duration = 1.days # Error types (metric label values) const @@ -188,43 +189,72 @@ method init*(wf: WakuFilter) = wf.handler = handle wf.codec = WakuFilterCodec -proc init*(T: type WakuFilter, peerManager: PeerManager, rng: ref BrHmacDrbgContext, handler: MessagePushHandler): T = +proc init*(T: type WakuFilter, peerManager: PeerManager, rng: ref BrHmacDrbgContext, handler: MessagePushHandler,timeout: Duration = WakuFilterTimeout): T = let rng = crypto.newRng() var wf = WakuFilter(rng: rng, peerManager: peerManager, - pushHandler: handler) + pushHandler: handler, + timeout: timeout) wf.init() - return wf proc setPeer*(wf: WakuFilter, peer: RemotePeerInfo) = wf.peerManager.addPeer(peer, WakuFilterCodec) waku_filter_peers.inc() +#clear the failed peer table if subscriber was able to connect. +proc handleClientSuccess(wf: WakuFilter, subscribers: seq[Subscriber]){.raises: [Defect, KeyError].} = + for subscriber in subscribers: + var subKey: string = $(subscriber) + if wf.failedPeers.hasKey(subKey): + wf.failedPeers.del(subKey) + +# If we have already failed to send message to this peer, +# check for elapsed time and if it's been too long, remove the peer. +proc handleClientError(wf: WakuFilter, subscribers: seq[Subscriber]){.raises: [Defect, KeyError].} = + for subscriber in subscribers: + var subKey: string = $(subscriber) + if wf.failedPeers.hasKey(subKey): + var elapsedTime = Moment.now() - wf.failedPeers[subKey] + if(elapsedTime > wf.timeout): + trace "Remove peer if timeout has reached for", peer=subscriber + var index = wf.subscribers.find(subscriber) + wf.subscribers.delete(index) + wf.failedPeers.del(subKey) + else: + # add the peer to the failed peers table. + wf.failedPeers[subKey] = Moment.now() + return proc handleMessage*(wf: WakuFilter, topic: string, msg: WakuMessage) {.async.} = # Handle WakuMessage according to filter protocol trace "handle message in WakuFilter", topic=topic, msg=msg - + var handleMessageFailed = false + var failedSubscriber: seq[Subscriber] + var connectedSubscribers: seq[Subscriber] for subscriber in wf.subscribers: if subscriber.filter.pubSubTopic != "" and subscriber.filter.pubSubTopic != topic: trace "Subscriber's filter pubsubTopic does not match message topic", filter=subscriber.filter.pubSubTopic, topic=topic continue - + for filter in subscriber.filter.contentFilters: if msg.contentTopic == filter.contentTopic: trace "Found matching contentTopic", filter=filter, msg=msg let push = FilterRPC(requestId: subscriber.requestId, push: MessagePush(messages: @[msg])) - let connOpt = await wf.peerManager.dialPeer(subscriber.peer, WakuFilterCodec) - if connOpt.isSome: await connOpt.get().writeLP(push.encode().buffer) + connectedSubscribers.add(subscriber) else: # @TODO more sophisticated error handling here + handleMessageFailed = true + failedSubscriber.add(subscriber) error "failed to push messages to remote peer" waku_filter_errors.inc(labelValues = [dialFailure]) break + handleClientSuccess(wf, connectedSubscribers) + if handleMessageFailed: + handleClientError(wf, failedSubscriber) proc subscribe*(wf: WakuFilter, request: FilterRequest): Future[Option[string]] {.async, gcsafe.} = let peerOpt = wf.peerManager.selectPeer(WakuFilterCodec) diff --git a/waku/v2/protocol/waku_filter/waku_filter_types.nim b/waku/v2/protocol/waku_filter/waku_filter_types.nim index 29c4179ec..e19bdb57e 100644 --- a/waku/v2/protocol/waku_filter/waku_filter_types.nim +++ b/waku/v2/protocol/waku_filter/waku_filter_types.nim @@ -1,5 +1,6 @@ import std/[tables], + chronos, bearssl, libp2p/protocols/protocol, ../../node/peer_manager/peer_manager, @@ -45,3 +46,5 @@ type peerManager*: PeerManager subscribers*: seq[Subscriber] pushHandler*: MessagePushHandler + failedPeers*: Table[string, chronos.Moment] + timeout*: chronos.Duration