Fix keepalive for connected peers (#588)

* Fix keepalive for connected peers

* Remove comment
This commit is contained in:
Hanno Cornelius 2021-06-02 09:53:34 +02:00 committed by GitHub
parent ee757308c6
commit 2b571e205b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 192 additions and 36 deletions

View File

@ -248,9 +248,11 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} =
await node.start() await node.start()
if conf.filternode != "": if conf.filternode != "":
node.mountRelay(conf.topics.split(" "), rlnRelayEnabled = conf.rlnRelay, keepAlive = conf.keepAlive) node.mountRelay(conf.topics.split(" "), rlnRelayEnabled = conf.rlnRelay)
else: else:
node.mountRelay(@[], rlnRelayEnabled = conf.rlnRelay, keepAlive = conf.keepAlive) node.mountRelay(@[], rlnRelayEnabled = conf.rlnRelay)
node.mountKeepalive()
let nick = await readNick(transp) let nick = await readNick(transp)
echo "Welcome, " & nick & "!" echo "Welcome, " & nick & "!"
@ -377,6 +379,9 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} =
await chat.readWriteLoop() await chat.readWriteLoop()
if conf.keepAlive:
node.startKeepalive()
runForever() runForever()
#await allFuturesThrowing(libp2pFuts) #await allFuturesThrowing(libp2pFuts)

View File

@ -14,7 +14,8 @@ import
./v2/test_web3, # TODO remove it when rln-relay tests get finalized ./v2/test_web3, # TODO remove it when rln-relay tests get finalized
./v2/test_waku_rln_relay, ./v2/test_waku_rln_relay,
./v2/test_waku_bridge, ./v2/test_waku_bridge,
./v2/test_peer_storage ./v2/test_peer_storage,
./v2/test_waku_keepalive
# TODO Only enable this once swap module is integrated more nicely as a dependency, i.e. as submodule with CI etc # TODO Only enable this once swap module is integrated more nicely as a dependency, i.e. as submodule with CI etc
# For PoC execute it manually and run separate module here: https://github.com/vacp2p/swap-contracts-module # For PoC execute it manually and run separate module here: https://github.com/vacp2p/swap-contracts-module

View File

@ -0,0 +1,52 @@
{.used.}
import
std/[options, tables, sets],
testutils/unittests, chronos, chronicles,
stew/shims/net as stewNet,
libp2p/switch,
libp2p/protobuf/minprotobuf,
libp2p/stream/[bufferstream, connection],
libp2p/crypto/crypto,
libp2p/multistream,
../../waku/v2/node/wakunode2,
../../waku/v2/protocol/waku_keepalive/waku_keepalive,
../test_helpers, ./utils
procSuite "Waku Keepalive":
asyncTest "handle keepalive":
let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000))
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002))
await node1.start()
node1.mountRelay()
node1.mountKeepalive()
await node2.start()
node2.mountRelay()
node2.mountKeepalive()
await node1.connectToNodes(@[node2.peerInfo])
var completionFut = newFuture[bool]()
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
debug "WakuKeepalive message received"
check:
proto == waku_keepalive.WakuKeepaliveCodec
completionFut.complete(true)
node2.wakuKeepalive.handler = handle
node1.startKeepalive()
check:
(await completionFut.withTimeout(5.seconds)) == true
await allFutures([node1.stop(), node2.stop()])

View File

@ -27,7 +27,7 @@ let
# Helper functions # # Helper functions #
#################### ####################
proc toPeerInfo(storedInfo: StoredInfo): PeerInfo = proc toPeerInfo*(storedInfo: StoredInfo): PeerInfo =
PeerInfo.init(peerId = storedInfo.peerId, PeerInfo.init(peerId = storedInfo.peerId,
addrs = toSeq(storedInfo.addrs), addrs = toSeq(storedInfo.addrs),
protocols = toSeq(storedInfo.protos)) protocols = toSeq(storedInfo.protos))

View File

@ -18,6 +18,7 @@ import
../protocol/waku_filter/waku_filter, ../protocol/waku_filter/waku_filter,
../protocol/waku_rln_relay/[rln,waku_rln_relay_utils], ../protocol/waku_rln_relay/[rln,waku_rln_relay_utils],
../protocol/waku_lightpush/waku_lightpush, ../protocol/waku_lightpush/waku_lightpush,
../protocol/waku_keepalive/waku_keepalive,
../utils/peers, ../utils/peers,
./storage/message/message_store, ./storage/message/message_store,
./storage/peer/peer_storage, ./storage/peer/peer_storage,
@ -62,6 +63,7 @@ type
wakuSwap*: WakuSwap wakuSwap*: WakuSwap
wakuRlnRelay*: WakuRLNRelay wakuRlnRelay*: WakuRLNRelay
wakuLightPush*: WakuLightPush wakuLightPush*: WakuLightPush
wakuKeepalive*: WakuKeepalive
peerInfo*: PeerInfo peerInfo*: PeerInfo
libp2pTransportLoops*: seq[Future[void]] libp2pTransportLoops*: seq[Future[void]]
# TODO Revist messages field indexing as well as if this should be Message or WakuMessage # TODO Revist messages field indexing as well as if this should be Message or WakuMessage
@ -456,7 +458,6 @@ proc addRLNRelayValidator*(node: WakuNode, pubsubTopic: string) =
proc mountRelay*(node: WakuNode, proc mountRelay*(node: WakuNode,
topics: seq[string] = newSeq[string](), topics: seq[string] = newSeq[string](),
rlnRelayEnabled = false, rlnRelayEnabled = false,
keepAlive = false,
relayMessages = true, relayMessages = true,
triggerSelf = true) {.gcsafe.} = triggerSelf = true) {.gcsafe.} =
let wakuRelay = WakuRelay.init( let wakuRelay = WakuRelay.init(
@ -468,7 +469,7 @@ proc mountRelay*(node: WakuNode,
verifySignature = false verifySignature = false
) )
info "mounting relay", rlnRelayEnabled=rlnRelayEnabled, keepAlive=keepAlive, relayMessages=relayMessages info "mounting relay", rlnRelayEnabled=rlnRelayEnabled, relayMessages=relayMessages
node.switch.mount(wakuRelay) node.switch.mount(wakuRelay)
@ -482,7 +483,6 @@ proc mountRelay*(node: WakuNode,
return return
node.wakuRelay = wakuRelay node.wakuRelay = wakuRelay
wakuRelay.keepAlive = keepAlive
node.subscribe(defaultTopic, none(TopicHandler)) node.subscribe(defaultTopic, none(TopicHandler))
@ -522,6 +522,29 @@ proc mountLightPush*(node: WakuNode) =
node.switch.mount(node.wakuLightPush) node.switch.mount(node.wakuLightPush)
proc mountKeepalive*(node: WakuNode) =
info "mounting keepalive"
node.wakuKeepalive = WakuKeepalive.new(node.peerManager, node.rng)
node.switch.mount(node.wakuKeepalive)
proc keepaliveLoop(node: WakuNode, keepalive: chronos.Duration) {.async.} =
while node.started:
# Keep all managed peers alive when idle
trace "Running keepalive"
await node.wakuKeepalive.keepAllAlive()
await sleepAsync(keepalive)
proc startKeepalive*(node: WakuNode) =
let defaultKeepalive = 5.minutes # 50% of the default chronosstream timeout duration
info "starting keepalive", keepalive=defaultKeepalive
asyncSpawn node.keepaliveLoop(defaultKeepalive)
## Helpers ## Helpers
proc dialPeer*(n: WakuNode, address: string) {.async.} = proc dialPeer*(n: WakuNode, address: string) {.async.} =
info "dialPeer", address = address info "dialPeer", address = address
@ -704,14 +727,15 @@ when isMainModule:
setStorePeer(node, conf.storenode) setStorePeer(node, conf.storenode)
# Relay setup # Relay setup
mountRelay(node, mountRelay(node,
conf.topics.split(" "), conf.topics.split(" "),
rlnRelayEnabled = conf.rlnRelay, rlnRelayEnabled = conf.rlnRelay,
keepAlive = conf.keepAlive,
relayMessages = conf.relay) # Indicates if node is capable to relay messages relayMessages = conf.relay) # Indicates if node is capable to relay messages
# Keepalive mounted on all nodes
mountKeepalive(node)
# Resume historical messages, this has to be called after the relay setup # Resume historical messages, this has to be called after the relay setup
if conf.store and conf.persistMessages: if conf.store and conf.persistMessages:
waitFor node.resume() waitFor node.resume()
@ -763,4 +787,8 @@ when isMainModule:
c_signal(SIGTERM, handleSigterm) c_signal(SIGTERM, handleSigterm)
# Start keepalive, if enabled
if conf.keepAlive:
node.startKeepalive()
runForever() runForever()

View File

@ -0,0 +1,85 @@
import
std/[tables, sequtils, options],
bearssl,
chronos, chronicles, metrics, stew/results,
libp2p/protocols/pubsub/pubsubpeer,
libp2p/protocols/pubsub/floodsub,
libp2p/protocols/pubsub/gossipsub,
libp2p/protocols/protocol,
libp2p/protobuf/minprotobuf,
libp2p/stream/connection,
libp2p/crypto/crypto,
../../utils/requests,
../../node/peer_manager/peer_manager,
../message_notifier,
../waku_relay,
waku_keepalive_types
export waku_keepalive_types
declarePublicGauge waku_keepalive_count, "number of keepalives received"
declarePublicGauge waku_keepalive_errors, "number of keepalive protocol errors", ["type"]
logScope:
topics = "wakukeepalive"
const
WakuKeepaliveCodec* = "/vac/waku/keepalive/2.0.0-alpha1"
# Error types (metric label values)
const
dialFailure = "dial_failure"
# Encoding and decoding -------------------------------------------------------
proc encode*(msg: KeepaliveMessage): ProtoBuffer =
var pb = initProtoBuffer()
# @TODO: Currently no fields defined for a KeepaliveMessage
return pb
proc init*(T: type KeepaliveMessage, buffer: seq[byte]): ProtoResult[T] =
var msg = KeepaliveMessage()
let pb = initProtoBuffer(buffer)
# @TODO: Currently no fields defined for a KeepaliveMessage
ok(msg)
# Protocol -------------------------------------------------------
proc new*(T: type WakuKeepalive, peerManager: PeerManager, rng: ref BrHmacDrbgContext): T =
debug "new WakuKeepalive"
var wk: WakuKeepalive
new wk
wk.rng = crypto.newRng()
wk.peerManager = peerManager
wk.init()
return wk
method init*(wk: WakuKeepalive) =
debug "init WakuKeepalive"
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
info "WakuKeepalive message received"
waku_keepalive_count.inc()
wk.handler = handle
wk.codec = WakuKeepaliveCodec
proc keepAllAlive*(wk: WakuKeepalive) {.async, gcsafe.} =
# Send keepalive message to all managed and connected peers
let peers = wk.peerManager.peers().filterIt(wk.peerManager.connectedness(it.peerId) == Connected).mapIt(it.toPeerInfo())
for peer in peers:
let connOpt = await wk.peerManager.dialPeer(peer, WakuKeepaliveCodec)
if connOpt.isNone():
# @TODO more sophisticated error handling here
error "failed to connect to remote peer"
waku_keepalive_errors.inc(labelValues = [dialFailure])
return
await connOpt.get().writeLP(KeepaliveMessage().encode().buffer) # Send keep-alive on connection

View File

@ -0,0 +1,12 @@
import
bearssl,
libp2p/protocols/protocol,
../../node/peer_manager/peer_manager
type
KeepaliveMessage* = object
# Currently no fields for a keepalive message
WakuKeepalive* = ref object of LPProtocol
rng*: ref BrHmacDrbgContext
peerManager*: PeerManager

View File

@ -15,30 +15,9 @@ logScope:
const const
WakuRelayCodec* = "/vac/waku/relay/2.0.0-beta2" WakuRelayCodec* = "/vac/waku/relay/2.0.0-beta2"
DefaultKeepAlive = 5.minutes # 50% of the default chronosstream timeout duration
type type
WakuRelay* = ref object of GossipSub WakuRelay* = ref object of GossipSub
keepAlive*: bool
proc keepAlive*(w: WakuRelay) {.async.} =
while w.keepAlive:
# Keep all mesh peers alive when idle
trace "Running keepalive"
for topic in w.topics.keys:
trace "Keepalive on topic", topic=topic
let
# Mesh peers for topic
mpeers = toSeq(w.mesh.getOrDefault(topic))
# Peers we're backing off from on topic
backoffPeers = w.backingOff.getOrDefault(topic)
# Only keep peers alive that we're not backing off from
keepAlivePeers = mpeers.filterIt(not backoffPeers.hasKey(it.peerId))
w.broadcast(keepAlivePeers, RPCMsg(control: some(ControlMessage(graft: @[ControlGraft(topicID: topic)]))))
await sleepAsync(DefaultKeepAlive)
method init*(w: WakuRelay) = method init*(w: WakuRelay) =
debug "init" debug "init"
@ -105,13 +84,7 @@ method start*(w: WakuRelay) {.async.} =
debug "start" debug "start"
await procCall GossipSub(w).start() await procCall GossipSub(w).start()
if w.keepAlive:
# Keep connection to mesh peers alive over periods of idleness
asyncSpawn keepAlive(w)
method stop*(w: WakuRelay) {.async.} = method stop*(w: WakuRelay) {.async.} =
debug "stop" debug "stop"
w.keepAlive = false
await procCall GossipSub(w).stop() await procCall GossipSub(w).stop()