mirror of https://github.com/waku-org/nwaku.git
Add ping protocol support and use for keep-alive (#621)
* Add ping protocol support and use for keep-alive * Separate WakuPeerStore from libp2p PeerStore * Revert silly test timeout
This commit is contained in:
parent
e2ea94dc71
commit
a044c6a82c
|
@ -12,7 +12,7 @@
|
||||||
path = vendor/nim-libp2p
|
path = vendor/nim-libp2p
|
||||||
url = https://github.com/status-im/nim-libp2p.git
|
url = https://github.com/status-im/nim-libp2p.git
|
||||||
ignore = dirty
|
ignore = dirty
|
||||||
branch = master
|
branch = unstable
|
||||||
[submodule "vendor/nim-stew"]
|
[submodule "vendor/nim-stew"]
|
||||||
path = vendor/nim-stew
|
path = vendor/nim-stew
|
||||||
url = https://github.com/status-im/nim-stew.git
|
url = https://github.com/status-im/nim-stew.git
|
||||||
|
|
|
@ -304,7 +304,7 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} =
|
||||||
rlnRelayEnabled = conf.rlnRelay,
|
rlnRelayEnabled = conf.rlnRelay,
|
||||||
relayMessages = conf.relay) # Indicates if node is capable to relay messages
|
relayMessages = conf.relay) # Indicates if node is capable to relay messages
|
||||||
|
|
||||||
node.mountKeepalive()
|
node.mountLibp2pPing()
|
||||||
|
|
||||||
let nick = await readNick(transp)
|
let nick = await readNick(transp)
|
||||||
echo "Welcome, " & nick & "!"
|
echo "Welcome, " & nick & "!"
|
||||||
|
|
|
@ -253,7 +253,7 @@ when isMainModule:
|
||||||
|
|
||||||
# Now load rest of config
|
# Now load rest of config
|
||||||
# Mount configured Waku v2 protocols
|
# Mount configured Waku v2 protocols
|
||||||
mountKeepalive(bridge.nodev2)
|
mountLibp2pPing(bridge.nodev2)
|
||||||
|
|
||||||
if conf.store:
|
if conf.store:
|
||||||
mountStore(bridge.nodev2)
|
mountStore(bridge.nodev2)
|
||||||
|
|
|
@ -6,44 +6,42 @@ import
|
||||||
stew/shims/net as stewNet,
|
stew/shims/net as stewNet,
|
||||||
libp2p/switch,
|
libp2p/switch,
|
||||||
libp2p/protobuf/minprotobuf,
|
libp2p/protobuf/minprotobuf,
|
||||||
|
libp2p/protocols/ping,
|
||||||
libp2p/stream/[bufferstream, connection],
|
libp2p/stream/[bufferstream, connection],
|
||||||
libp2p/crypto/crypto,
|
libp2p/crypto/crypto,
|
||||||
libp2p/multistream,
|
libp2p/multistream,
|
||||||
../../waku/v2/node/wakunode2,
|
../../waku/v2/node/wakunode2,
|
||||||
../../waku/v2/protocol/waku_keepalive/waku_keepalive,
|
|
||||||
../test_helpers, ./utils
|
../test_helpers, ./utils
|
||||||
|
|
||||||
procSuite "Waku Keepalive":
|
procSuite "Waku Keepalive":
|
||||||
|
|
||||||
asyncTest "handle keepalive":
|
asyncTest "handle ping keepalives":
|
||||||
let
|
let
|
||||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||||
node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000))
|
node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000))
|
||||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||||
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002))
|
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002))
|
||||||
|
|
||||||
|
var completionFut = newFuture[bool]()
|
||||||
|
|
||||||
|
proc pingHandler(peer: PeerInfo) {.async, gcsafe, raises: [Defect].} =
|
||||||
|
debug "Ping received"
|
||||||
|
|
||||||
|
check:
|
||||||
|
peer.peerId == node1.switch.peerInfo.peerId
|
||||||
|
|
||||||
|
completionFut.complete(true)
|
||||||
|
|
||||||
await node1.start()
|
await node1.start()
|
||||||
node1.mountRelay()
|
node1.mountRelay()
|
||||||
node1.mountKeepalive()
|
node1.mountLibp2pPing()
|
||||||
|
|
||||||
await node2.start()
|
await node2.start()
|
||||||
node2.mountRelay()
|
node2.mountRelay()
|
||||||
node2.mountKeepalive()
|
node2.switch.mount(Ping.new(handler = pingHandler))
|
||||||
|
|
||||||
await node1.connectToNodes(@[node2.peerInfo])
|
await node1.connectToNodes(@[node2.peerInfo])
|
||||||
|
|
||||||
var completionFut = newFuture[bool]()
|
|
||||||
|
|
||||||
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
|
|
||||||
debug "WakuKeepalive message received"
|
|
||||||
|
|
||||||
check:
|
|
||||||
proto == waku_keepalive.WakuKeepaliveCodec
|
|
||||||
|
|
||||||
completionFut.complete(true)
|
|
||||||
|
|
||||||
node2.wakuKeepalive.handler = handle
|
|
||||||
|
|
||||||
node1.startKeepalive()
|
node1.startKeepalive()
|
||||||
|
|
||||||
check:
|
check:
|
||||||
|
|
|
@ -1 +1 @@
|
||||||
Subproject commit 252e5d0d502c51c7bb20eeebb7a3129220b12ff8
|
Subproject commit f7dd6b76c2cbf58a8ed755251086067cb67f420e
|
|
@ -1 +1 @@
|
||||||
Subproject commit 0ad571ab27c46a32256c8568a32ef1d6ac34b733
|
Subproject commit 601fa7ff667431b05d18579af0e43bf4d8dafa61
|
|
@ -1 +1 @@
|
||||||
Subproject commit 3da656687be63ccbf5d659af55d159130d325038
|
Subproject commit bd2e9a04622d4dece2a7e23552050d6c6261f92d
|
|
@ -255,7 +255,7 @@ when isMainModule:
|
||||||
elif conf.fleetV1 == test: connectToNodes(bridge.nodev1, WhisperNodesTest)
|
elif conf.fleetV1 == test: connectToNodes(bridge.nodev1, WhisperNodesTest)
|
||||||
|
|
||||||
# Mount configured Waku v2 protocols
|
# Mount configured Waku v2 protocols
|
||||||
mountKeepalive(bridge.nodev2)
|
mountLibp2pPing(bridge.nodev2)
|
||||||
|
|
||||||
if conf.store:
|
if conf.store:
|
||||||
mountStore(bridge.nodev2, persistMessages = false) # Bridge does not persist messages
|
mountStore(bridge.nodev2, persistMessages = false) # Bridge does not persist messages
|
||||||
|
|
|
@ -117,8 +117,8 @@ proc new*(T: type PeerManager, switch: Switch, storage: PeerStorage = nil): Peer
|
||||||
peerStore: WakuPeerStore.new(),
|
peerStore: WakuPeerStore.new(),
|
||||||
storage: storage)
|
storage: storage)
|
||||||
|
|
||||||
proc peerHook(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.} =
|
proc peerHook(peerInfo: PeerInfo, event: ConnEvent): Future[void] {.gcsafe.} =
|
||||||
onConnEvent(pm, peerId, event)
|
onConnEvent(pm, peerInfo.peerId, event)
|
||||||
|
|
||||||
pm.switch.addConnEventHandler(peerHook, ConnEventKind.Connected)
|
pm.switch.addConnEventHandler(peerHook, ConnEventKind.Connected)
|
||||||
pm.switch.addConnEventHandler(peerHook, ConnEventKind.Disconnected)
|
pm.switch.addConnEventHandler(peerHook, ConnEventKind.Disconnected)
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
{.push raises: [Defect].}
|
{.push raises: [Defect].}
|
||||||
|
|
||||||
import
|
import
|
||||||
|
std/[tables, sequtils, sets],
|
||||||
libp2p/builders,
|
libp2p/builders,
|
||||||
libp2p/peerstore
|
libp2p/peerstore
|
||||||
|
|
||||||
|
@ -21,11 +22,38 @@ type
|
||||||
|
|
||||||
DisconnectBook* = object of PeerBook[int64] # Keeps track of when peers were disconnected in Unix timestamps
|
DisconnectBook* = object of PeerBook[int64] # Keeps track of when peers were disconnected in Unix timestamps
|
||||||
|
|
||||||
WakuPeerStore* = ref object of PeerStore
|
WakuPeerStore* = ref object
|
||||||
|
addressBook*: AddressBook
|
||||||
|
protoBook*: ProtoBook
|
||||||
|
keyBook*: KeyBook
|
||||||
connectionBook*: ConnectionBook
|
connectionBook*: ConnectionBook
|
||||||
disconnectBook*: DisconnectBook
|
disconnectBook*: DisconnectBook
|
||||||
|
|
||||||
proc new*(T: type WakuPeerStore): WakuPeerStore =
|
proc new*(T: type WakuPeerStore): WakuPeerStore =
|
||||||
var p: WakuPeerStore
|
var p: WakuPeerStore
|
||||||
new(p)
|
new(p)
|
||||||
return p
|
return p
|
||||||
|
|
||||||
|
##################
|
||||||
|
# Peer Store API #
|
||||||
|
##################
|
||||||
|
|
||||||
|
proc get*(peerStore: WakuPeerStore,
|
||||||
|
peerId: PeerID): StoredInfo =
|
||||||
|
## Get the stored information of a given peer.
|
||||||
|
|
||||||
|
StoredInfo(
|
||||||
|
peerId: peerId,
|
||||||
|
addrs: peerStore.addressBook.get(peerId),
|
||||||
|
protos: peerStore.protoBook.get(peerId),
|
||||||
|
publicKey: peerStore.keyBook.get(peerId)
|
||||||
|
)
|
||||||
|
|
||||||
|
proc peers*(peerStore: WakuPeerStore): seq[StoredInfo] =
|
||||||
|
## Get all the stored information of every peer.
|
||||||
|
|
||||||
|
let allKeys = concat(toSeq(keys(peerStore.addressBook.book)),
|
||||||
|
toSeq(keys(peerStore.protoBook.book)),
|
||||||
|
toSeq(keys(peerStore.keyBook.book))).toHashSet()
|
||||||
|
|
||||||
|
return allKeys.mapIt(peerStore.get(it))
|
||||||
|
|
|
@ -9,6 +9,7 @@ import
|
||||||
libp2p/multiaddress,
|
libp2p/multiaddress,
|
||||||
libp2p/crypto/crypto,
|
libp2p/crypto/crypto,
|
||||||
libp2p/protocols/protocol,
|
libp2p/protocols/protocol,
|
||||||
|
libp2p/protocols/ping,
|
||||||
# NOTE For TopicHandler, solve with exports?
|
# NOTE For TopicHandler, solve with exports?
|
||||||
libp2p/protocols/pubsub/rpc/messages,
|
libp2p/protocols/pubsub/rpc/messages,
|
||||||
libp2p/protocols/pubsub/pubsub,
|
libp2p/protocols/pubsub/pubsub,
|
||||||
|
@ -20,7 +21,6 @@ import
|
||||||
../protocol/waku_filter/waku_filter,
|
../protocol/waku_filter/waku_filter,
|
||||||
../protocol/waku_lightpush/waku_lightpush,
|
../protocol/waku_lightpush/waku_lightpush,
|
||||||
../protocol/waku_rln_relay/waku_rln_relay_types,
|
../protocol/waku_rln_relay/waku_rln_relay_types,
|
||||||
../protocol/waku_keepalive/waku_keepalive,
|
|
||||||
../utils/peers,
|
../utils/peers,
|
||||||
./storage/message/message_store,
|
./storage/message/message_store,
|
||||||
./storage/peer/peer_storage,
|
./storage/peer/peer_storage,
|
||||||
|
@ -68,8 +68,8 @@ type
|
||||||
wakuSwap*: WakuSwap
|
wakuSwap*: WakuSwap
|
||||||
wakuRlnRelay*: WakuRLNRelay
|
wakuRlnRelay*: WakuRLNRelay
|
||||||
wakuLightPush*: WakuLightPush
|
wakuLightPush*: WakuLightPush
|
||||||
wakuKeepalive*: WakuKeepalive
|
|
||||||
peerInfo*: PeerInfo
|
peerInfo*: PeerInfo
|
||||||
|
libp2pPing*: Ping
|
||||||
libp2pTransportLoops*: seq[Future[void]]
|
libp2pTransportLoops*: seq[Future[void]]
|
||||||
# TODO Revist messages field indexing as well as if this should be Message or WakuMessage
|
# TODO Revist messages field indexing as well as if this should be Message or WakuMessage
|
||||||
messages*: seq[(Topic, WakuMessage)]
|
messages*: seq[(Topic, WakuMessage)]
|
||||||
|
@ -530,19 +530,34 @@ proc mountLightPush*(node: WakuNode) =
|
||||||
|
|
||||||
node.switch.mount(node.wakuLightPush)
|
node.switch.mount(node.wakuLightPush)
|
||||||
|
|
||||||
proc mountKeepalive*(node: WakuNode) =
|
proc mountLibp2pPing*(node: WakuNode) =
|
||||||
info "mounting keepalive"
|
info "mounting libp2p ping protocol"
|
||||||
|
|
||||||
node.wakuKeepalive = WakuKeepalive.new(node.peerManager, node.rng)
|
node.libp2pPing = Ping.new(rng = node.rng)
|
||||||
|
|
||||||
node.switch.mount(node.wakuKeepalive)
|
node.switch.mount(node.libp2pPing)
|
||||||
|
|
||||||
proc keepaliveLoop(node: WakuNode, keepalive: chronos.Duration) {.async.} =
|
proc keepaliveLoop(node: WakuNode, keepalive: chronos.Duration) {.async.} =
|
||||||
while node.started:
|
while node.started:
|
||||||
# Keep all managed peers alive when idle
|
# Keep all connected peers alive while running
|
||||||
trace "Running keepalive"
|
trace "Running keepalive"
|
||||||
|
|
||||||
await node.wakuKeepalive.keepAllAlive()
|
# First get a list of connected peer infos
|
||||||
|
let peers = node.peerManager.peers()
|
||||||
|
.filterIt(node.peerManager.connectedness(it.peerId) == Connected)
|
||||||
|
.mapIt(it.toPeerInfo())
|
||||||
|
|
||||||
|
# Attempt to retrieve and ping the active outgoing connection for each peer
|
||||||
|
for peer in peers:
|
||||||
|
let connOpt = await node.peerManager.dialPeer(peer, PingCodec)
|
||||||
|
|
||||||
|
if connOpt.isNone:
|
||||||
|
# @TODO more sophisticated error handling here
|
||||||
|
debug "failed to connect to remote peer", peer=peer
|
||||||
|
waku_node_errors.inc(labelValues = ["keep_alive_failure"])
|
||||||
|
return
|
||||||
|
|
||||||
|
discard await node.libp2pPing.ping(connOpt.get()) # Ping connection
|
||||||
|
|
||||||
await sleepAsync(keepalive)
|
await sleepAsync(keepalive)
|
||||||
|
|
||||||
|
@ -752,7 +767,7 @@ when isMainModule:
|
||||||
relayMessages = conf.relay) # Indicates if node is capable to relay messages
|
relayMessages = conf.relay) # Indicates if node is capable to relay messages
|
||||||
|
|
||||||
# Keepalive mounted on all nodes
|
# Keepalive mounted on all nodes
|
||||||
mountKeepalive(node)
|
mountLibp2pPing(node)
|
||||||
|
|
||||||
# Resume historical messages, this has to be called after the relay setup
|
# Resume historical messages, this has to be called after the relay setup
|
||||||
if conf.store and conf.persistMessages:
|
if conf.store and conf.persistMessages:
|
||||||
|
|
|
@ -1,85 +0,0 @@
|
||||||
import
|
|
||||||
std/[tables, sequtils, options],
|
|
||||||
bearssl,
|
|
||||||
chronos, chronicles, metrics, stew/results,
|
|
||||||
libp2p/protocols/pubsub/pubsubpeer,
|
|
||||||
libp2p/protocols/pubsub/floodsub,
|
|
||||||
libp2p/protocols/pubsub/gossipsub,
|
|
||||||
libp2p/protocols/protocol,
|
|
||||||
libp2p/protobuf/minprotobuf,
|
|
||||||
libp2p/stream/connection,
|
|
||||||
libp2p/crypto/crypto,
|
|
||||||
../../utils/requests,
|
|
||||||
../../node/peer_manager/peer_manager,
|
|
||||||
../message_notifier,
|
|
||||||
../waku_relay,
|
|
||||||
waku_keepalive_types
|
|
||||||
|
|
||||||
export waku_keepalive_types
|
|
||||||
|
|
||||||
declarePublicGauge waku_keepalive_count, "number of keepalives received"
|
|
||||||
declarePublicGauge waku_keepalive_errors, "number of keepalive protocol errors", ["type"]
|
|
||||||
|
|
||||||
logScope:
|
|
||||||
topics = "wakukeepalive"
|
|
||||||
|
|
||||||
const
|
|
||||||
WakuKeepaliveCodec* = "/vac/waku/keepalive/2.0.0-alpha1"
|
|
||||||
|
|
||||||
# Error types (metric label values)
|
|
||||||
const
|
|
||||||
dialFailure = "dial_failure"
|
|
||||||
|
|
||||||
# Encoding and decoding -------------------------------------------------------
|
|
||||||
proc encode*(msg: KeepaliveMessage): ProtoBuffer =
|
|
||||||
var pb = initProtoBuffer()
|
|
||||||
|
|
||||||
# @TODO: Currently no fields defined for a KeepaliveMessage
|
|
||||||
|
|
||||||
return pb
|
|
||||||
|
|
||||||
proc init*(T: type KeepaliveMessage, buffer: seq[byte]): ProtoResult[T] =
|
|
||||||
var msg = KeepaliveMessage()
|
|
||||||
let pb = initProtoBuffer(buffer)
|
|
||||||
|
|
||||||
# @TODO: Currently no fields defined for a KeepaliveMessage
|
|
||||||
|
|
||||||
ok(msg)
|
|
||||||
|
|
||||||
# Protocol -------------------------------------------------------
|
|
||||||
proc new*(T: type WakuKeepalive, peerManager: PeerManager, rng: ref BrHmacDrbgContext): T =
|
|
||||||
debug "new WakuKeepalive"
|
|
||||||
var wk: WakuKeepalive
|
|
||||||
new wk
|
|
||||||
|
|
||||||
wk.rng = crypto.newRng()
|
|
||||||
wk.peerManager = peerManager
|
|
||||||
|
|
||||||
wk.init()
|
|
||||||
|
|
||||||
return wk
|
|
||||||
|
|
||||||
method init*(wk: WakuKeepalive) =
|
|
||||||
debug "init WakuKeepalive"
|
|
||||||
|
|
||||||
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
|
|
||||||
info "WakuKeepalive message received"
|
|
||||||
waku_keepalive_count.inc()
|
|
||||||
|
|
||||||
wk.handler = handle
|
|
||||||
wk.codec = WakuKeepaliveCodec
|
|
||||||
|
|
||||||
proc keepAllAlive*(wk: WakuKeepalive) {.async, gcsafe.} =
|
|
||||||
# Send keepalive message to all managed and connected peers
|
|
||||||
let peers = wk.peerManager.peers().filterIt(wk.peerManager.connectedness(it.peerId) == Connected).mapIt(it.toPeerInfo())
|
|
||||||
|
|
||||||
for peer in peers:
|
|
||||||
let connOpt = await wk.peerManager.dialPeer(peer, WakuKeepaliveCodec)
|
|
||||||
|
|
||||||
if connOpt.isNone():
|
|
||||||
# @TODO more sophisticated error handling here
|
|
||||||
error "failed to connect to remote peer"
|
|
||||||
waku_keepalive_errors.inc(labelValues = [dialFailure])
|
|
||||||
return
|
|
||||||
|
|
||||||
await connOpt.get().writeLP(KeepaliveMessage().encode().buffer) # Send keep-alive on connection
|
|
|
@ -1,12 +0,0 @@
|
||||||
import
|
|
||||||
bearssl,
|
|
||||||
libp2p/protocols/protocol,
|
|
||||||
../../node/peer_manager/peer_manager
|
|
||||||
|
|
||||||
type
|
|
||||||
KeepaliveMessage* = object
|
|
||||||
# Currently no fields for a keepalive message
|
|
||||||
|
|
||||||
WakuKeepalive* = ref object of LPProtocol
|
|
||||||
rng*: ref BrHmacDrbgContext
|
|
||||||
peerManager*: PeerManager
|
|
Loading…
Reference in New Issue