diff --git a/.gitmodules b/.gitmodules index cdf466973..e70cc563b 100644 --- a/.gitmodules +++ b/.gitmodules @@ -12,7 +12,7 @@ path = vendor/nim-libp2p url = https://github.com/status-im/nim-libp2p.git ignore = dirty - branch = master + branch = unstable [submodule "vendor/nim-stew"] path = vendor/nim-stew url = https://github.com/status-im/nim-stew.git diff --git a/examples/v2/chat2.nim b/examples/v2/chat2.nim index e8c2b3bb2..f495852a0 100644 --- a/examples/v2/chat2.nim +++ b/examples/v2/chat2.nim @@ -304,7 +304,7 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} = rlnRelayEnabled = conf.rlnRelay, relayMessages = conf.relay) # Indicates if node is capable to relay messages - node.mountKeepalive() + node.mountLibp2pPing() let nick = await readNick(transp) echo "Welcome, " & nick & "!" diff --git a/examples/v2/matterbridge/chat2bridge.nim b/examples/v2/matterbridge/chat2bridge.nim index cb6ad21c3..92d60c436 100644 --- a/examples/v2/matterbridge/chat2bridge.nim +++ b/examples/v2/matterbridge/chat2bridge.nim @@ -253,7 +253,7 @@ when isMainModule: # Now load rest of config # Mount configured Waku v2 protocols - mountKeepalive(bridge.nodev2) + mountLibp2pPing(bridge.nodev2) if conf.store: mountStore(bridge.nodev2) diff --git a/tests/v2/test_waku_keepalive.nim b/tests/v2/test_waku_keepalive.nim index 4e2c7315c..4deea0ea5 100644 --- a/tests/v2/test_waku_keepalive.nim +++ b/tests/v2/test_waku_keepalive.nim @@ -6,44 +6,42 @@ import stew/shims/net as stewNet, libp2p/switch, libp2p/protobuf/minprotobuf, + libp2p/protocols/ping, libp2p/stream/[bufferstream, connection], libp2p/crypto/crypto, libp2p/multistream, ../../waku/v2/node/wakunode2, - ../../waku/v2/protocol/waku_keepalive/waku_keepalive, ../test_helpers, ./utils procSuite "Waku Keepalive": - asyncTest "handle keepalive": + asyncTest "handle ping keepalives": let nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000)) nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002)) + var completionFut = newFuture[bool]() + + proc pingHandler(peer: PeerInfo) {.async, gcsafe, raises: [Defect].} = + debug "Ping received" + + check: + peer.peerId == node1.switch.peerInfo.peerId + + completionFut.complete(true) + await node1.start() node1.mountRelay() - node1.mountKeepalive() + node1.mountLibp2pPing() await node2.start() node2.mountRelay() - node2.mountKeepalive() + node2.switch.mount(Ping.new(handler = pingHandler)) await node1.connectToNodes(@[node2.peerInfo]) - var completionFut = newFuture[bool]() - - proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = - debug "WakuKeepalive message received" - - check: - proto == waku_keepalive.WakuKeepaliveCodec - - completionFut.complete(true) - - node2.wakuKeepalive.handler = handle - node1.startKeepalive() check: diff --git a/vendor/nim-chronos b/vendor/nim-chronos index 252e5d0d5..f7dd6b76c 160000 --- a/vendor/nim-chronos +++ b/vendor/nim-chronos @@ -1 +1 @@ -Subproject commit 252e5d0d502c51c7bb20eeebb7a3129220b12ff8 +Subproject commit f7dd6b76c2cbf58a8ed755251086067cb67f420e diff --git a/vendor/nim-eth b/vendor/nim-eth index 0ad571ab2..601fa7ff6 160000 --- a/vendor/nim-eth +++ b/vendor/nim-eth @@ -1 +1 @@ -Subproject commit 0ad571ab27c46a32256c8568a32ef1d6ac34b733 +Subproject commit 601fa7ff667431b05d18579af0e43bf4d8dafa61 diff --git a/vendor/nim-libp2p b/vendor/nim-libp2p index 3da656687..bd2e9a046 160000 --- a/vendor/nim-libp2p +++ b/vendor/nim-libp2p @@ -1 +1 @@ -Subproject commit 3da656687be63ccbf5d659af55d159130d325038 +Subproject commit bd2e9a04622d4dece2a7e23552050d6c6261f92d diff --git a/waku/common/wakubridge.nim b/waku/common/wakubridge.nim index d31e00cce..d4a539871 100644 --- a/waku/common/wakubridge.nim +++ b/waku/common/wakubridge.nim @@ -255,7 +255,7 @@ when isMainModule: elif conf.fleetV1 == test: connectToNodes(bridge.nodev1, WhisperNodesTest) # Mount configured Waku v2 protocols - mountKeepalive(bridge.nodev2) + mountLibp2pPing(bridge.nodev2) if conf.store: mountStore(bridge.nodev2, persistMessages = false) # Bridge does not persist messages diff --git a/waku/v2/node/peer_manager/peer_manager.nim b/waku/v2/node/peer_manager/peer_manager.nim index e831cd0c8..58942b9a1 100644 --- a/waku/v2/node/peer_manager/peer_manager.nim +++ b/waku/v2/node/peer_manager/peer_manager.nim @@ -117,8 +117,8 @@ proc new*(T: type PeerManager, switch: Switch, storage: PeerStorage = nil): Peer peerStore: WakuPeerStore.new(), storage: storage) - proc peerHook(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.} = - onConnEvent(pm, peerId, event) + proc peerHook(peerInfo: PeerInfo, event: ConnEvent): Future[void] {.gcsafe.} = + onConnEvent(pm, peerInfo.peerId, event) pm.switch.addConnEventHandler(peerHook, ConnEventKind.Connected) pm.switch.addConnEventHandler(peerHook, ConnEventKind.Disconnected) diff --git a/waku/v2/node/peer_manager/waku_peer_store.nim b/waku/v2/node/peer_manager/waku_peer_store.nim index 017112ff5..1b992cc1d 100644 --- a/waku/v2/node/peer_manager/waku_peer_store.nim +++ b/waku/v2/node/peer_manager/waku_peer_store.nim @@ -1,6 +1,7 @@ {.push raises: [Defect].} import + std/[tables, sequtils, sets], libp2p/builders, libp2p/peerstore @@ -21,11 +22,38 @@ type DisconnectBook* = object of PeerBook[int64] # Keeps track of when peers were disconnected in Unix timestamps - WakuPeerStore* = ref object of PeerStore + WakuPeerStore* = ref object + addressBook*: AddressBook + protoBook*: ProtoBook + keyBook*: KeyBook connectionBook*: ConnectionBook disconnectBook*: DisconnectBook proc new*(T: type WakuPeerStore): WakuPeerStore = var p: WakuPeerStore new(p) - return p \ No newline at end of file + return p + +################## +# Peer Store API # +################## + +proc get*(peerStore: WakuPeerStore, + peerId: PeerID): StoredInfo = + ## Get the stored information of a given peer. + + StoredInfo( + peerId: peerId, + addrs: peerStore.addressBook.get(peerId), + protos: peerStore.protoBook.get(peerId), + publicKey: peerStore.keyBook.get(peerId) + ) + +proc peers*(peerStore: WakuPeerStore): seq[StoredInfo] = + ## Get all the stored information of every peer. + + let allKeys = concat(toSeq(keys(peerStore.addressBook.book)), + toSeq(keys(peerStore.protoBook.book)), + toSeq(keys(peerStore.keyBook.book))).toHashSet() + + return allKeys.mapIt(peerStore.get(it)) diff --git a/waku/v2/node/wakunode2.nim b/waku/v2/node/wakunode2.nim index 77fd4b8a4..a42d0fdd2 100644 --- a/waku/v2/node/wakunode2.nim +++ b/waku/v2/node/wakunode2.nim @@ -9,6 +9,7 @@ import libp2p/multiaddress, libp2p/crypto/crypto, libp2p/protocols/protocol, + libp2p/protocols/ping, # NOTE For TopicHandler, solve with exports? libp2p/protocols/pubsub/rpc/messages, libp2p/protocols/pubsub/pubsub, @@ -20,7 +21,6 @@ import ../protocol/waku_filter/waku_filter, ../protocol/waku_lightpush/waku_lightpush, ../protocol/waku_rln_relay/waku_rln_relay_types, - ../protocol/waku_keepalive/waku_keepalive, ../utils/peers, ./storage/message/message_store, ./storage/peer/peer_storage, @@ -68,8 +68,8 @@ type wakuSwap*: WakuSwap wakuRlnRelay*: WakuRLNRelay wakuLightPush*: WakuLightPush - wakuKeepalive*: WakuKeepalive peerInfo*: PeerInfo + libp2pPing*: Ping libp2pTransportLoops*: seq[Future[void]] # TODO Revist messages field indexing as well as if this should be Message or WakuMessage messages*: seq[(Topic, WakuMessage)] @@ -530,19 +530,34 @@ proc mountLightPush*(node: WakuNode) = node.switch.mount(node.wakuLightPush) -proc mountKeepalive*(node: WakuNode) = - info "mounting keepalive" +proc mountLibp2pPing*(node: WakuNode) = + info "mounting libp2p ping protocol" - node.wakuKeepalive = WakuKeepalive.new(node.peerManager, node.rng) + node.libp2pPing = Ping.new(rng = node.rng) - node.switch.mount(node.wakuKeepalive) + node.switch.mount(node.libp2pPing) proc keepaliveLoop(node: WakuNode, keepalive: chronos.Duration) {.async.} = while node.started: - # Keep all managed peers alive when idle + # Keep all connected peers alive while running trace "Running keepalive" - await node.wakuKeepalive.keepAllAlive() + # First get a list of connected peer infos + let peers = node.peerManager.peers() + .filterIt(node.peerManager.connectedness(it.peerId) == Connected) + .mapIt(it.toPeerInfo()) + + # Attempt to retrieve and ping the active outgoing connection for each peer + for peer in peers: + let connOpt = await node.peerManager.dialPeer(peer, PingCodec) + + if connOpt.isNone: + # @TODO more sophisticated error handling here + debug "failed to connect to remote peer", peer=peer + waku_node_errors.inc(labelValues = ["keep_alive_failure"]) + return + + discard await node.libp2pPing.ping(connOpt.get()) # Ping connection await sleepAsync(keepalive) @@ -752,7 +767,7 @@ when isMainModule: relayMessages = conf.relay) # Indicates if node is capable to relay messages # Keepalive mounted on all nodes - mountKeepalive(node) + mountLibp2pPing(node) # Resume historical messages, this has to be called after the relay setup if conf.store and conf.persistMessages: diff --git a/waku/v2/protocol/waku_keepalive/waku_keepalive.nim b/waku/v2/protocol/waku_keepalive/waku_keepalive.nim deleted file mode 100644 index e88d04af3..000000000 --- a/waku/v2/protocol/waku_keepalive/waku_keepalive.nim +++ /dev/null @@ -1,85 +0,0 @@ -import - std/[tables, sequtils, options], - bearssl, - chronos, chronicles, metrics, stew/results, - libp2p/protocols/pubsub/pubsubpeer, - libp2p/protocols/pubsub/floodsub, - libp2p/protocols/pubsub/gossipsub, - libp2p/protocols/protocol, - libp2p/protobuf/minprotobuf, - libp2p/stream/connection, - libp2p/crypto/crypto, - ../../utils/requests, - ../../node/peer_manager/peer_manager, - ../message_notifier, - ../waku_relay, - waku_keepalive_types - -export waku_keepalive_types - -declarePublicGauge waku_keepalive_count, "number of keepalives received" -declarePublicGauge waku_keepalive_errors, "number of keepalive protocol errors", ["type"] - -logScope: - topics = "wakukeepalive" - -const - WakuKeepaliveCodec* = "/vac/waku/keepalive/2.0.0-alpha1" - -# Error types (metric label values) -const - dialFailure = "dial_failure" - -# Encoding and decoding ------------------------------------------------------- -proc encode*(msg: KeepaliveMessage): ProtoBuffer = - var pb = initProtoBuffer() - - # @TODO: Currently no fields defined for a KeepaliveMessage - - return pb - -proc init*(T: type KeepaliveMessage, buffer: seq[byte]): ProtoResult[T] = - var msg = KeepaliveMessage() - let pb = initProtoBuffer(buffer) - - # @TODO: Currently no fields defined for a KeepaliveMessage - - ok(msg) - -# Protocol ------------------------------------------------------- -proc new*(T: type WakuKeepalive, peerManager: PeerManager, rng: ref BrHmacDrbgContext): T = - debug "new WakuKeepalive" - var wk: WakuKeepalive - new wk - - wk.rng = crypto.newRng() - wk.peerManager = peerManager - - wk.init() - - return wk - -method init*(wk: WakuKeepalive) = - debug "init WakuKeepalive" - - proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = - info "WakuKeepalive message received" - waku_keepalive_count.inc() - - wk.handler = handle - wk.codec = WakuKeepaliveCodec - -proc keepAllAlive*(wk: WakuKeepalive) {.async, gcsafe.} = - # Send keepalive message to all managed and connected peers - let peers = wk.peerManager.peers().filterIt(wk.peerManager.connectedness(it.peerId) == Connected).mapIt(it.toPeerInfo()) - - for peer in peers: - let connOpt = await wk.peerManager.dialPeer(peer, WakuKeepaliveCodec) - - if connOpt.isNone(): - # @TODO more sophisticated error handling here - error "failed to connect to remote peer" - waku_keepalive_errors.inc(labelValues = [dialFailure]) - return - - await connOpt.get().writeLP(KeepaliveMessage().encode().buffer) # Send keep-alive on connection diff --git a/waku/v2/protocol/waku_keepalive/waku_keepalive_types.nim b/waku/v2/protocol/waku_keepalive/waku_keepalive_types.nim deleted file mode 100644 index 8d6e9160a..000000000 --- a/waku/v2/protocol/waku_keepalive/waku_keepalive_types.nim +++ /dev/null @@ -1,12 +0,0 @@ -import - bearssl, - libp2p/protocols/protocol, - ../../node/peer_manager/peer_manager - -type - KeepaliveMessage* = object - # Currently no fields for a keepalive message - - WakuKeepalive* = ref object of LPProtocol - rng*: ref BrHmacDrbgContext - peerManager*: PeerManager