diff --git a/CHANGELOG.md b/CHANGELOG.md index a4450e62d..6cc9008b5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,8 @@ - Refactor: Split out `waku_types` types into right place; create utils folder. - Docs: Add information on how to query Status test fleet for node addresses; how to view logs and how to update submodules. +- PubSub topic `subscribe` and `unsubscribe` no longer returns a future (removed `async` designation) +- Added a peer manager for `relay` and `filter` peers. ## 2021-01-05 v0.2 diff --git a/tests/v2/test_waku_filter.nim b/tests/v2/test_waku_filter.nim index dfab9838e..68aaf4f90 100644 --- a/tests/v2/test_waku_filter.nim +++ b/tests/v2/test_waku_filter.nim @@ -8,7 +8,8 @@ import libp2p/stream/[bufferstream, connection], libp2p/crypto/crypto, libp2p/multistream, - ../../waku/v2/protocol/[message_notifier], + ../../waku/v2/node/peer_manager, + ../../waku/v2/protocol/message_notifier, ../../waku/v2/protocol/waku_filter/waku_filter, ../test_helpers, ./utils @@ -37,7 +38,7 @@ procSuite "Waku Filter": responseRequestIdFuture.complete(requestId) let - proto = WakuFilter.init(dialSwitch, crypto.newRng(), handle) + proto = WakuFilter.init(PeerManager.new(dialSwitch), crypto.newRng(), handle) rpc = FilterRequest(contentFilters: @[ContentFilter(topics: @[contentTopic])], topic: defaultTopic, subscribe: true) dialSwitch.mount(proto) @@ -47,14 +48,14 @@ procSuite "Waku Filter": discard let - proto2 = WakuFilter.init(listenSwitch, crypto.newRng(), emptyHandle) + proto2 = WakuFilter.init(PeerManager.new(listenSwitch), crypto.newRng(), emptyHandle) subscription = proto2.subscription() var subscriptions = newTable[string, MessageNotificationSubscription]() subscriptions["test"] = subscription listenSwitch.mount(proto2) - let id = await proto.subscribe(rpc) + let id = (await proto.subscribe(rpc)).get() await sleepAsync(2.seconds) @@ -86,7 +87,7 @@ procSuite "Waku Filter": responseCompletionFuture.complete(true) let - proto = WakuFilter.init(dialSwitch, crypto.newRng(), handle) + proto = WakuFilter.init(PeerManager.new(dialSwitch), crypto.newRng(), handle) rpc = FilterRequest(contentFilters: @[ContentFilter(topics: @[contentTopic])], topic: defaultTopic, subscribe: true) dialSwitch.mount(proto) @@ -96,14 +97,14 @@ procSuite "Waku Filter": discard let - proto2 = WakuFilter.init(listenSwitch, crypto.newRng(), emptyHandle) + proto2 = WakuFilter.init(PeerManager.new(listenSwitch), crypto.newRng(), emptyHandle) subscription = proto2.subscription() var subscriptions = newTable[string, MessageNotificationSubscription]() subscriptions["test"] = subscription listenSwitch.mount(proto2) - let id = await proto.subscribe(rpc) + let id = (await proto.subscribe(rpc)).get() await sleepAsync(2.seconds) @@ -128,3 +129,27 @@ procSuite "Waku Filter": check: # Check that unsubscribe works as expected (await responseCompletionFuture.withTimeout(5.seconds)) == false + + asyncTest "handle filter subscribe failures": + const defaultTopic = "/waku/2/default-waku/proto" + + let + contentTopic = ContentTopic(1) + + var dialSwitch = newStandardSwitch() + discard await dialSwitch.start() + + var responseRequestIdFuture = newFuture[string]() + proc handle(requestId: string, msg: MessagePush) {.gcsafe, closure.} = + discard + + let + proto = WakuFilter.init(PeerManager.new(dialSwitch), crypto.newRng(), handle) + rpc = FilterRequest(contentFilters: @[ContentFilter(topics: @[contentTopic])], topic: defaultTopic, subscribe: true) + + dialSwitch.mount(proto) + + let idOpt = (await proto.subscribe(rpc)) + + check: + idOpt.isNone diff --git a/waku/v2/node/peer_manager.nim b/waku/v2/node/peer_manager.nim index c77ef2982..9cf785c03 100644 --- a/waku/v2/node/peer_manager.nim +++ b/waku/v2/node/peer_manager.nim @@ -1,7 +1,7 @@ {.push raises: [Defect, Exception].} import - std/options, + std/[options, sets], chronos, chronicles, metrics, libp2p/standard_setup, libp2p/peerstore @@ -26,16 +26,18 @@ proc new*(T: type PeerManager, switch: Switch): PeerManager = peerStore: PeerStore.new()) #################### -# Dialer interface # +# Helper functions # #################### -proc dialPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string, dialTimeout = defaultDialTimeout): Future[Option[Connection]] {.async.} = - # Dial a given peer and add it to the list of known peers - # @TODO check peer validity, duplicates and score before continuing. Limit number of peers to be managed. - - # First add dialed peer info to peer store... +proc hasPeer(pm: PeerManager, peerInfo: PeerInfo, proto: string): bool = + # Returns `true` if peer is included in manager for the specified protocol - debug "Adding dialed peer to manager", peerId = peerInfo.peerId, addr = peerInfo.addrs[0], proto = proto + pm.peerStore.get(peerInfo.peerId).protos.contains(proto) + +proc addPeer(pm: PeerManager, peerInfo: PeerInfo, proto: string) = + # Adds peer to manager for the specified protocol + + debug "Adding peer to manager", peerId = peerInfo.peerId, addr = peerInfo.addrs[0], proto = proto # ...known addresses for multiaddr in peerInfo.addrs: @@ -50,6 +52,20 @@ proc dialPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string, dialTimeout = # ...associated protocols pm.peerStore.protoBook.add(peerInfo.peerId, proto) + +#################### +# Dialer interface # +#################### + +proc dialPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string, dialTimeout = defaultDialTimeout): Future[Option[Connection]] {.async.} = + # Dial a given peer and add it to the list of known peers + # @TODO check peer validity and score before continuing. Limit number of peers to be managed. + + # First add dialed peer info to peer store, if it does not exist yet... + if not pm.hasPeer(peerInfo, proto): + trace "Adding newly dialed peer to manager", peerId = peerInfo.peerId, addr = peerInfo.addrs[0], proto = proto + pm.addPeer(peerInfo, proto) + info "Dialing peer from manager", wireAddr = peerInfo.addrs[0], peerId = peerInfo.peerId # Dial Peer diff --git a/waku/v2/node/wakunode2.nim b/waku/v2/node/wakunode2.nim index 9485c7a00..7c1e5fd8e 100644 --- a/waku/v2/node/wakunode2.nim +++ b/waku/v2/node/wakunode2.nim @@ -182,11 +182,20 @@ proc subscribe*(node: WakuNode, request: FilterRequest, handler: ContentFilterHa info "subscribe content", filter=request var id = generateRequestId(node.rng) - if node.wakuFilter.isNil == false: - # @TODO: ERROR HANDLING - id = await node.wakuFilter.subscribe(request) - node.filters[id] = Filter(contentFilters: request.contentFilters, handler: handler) + if node.wakuFilter.isNil == false: + let idOpt = await node.wakuFilter.subscribe(request) + + if idOpt.isSome(): + # Subscribed successfully. + id = idOpt.get() + else: + # Failed to subscribe + error "remote subscription to filter failed", filter = request + waku_node_errors.inc(labelValues = ["subscribe_filter_failure"]) + + # Register handler for filter, whether remote subscription succeeded or not + node.filters[id] = Filter(contentFilters: request.contentFilters, handler: handler) waku_node_filters.set(node.filters.len.int64) proc unsubscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) = @@ -275,7 +284,7 @@ proc mountFilter*(node: WakuNode) = node.filters.notify(message, requestId) waku_node_messages.inc(labelValues = ["filter"]) - node.wakuFilter = WakuFilter.init(node.switch, node.rng, filterHandler) + node.wakuFilter = WakuFilter.init(node.peerManager, node.rng, filterHandler) node.switch.mount(node.wakuFilter) node.subscriptions.subscribe(WakuFilterCodec, node.wakuFilter.subscription()) diff --git a/waku/v2/protocol/waku_filter/waku_filter.nim b/waku/v2/protocol/waku_filter/waku_filter.nim index 24200cfd9..5433e6d04 100644 --- a/waku/v2/protocol/waku_filter/waku_filter.nim +++ b/waku/v2/protocol/waku_filter/waku_filter.nim @@ -1,5 +1,5 @@ import - std/[tables, sequtils], + std/[tables, sequtils, options], bearssl, chronos, chronicles, metrics, stew/results, libp2p/protocols/pubsub/pubsubpeer, @@ -9,10 +9,10 @@ import libp2p/protobuf/minprotobuf, libp2p/stream/connection, libp2p/crypto/crypto, - libp2p/switch, ../message_notifier, waku_filter_types, - ../../utils/requests + ../../utils/requests, + ../../node/peer_manager # NOTE This is just a start, the design of this protocol isn't done yet. It # should be direct payload exchange (a la req-resp), not be coupled with the @@ -30,6 +30,12 @@ logScope: const WakuFilterCodec* = "/vac/waku/filter/2.0.0-beta1" + +# Error types (metric label values) +const + dialFailure = "dial_failure" + decodeRpcFailure = "decode_rpc_failure" + proc notify*(filters: Filters, msg: WakuMessage, requestId: string = "") = for key in filters.keys: let filter = filters[key] @@ -166,7 +172,7 @@ method init*(wf: WakuFilter) = var res = FilterRPC.init(message) if res.isErr: error "failed to decode rpc" - waku_filter_errors.inc(labelValues = ["decode_rpc_failure"]) + waku_filter_errors.inc(labelValues = [decodeRpcFailure]) return info "filter message received" @@ -185,10 +191,10 @@ method init*(wf: WakuFilter) = wf.handler = handle wf.codec = WakuFilterCodec -proc init*(T: type WakuFilter, switch: Switch, rng: ref BrHmacDrbgContext, handler: MessagePushHandler): T = +proc init*(T: type WakuFilter, peerManager: PeerManager, rng: ref BrHmacDrbgContext, handler: MessagePushHandler): T = new result result.rng = crypto.newRng() - result.switch = switch + result.peerManager = peerManager result.pushHandler = handler result.init() @@ -208,26 +214,47 @@ proc subscription*(proto: WakuFilter): MessageNotificationSubscription = for filter in subscriber.filter.contentFilters: if msg.contentTopic in filter.topics: let push = FilterRPC(requestId: subscriber.requestId, push: MessagePush(messages: @[msg])) - let conn = await proto.switch.dial(subscriber.peer.peerId, subscriber.peer.addrs, WakuFilterCodec) - await conn.writeLP(push.encode().buffer) + + let connOpt = await proto.peerManager.dialPeer(subscriber.peer, WakuFilterCodec) + + if connOpt.isSome: + await connOpt.get().writeLP(push.encode().buffer) + else: + # @TODO more sophisticated error handling here + error "failed to push messages to remote peer" + waku_filter_errors.inc(labelValues = [dialFailure]) break MessageNotificationSubscription.init(@[], handle) -proc subscribe*(wf: WakuFilter, request: FilterRequest): Future[string] {.async, gcsafe.} = - let id = generateRequestId(wf.rng) +proc subscribe*(wf: WakuFilter, request: FilterRequest): Future[Option[string]] {.async, gcsafe.} = if wf.peers.len >= 1: - let peer = wf.peers[0].peerInfo - # @TODO: THERE SHOULD BE ERROR HANDLING HERE, WHAT IF A PEER IS GONE? WHAT IF THERE IS A TIMEOUT ETC. - let conn = await wf.switch.dial(peer.peerId, peer.addrs, WakuFilterCodec) - await conn.writeLP(FilterRPC(requestId: id, request: request).encode().buffer) - result = id + let peer = wf.peers[0].peerInfo # @TODO: select peer from manager rather than from local set + + let connOpt = await wf.peerManager.dialPeer(peer, WakuFilterCodec) + + if connOpt.isSome: + # This is the only successful path to subscription + let id = generateRequestId(wf.rng) + await connOpt.get().writeLP(FilterRPC(requestId: id, request: request).encode().buffer) + return some(id) + else: + # @TODO more sophisticated error handling here + error "failed to connect to remote peer" + waku_filter_errors.inc(labelValues = [dialFailure]) + return none(string) proc unsubscribe*(wf: WakuFilter, request: FilterRequest) {.async, gcsafe.} = # @TODO: NO REAL REASON TO GENERATE REQUEST ID FOR UNSUBSCRIBE OTHER THAN CREATING SANE-LOOKING RPC. let id = generateRequestId(wf.rng) if wf.peers.len >= 1: - let peer = wf.peers[0].peerInfo - # @TODO: THERE SHOULD BE ERROR HANDLING HERE, WHAT IF A PEER IS GONE? WHAT IF THERE IS A TIMEOUT ETC. - let conn = await wf.switch.dial(peer.peerId, peer.addrs, WakuFilterCodec) - await conn.writeLP(FilterRPC(requestId: id, request: request).encode().buffer) + let peer = wf.peers[0].peerInfo # @TODO: select peer from manager rather than from local set + + let connOpt = await wf.peerManager.dialPeer(peer, WakuFilterCodec) + + if connOpt.isSome: + await connOpt.get().writeLP(FilterRPC(requestId: id, request: request).encode().buffer) + else: + # @TODO more sophisticated error handling here + error "failed to connect to remote peer" + waku_filter_errors.inc(labelValues = [dialFailure]) diff --git a/waku/v2/protocol/waku_filter/waku_filter_types.nim b/waku/v2/protocol/waku_filter/waku_filter_types.nim index d22064c84..b4fb25c9a 100644 --- a/waku/v2/protocol/waku_filter/waku_filter_types.nim +++ b/waku/v2/protocol/waku_filter/waku_filter_types.nim @@ -1,8 +1,9 @@ import std/[tables], bearssl, - libp2p/[switch, peerinfo], + libp2p/peerinfo, libp2p/protocols/protocol, + ../../node/peer_manager, ../waku_message export waku_message @@ -45,7 +46,7 @@ type WakuFilter* = ref object of LPProtocol rng*: ref BrHmacDrbgContext - switch*: Switch + peerManager*: PeerManager peers*: seq[FilterPeer] subscribers*: seq[Subscriber] pushHandler*: MessagePushHandler