deploy: b2273fff9a98a8918e0e8b5012aee1754ba3e943

This commit is contained in:
rshiv 2021-12-08 19:06:31 +00:00
parent eba5e332bb
commit 97e4f4fe27
7 changed files with 203 additions and 12 deletions

View File

@ -1,3 +1,15 @@
## Next version
This release contains the following:
### Features
- Waku v2 node timeout for Filter nodes.
- Waku v2 node support for secure websockets.
### Changes
- The WakuInfo Object field of `listenStr` is deprecated and is now replaced with `listenAddresses`
which is a sequence of string.
## 2021-11-05 v0.6 ## 2021-11-05 v0.6
Some useful features and fixes in this release, include: Some useful features and fixes in this release, include:

View File

@ -144,3 +144,141 @@ procSuite "Waku Filter":
check: check:
idOpt.isNone idOpt.isNone
asyncTest "Handle failed clients":
const defaultTopic = "/waku/2/default-waku/proto"
let
key = PrivateKey.random(ECDSA, rng[]).get()
peer = PeerInfo.new(key)
contentTopic = ContentTopic("/waku/2/default-content/proto")
post = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic)
var dialSwitch = newStandardSwitch()
discard await dialSwitch.start()
var listenSwitch = newStandardSwitch(some(key))
discard await listenSwitch.start()
var responseCompletionFuture = newFuture[bool]()
proc handle(requestId: string, msg: MessagePush) {.gcsafe, closure.} =
check:
msg.messages.len() == 1
msg.messages[0] == post
responseCompletionFuture.complete(true)
let
proto = WakuFilter.init(PeerManager.new(dialSwitch), crypto.newRng(), handle)
rpc = FilterRequest(contentFilters: @[ContentFilter(contentTopic: contentTopic)], pubSubTopic: defaultTopic, subscribe: true)
dialSwitch.mount(proto)
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
proc emptyHandle(requestId: string, msg: MessagePush) {.gcsafe, closure.} =
discard
let proto2 = WakuFilter.init(PeerManager.new(listenSwitch), crypto.newRng(), emptyHandle, 1.seconds)
listenSwitch.mount(proto2)
let id = (await proto.subscribe(rpc)).get()
await sleepAsync(2.seconds)
await proto2.handleMessage(defaultTopic, post)
check:
# Check that subscription works as expected
(await responseCompletionFuture.withTimeout(3.seconds)) == true
# Stop switch to test unsubscribe
discard dialSwitch.stop()
await sleepAsync(2.seconds)
#First failure should not remove the subscription
await proto2.handleMessage(defaultTopic, post)
await sleepAsync(2000.millis)
check:
proto2.subscribers.len() == 1
#Second failure should remove the subscription
await proto2.handleMessage(defaultTopic, post)
check:
proto2.subscribers.len() == 0
asyncTest "Handles failed clients coming back up":
const defaultTopic = "/waku/2/default-waku/proto"
let
key = PrivateKey.random(ECDSA, rng[]).get()
peer = PeerInfo.new(key)
contentTopic = ContentTopic("/waku/2/default-content/proto")
post = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic)
var dialSwitch = newStandardSwitch()
discard await dialSwitch.start()
var listenSwitch = newStandardSwitch(some(key))
discard await listenSwitch.start()
var responseCompletionFuture = newFuture[bool]()
proc handle(requestId: string, msg: MessagePush) {.gcsafe, closure.} =
check:
msg.messages.len() == 1
msg.messages[0] == post
responseCompletionFuture.complete(true)
let
proto = WakuFilter.init(PeerManager.new(dialSwitch), crypto.newRng(), handle)
rpc = FilterRequest(contentFilters: @[ContentFilter(contentTopic: contentTopic)], pubSubTopic: defaultTopic, subscribe: true)
dialSwitch.mount(proto)
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
proc emptyHandle(requestId: string, msg: MessagePush) {.gcsafe, closure.} =
discard
let proto2 = WakuFilter.init(PeerManager.new(listenSwitch), crypto.newRng(), emptyHandle, 2.seconds)
listenSwitch.mount(proto2)
let id = (await proto.subscribe(rpc)).get()
await sleepAsync(2.seconds)
await proto2.handleMessage(defaultTopic, post)
check:
# Check that subscription works as expected
(await responseCompletionFuture.withTimeout(3.seconds)) == true
responseCompletionFuture = newFuture[bool]()
# Stop switch to test unsubscribe
discard dialSwitch.stop()
await sleepAsync(1.seconds)
#First failure should add to failure list
await proto2.handleMessage(defaultTopic, post)
check:
proto2.failedPeers.len() == 1
discard dialSwitch.start()
dialSwitch.mount(proto)
#Second failure should remove the subscription
await proto2.handleMessage(defaultTopic, post)
check:
# Check that subscription works as expected
(await responseCompletionFuture.withTimeout(3.seconds)) == true
check:
proto2.failedPeers.len() == 0
discard dialSwitch.stop()
discard listenSwitch.stop()

View File

@ -2,7 +2,7 @@
# libtool - Provide generalized library-building support services. # libtool - Provide generalized library-building support services.
# Generated automatically by config.status (libbacktrace) version-unused # Generated automatically by config.status (libbacktrace) version-unused
# Libtool was configured on host fv-az196-786: # Libtool was configured on host fv-az132-44:
# NOTE: Changes made to this file will be lost: look at ltmain.sh. # NOTE: Changes made to this file will be lost: look at ltmain.sh.
# #
# Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005, # Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005,

View File

@ -7,7 +7,7 @@ import
nimcrypto/utils, nimcrypto/utils,
eth/keys, eth/keys,
../protocol/waku_rln_relay/[waku_rln_relay_types] ../protocol/waku_rln_relay/[waku_rln_relay_types]
type type
WakuNodeConf* = object WakuNodeConf* = object
## General node config ## General node config
@ -129,6 +129,11 @@ type
defaultValue: "" defaultValue: ""
name: "filternode" }: string name: "filternode" }: string
filterTimeout* {.
desc: "Timeout for filter node in seconds.",
defaultValue: 14400 # 4 hours
name: "filter-timeout" }: int64
## Swap config ## Swap config
swap* {. swap* {.

View File

@ -53,6 +53,9 @@ const clientId* = "Nimbus Waku v2 node"
# Default topic # Default topic
const defaultTopic = "/waku/2/default-waku/proto" const defaultTopic = "/waku/2/default-waku/proto"
# Default Waku Filter Timeout
const WakuFilterTimeout: Duration = 1.days
# key and crypto modules different # key and crypto modules different
type type
@ -430,7 +433,7 @@ proc info*(node: WakuNode): WakuInfo =
let wakuInfo = WakuInfo(listenAddresses: listenStr) let wakuInfo = WakuInfo(listenAddresses: listenStr)
return wakuInfo return wakuInfo
proc mountFilter*(node: WakuNode) {.raises: [Defect, KeyError, LPError]} = proc mountFilter*(node: WakuNode, filterTimeout: Duration = WakuFilterTimeout) {.raises: [Defect, KeyError, LPError]} =
info "mounting filter" info "mounting filter"
proc filterHandler(requestId: string, msg: MessagePush) proc filterHandler(requestId: string, msg: MessagePush)
{.gcsafe, raises: [Defect, KeyError].} = {.gcsafe, raises: [Defect, KeyError].} =
@ -440,7 +443,7 @@ proc mountFilter*(node: WakuNode) {.raises: [Defect, KeyError, LPError]} =
node.filters.notify(message, requestId) # Trigger filter handlers on a light node node.filters.notify(message, requestId) # Trigger filter handlers on a light node
waku_node_messages.inc(labelValues = ["filter"]) waku_node_messages.inc(labelValues = ["filter"])
node.wakuFilter = WakuFilter.init(node.peerManager, node.rng, filterHandler) node.wakuFilter = WakuFilter.init(node.peerManager, node.rng, filterHandler, filterTimeout)
node.switch.mount(node.wakuFilter, protocolMatcher(WakuFilterCodec)) node.switch.mount(node.wakuFilter, protocolMatcher(WakuFilterCodec))
# NOTE: If using the swap protocol, it must be mounted before store. This is # NOTE: If using the swap protocol, it must be mounted before store. This is
@ -1089,7 +1092,7 @@ when isMainModule:
# Filter setup. NOTE Must be mounted after relay # Filter setup. NOTE Must be mounted after relay
if (conf.filternode != "") or (conf.filter): if (conf.filternode != "") or (conf.filter):
mountFilter(node) mountFilter(node, filterTimeout = chronos.seconds(conf.filterTimeout))
if conf.filternode != "": if conf.filternode != "":
setFilterPeer(node, conf.filternode) setFilterPeer(node, conf.filternode)

View File

@ -30,6 +30,7 @@ logScope:
const const
WakuFilterCodec* = "/vac/waku/filter/2.0.0-beta1" WakuFilterCodec* = "/vac/waku/filter/2.0.0-beta1"
WakuFilterTimeout: Duration = 1.days
# Error types (metric label values) # Error types (metric label values)
const const
@ -188,43 +189,72 @@ method init*(wf: WakuFilter) =
wf.handler = handle wf.handler = handle
wf.codec = WakuFilterCodec wf.codec = WakuFilterCodec
proc init*(T: type WakuFilter, peerManager: PeerManager, rng: ref BrHmacDrbgContext, handler: MessagePushHandler): T = proc init*(T: type WakuFilter, peerManager: PeerManager, rng: ref BrHmacDrbgContext, handler: MessagePushHandler,timeout: Duration = WakuFilterTimeout): T =
let rng = crypto.newRng() let rng = crypto.newRng()
var wf = WakuFilter(rng: rng, var wf = WakuFilter(rng: rng,
peerManager: peerManager, peerManager: peerManager,
pushHandler: handler) pushHandler: handler,
timeout: timeout)
wf.init() wf.init()
return wf return wf
proc setPeer*(wf: WakuFilter, peer: RemotePeerInfo) = proc setPeer*(wf: WakuFilter, peer: RemotePeerInfo) =
wf.peerManager.addPeer(peer, WakuFilterCodec) wf.peerManager.addPeer(peer, WakuFilterCodec)
waku_filter_peers.inc() waku_filter_peers.inc()
#clear the failed peer table if subscriber was able to connect.
proc handleClientSuccess(wf: WakuFilter, subscribers: seq[Subscriber]){.raises: [Defect, KeyError].} =
for subscriber in subscribers:
var subKey: string = $(subscriber)
if wf.failedPeers.hasKey(subKey):
wf.failedPeers.del(subKey)
# If we have already failed to send message to this peer,
# check for elapsed time and if it's been too long, remove the peer.
proc handleClientError(wf: WakuFilter, subscribers: seq[Subscriber]){.raises: [Defect, KeyError].} =
for subscriber in subscribers:
var subKey: string = $(subscriber)
if wf.failedPeers.hasKey(subKey):
var elapsedTime = Moment.now() - wf.failedPeers[subKey]
if(elapsedTime > wf.timeout):
trace "Remove peer if timeout has reached for", peer=subscriber
var index = wf.subscribers.find(subscriber)
wf.subscribers.delete(index)
wf.failedPeers.del(subKey)
else:
# add the peer to the failed peers table.
wf.failedPeers[subKey] = Moment.now()
return
proc handleMessage*(wf: WakuFilter, topic: string, msg: WakuMessage) {.async.} = proc handleMessage*(wf: WakuFilter, topic: string, msg: WakuMessage) {.async.} =
# Handle WakuMessage according to filter protocol # Handle WakuMessage according to filter protocol
trace "handle message in WakuFilter", topic=topic, msg=msg trace "handle message in WakuFilter", topic=topic, msg=msg
var handleMessageFailed = false
var failedSubscriber: seq[Subscriber]
var connectedSubscribers: seq[Subscriber]
for subscriber in wf.subscribers: for subscriber in wf.subscribers:
if subscriber.filter.pubSubTopic != "" and subscriber.filter.pubSubTopic != topic: if subscriber.filter.pubSubTopic != "" and subscriber.filter.pubSubTopic != topic:
trace "Subscriber's filter pubsubTopic does not match message topic", filter=subscriber.filter.pubSubTopic, topic=topic trace "Subscriber's filter pubsubTopic does not match message topic", filter=subscriber.filter.pubSubTopic, topic=topic
continue continue
for filter in subscriber.filter.contentFilters: for filter in subscriber.filter.contentFilters:
if msg.contentTopic == filter.contentTopic: if msg.contentTopic == filter.contentTopic:
trace "Found matching contentTopic", filter=filter, msg=msg trace "Found matching contentTopic", filter=filter, msg=msg
let push = FilterRPC(requestId: subscriber.requestId, push: MessagePush(messages: @[msg])) let push = FilterRPC(requestId: subscriber.requestId, push: MessagePush(messages: @[msg]))
let connOpt = await wf.peerManager.dialPeer(subscriber.peer, WakuFilterCodec) let connOpt = await wf.peerManager.dialPeer(subscriber.peer, WakuFilterCodec)
if connOpt.isSome: if connOpt.isSome:
await connOpt.get().writeLP(push.encode().buffer) await connOpt.get().writeLP(push.encode().buffer)
connectedSubscribers.add(subscriber)
else: else:
# @TODO more sophisticated error handling here # @TODO more sophisticated error handling here
handleMessageFailed = true
failedSubscriber.add(subscriber)
error "failed to push messages to remote peer" error "failed to push messages to remote peer"
waku_filter_errors.inc(labelValues = [dialFailure]) waku_filter_errors.inc(labelValues = [dialFailure])
break break
handleClientSuccess(wf, connectedSubscribers)
if handleMessageFailed:
handleClientError(wf, failedSubscriber)
proc subscribe*(wf: WakuFilter, request: FilterRequest): Future[Option[string]] {.async, gcsafe.} = proc subscribe*(wf: WakuFilter, request: FilterRequest): Future[Option[string]] {.async, gcsafe.} =
let peerOpt = wf.peerManager.selectPeer(WakuFilterCodec) let peerOpt = wf.peerManager.selectPeer(WakuFilterCodec)

View File

@ -1,5 +1,6 @@
import import
std/[tables], std/[tables],
chronos,
bearssl, bearssl,
libp2p/protocols/protocol, libp2p/protocols/protocol,
../../node/peer_manager/peer_manager, ../../node/peer_manager/peer_manager,
@ -45,3 +46,5 @@ type
peerManager*: PeerManager peerManager*: PeerManager
subscribers*: seq[Subscriber] subscribers*: seq[Subscriber]
pushHandler*: MessagePushHandler pushHandler*: MessagePushHandler
failedPeers*: Table[string, chronos.Moment]
timeout*: chronos.Duration