diff --git a/examples/v2/chat2.nim b/examples/v2/chat2.nim index 5860500d9..ed83943ec 100644 --- a/examples/v2/chat2.nim +++ b/examples/v2/chat2.nim @@ -248,9 +248,11 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} = await node.start() if conf.filternode != "": - node.mountRelay(conf.topics.split(" "), rlnRelayEnabled = conf.rlnRelay, keepAlive = conf.keepAlive) + node.mountRelay(conf.topics.split(" "), rlnRelayEnabled = conf.rlnRelay) else: - node.mountRelay(@[], rlnRelayEnabled = conf.rlnRelay, keepAlive = conf.keepAlive) + node.mountRelay(@[], rlnRelayEnabled = conf.rlnRelay) + + node.mountKeepalive() let nick = await readNick(transp) echo "Welcome, " & nick & "!" @@ -377,6 +379,9 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} = await chat.readWriteLoop() + if conf.keepAlive: + node.startKeepalive() + runForever() #await allFuturesThrowing(libp2pFuts) diff --git a/tests/all_tests_v2.nim b/tests/all_tests_v2.nim index b1bf1d8c3..c2452e3ba 100644 --- a/tests/all_tests_v2.nim +++ b/tests/all_tests_v2.nim @@ -14,7 +14,8 @@ import ./v2/test_web3, # TODO remove it when rln-relay tests get finalized ./v2/test_waku_rln_relay, ./v2/test_waku_bridge, - ./v2/test_peer_storage + ./v2/test_peer_storage, + ./v2/test_waku_keepalive # TODO Only enable this once swap module is integrated more nicely as a dependency, i.e. as submodule with CI etc # For PoC execute it manually and run separate module here: https://github.com/vacp2p/swap-contracts-module diff --git a/tests/v2/test_waku_keepalive.nim b/tests/v2/test_waku_keepalive.nim new file mode 100644 index 000000000..4e2c7315c --- /dev/null +++ b/tests/v2/test_waku_keepalive.nim @@ -0,0 +1,52 @@ +{.used.} + +import + std/[options, tables, sets], + testutils/unittests, chronos, chronicles, + stew/shims/net as stewNet, + libp2p/switch, + libp2p/protobuf/minprotobuf, + libp2p/stream/[bufferstream, connection], + libp2p/crypto/crypto, + libp2p/multistream, + ../../waku/v2/node/wakunode2, + ../../waku/v2/protocol/waku_keepalive/waku_keepalive, + ../test_helpers, ./utils + +procSuite "Waku Keepalive": + + asyncTest "handle keepalive": + let + nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000)) + nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002)) + + await node1.start() + node1.mountRelay() + node1.mountKeepalive() + + await node2.start() + node2.mountRelay() + node2.mountKeepalive() + + await node1.connectToNodes(@[node2.peerInfo]) + + var completionFut = newFuture[bool]() + + proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = + debug "WakuKeepalive message received" + + check: + proto == waku_keepalive.WakuKeepaliveCodec + + completionFut.complete(true) + + node2.wakuKeepalive.handler = handle + + node1.startKeepalive() + + check: + (await completionFut.withTimeout(5.seconds)) == true + + await allFutures([node1.stop(), node2.stop()]) diff --git a/waku/v2/node/peer_manager/peer_manager.nim b/waku/v2/node/peer_manager/peer_manager.nim index c1140d529..c8d0df636 100644 --- a/waku/v2/node/peer_manager/peer_manager.nim +++ b/waku/v2/node/peer_manager/peer_manager.nim @@ -27,7 +27,7 @@ let # Helper functions # #################### -proc toPeerInfo(storedInfo: StoredInfo): PeerInfo = +proc toPeerInfo*(storedInfo: StoredInfo): PeerInfo = PeerInfo.init(peerId = storedInfo.peerId, addrs = toSeq(storedInfo.addrs), protocols = toSeq(storedInfo.protos)) diff --git a/waku/v2/node/wakunode2.nim b/waku/v2/node/wakunode2.nim index 97fd503fc..63ccbcc96 100644 --- a/waku/v2/node/wakunode2.nim +++ b/waku/v2/node/wakunode2.nim @@ -18,6 +18,7 @@ import ../protocol/waku_filter/waku_filter, ../protocol/waku_rln_relay/[rln,waku_rln_relay_utils], ../protocol/waku_lightpush/waku_lightpush, + ../protocol/waku_keepalive/waku_keepalive, ../utils/peers, ./storage/message/message_store, ./storage/peer/peer_storage, @@ -62,6 +63,7 @@ type wakuSwap*: WakuSwap wakuRlnRelay*: WakuRLNRelay wakuLightPush*: WakuLightPush + wakuKeepalive*: WakuKeepalive peerInfo*: PeerInfo libp2pTransportLoops*: seq[Future[void]] # TODO Revist messages field indexing as well as if this should be Message or WakuMessage @@ -456,7 +458,6 @@ proc addRLNRelayValidator*(node: WakuNode, pubsubTopic: string) = proc mountRelay*(node: WakuNode, topics: seq[string] = newSeq[string](), rlnRelayEnabled = false, - keepAlive = false, relayMessages = true, triggerSelf = true) {.gcsafe.} = let wakuRelay = WakuRelay.init( @@ -468,7 +469,7 @@ proc mountRelay*(node: WakuNode, verifySignature = false ) - info "mounting relay", rlnRelayEnabled=rlnRelayEnabled, keepAlive=keepAlive, relayMessages=relayMessages + info "mounting relay", rlnRelayEnabled=rlnRelayEnabled, relayMessages=relayMessages node.switch.mount(wakuRelay) @@ -482,7 +483,6 @@ proc mountRelay*(node: WakuNode, return node.wakuRelay = wakuRelay - wakuRelay.keepAlive = keepAlive node.subscribe(defaultTopic, none(TopicHandler)) @@ -522,6 +522,29 @@ proc mountLightPush*(node: WakuNode) = node.switch.mount(node.wakuLightPush) +proc mountKeepalive*(node: WakuNode) = + info "mounting keepalive" + + node.wakuKeepalive = WakuKeepalive.new(node.peerManager, node.rng) + + node.switch.mount(node.wakuKeepalive) + +proc keepaliveLoop(node: WakuNode, keepalive: chronos.Duration) {.async.} = + while node.started: + # Keep all managed peers alive when idle + trace "Running keepalive" + + await node.wakuKeepalive.keepAllAlive() + + await sleepAsync(keepalive) + +proc startKeepalive*(node: WakuNode) = + let defaultKeepalive = 5.minutes # 50% of the default chronosstream timeout duration + + info "starting keepalive", keepalive=defaultKeepalive + + asyncSpawn node.keepaliveLoop(defaultKeepalive) + ## Helpers proc dialPeer*(n: WakuNode, address: string) {.async.} = info "dialPeer", address = address @@ -704,14 +727,15 @@ when isMainModule: setStorePeer(node, conf.storenode) - # Relay setup mountRelay(node, conf.topics.split(" "), rlnRelayEnabled = conf.rlnRelay, - keepAlive = conf.keepAlive, relayMessages = conf.relay) # Indicates if node is capable to relay messages + # Keepalive mounted on all nodes + mountKeepalive(node) + # Resume historical messages, this has to be called after the relay setup if conf.store and conf.persistMessages: waitFor node.resume() @@ -762,5 +786,9 @@ when isMainModule: quit(QuitSuccess) c_signal(SIGTERM, handleSigterm) + + # Start keepalive, if enabled + if conf.keepAlive: + node.startKeepalive() runForever() diff --git a/waku/v2/protocol/waku_keepalive/waku_keepalive.nim b/waku/v2/protocol/waku_keepalive/waku_keepalive.nim new file mode 100644 index 000000000..e88d04af3 --- /dev/null +++ b/waku/v2/protocol/waku_keepalive/waku_keepalive.nim @@ -0,0 +1,85 @@ +import + std/[tables, sequtils, options], + bearssl, + chronos, chronicles, metrics, stew/results, + libp2p/protocols/pubsub/pubsubpeer, + libp2p/protocols/pubsub/floodsub, + libp2p/protocols/pubsub/gossipsub, + libp2p/protocols/protocol, + libp2p/protobuf/minprotobuf, + libp2p/stream/connection, + libp2p/crypto/crypto, + ../../utils/requests, + ../../node/peer_manager/peer_manager, + ../message_notifier, + ../waku_relay, + waku_keepalive_types + +export waku_keepalive_types + +declarePublicGauge waku_keepalive_count, "number of keepalives received" +declarePublicGauge waku_keepalive_errors, "number of keepalive protocol errors", ["type"] + +logScope: + topics = "wakukeepalive" + +const + WakuKeepaliveCodec* = "/vac/waku/keepalive/2.0.0-alpha1" + +# Error types (metric label values) +const + dialFailure = "dial_failure" + +# Encoding and decoding ------------------------------------------------------- +proc encode*(msg: KeepaliveMessage): ProtoBuffer = + var pb = initProtoBuffer() + + # @TODO: Currently no fields defined for a KeepaliveMessage + + return pb + +proc init*(T: type KeepaliveMessage, buffer: seq[byte]): ProtoResult[T] = + var msg = KeepaliveMessage() + let pb = initProtoBuffer(buffer) + + # @TODO: Currently no fields defined for a KeepaliveMessage + + ok(msg) + +# Protocol ------------------------------------------------------- +proc new*(T: type WakuKeepalive, peerManager: PeerManager, rng: ref BrHmacDrbgContext): T = + debug "new WakuKeepalive" + var wk: WakuKeepalive + new wk + + wk.rng = crypto.newRng() + wk.peerManager = peerManager + + wk.init() + + return wk + +method init*(wk: WakuKeepalive) = + debug "init WakuKeepalive" + + proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = + info "WakuKeepalive message received" + waku_keepalive_count.inc() + + wk.handler = handle + wk.codec = WakuKeepaliveCodec + +proc keepAllAlive*(wk: WakuKeepalive) {.async, gcsafe.} = + # Send keepalive message to all managed and connected peers + let peers = wk.peerManager.peers().filterIt(wk.peerManager.connectedness(it.peerId) == Connected).mapIt(it.toPeerInfo()) + + for peer in peers: + let connOpt = await wk.peerManager.dialPeer(peer, WakuKeepaliveCodec) + + if connOpt.isNone(): + # @TODO more sophisticated error handling here + error "failed to connect to remote peer" + waku_keepalive_errors.inc(labelValues = [dialFailure]) + return + + await connOpt.get().writeLP(KeepaliveMessage().encode().buffer) # Send keep-alive on connection diff --git a/waku/v2/protocol/waku_keepalive/waku_keepalive_types.nim b/waku/v2/protocol/waku_keepalive/waku_keepalive_types.nim new file mode 100644 index 000000000..8d6e9160a --- /dev/null +++ b/waku/v2/protocol/waku_keepalive/waku_keepalive_types.nim @@ -0,0 +1,12 @@ +import + bearssl, + libp2p/protocols/protocol, + ../../node/peer_manager/peer_manager + +type + KeepaliveMessage* = object + # Currently no fields for a keepalive message + + WakuKeepalive* = ref object of LPProtocol + rng*: ref BrHmacDrbgContext + peerManager*: PeerManager diff --git a/waku/v2/protocol/waku_relay.nim b/waku/v2/protocol/waku_relay.nim index f5d3ff256..283a30df6 100644 --- a/waku/v2/protocol/waku_relay.nim +++ b/waku/v2/protocol/waku_relay.nim @@ -15,30 +15,9 @@ logScope: const WakuRelayCodec* = "/vac/waku/relay/2.0.0-beta2" - DefaultKeepAlive = 5.minutes # 50% of the default chronosstream timeout duration type WakuRelay* = ref object of GossipSub - keepAlive*: bool - -proc keepAlive*(w: WakuRelay) {.async.} = - while w.keepAlive: - # Keep all mesh peers alive when idle - trace "Running keepalive" - - for topic in w.topics.keys: - trace "Keepalive on topic", topic=topic - let - # Mesh peers for topic - mpeers = toSeq(w.mesh.getOrDefault(topic)) - # Peers we're backing off from on topic - backoffPeers = w.backingOff.getOrDefault(topic) - # Only keep peers alive that we're not backing off from - keepAlivePeers = mpeers.filterIt(not backoffPeers.hasKey(it.peerId)) - - w.broadcast(keepAlivePeers, RPCMsg(control: some(ControlMessage(graft: @[ControlGraft(topicID: topic)])))) - - await sleepAsync(DefaultKeepAlive) method init*(w: WakuRelay) = debug "init" @@ -104,14 +83,8 @@ method unsubscribeAll*(w: WakuRelay, method start*(w: WakuRelay) {.async.} = debug "start" await procCall GossipSub(w).start() - - if w.keepAlive: - # Keep connection to mesh peers alive over periods of idleness - asyncSpawn keepAlive(w) method stop*(w: WakuRelay) {.async.} = debug "stop" - w.keepAlive = false - await procCall GossipSub(w).stop()