Peer manager: improvements; waku_filter integration (#368)

* Integrate peer manager with waku_filter

* Changelog and misc PR suggestions
This commit is contained in:
Hanno Cornelius 2021-02-08 11:17:20 +02:00 committed by GitHub
parent 1f5c3cc621
commit 1d9e3afaa4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 121 additions and 41 deletions

View File

@ -4,6 +4,8 @@
- Refactor: Split out `waku_types` types into right place; create utils folder.
- Docs: Add information on how to query Status test fleet for node addresses; how to view logs and how to update submodules.
- PubSub topic `subscribe` and `unsubscribe` no longer returns a future (removed `async` designation)
- Added a peer manager for `relay` and `filter` peers.
## 2021-01-05 v0.2

View File

@ -8,7 +8,8 @@ import
libp2p/stream/[bufferstream, connection],
libp2p/crypto/crypto,
libp2p/multistream,
../../waku/v2/protocol/[message_notifier],
../../waku/v2/node/peer_manager,
../../waku/v2/protocol/message_notifier,
../../waku/v2/protocol/waku_filter/waku_filter,
../test_helpers, ./utils
@ -37,7 +38,7 @@ procSuite "Waku Filter":
responseRequestIdFuture.complete(requestId)
let
proto = WakuFilter.init(dialSwitch, crypto.newRng(), handle)
proto = WakuFilter.init(PeerManager.new(dialSwitch), crypto.newRng(), handle)
rpc = FilterRequest(contentFilters: @[ContentFilter(topics: @[contentTopic])], topic: defaultTopic, subscribe: true)
dialSwitch.mount(proto)
@ -47,14 +48,14 @@ procSuite "Waku Filter":
discard
let
proto2 = WakuFilter.init(listenSwitch, crypto.newRng(), emptyHandle)
proto2 = WakuFilter.init(PeerManager.new(listenSwitch), crypto.newRng(), emptyHandle)
subscription = proto2.subscription()
var subscriptions = newTable[string, MessageNotificationSubscription]()
subscriptions["test"] = subscription
listenSwitch.mount(proto2)
let id = await proto.subscribe(rpc)
let id = (await proto.subscribe(rpc)).get()
await sleepAsync(2.seconds)
@ -86,7 +87,7 @@ procSuite "Waku Filter":
responseCompletionFuture.complete(true)
let
proto = WakuFilter.init(dialSwitch, crypto.newRng(), handle)
proto = WakuFilter.init(PeerManager.new(dialSwitch), crypto.newRng(), handle)
rpc = FilterRequest(contentFilters: @[ContentFilter(topics: @[contentTopic])], topic: defaultTopic, subscribe: true)
dialSwitch.mount(proto)
@ -96,14 +97,14 @@ procSuite "Waku Filter":
discard
let
proto2 = WakuFilter.init(listenSwitch, crypto.newRng(), emptyHandle)
proto2 = WakuFilter.init(PeerManager.new(listenSwitch), crypto.newRng(), emptyHandle)
subscription = proto2.subscription()
var subscriptions = newTable[string, MessageNotificationSubscription]()
subscriptions["test"] = subscription
listenSwitch.mount(proto2)
let id = await proto.subscribe(rpc)
let id = (await proto.subscribe(rpc)).get()
await sleepAsync(2.seconds)
@ -128,3 +129,27 @@ procSuite "Waku Filter":
check:
# Check that unsubscribe works as expected
(await responseCompletionFuture.withTimeout(5.seconds)) == false
asyncTest "handle filter subscribe failures":
const defaultTopic = "/waku/2/default-waku/proto"
let
contentTopic = ContentTopic(1)
var dialSwitch = newStandardSwitch()
discard await dialSwitch.start()
var responseRequestIdFuture = newFuture[string]()
proc handle(requestId: string, msg: MessagePush) {.gcsafe, closure.} =
discard
let
proto = WakuFilter.init(PeerManager.new(dialSwitch), crypto.newRng(), handle)
rpc = FilterRequest(contentFilters: @[ContentFilter(topics: @[contentTopic])], topic: defaultTopic, subscribe: true)
dialSwitch.mount(proto)
let idOpt = (await proto.subscribe(rpc))
check:
idOpt.isNone

View File

@ -1,7 +1,7 @@
{.push raises: [Defect, Exception].}
import
std/options,
std/[options, sets],
chronos, chronicles, metrics,
libp2p/standard_setup,
libp2p/peerstore
@ -26,16 +26,18 @@ proc new*(T: type PeerManager, switch: Switch): PeerManager =
peerStore: PeerStore.new())
####################
# Dialer interface #
# Helper functions #
####################
proc dialPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string, dialTimeout = defaultDialTimeout): Future[Option[Connection]] {.async.} =
# Dial a given peer and add it to the list of known peers
# @TODO check peer validity, duplicates and score before continuing. Limit number of peers to be managed.
# First add dialed peer info to peer store...
proc hasPeer(pm: PeerManager, peerInfo: PeerInfo, proto: string): bool =
# Returns `true` if peer is included in manager for the specified protocol
debug "Adding dialed peer to manager", peerId = peerInfo.peerId, addr = peerInfo.addrs[0], proto = proto
pm.peerStore.get(peerInfo.peerId).protos.contains(proto)
proc addPeer(pm: PeerManager, peerInfo: PeerInfo, proto: string) =
# Adds peer to manager for the specified protocol
debug "Adding peer to manager", peerId = peerInfo.peerId, addr = peerInfo.addrs[0], proto = proto
# ...known addresses
for multiaddr in peerInfo.addrs:
@ -50,6 +52,20 @@ proc dialPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string, dialTimeout =
# ...associated protocols
pm.peerStore.protoBook.add(peerInfo.peerId, proto)
####################
# Dialer interface #
####################
proc dialPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string, dialTimeout = defaultDialTimeout): Future[Option[Connection]] {.async.} =
# Dial a given peer and add it to the list of known peers
# @TODO check peer validity and score before continuing. Limit number of peers to be managed.
# First add dialed peer info to peer store, if it does not exist yet...
if not pm.hasPeer(peerInfo, proto):
trace "Adding newly dialed peer to manager", peerId = peerInfo.peerId, addr = peerInfo.addrs[0], proto = proto
pm.addPeer(peerInfo, proto)
info "Dialing peer from manager", wireAddr = peerInfo.addrs[0], peerId = peerInfo.peerId
# Dial Peer

View File

@ -182,11 +182,20 @@ proc subscribe*(node: WakuNode, request: FilterRequest, handler: ContentFilterHa
info "subscribe content", filter=request
var id = generateRequestId(node.rng)
if node.wakuFilter.isNil == false:
# @TODO: ERROR HANDLING
id = await node.wakuFilter.subscribe(request)
node.filters[id] = Filter(contentFilters: request.contentFilters, handler: handler)
if node.wakuFilter.isNil == false:
let idOpt = await node.wakuFilter.subscribe(request)
if idOpt.isSome():
# Subscribed successfully.
id = idOpt.get()
else:
# Failed to subscribe
error "remote subscription to filter failed", filter = request
waku_node_errors.inc(labelValues = ["subscribe_filter_failure"])
# Register handler for filter, whether remote subscription succeeded or not
node.filters[id] = Filter(contentFilters: request.contentFilters, handler: handler)
waku_node_filters.set(node.filters.len.int64)
proc unsubscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) =
@ -275,7 +284,7 @@ proc mountFilter*(node: WakuNode) =
node.filters.notify(message, requestId)
waku_node_messages.inc(labelValues = ["filter"])
node.wakuFilter = WakuFilter.init(node.switch, node.rng, filterHandler)
node.wakuFilter = WakuFilter.init(node.peerManager, node.rng, filterHandler)
node.switch.mount(node.wakuFilter)
node.subscriptions.subscribe(WakuFilterCodec, node.wakuFilter.subscription())

View File

@ -1,5 +1,5 @@
import
std/[tables, sequtils],
std/[tables, sequtils, options],
bearssl,
chronos, chronicles, metrics, stew/results,
libp2p/protocols/pubsub/pubsubpeer,
@ -9,10 +9,10 @@ import
libp2p/protobuf/minprotobuf,
libp2p/stream/connection,
libp2p/crypto/crypto,
libp2p/switch,
../message_notifier,
waku_filter_types,
../../utils/requests
../../utils/requests,
../../node/peer_manager
# NOTE This is just a start, the design of this protocol isn't done yet. It
# should be direct payload exchange (a la req-resp), not be coupled with the
@ -30,6 +30,12 @@ logScope:
const
WakuFilterCodec* = "/vac/waku/filter/2.0.0-beta1"
# Error types (metric label values)
const
dialFailure = "dial_failure"
decodeRpcFailure = "decode_rpc_failure"
proc notify*(filters: Filters, msg: WakuMessage, requestId: string = "") =
for key in filters.keys:
let filter = filters[key]
@ -166,7 +172,7 @@ method init*(wf: WakuFilter) =
var res = FilterRPC.init(message)
if res.isErr:
error "failed to decode rpc"
waku_filter_errors.inc(labelValues = ["decode_rpc_failure"])
waku_filter_errors.inc(labelValues = [decodeRpcFailure])
return
info "filter message received"
@ -185,10 +191,10 @@ method init*(wf: WakuFilter) =
wf.handler = handle
wf.codec = WakuFilterCodec
proc init*(T: type WakuFilter, switch: Switch, rng: ref BrHmacDrbgContext, handler: MessagePushHandler): T =
proc init*(T: type WakuFilter, peerManager: PeerManager, rng: ref BrHmacDrbgContext, handler: MessagePushHandler): T =
new result
result.rng = crypto.newRng()
result.switch = switch
result.peerManager = peerManager
result.pushHandler = handler
result.init()
@ -208,26 +214,47 @@ proc subscription*(proto: WakuFilter): MessageNotificationSubscription =
for filter in subscriber.filter.contentFilters:
if msg.contentTopic in filter.topics:
let push = FilterRPC(requestId: subscriber.requestId, push: MessagePush(messages: @[msg]))
let conn = await proto.switch.dial(subscriber.peer.peerId, subscriber.peer.addrs, WakuFilterCodec)
await conn.writeLP(push.encode().buffer)
let connOpt = await proto.peerManager.dialPeer(subscriber.peer, WakuFilterCodec)
if connOpt.isSome:
await connOpt.get().writeLP(push.encode().buffer)
else:
# @TODO more sophisticated error handling here
error "failed to push messages to remote peer"
waku_filter_errors.inc(labelValues = [dialFailure])
break
MessageNotificationSubscription.init(@[], handle)
proc subscribe*(wf: WakuFilter, request: FilterRequest): Future[string] {.async, gcsafe.} =
let id = generateRequestId(wf.rng)
proc subscribe*(wf: WakuFilter, request: FilterRequest): Future[Option[string]] {.async, gcsafe.} =
if wf.peers.len >= 1:
let peer = wf.peers[0].peerInfo
# @TODO: THERE SHOULD BE ERROR HANDLING HERE, WHAT IF A PEER IS GONE? WHAT IF THERE IS A TIMEOUT ETC.
let conn = await wf.switch.dial(peer.peerId, peer.addrs, WakuFilterCodec)
await conn.writeLP(FilterRPC(requestId: id, request: request).encode().buffer)
result = id
let peer = wf.peers[0].peerInfo # @TODO: select peer from manager rather than from local set
let connOpt = await wf.peerManager.dialPeer(peer, WakuFilterCodec)
if connOpt.isSome:
# This is the only successful path to subscription
let id = generateRequestId(wf.rng)
await connOpt.get().writeLP(FilterRPC(requestId: id, request: request).encode().buffer)
return some(id)
else:
# @TODO more sophisticated error handling here
error "failed to connect to remote peer"
waku_filter_errors.inc(labelValues = [dialFailure])
return none(string)
proc unsubscribe*(wf: WakuFilter, request: FilterRequest) {.async, gcsafe.} =
# @TODO: NO REAL REASON TO GENERATE REQUEST ID FOR UNSUBSCRIBE OTHER THAN CREATING SANE-LOOKING RPC.
let id = generateRequestId(wf.rng)
if wf.peers.len >= 1:
let peer = wf.peers[0].peerInfo
# @TODO: THERE SHOULD BE ERROR HANDLING HERE, WHAT IF A PEER IS GONE? WHAT IF THERE IS A TIMEOUT ETC.
let conn = await wf.switch.dial(peer.peerId, peer.addrs, WakuFilterCodec)
await conn.writeLP(FilterRPC(requestId: id, request: request).encode().buffer)
let peer = wf.peers[0].peerInfo # @TODO: select peer from manager rather than from local set
let connOpt = await wf.peerManager.dialPeer(peer, WakuFilterCodec)
if connOpt.isSome:
await connOpt.get().writeLP(FilterRPC(requestId: id, request: request).encode().buffer)
else:
# @TODO more sophisticated error handling here
error "failed to connect to remote peer"
waku_filter_errors.inc(labelValues = [dialFailure])

View File

@ -1,8 +1,9 @@
import
std/[tables],
bearssl,
libp2p/[switch, peerinfo],
libp2p/peerinfo,
libp2p/protocols/protocol,
../../node/peer_manager,
../waku_message
export waku_message
@ -45,7 +46,7 @@ type
WakuFilter* = ref object of LPProtocol
rng*: ref BrHmacDrbgContext
switch*: Switch
peerManager*: PeerManager
peers*: seq[FilterPeer]
subscribers*: seq[Subscriber]
pushHandler*: MessagePushHandler