Handles stale clients in FILTER protocol (#782)

* Handles stale clients

Signed-off-by: rshiv <reeshav96@gmail.com>

* adds test

Signed-off-by: rshiv <reeshav96@gmail.com>

* removes failed client from subscriber list

Signed-off-by: rshiv <reeshav96@gmail.com>

* adds filter timeout config

Signed-off-by: rshiv <reeshav96@gmail.com>

* reverts peer removal logic

Signed-off-by: rshiv <reeshav96@gmail.com>

* resolve ci issues

Signed-off-by: rshiv <reeshav96@gmail.com>

* resolves review comment

Signed-off-by: rshiv <reeshav96@gmail.com>

* solves review comments

Signed-off-by: rshiv <reeshav96@gmail.com>

* ChangeLog update

Signed-off-by: rshiv <reeshav96@gmail.com>

* resolves review comment

Signed-off-by: rshiv <reeshav96@gmail.com>

* Update CHANGELOG.md

Co-authored-by: Hanno Cornelius <68783915+jm-clius@users.noreply.github.com>

* fixes review comments

Signed-off-by: rshiv <reeshav96@gmail.com>

* handles CI issues

Signed-off-by: rshiv <reeshav96@gmail.com>

* tries to solve test CI

Signed-off-by: rshiv <reeshav96@gmail.com>

* solves CI issue

Signed-off-by: rshiv <reeshav96@gmail.com>

* resolves ci issue

Signed-off-by: rshiv <reeshav96@gmail.com>

* resolves CI issue

Signed-off-by: rshiv <reeshav96@gmail.com>

* resolves review comments

Signed-off-by: rshiv <reeshav96@gmail.com>

Co-authored-by: Hanno Cornelius <68783915+jm-clius@users.noreply.github.com>
This commit is contained in:
rshiv 2021-12-08 18:35:47 +00:00 committed by GitHub
parent ce0607b2eb
commit b2273fff9a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 202 additions and 11 deletions

View File

@ -1,3 +1,15 @@
## Next version
This release contains the following:
### Features
- Waku v2 node timeout for Filter nodes.
- Waku v2 node support for secure websockets.
### Changes
- The WakuInfo Object field of `listenStr` is deprecated and is now replaced with `listenAddresses`
which is a sequence of string.
## 2021-11-05 v0.6
Some useful features and fixes in this release, include:

View File

@ -144,3 +144,141 @@ procSuite "Waku Filter":
check:
idOpt.isNone
asyncTest "Handle failed clients":
const defaultTopic = "/waku/2/default-waku/proto"
let
key = PrivateKey.random(ECDSA, rng[]).get()
peer = PeerInfo.new(key)
contentTopic = ContentTopic("/waku/2/default-content/proto")
post = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic)
var dialSwitch = newStandardSwitch()
discard await dialSwitch.start()
var listenSwitch = newStandardSwitch(some(key))
discard await listenSwitch.start()
var responseCompletionFuture = newFuture[bool]()
proc handle(requestId: string, msg: MessagePush) {.gcsafe, closure.} =
check:
msg.messages.len() == 1
msg.messages[0] == post
responseCompletionFuture.complete(true)
let
proto = WakuFilter.init(PeerManager.new(dialSwitch), crypto.newRng(), handle)
rpc = FilterRequest(contentFilters: @[ContentFilter(contentTopic: contentTopic)], pubSubTopic: defaultTopic, subscribe: true)
dialSwitch.mount(proto)
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
proc emptyHandle(requestId: string, msg: MessagePush) {.gcsafe, closure.} =
discard
let proto2 = WakuFilter.init(PeerManager.new(listenSwitch), crypto.newRng(), emptyHandle, 1.seconds)
listenSwitch.mount(proto2)
let id = (await proto.subscribe(rpc)).get()
await sleepAsync(2.seconds)
await proto2.handleMessage(defaultTopic, post)
check:
# Check that subscription works as expected
(await responseCompletionFuture.withTimeout(3.seconds)) == true
# Stop switch to test unsubscribe
discard dialSwitch.stop()
await sleepAsync(2.seconds)
#First failure should not remove the subscription
await proto2.handleMessage(defaultTopic, post)
await sleepAsync(2000.millis)
check:
proto2.subscribers.len() == 1
#Second failure should remove the subscription
await proto2.handleMessage(defaultTopic, post)
check:
proto2.subscribers.len() == 0
asyncTest "Handles failed clients coming back up":
const defaultTopic = "/waku/2/default-waku/proto"
let
key = PrivateKey.random(ECDSA, rng[]).get()
peer = PeerInfo.new(key)
contentTopic = ContentTopic("/waku/2/default-content/proto")
post = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic)
var dialSwitch = newStandardSwitch()
discard await dialSwitch.start()
var listenSwitch = newStandardSwitch(some(key))
discard await listenSwitch.start()
var responseCompletionFuture = newFuture[bool]()
proc handle(requestId: string, msg: MessagePush) {.gcsafe, closure.} =
check:
msg.messages.len() == 1
msg.messages[0] == post
responseCompletionFuture.complete(true)
let
proto = WakuFilter.init(PeerManager.new(dialSwitch), crypto.newRng(), handle)
rpc = FilterRequest(contentFilters: @[ContentFilter(contentTopic: contentTopic)], pubSubTopic: defaultTopic, subscribe: true)
dialSwitch.mount(proto)
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
proc emptyHandle(requestId: string, msg: MessagePush) {.gcsafe, closure.} =
discard
let proto2 = WakuFilter.init(PeerManager.new(listenSwitch), crypto.newRng(), emptyHandle, 2.seconds)
listenSwitch.mount(proto2)
let id = (await proto.subscribe(rpc)).get()
await sleepAsync(2.seconds)
await proto2.handleMessage(defaultTopic, post)
check:
# Check that subscription works as expected
(await responseCompletionFuture.withTimeout(3.seconds)) == true
responseCompletionFuture = newFuture[bool]()
# Stop switch to test unsubscribe
discard dialSwitch.stop()
await sleepAsync(1.seconds)
#First failure should add to failure list
await proto2.handleMessage(defaultTopic, post)
check:
proto2.failedPeers.len() == 1
discard dialSwitch.start()
dialSwitch.mount(proto)
#Second failure should remove the subscription
await proto2.handleMessage(defaultTopic, post)
check:
# Check that subscription works as expected
(await responseCompletionFuture.withTimeout(3.seconds)) == true
check:
proto2.failedPeers.len() == 0
discard dialSwitch.stop()
discard listenSwitch.stop()

View File

@ -7,7 +7,7 @@ import
nimcrypto/utils,
eth/keys,
../protocol/waku_rln_relay/[waku_rln_relay_types]
type
WakuNodeConf* = object
## General node config
@ -129,6 +129,11 @@ type
defaultValue: ""
name: "filternode" }: string
filterTimeout* {.
desc: "Timeout for filter node in seconds.",
defaultValue: 14400 # 4 hours
name: "filter-timeout" }: int64
## Swap config
swap* {.

View File

@ -53,6 +53,9 @@ const clientId* = "Nimbus Waku v2 node"
# Default topic
const defaultTopic = "/waku/2/default-waku/proto"
# Default Waku Filter Timeout
const WakuFilterTimeout: Duration = 1.days
# key and crypto modules different
type
@ -430,7 +433,7 @@ proc info*(node: WakuNode): WakuInfo =
let wakuInfo = WakuInfo(listenAddresses: listenStr)
return wakuInfo
proc mountFilter*(node: WakuNode) {.raises: [Defect, KeyError, LPError]} =
proc mountFilter*(node: WakuNode, filterTimeout: Duration = WakuFilterTimeout) {.raises: [Defect, KeyError, LPError]} =
info "mounting filter"
proc filterHandler(requestId: string, msg: MessagePush)
{.gcsafe, raises: [Defect, KeyError].} =
@ -440,7 +443,7 @@ proc mountFilter*(node: WakuNode) {.raises: [Defect, KeyError, LPError]} =
node.filters.notify(message, requestId) # Trigger filter handlers on a light node
waku_node_messages.inc(labelValues = ["filter"])
node.wakuFilter = WakuFilter.init(node.peerManager, node.rng, filterHandler)
node.wakuFilter = WakuFilter.init(node.peerManager, node.rng, filterHandler, filterTimeout)
node.switch.mount(node.wakuFilter, protocolMatcher(WakuFilterCodec))
# NOTE: If using the swap protocol, it must be mounted before store. This is
@ -1089,7 +1092,7 @@ when isMainModule:
# Filter setup. NOTE Must be mounted after relay
if (conf.filternode != "") or (conf.filter):
mountFilter(node)
mountFilter(node, filterTimeout = chronos.seconds(conf.filterTimeout))
if conf.filternode != "":
setFilterPeer(node, conf.filternode)

View File

@ -30,6 +30,7 @@ logScope:
const
WakuFilterCodec* = "/vac/waku/filter/2.0.0-beta1"
WakuFilterTimeout: Duration = 1.days
# Error types (metric label values)
const
@ -188,43 +189,72 @@ method init*(wf: WakuFilter) =
wf.handler = handle
wf.codec = WakuFilterCodec
proc init*(T: type WakuFilter, peerManager: PeerManager, rng: ref BrHmacDrbgContext, handler: MessagePushHandler): T =
proc init*(T: type WakuFilter, peerManager: PeerManager, rng: ref BrHmacDrbgContext, handler: MessagePushHandler,timeout: Duration = WakuFilterTimeout): T =
let rng = crypto.newRng()
var wf = WakuFilter(rng: rng,
peerManager: peerManager,
pushHandler: handler)
pushHandler: handler,
timeout: timeout)
wf.init()
return wf
proc setPeer*(wf: WakuFilter, peer: RemotePeerInfo) =
wf.peerManager.addPeer(peer, WakuFilterCodec)
waku_filter_peers.inc()
#clear the failed peer table if subscriber was able to connect.
proc handleClientSuccess(wf: WakuFilter, subscribers: seq[Subscriber]){.raises: [Defect, KeyError].} =
for subscriber in subscribers:
var subKey: string = $(subscriber)
if wf.failedPeers.hasKey(subKey):
wf.failedPeers.del(subKey)
# If we have already failed to send message to this peer,
# check for elapsed time and if it's been too long, remove the peer.
proc handleClientError(wf: WakuFilter, subscribers: seq[Subscriber]){.raises: [Defect, KeyError].} =
for subscriber in subscribers:
var subKey: string = $(subscriber)
if wf.failedPeers.hasKey(subKey):
var elapsedTime = Moment.now() - wf.failedPeers[subKey]
if(elapsedTime > wf.timeout):
trace "Remove peer if timeout has reached for", peer=subscriber
var index = wf.subscribers.find(subscriber)
wf.subscribers.delete(index)
wf.failedPeers.del(subKey)
else:
# add the peer to the failed peers table.
wf.failedPeers[subKey] = Moment.now()
return
proc handleMessage*(wf: WakuFilter, topic: string, msg: WakuMessage) {.async.} =
# Handle WakuMessage according to filter protocol
trace "handle message in WakuFilter", topic=topic, msg=msg
var handleMessageFailed = false
var failedSubscriber: seq[Subscriber]
var connectedSubscribers: seq[Subscriber]
for subscriber in wf.subscribers:
if subscriber.filter.pubSubTopic != "" and subscriber.filter.pubSubTopic != topic:
trace "Subscriber's filter pubsubTopic does not match message topic", filter=subscriber.filter.pubSubTopic, topic=topic
continue
for filter in subscriber.filter.contentFilters:
if msg.contentTopic == filter.contentTopic:
trace "Found matching contentTopic", filter=filter, msg=msg
let push = FilterRPC(requestId: subscriber.requestId, push: MessagePush(messages: @[msg]))
let connOpt = await wf.peerManager.dialPeer(subscriber.peer, WakuFilterCodec)
if connOpt.isSome:
await connOpt.get().writeLP(push.encode().buffer)
connectedSubscribers.add(subscriber)
else:
# @TODO more sophisticated error handling here
handleMessageFailed = true
failedSubscriber.add(subscriber)
error "failed to push messages to remote peer"
waku_filter_errors.inc(labelValues = [dialFailure])
break
handleClientSuccess(wf, connectedSubscribers)
if handleMessageFailed:
handleClientError(wf, failedSubscriber)
proc subscribe*(wf: WakuFilter, request: FilterRequest): Future[Option[string]] {.async, gcsafe.} =
let peerOpt = wf.peerManager.selectPeer(WakuFilterCodec)

View File

@ -1,5 +1,6 @@
import
std/[tables],
chronos,
bearssl,
libp2p/protocols/protocol,
../../node/peer_manager/peer_manager,
@ -45,3 +46,5 @@ type
peerManager*: PeerManager
subscribers*: seq[Subscriber]
pushHandler*: MessagePushHandler
failedPeers*: Table[string, chronos.Moment]
timeout*: chronos.Duration