From daae9f3e1d4885a440b790fe0419cac302fcd98a Mon Sep 17 00:00:00 2001 From: Hanno Cornelius <68783915+jm-clius@users.noreply.github.com> Date: Fri, 26 Mar 2021 10:49:51 +0200 Subject: [PATCH] Integrate persistent peer storage (#437) * Integrate persistent peer storage --- CHANGELOG.md | 2 +- tests/all_tests_v2.nim | 3 +- tests/v2/test_peer_manager.nim | 53 +++++- tests/v2/test_peer_storage.nim | 26 ++- tests/v2/test_waku_filter.nim | 2 +- tests/v2/test_waku_store.nim | 2 +- waku/v2/node/jsonrpc/admin_api.nim | 2 +- .../node/{ => peer_manager}/peer_manager.nim | 177 +++++++++++------- waku/v2/node/peer_manager/waku_peer_store.nim | 28 +++ waku/v2/node/storage/peer/peer_storage.nim | 23 +++ .../node/storage/peer/waku_peer_storage.nim | 27 ++- waku/v2/node/wakunode2.nim | 54 ++++-- waku/v2/protocol/waku_filter/waku_filter.nim | 2 +- .../waku_filter/waku_filter_types.nim | 2 +- waku/v2/protocol/waku_store/waku_store.nim | 2 +- .../protocol/waku_store/waku_store_types.nim | 2 +- waku/v2/protocol/waku_swap/waku_swap.nim | 2 +- .../v2/protocol/waku_swap/waku_swap_types.nim | 2 +- 18 files changed, 296 insertions(+), 115 deletions(-) rename waku/v2/node/{ => peer_manager}/peer_manager.nim (59%) create mode 100644 waku/v2/node/peer_manager/waku_peer_store.nim create mode 100644 waku/v2/node/storage/peer/peer_storage.nim diff --git a/CHANGELOG.md b/CHANGELOG.md index 179dc38c6..accdce6c2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,7 +8,7 @@ - Added a peer manager for `relay`, `filter`, `store` and `swap` peers. - `relay`, `filter`, `store` and `swap` peers are now stored in a common, shared peer store and no longer in separate sets. - Admin API now provides a `post` method to connect to peers on an ad-hoc basis -- Added persistent peer storage +- Added persistent peer storage. A node will now attempt to reconnect to `relay` peers after a restart. ## 2021-01-05 v0.2 diff --git a/tests/all_tests_v2.nim b/tests/all_tests_v2.nim index 3363ace93..b1bf1d8c3 100644 --- a/tests/all_tests_v2.nim +++ b/tests/all_tests_v2.nim @@ -13,7 +13,8 @@ import ./v2/test_peer_manager, ./v2/test_web3, # TODO remove it when rln-relay tests get finalized ./v2/test_waku_rln_relay, - ./v2/test_waku_bridge + ./v2/test_waku_bridge, + ./v2/test_peer_storage # TODO Only enable this once swap module is integrated more nicely as a dependency, i.e. as submodule with CI etc # For PoC execute it manually and run separate module here: https://github.com/vacp2p/swap-contracts-module diff --git a/tests/v2/test_peer_manager.nim b/tests/v2/test_peer_manager.nim index fe9d49fb6..ad0516720 100644 --- a/tests/v2/test_peer_manager.nim +++ b/tests/v2/test_peer_manager.nim @@ -12,7 +12,8 @@ import libp2p/protocols/pubsub/pubsub, libp2p/protocols/pubsub/rpc/message, ../../waku/v2/node/wakunode2, - ../../waku/v2/node/peer_manager, + ../../waku/v2/node/peer_manager/peer_manager, + ../../waku/v2/node/storage/peer/waku_peer_storage, ../../waku/v2/protocol/waku_relay, ../../waku/v2/protocol/waku_filter/waku_filter, ../../waku/v2/protocol/waku_store/waku_store, @@ -162,3 +163,53 @@ procSuite "Peer Manager": check: # Not currently connected to node2, but had recent, successful connection. node1.peerManager.connectedness(peerInfo2.peerId) == CanConnect + + await node2.stop() + + asyncTest "Peer manager can use persistent storage and survive restarts": + let + database = SqliteDatabase.init("", inMemory = true)[] + storage = WakuPeerStorage.new(database)[] + nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"), + Port(60000), peerStorage = storage) + nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"), + Port(60002)) + peerInfo2 = node2.peerInfo + + await node1.start() + await node2.start() + + node1.mountRelay() + node2.mountRelay() + + discard await node1.peerManager.dialPeer(peerInfo2, WakuRelayCodec, 2.seconds) + check: + # Currently connected to node2 + node1.peerManager.peers().len == 1 + node1.peerManager.peers().anyIt(it.peerId == peerInfo2.peerId) + node1.peerManager.connectedness(peerInfo2.peerId) == Connected + + # Simulate restart by initialising a new node using the same storage + let + nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node3 = WakuNode.init(nodeKey3, ValidIpAddress.init("0.0.0.0"), + Port(60004), peerStorage = storage) + + await node3.start() + check: + # Node2 has been loaded after "restart", but we have not yet reconnected + node3.peerManager.peers().len == 1 + node3.peerManager.peers().anyIt(it.peerId == peerInfo2.peerId) + node3.peerManager.connectedness(peerInfo2.peerId) == NotConnected + + node3.mountRelay() # This should trigger a reconnect + + check: + # Reconnected to node2 after "restart" + node3.peerManager.peers().len == 1 + node3.peerManager.peers().anyIt(it.peerId == peerInfo2.peerId) + node3.peerManager.connectedness(peerInfo2.peerId) == Connected + + await allFutures([node1.stop(), node2.stop(), node3.stop()]) diff --git a/tests/v2/test_peer_storage.nim b/tests/v2/test_peer_storage.nim index b41f3ac22..b43975781 100644 --- a/tests/v2/test_peer_storage.nim +++ b/tests/v2/test_peer_storage.nim @@ -4,15 +4,15 @@ import std/[unittest, sets], libp2p/crypto/crypto, ../test_helpers, - ../../waku/v2/node/peer_manager, + ../../waku/v2/node/peer_manager/peer_manager, ../../waku/v2/node/storage/peer/waku_peer_storage suite "Peer Storage": - test "Store and retrieve from persistent peer storage": + test "Store, replace and retrieve from persistent peer storage": let database = SqliteDatabase.init("", inMemory = true)[] - storage = WakuPeerStorage.init(database)[] + storage = WakuPeerStorage.new(database)[] # Test Peer peerLoc = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet() @@ -24,6 +24,8 @@ suite "Peer Storage": defer: storage.close() + # Test insert and retrieve + discard storage.put(peer.peerId, stored, conn) var responseCount = 0 @@ -40,3 +42,21 @@ suite "Peer Storage": check: res.isErr == false responseCount == 1 + + # Test replace and retrieve (update an existing entry) + discard storage.put(peer.peerId, stored, Connectedness.CannotConnect) + + responseCount = 0 + proc replacedData(peerId: PeerID, storedInfo: StoredInfo, + connectedness: Connectedness) = + responseCount += 1 + check: + peerId == peer.peerId + storedInfo == stored + connectedness == CannotConnect + + let repRes = storage.getAll(replacedData) + + check: + repRes.isErr == false + responseCount == 1 diff --git a/tests/v2/test_waku_filter.nim b/tests/v2/test_waku_filter.nim index 68aaf4f90..96e2fdd89 100644 --- a/tests/v2/test_waku_filter.nim +++ b/tests/v2/test_waku_filter.nim @@ -8,7 +8,7 @@ import libp2p/stream/[bufferstream, connection], libp2p/crypto/crypto, libp2p/multistream, - ../../waku/v2/node/peer_manager, + ../../waku/v2/node/peer_manager/peer_manager, ../../waku/v2/protocol/message_notifier, ../../waku/v2/protocol/waku_filter/waku_filter, ../test_helpers, ./utils diff --git a/tests/v2/test_waku_store.nim b/tests/v2/test_waku_store.nim index 2e19baa75..5f112a348 100644 --- a/tests/v2/test_waku_store.nim +++ b/tests/v2/test_waku_store.nim @@ -11,7 +11,7 @@ import ../../waku/v2/protocol/[waku_message, message_notifier], ../../waku/v2/protocol/waku_store/waku_store, ../../waku/v2/node/storage/message/waku_message_store, - ../../waku/v2/node/peer_manager, + ../../waku/v2/node/peer_manager/peer_manager, ../test_helpers, ./utils procSuite "Waku Store": diff --git a/waku/v2/node/jsonrpc/admin_api.nim b/waku/v2/node/jsonrpc/admin_api.nim index 00ace480a..21686ec4a 100644 --- a/waku/v2/node/jsonrpc/admin_api.nim +++ b/waku/v2/node/jsonrpc/admin_api.nim @@ -9,7 +9,7 @@ import ../../protocol/waku_filter/[waku_filter_types, waku_filter], ../../protocol/waku_relay, ../wakunode2, - ../peer_manager, + ../peer_manager/peer_manager, ./jsonrpc_types export jsonrpc_types diff --git a/waku/v2/node/peer_manager.nim b/waku/v2/node/peer_manager/peer_manager.nim similarity index 59% rename from waku/v2/node/peer_manager.nim rename to waku/v2/node/peer_manager/peer_manager.nim index 11ff6d393..de8ec5add 100644 --- a/waku/v2/node/peer_manager.nim +++ b/waku/v2/node/peer_manager/peer_manager.nim @@ -3,66 +3,26 @@ import std/[options, sets, sequtils], chronos, chronicles, metrics, - libp2p/standard_setup, - libp2p/peerstore + ./waku_peer_store, + ../storage/peer/peer_storage -export peerstore, standard_setup +export waku_peer_store declareCounter waku_peers_dials, "Number of peer dials", ["outcome"] +declarePublicGauge waku_peers_errors, "Number of peer manager errors", ["type"] logScope: topics = "wakupeers" type - Connectedness* = enum - # NotConnected: default state for a new peer. No connection and no further information on connectedness. - NotConnected, - # CannotConnect: attempted to connect to peer, but failed. - CannotConnect, - # CanConnect: was recently connected to peer and disconnected gracefully. - CanConnect, - # Connected: actively connected to peer. - Connected - - ConnectionBook* = object of PeerBook[Connectedness] - - WakuPeerStore* = ref object of PeerStore - connectionBook*: ConnectionBook - PeerManager* = ref object of RootObj switch*: Switch peerStore*: WakuPeerStore + storage: PeerStorage const defaultDialTimeout = 1.minutes # @TODO should this be made configurable? -proc onConnEvent(pm: PeerManager, peerId: PeerID, event: ConnEvent) {.async.} = - case event.kind - of ConnEventKind.Connected: - pm.peerStore.connectionBook.set(peerId, Connected) - return - of ConnEventKind.Disconnected: - pm.peerStore.connectionBook.set(peerId, CanConnect) - return - -proc new*(T: type WakuPeerStore): WakuPeerStore = - var p: WakuPeerStore - new(p) - return p - -proc new*(T: type PeerManager, switch: Switch): PeerManager = - let pm = PeerManager(switch: switch, - peerStore: WakuPeerStore.new()) - - proc peerHook(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.} = - onConnEvent(pm, peerId, event) - - - pm.switch.addConnEventHandler(peerHook, ConnEventKind.Connected) - pm.switch.addConnEventHandler(peerHook, ConnEventKind.Disconnected) - - return pm - #################### # Helper functions # #################### @@ -72,6 +32,93 @@ proc toPeerInfo(storedInfo: StoredInfo): PeerInfo = addrs = toSeq(storedInfo.addrs), protocols = toSeq(storedInfo.protos)) +proc insertOrReplace(ps: PeerStorage, peerId: PeerID, storedInfo: StoredInfo, connectedness: Connectedness) = + # Insert peer entry into persistent storage, or replace existing entry with updated info + let res = ps.put(peerId, storedInfo, connectedness) + if res.isErr: + warn "failed to store peers", err = res.error + waku_peers_errors.inc(labelValues = ["storage_failure"]) + +proc dialPeer(pm: PeerManager, peerId: PeerID, + addrs: seq[MultiAddress], proto: string, + dialTimeout = defaultDialTimeout): Future[Option[Connection]] {.async.} = + info "Dialing peer from manager", wireAddr = addrs[0], peerId = peerId + + # Dial Peer + let dialFut = pm.switch.dial(peerId, addrs, proto) + + try: + # Attempt to dial remote peer + if (await dialFut.withTimeout(dialTimeout)): + waku_peers_dials.inc(labelValues = ["successful"]) + return some(dialFut.read()) + else: + # @TODO any redial attempts? + debug "Dialing remote peer timed out" + waku_peers_dials.inc(labelValues = ["timeout"]) + + pm.peerStore.connectionBook.set(peerId, CannotConnect) + if not pm.storage.isNil: + pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CannotConnect) + + return none(Connection) + except CatchableError as e: + # @TODO any redial attempts? + debug "Dialing remote peer failed", msg = e.msg + waku_peers_dials.inc(labelValues = ["failed"]) + + pm.peerStore.connectionBook.set(peerId, CannotConnect) + if not pm.storage.isNil: + pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CannotConnect) + + return none(Connection) + +proc loadFromStorage(pm: PeerManager) = + # Load peers from storage, if available + proc onData(peerId: PeerID, storedInfo: StoredInfo, connectedness: Connectedness) = + pm.peerStore.addressBook.set(peerId, storedInfo.addrs) + pm.peerStore.protoBook.set(peerId, storedInfo.protos) + pm.peerStore.keyBook.set(peerId, storedInfo.publicKey) + pm.peerStore.connectionBook.set(peerId, NotConnected) # Reset connectedness state + + let res = pm.storage.getAll(onData) + if res.isErr: + warn "failed to load peers from storage", err = res.error + waku_peers_errors.inc(labelValues = ["storage_load_failure"]) + +################## +# Initialisation # +################## + +proc onConnEvent(pm: PeerManager, peerId: PeerID, event: ConnEvent) {.async.} = + case event.kind + of ConnEventKind.Connected: + pm.peerStore.connectionBook.set(peerId, Connected) + if not pm.storage.isNil: + pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), Connected) + return + of ConnEventKind.Disconnected: + pm.peerStore.connectionBook.set(peerId, CanConnect) + if not pm.storage.isNil: + pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CanConnect) + return + +proc new*(T: type PeerManager, switch: Switch, storage: PeerStorage = nil): PeerManager = + let pm = PeerManager(switch: switch, + peerStore: WakuPeerStore.new(), + storage: storage) + + proc peerHook(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.} = + onConnEvent(pm, peerId, event) + + pm.switch.addConnEventHandler(peerHook, ConnEventKind.Connected) + pm.switch.addConnEventHandler(peerHook, ConnEventKind.Disconnected) + + if not storage.isNil: + pm.loadFromStorage() # Load previously managed peers. + + return pm + ##################### # Manager interface # ##################### @@ -120,6 +167,10 @@ proc addPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string) = # ...associated protocols pm.peerStore.protoBook.add(peerInfo.peerId, proto) + # Add peer to storage. Entry will subsequently be updated with connectedness information + if not pm.storage.isNil: + pm.storage.insertOrReplace(peerInfo.peerId, pm.peerStore.get(peerInfo.peerId), NotConnected) + proc selectPeer*(pm: PeerManager, proto: string): Option[PeerInfo] = # Selects the best peer for a given protocol let peers = pm.peers.filterIt(it.protos.contains(proto)) @@ -132,6 +183,16 @@ proc selectPeer*(pm: PeerManager, proto: string): Option[PeerInfo] = else: return none(PeerInfo) +proc reconnectPeers*(pm: PeerManager, proto: string) {.async.} = + ## Reconnect to peers registered for this protocol. This will update connectedness. + ## Especially useful to resume connections from persistent storage after a restart. + + debug "Reconnecting peers", proto=proto + + for storedInfo in pm.peers(proto): + trace "Reconnecting to peer", peerId=storedInfo.peerId + discard await pm.dialPeer(storedInfo.peerId, toSeq(storedInfo.addrs), proto) + #################### # Dialer interface # #################### @@ -145,28 +206,4 @@ proc dialPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string, dialTimeout = trace "Adding newly dialed peer to manager", peerId = peerInfo.peerId, addr = peerInfo.addrs[0], proto = proto pm.addPeer(peerInfo, proto) - info "Dialing peer from manager", wireAddr = peerInfo.addrs[0], peerId = peerInfo.peerId - - # Dial Peer - # @TODO Keep track of conn and connected state in peer store - let dialFut = pm.switch.dial(peerInfo.peerId, peerInfo.addrs, proto) - - try: - # Attempt to dial remote peer - if (await dialFut.withTimeout(dialTimeout)): - waku_peers_dials.inc(labelValues = ["successful"]) - return some(dialFut.read()) - else: - # @TODO any redial attempts? - # @TODO indicate CannotConnect on peer metadata - debug "Dialing remote peer timed out" - waku_peers_dials.inc(labelValues = ["timeout"]) - pm.peerStore.connectionBook.set(peerInfo.peerId, CannotConnect) - return none(Connection) - except CatchableError as e: - # @TODO any redial attempts? - # @TODO indicate CannotConnect on peer metadata - debug "Dialing remote peer failed", msg = e.msg - waku_peers_dials.inc(labelValues = ["failed"]) - pm.peerStore.connectionBook.set(peerInfo.peerId, CannotConnect) - return none(Connection) + return await pm.dialPeer(peerInfo.peerId, peerInfo.addrs, proto, dialTimeout) diff --git a/waku/v2/node/peer_manager/waku_peer_store.nim b/waku/v2/node/peer_manager/waku_peer_store.nim new file mode 100644 index 000000000..fe05eed3c --- /dev/null +++ b/waku/v2/node/peer_manager/waku_peer_store.nim @@ -0,0 +1,28 @@ +{.push raises: [Defect].} + +import + libp2p/standard_setup, + libp2p/peerstore + +export peerstore, standard_setup + +type + Connectedness* = enum + # NotConnected: default state for a new peer. No connection and no further information on connectedness. + NotConnected, + # CannotConnect: attempted to connect to peer, but failed. + CannotConnect, + # CanConnect: was recently connected to peer and disconnected gracefully. + CanConnect, + # Connected: actively connected to peer. + Connected + + ConnectionBook* = object of PeerBook[Connectedness] + + WakuPeerStore* = ref object of PeerStore + connectionBook*: ConnectionBook + +proc new*(T: type WakuPeerStore): WakuPeerStore = + var p: WakuPeerStore + new(p) + return p \ No newline at end of file diff --git a/waku/v2/node/storage/peer/peer_storage.nim b/waku/v2/node/storage/peer/peer_storage.nim new file mode 100644 index 000000000..bc9476ed0 --- /dev/null +++ b/waku/v2/node/storage/peer/peer_storage.nim @@ -0,0 +1,23 @@ +import + stew/results, + chronos, + ../../peer_manager/waku_peer_store + +## This module defines a peer storage interface. Implementations of +## PeerStorage are used to store and retrieve peers + +type + PeerStorage* = ref object of RootObj + + PeerStorageResult*[T] = Result[T, string] + + DataProc* = proc(peerId: PeerID, storedInfo: StoredInfo, + connectedness: Connectedness) {.closure.} + +# PeerStorage interface +method put*(db: PeerStorage, + peerId: PeerID, + storedInfo: StoredInfo, + connectedness: Connectedness): PeerStorageResult[void] {.base.} = discard + +method getAll*(db: PeerStorage, onData: DataProc): PeerStorageResult[bool] {.base.} = discard \ No newline at end of file diff --git a/waku/v2/node/storage/peer/waku_peer_storage.nim b/waku/v2/node/storage/peer/waku_peer_storage.nim index 428b58d9a..e523cb3c5 100644 --- a/waku/v2/node/storage/peer/waku_peer_storage.nim +++ b/waku/v2/node/storage/peer/waku_peer_storage.nim @@ -4,20 +4,15 @@ import chronos, metrics, libp2p/protobuf/minprotobuf, stew/results, + ./peer_storage, ../sqlite, - ../../peer_manager + ../../peer_manager/waku_peer_store export sqlite type - WakuPeerStorage* = ref object of RootObj + WakuPeerStorage* = ref object of PeerStorage database*: SqliteDatabase - - WakuPeerStorageResult*[T] = Result[T, string] - - DataProc* = proc(peerId: PeerID, storedInfo: StoredInfo, - connectedness: Connectedness) {.closure.} - ########################## # Protobuf Serialisation # @@ -60,7 +55,7 @@ proc encode*(storedInfo: StoredInfo): ProtoBuffer = # Storage implementation # ########################## -proc init*(T: type WakuPeerStorage, db: SqliteDatabase): WakuPeerStorageResult[T] = +proc new*(T: type WakuPeerStorage, db: SqliteDatabase): PeerStorageResult[T] = ## Create the "Peers" table ## It contains: ## - peer id as primary key, stored as a blob @@ -84,14 +79,14 @@ proc init*(T: type WakuPeerStorage, db: SqliteDatabase): WakuPeerStorageResult[T ok(WakuPeerStorage(database: db)) -proc put*(db: WakuPeerStorage, - peerId: PeerID, - storedInfo: StoredInfo, - connectedness: Connectedness): WakuPeerStorageResult[void] = +method put*(db: WakuPeerStorage, + peerId: PeerID, + storedInfo: StoredInfo, + connectedness: Connectedness): PeerStorageResult[void] = - ## Adds a peer to storage + ## Adds a peer to storage or replaces existing entry if it already exists let prepare = db.database.prepareStmt( - "INSERT INTO Peers (peerId, storedInfo, connectedness) VALUES (?, ?, ?);", + "REPLACE INTO Peers (peerId, storedInfo, connectedness) VALUES (?, ?, ?);", (seq[byte], seq[byte], int32), void ) @@ -105,7 +100,7 @@ proc put*(db: WakuPeerStorage, ok() -proc getAll*(db: WakuPeerStorage, onData: DataProc): WakuPeerStorageResult[bool] = +method getAll*(db: WakuPeerStorage, onData: peer_storage.DataProc): PeerStorageResult[bool] = ## Retrieves all peers from storage var gotPeers = false diff --git a/waku/v2/node/wakunode2.nim b/waku/v2/node/wakunode2.nim index bb461caf5..009021eb7 100644 --- a/waku/v2/node/wakunode2.nim +++ b/waku/v2/node/wakunode2.nim @@ -18,8 +18,9 @@ import ../protocol/waku_rln_relay/[rln,waku_rln_relay_utils], ../utils/peers, ./storage/message/message_store, + ./storage/peer/peer_storage, ../utils/requests, - ./peer_manager + ./peer_manager/peer_manager declarePublicCounter waku_node_messages, "number of messages received", ["type"] declarePublicGauge waku_node_filters, "number of content filter subscriptions" @@ -110,7 +111,8 @@ template tcpEndPoint(address, port): auto = proc init*(T: type WakuNode, nodeKey: crypto.PrivateKey, bindIp: ValidIpAddress, bindPort: Port, - extIp = none[ValidIpAddress](), extPort = none[Port]()): T = + extIp = none[ValidIpAddress](), extPort = none[Port](), + peerStorage: PeerStorage = nil): T = ## Creates a Waku Node. ## ## Status: Implemented. @@ -127,7 +129,7 @@ proc init*(T: type WakuNode, nodeKey: crypto.PrivateKey, peerInfo.addrs.add(hostAddress) # Index 0 for multiaddr in announcedAddresses: peerInfo.addrs.add(multiaddr) # Announced addresses in index > 0 - + var switch = newStandardSwitch(some(nodekey), hostAddress, transportFlags = {ServerFlags.ReuseAddr}, rng = rng) # TODO Untested - verify behavior after switch interface change @@ -138,7 +140,7 @@ proc init*(T: type WakuNode, nodeKey: crypto.PrivateKey, # triggerSelf = true, sign = false, # verifySignature = false).PubSub result = WakuNode( - peerManager: PeerManager.new(switch), + peerManager: PeerManager.new(switch, peerStorage), switch: switch, rng: rng, peerInfo: peerInfo, @@ -388,6 +390,9 @@ proc mountRelay*(node: WakuNode, topics: seq[string] = newSeq[string](), rlnRela node.wakuRelay = wakuRelay node.switch.mount(wakuRelay) + # Reonnect to previous relay peers + waitFor node.peerManager.reconnectPeers(WakuRelayCodec) + info "mounting relay" proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = let msg = WakuMessage.init(data) @@ -476,6 +481,7 @@ when isMainModule: relay_api, store_api], ./storage/message/waku_message_store, + ./storage/peer/waku_peer_storage, ../../common/utils/nat proc startRpc(node: WakuNode, rpcIp: ValidIpAddress, rpcPort: Port, conf: WakuNodeConf) = @@ -524,9 +530,32 @@ when isMainModule: info "Node metrics", totalMessages discard setTimer(Moment.fromNow(2.seconds), logMetrics) discard setTimer(Moment.fromNow(2.seconds), logMetrics) - + let conf = WakuNodeConf.load() + + # Storage setup + var sqliteDatabase: SqliteDatabase + + if conf.dbpath != "": + let dbRes = SqliteDatabase.init(conf.dbpath) + if dbRes.isErr: + warn "failed to init database", err = dbRes.error + waku_node_errors.inc(labelValues = ["init_db_failure"]) + else: + sqliteDatabase = dbRes.value + + var pStorage: WakuPeerStorage + + if not sqliteDatabase.isNil: + let res = WakuPeerStorage.new(sqliteDatabase) + if res.isErr: + warn "failed to init new WakuPeerStorage", err = res.error + waku_node_errors.inc(labelValues = ["init_store_failure"]) + else: + pStorage = res.value + + let (extIp, extTcpPort, extUdpPort) = setupNat(conf.nat, clientId, Port(uint16(conf.tcpPort) + conf.portsShift), Port(uint16(conf.udpPort) + conf.portsShift)) @@ -535,8 +564,10 @@ when isMainModule: ## config, the external port is the same as the bind port. extPort = if extIp.isSome() and extTcpPort.isNone(): some(Port(uint16(conf.tcpPort) + conf.portsShift)) else: extTcpPort - node = WakuNode.init(conf.nodeKey, conf.listenAddress, - Port(uint16(conf.tcpPort) + conf.portsShift), extIp, extPort) + node = WakuNode.init(conf.nodeKey, + conf.listenAddress, Port(uint16(conf.tcpPort) + conf.portsShift), + extIp, extPort, + pStorage) waitFor node.start() @@ -548,13 +579,8 @@ when isMainModule: if conf.store: var store: WakuMessageStore - if conf.dbpath != "": - let dbRes = SqliteDatabase.init(conf.dbpath) - if dbRes.isErr: - warn "failed to init database", err = dbRes.error - waku_node_errors.inc(labelValues = ["init_db_failure"]) - - let res = WakuMessageStore.init(dbRes.value) + if not sqliteDatabase.isNil: + let res = WakuMessageStore.init(sqliteDatabase) if res.isErr: warn "failed to init WakuMessageStore", err = res.error waku_node_errors.inc(labelValues = ["init_store_failure"]) diff --git a/waku/v2/protocol/waku_filter/waku_filter.nim b/waku/v2/protocol/waku_filter/waku_filter.nim index 0f760db64..d872dc782 100644 --- a/waku/v2/protocol/waku_filter/waku_filter.nim +++ b/waku/v2/protocol/waku_filter/waku_filter.nim @@ -12,7 +12,7 @@ import ../message_notifier, waku_filter_types, ../../utils/requests, - ../../node/peer_manager + ../../node/peer_manager/peer_manager # NOTE This is just a start, the design of this protocol isn't done yet. It # should be direct payload exchange (a la req-resp), not be coupled with the diff --git a/waku/v2/protocol/waku_filter/waku_filter_types.nim b/waku/v2/protocol/waku_filter/waku_filter_types.nim index 548406b1c..5e737ce91 100644 --- a/waku/v2/protocol/waku_filter/waku_filter_types.nim +++ b/waku/v2/protocol/waku_filter/waku_filter_types.nim @@ -3,7 +3,7 @@ import bearssl, libp2p/peerinfo, libp2p/protocols/protocol, - ../../node/peer_manager, + ../../node/peer_manager/peer_manager, ../waku_message export waku_message diff --git a/waku/v2/protocol/waku_store/waku_store.nim b/waku/v2/protocol/waku_store/waku_store.nim index a68605ed7..876664fd8 100644 --- a/waku/v2/protocol/waku_store/waku_store.nim +++ b/waku/v2/protocol/waku_store/waku_store.nim @@ -15,7 +15,7 @@ import ../waku_swap/waku_swap, ./waku_store_types, ../../utils/requests, - ../../node/peer_manager + ../../node/peer_manager/peer_manager export waku_store_types diff --git a/waku/v2/protocol/waku_store/waku_store_types.nim b/waku/v2/protocol/waku_store/waku_store_types.nim index e14480ec0..6b9c0934c 100644 --- a/waku/v2/protocol/waku_store/waku_store_types.nim +++ b/waku/v2/protocol/waku_store/waku_store_types.nim @@ -8,7 +8,7 @@ import ../waku_message, ../../node/storage/message/message_store, ../../utils/pagination, - ../../node/peer_manager + ../../node/peer_manager/peer_manager export waku_message export pagination diff --git a/waku/v2/protocol/waku_swap/waku_swap.nim b/waku/v2/protocol/waku_swap/waku_swap.nim index 2036f4cb9..5ae18b66f 100644 --- a/waku/v2/protocol/waku_swap/waku_swap.nim +++ b/waku/v2/protocol/waku_swap/waku_swap.nim @@ -29,7 +29,7 @@ import libp2p/protocols/protocol, libp2p/protobuf/minprotobuf, libp2p/stream/connection, - ../../node/peer_manager, + ../../node/peer_manager/peer_manager, ../message_notifier, ./waku_swap_types, ../../waku/v2/protocol/waku_swap/waku_swap_contracts diff --git a/waku/v2/protocol/waku_swap/waku_swap_types.nim b/waku/v2/protocol/waku_swap/waku_swap_types.nim index 25d44d3b7..23cf96bef 100644 --- a/waku/v2/protocol/waku_swap/waku_swap_types.nim +++ b/waku/v2/protocol/waku_swap/waku_swap_types.nim @@ -3,7 +3,7 @@ import bearssl, libp2p/protocols/protocol, libp2p/peerinfo, - ../../node/peer_manager + ../../node/peer_manager/peer_manager type Beneficiary* = seq[byte]