mirror of
https://github.com/waku-org/nwaku.git
synced 2025-02-12 15:06:38 +00:00
deploy: 1d9e3afaa4a5a0ccc41f566e946784e0f7e994e8
This commit is contained in:
parent
ccccd2732f
commit
5b017cbbf8
@ -1 +1 @@
|
||||
1612522151
|
||||
1612775840
|
@ -4,6 +4,8 @@
|
||||
|
||||
- Refactor: Split out `waku_types` types into right place; create utils folder.
|
||||
- Docs: Add information on how to query Status test fleet for node addresses; how to view logs and how to update submodules.
|
||||
- PubSub topic `subscribe` and `unsubscribe` no longer returns a future (removed `async` designation)
|
||||
- Added a peer manager for `relay` and `filter` peers.
|
||||
|
||||
## 2021-01-05 v0.2
|
||||
|
||||
|
@ -8,7 +8,8 @@ import
|
||||
libp2p/stream/[bufferstream, connection],
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/multistream,
|
||||
../../waku/v2/protocol/[message_notifier],
|
||||
../../waku/v2/node/peer_manager,
|
||||
../../waku/v2/protocol/message_notifier,
|
||||
../../waku/v2/protocol/waku_filter/waku_filter,
|
||||
../test_helpers, ./utils
|
||||
|
||||
@ -37,7 +38,7 @@ procSuite "Waku Filter":
|
||||
responseRequestIdFuture.complete(requestId)
|
||||
|
||||
let
|
||||
proto = WakuFilter.init(dialSwitch, crypto.newRng(), handle)
|
||||
proto = WakuFilter.init(PeerManager.new(dialSwitch), crypto.newRng(), handle)
|
||||
rpc = FilterRequest(contentFilters: @[ContentFilter(topics: @[contentTopic])], topic: defaultTopic, subscribe: true)
|
||||
|
||||
dialSwitch.mount(proto)
|
||||
@ -47,14 +48,14 @@ procSuite "Waku Filter":
|
||||
discard
|
||||
|
||||
let
|
||||
proto2 = WakuFilter.init(listenSwitch, crypto.newRng(), emptyHandle)
|
||||
proto2 = WakuFilter.init(PeerManager.new(listenSwitch), crypto.newRng(), emptyHandle)
|
||||
subscription = proto2.subscription()
|
||||
|
||||
var subscriptions = newTable[string, MessageNotificationSubscription]()
|
||||
subscriptions["test"] = subscription
|
||||
listenSwitch.mount(proto2)
|
||||
|
||||
let id = await proto.subscribe(rpc)
|
||||
let id = (await proto.subscribe(rpc)).get()
|
||||
|
||||
await sleepAsync(2.seconds)
|
||||
|
||||
@ -86,7 +87,7 @@ procSuite "Waku Filter":
|
||||
responseCompletionFuture.complete(true)
|
||||
|
||||
let
|
||||
proto = WakuFilter.init(dialSwitch, crypto.newRng(), handle)
|
||||
proto = WakuFilter.init(PeerManager.new(dialSwitch), crypto.newRng(), handle)
|
||||
rpc = FilterRequest(contentFilters: @[ContentFilter(topics: @[contentTopic])], topic: defaultTopic, subscribe: true)
|
||||
|
||||
dialSwitch.mount(proto)
|
||||
@ -96,14 +97,14 @@ procSuite "Waku Filter":
|
||||
discard
|
||||
|
||||
let
|
||||
proto2 = WakuFilter.init(listenSwitch, crypto.newRng(), emptyHandle)
|
||||
proto2 = WakuFilter.init(PeerManager.new(listenSwitch), crypto.newRng(), emptyHandle)
|
||||
subscription = proto2.subscription()
|
||||
|
||||
var subscriptions = newTable[string, MessageNotificationSubscription]()
|
||||
subscriptions["test"] = subscription
|
||||
listenSwitch.mount(proto2)
|
||||
|
||||
let id = await proto.subscribe(rpc)
|
||||
let id = (await proto.subscribe(rpc)).get()
|
||||
|
||||
await sleepAsync(2.seconds)
|
||||
|
||||
@ -128,3 +129,27 @@ procSuite "Waku Filter":
|
||||
check:
|
||||
# Check that unsubscribe works as expected
|
||||
(await responseCompletionFuture.withTimeout(5.seconds)) == false
|
||||
|
||||
asyncTest "handle filter subscribe failures":
|
||||
const defaultTopic = "/waku/2/default-waku/proto"
|
||||
|
||||
let
|
||||
contentTopic = ContentTopic(1)
|
||||
|
||||
var dialSwitch = newStandardSwitch()
|
||||
discard await dialSwitch.start()
|
||||
|
||||
var responseRequestIdFuture = newFuture[string]()
|
||||
proc handle(requestId: string, msg: MessagePush) {.gcsafe, closure.} =
|
||||
discard
|
||||
|
||||
let
|
||||
proto = WakuFilter.init(PeerManager.new(dialSwitch), crypto.newRng(), handle)
|
||||
rpc = FilterRequest(contentFilters: @[ContentFilter(topics: @[contentTopic])], topic: defaultTopic, subscribe: true)
|
||||
|
||||
dialSwitch.mount(proto)
|
||||
|
||||
let idOpt = (await proto.subscribe(rpc))
|
||||
|
||||
check:
|
||||
idOpt.isNone
|
||||
|
@ -10,7 +10,7 @@ generated by GNU Autoconf 2.69. Invocation command line was
|
||||
## Platform. ##
|
||||
## --------- ##
|
||||
|
||||
hostname = fv-az54-298
|
||||
hostname = fv-az16-738
|
||||
uname -m = x86_64
|
||||
uname -r = 5.4.0-1039-azure
|
||||
uname -s = Linux
|
||||
@ -841,7 +841,7 @@ configure:12482: $? = 0
|
||||
configure:12482: result: yes
|
||||
configure:12499: checking for getexecname
|
||||
configure:12499: gcc -o conftest -g -O3 -std=gnu11 -pipe -Wall -Wextra -fPIC conftest.c >&5
|
||||
/tmp/ccxyCmdq.o: In function `main':
|
||||
/tmp/cc3MtpPn.o: In function `main':
|
||||
/home/runner/work/nim-waku/nim-waku/vendor/nim-libbacktrace/vendor/libbacktrace-upstream/conftest.c:73: undefined reference to `getexecname'
|
||||
collect2: error: ld returned 1 exit status
|
||||
configure:12499: $? = 1
|
||||
@ -1134,7 +1134,7 @@ generated by GNU Autoconf 2.69. Invocation command line was
|
||||
CONFIG_COMMANDS =
|
||||
$ ./config.status
|
||||
|
||||
on fv-az54-298
|
||||
on fv-az16-738
|
||||
|
||||
config.status:1150: creating Makefile
|
||||
config.status:1150: creating backtrace-supported.h
|
||||
|
@ -2,7 +2,7 @@
|
||||
|
||||
# libtool - Provide generalized library-building support services.
|
||||
# Generated automatically by config.status (libbacktrace) version-unused
|
||||
# Libtool was configured on host fv-az54-298:
|
||||
# Libtool was configured on host fv-az16-738:
|
||||
# NOTE: Changes made to this file will be lost: look at ltmain.sh.
|
||||
#
|
||||
# Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005,
|
||||
|
@ -1,7 +1,7 @@
|
||||
{.push raises: [Defect, Exception].}
|
||||
|
||||
import
|
||||
std/options,
|
||||
std/[options, sets],
|
||||
chronos, chronicles, metrics,
|
||||
libp2p/standard_setup,
|
||||
libp2p/peerstore
|
||||
@ -26,16 +26,18 @@ proc new*(T: type PeerManager, switch: Switch): PeerManager =
|
||||
peerStore: PeerStore.new())
|
||||
|
||||
####################
|
||||
# Dialer interface #
|
||||
# Helper functions #
|
||||
####################
|
||||
|
||||
proc dialPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string, dialTimeout = defaultDialTimeout): Future[Option[Connection]] {.async.} =
|
||||
# Dial a given peer and add it to the list of known peers
|
||||
# @TODO check peer validity, duplicates and score before continuing. Limit number of peers to be managed.
|
||||
|
||||
# First add dialed peer info to peer store...
|
||||
proc hasPeer(pm: PeerManager, peerInfo: PeerInfo, proto: string): bool =
|
||||
# Returns `true` if peer is included in manager for the specified protocol
|
||||
|
||||
debug "Adding dialed peer to manager", peerId = peerInfo.peerId, addr = peerInfo.addrs[0], proto = proto
|
||||
pm.peerStore.get(peerInfo.peerId).protos.contains(proto)
|
||||
|
||||
proc addPeer(pm: PeerManager, peerInfo: PeerInfo, proto: string) =
|
||||
# Adds peer to manager for the specified protocol
|
||||
|
||||
debug "Adding peer to manager", peerId = peerInfo.peerId, addr = peerInfo.addrs[0], proto = proto
|
||||
|
||||
# ...known addresses
|
||||
for multiaddr in peerInfo.addrs:
|
||||
@ -50,6 +52,20 @@ proc dialPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string, dialTimeout =
|
||||
# ...associated protocols
|
||||
pm.peerStore.protoBook.add(peerInfo.peerId, proto)
|
||||
|
||||
|
||||
####################
|
||||
# Dialer interface #
|
||||
####################
|
||||
|
||||
proc dialPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string, dialTimeout = defaultDialTimeout): Future[Option[Connection]] {.async.} =
|
||||
# Dial a given peer and add it to the list of known peers
|
||||
# @TODO check peer validity and score before continuing. Limit number of peers to be managed.
|
||||
|
||||
# First add dialed peer info to peer store, if it does not exist yet...
|
||||
if not pm.hasPeer(peerInfo, proto):
|
||||
trace "Adding newly dialed peer to manager", peerId = peerInfo.peerId, addr = peerInfo.addrs[0], proto = proto
|
||||
pm.addPeer(peerInfo, proto)
|
||||
|
||||
info "Dialing peer from manager", wireAddr = peerInfo.addrs[0], peerId = peerInfo.peerId
|
||||
|
||||
# Dial Peer
|
||||
|
@ -182,11 +182,20 @@ proc subscribe*(node: WakuNode, request: FilterRequest, handler: ContentFilterHa
|
||||
info "subscribe content", filter=request
|
||||
|
||||
var id = generateRequestId(node.rng)
|
||||
if node.wakuFilter.isNil == false:
|
||||
# @TODO: ERROR HANDLING
|
||||
id = await node.wakuFilter.subscribe(request)
|
||||
node.filters[id] = Filter(contentFilters: request.contentFilters, handler: handler)
|
||||
|
||||
if node.wakuFilter.isNil == false:
|
||||
let idOpt = await node.wakuFilter.subscribe(request)
|
||||
|
||||
if idOpt.isSome():
|
||||
# Subscribed successfully.
|
||||
id = idOpt.get()
|
||||
else:
|
||||
# Failed to subscribe
|
||||
error "remote subscription to filter failed", filter = request
|
||||
waku_node_errors.inc(labelValues = ["subscribe_filter_failure"])
|
||||
|
||||
# Register handler for filter, whether remote subscription succeeded or not
|
||||
node.filters[id] = Filter(contentFilters: request.contentFilters, handler: handler)
|
||||
waku_node_filters.set(node.filters.len.int64)
|
||||
|
||||
proc unsubscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) =
|
||||
@ -275,7 +284,7 @@ proc mountFilter*(node: WakuNode) =
|
||||
node.filters.notify(message, requestId)
|
||||
waku_node_messages.inc(labelValues = ["filter"])
|
||||
|
||||
node.wakuFilter = WakuFilter.init(node.switch, node.rng, filterHandler)
|
||||
node.wakuFilter = WakuFilter.init(node.peerManager, node.rng, filterHandler)
|
||||
node.switch.mount(node.wakuFilter)
|
||||
node.subscriptions.subscribe(WakuFilterCodec, node.wakuFilter.subscription())
|
||||
|
||||
|
@ -1,5 +1,5 @@
|
||||
import
|
||||
std/[tables, sequtils],
|
||||
std/[tables, sequtils, options],
|
||||
bearssl,
|
||||
chronos, chronicles, metrics, stew/results,
|
||||
libp2p/protocols/pubsub/pubsubpeer,
|
||||
@ -9,10 +9,10 @@ import
|
||||
libp2p/protobuf/minprotobuf,
|
||||
libp2p/stream/connection,
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/switch,
|
||||
../message_notifier,
|
||||
waku_filter_types,
|
||||
../../utils/requests
|
||||
../../utils/requests,
|
||||
../../node/peer_manager
|
||||
|
||||
# NOTE This is just a start, the design of this protocol isn't done yet. It
|
||||
# should be direct payload exchange (a la req-resp), not be coupled with the
|
||||
@ -30,6 +30,12 @@ logScope:
|
||||
const
|
||||
WakuFilterCodec* = "/vac/waku/filter/2.0.0-beta1"
|
||||
|
||||
|
||||
# Error types (metric label values)
|
||||
const
|
||||
dialFailure = "dial_failure"
|
||||
decodeRpcFailure = "decode_rpc_failure"
|
||||
|
||||
proc notify*(filters: Filters, msg: WakuMessage, requestId: string = "") =
|
||||
for key in filters.keys:
|
||||
let filter = filters[key]
|
||||
@ -166,7 +172,7 @@ method init*(wf: WakuFilter) =
|
||||
var res = FilterRPC.init(message)
|
||||
if res.isErr:
|
||||
error "failed to decode rpc"
|
||||
waku_filter_errors.inc(labelValues = ["decode_rpc_failure"])
|
||||
waku_filter_errors.inc(labelValues = [decodeRpcFailure])
|
||||
return
|
||||
|
||||
info "filter message received"
|
||||
@ -185,10 +191,10 @@ method init*(wf: WakuFilter) =
|
||||
wf.handler = handle
|
||||
wf.codec = WakuFilterCodec
|
||||
|
||||
proc init*(T: type WakuFilter, switch: Switch, rng: ref BrHmacDrbgContext, handler: MessagePushHandler): T =
|
||||
proc init*(T: type WakuFilter, peerManager: PeerManager, rng: ref BrHmacDrbgContext, handler: MessagePushHandler): T =
|
||||
new result
|
||||
result.rng = crypto.newRng()
|
||||
result.switch = switch
|
||||
result.peerManager = peerManager
|
||||
result.pushHandler = handler
|
||||
result.init()
|
||||
|
||||
@ -208,26 +214,47 @@ proc subscription*(proto: WakuFilter): MessageNotificationSubscription =
|
||||
for filter in subscriber.filter.contentFilters:
|
||||
if msg.contentTopic in filter.topics:
|
||||
let push = FilterRPC(requestId: subscriber.requestId, push: MessagePush(messages: @[msg]))
|
||||
let conn = await proto.switch.dial(subscriber.peer.peerId, subscriber.peer.addrs, WakuFilterCodec)
|
||||
await conn.writeLP(push.encode().buffer)
|
||||
|
||||
let connOpt = await proto.peerManager.dialPeer(subscriber.peer, WakuFilterCodec)
|
||||
|
||||
if connOpt.isSome:
|
||||
await connOpt.get().writeLP(push.encode().buffer)
|
||||
else:
|
||||
# @TODO more sophisticated error handling here
|
||||
error "failed to push messages to remote peer"
|
||||
waku_filter_errors.inc(labelValues = [dialFailure])
|
||||
break
|
||||
|
||||
MessageNotificationSubscription.init(@[], handle)
|
||||
|
||||
proc subscribe*(wf: WakuFilter, request: FilterRequest): Future[string] {.async, gcsafe.} =
|
||||
let id = generateRequestId(wf.rng)
|
||||
proc subscribe*(wf: WakuFilter, request: FilterRequest): Future[Option[string]] {.async, gcsafe.} =
|
||||
if wf.peers.len >= 1:
|
||||
let peer = wf.peers[0].peerInfo
|
||||
# @TODO: THERE SHOULD BE ERROR HANDLING HERE, WHAT IF A PEER IS GONE? WHAT IF THERE IS A TIMEOUT ETC.
|
||||
let conn = await wf.switch.dial(peer.peerId, peer.addrs, WakuFilterCodec)
|
||||
await conn.writeLP(FilterRPC(requestId: id, request: request).encode().buffer)
|
||||
result = id
|
||||
let peer = wf.peers[0].peerInfo # @TODO: select peer from manager rather than from local set
|
||||
|
||||
let connOpt = await wf.peerManager.dialPeer(peer, WakuFilterCodec)
|
||||
|
||||
if connOpt.isSome:
|
||||
# This is the only successful path to subscription
|
||||
let id = generateRequestId(wf.rng)
|
||||
await connOpt.get().writeLP(FilterRPC(requestId: id, request: request).encode().buffer)
|
||||
return some(id)
|
||||
else:
|
||||
# @TODO more sophisticated error handling here
|
||||
error "failed to connect to remote peer"
|
||||
waku_filter_errors.inc(labelValues = [dialFailure])
|
||||
return none(string)
|
||||
|
||||
proc unsubscribe*(wf: WakuFilter, request: FilterRequest) {.async, gcsafe.} =
|
||||
# @TODO: NO REAL REASON TO GENERATE REQUEST ID FOR UNSUBSCRIBE OTHER THAN CREATING SANE-LOOKING RPC.
|
||||
let id = generateRequestId(wf.rng)
|
||||
if wf.peers.len >= 1:
|
||||
let peer = wf.peers[0].peerInfo
|
||||
# @TODO: THERE SHOULD BE ERROR HANDLING HERE, WHAT IF A PEER IS GONE? WHAT IF THERE IS A TIMEOUT ETC.
|
||||
let conn = await wf.switch.dial(peer.peerId, peer.addrs, WakuFilterCodec)
|
||||
await conn.writeLP(FilterRPC(requestId: id, request: request).encode().buffer)
|
||||
let peer = wf.peers[0].peerInfo # @TODO: select peer from manager rather than from local set
|
||||
|
||||
let connOpt = await wf.peerManager.dialPeer(peer, WakuFilterCodec)
|
||||
|
||||
if connOpt.isSome:
|
||||
await connOpt.get().writeLP(FilterRPC(requestId: id, request: request).encode().buffer)
|
||||
else:
|
||||
# @TODO more sophisticated error handling here
|
||||
error "failed to connect to remote peer"
|
||||
waku_filter_errors.inc(labelValues = [dialFailure])
|
||||
|
@ -1,8 +1,9 @@
|
||||
import
|
||||
std/[tables],
|
||||
bearssl,
|
||||
libp2p/[switch, peerinfo],
|
||||
libp2p/peerinfo,
|
||||
libp2p/protocols/protocol,
|
||||
../../node/peer_manager,
|
||||
../waku_message
|
||||
|
||||
export waku_message
|
||||
@ -45,7 +46,7 @@ type
|
||||
|
||||
WakuFilter* = ref object of LPProtocol
|
||||
rng*: ref BrHmacDrbgContext
|
||||
switch*: Switch
|
||||
peerManager*: PeerManager
|
||||
peers*: seq[FilterPeer]
|
||||
subscribers*: seq[Subscriber]
|
||||
pushHandler*: MessagePushHandler
|
||||
|
Loading…
x
Reference in New Issue
Block a user