From f457db9e0a09232b9331d5fd0951816dca401ed7 Mon Sep 17 00:00:00 2001 From: Hanno Cornelius <68783915+jm-clius@users.noreply.github.com> Date: Fri, 30 Apr 2021 14:07:46 +0200 Subject: [PATCH] Add optional keepAlive for relay peers (#523) --- waku/v2/node/config.nim | 5 +++++ waku/v2/node/wakunode2.nim | 6 ++++-- waku/v2/protocol/waku_relay.nim | 26 +++++++++++++++++++++++++- 3 files changed, 34 insertions(+), 3 deletions(-) diff --git a/waku/v2/node/config.nim b/waku/v2/node/config.nim index 29eff6d43..548ac77df 100644 --- a/waku/v2/node/config.nim +++ b/waku/v2/node/config.nim @@ -72,6 +72,11 @@ type desc: "Enable spam protection through rln-relay: true|false", defaultValue: false name: "rlnrelay" }: bool + + keepAlive* {. + desc: "Enable keep-alive for idle connections: true|false", + defaultValue: false + name: "keep-alive" }: bool swap* {. desc: "Enable swap protocol: true|false", diff --git a/waku/v2/node/wakunode2.nim b/waku/v2/node/wakunode2.nim index 4677b192b..9bab38f35 100644 --- a/waku/v2/node/wakunode2.nim +++ b/waku/v2/node/wakunode2.nim @@ -454,7 +454,7 @@ proc addRLNRelayValidator*(node: WakuNode, pubsubTopic: string) = let pb = PubSub(node.wakuRelay) pb.addValidator(pubsubTopic, validator) -proc mountRelay*(node: WakuNode, topics: seq[string] = newSeq[string](), rlnRelayEnabled = false) {.gcsafe.} = +proc mountRelay*(node: WakuNode, topics: seq[string] = newSeq[string](), rlnRelayEnabled = false, keepAlive = false) {.gcsafe.} = let wakuRelay = WakuRelay.init( switch = node.switch, # Use default @@ -464,6 +464,8 @@ proc mountRelay*(node: WakuNode, topics: seq[string] = newSeq[string](), rlnRela verifySignature = false ) + wakuRelay.keepAlive = keepAlive + node.wakuRelay = wakuRelay node.switch.mount(wakuRelay) @@ -691,7 +693,7 @@ when isMainModule: # Relay setup if conf.relay: # True by default - mountRelay(node, conf.topics.split(" "), rlnRelayEnabled = conf.rlnrelay) + mountRelay(node, conf.topics.split(" "), rlnRelayEnabled = conf.rlnrelay, keepAlive = conf.keepAlive) if conf.staticnodes.len > 0: waitFor connectToNodes(node, conf.staticnodes) diff --git a/waku/v2/protocol/waku_relay.nim b/waku/v2/protocol/waku_relay.nim index 55bba1f17..650e57499 100644 --- a/waku/v2/protocol/waku_relay.nim +++ b/waku/v2/protocol/waku_relay.nim @@ -4,6 +4,7 @@ ## for spec. import + std/[tables, sequtils, sets], chronos, chronicles, metrics, libp2p/protocols/pubsub/[pubsub, gossipsub], libp2p/protocols/pubsub/rpc/messages, @@ -12,10 +13,26 @@ import logScope: topics = "wakurelay" -const WakuRelayCodec* = "/vac/waku/relay/2.0.0-beta2" +const + WakuRelayCodec* = "/vac/waku/relay/2.0.0-beta2" + DefaultKeepAlive = 5.minutes # 50% of the default chronosstream timeout duration type WakuRelay* = ref object of GossipSub + keepAlive*: bool + +proc keepAlive*(w: WakuRelay) {.async.} = + while w.keepAlive: + # Keep all mesh peers alive when idle + trace "Running keepalive" + + for topic in w.topics.keys: + trace "Keepalive on topic", topic=topic + let mpeers = w.mesh.getOrDefault(topic) + + w.broadcast(toSeq(mpeers), RPCMsg(control: some(ControlMessage(graft: @[ControlGraft(topicID: topic)])))) + + await sleepAsync(DefaultKeepAlive) method init*(w: WakuRelay) = debug "init" @@ -81,7 +98,14 @@ method unsubscribeAll*(w: WakuRelay, method start*(w: WakuRelay) {.async.} = debug "start" await procCall GossipSub(w).start() + + if w.keepAlive: + # Keep connection to mesh peers alive over periods of idleness + asyncSpawn keepAlive(w) method stop*(w: WakuRelay) {.async.} = debug "stop" + + w.keepAlive = false + await procCall GossipSub(w).stop()