diff --git a/apps/wakunode2/wakunode2.nim b/apps/wakunode2/wakunode2.nim index 225cc9b50..a92ac0791 100644 --- a/apps/wakunode2/wakunode2.nim +++ b/apps/wakunode2/wakunode2.nim @@ -22,10 +22,11 @@ import ../../waku/v2/protocol/waku_filter, ../../waku/v2/protocol/waku_peer_exchange, ../../waku/v2/node/peer_manager/peer_manager, + ../../waku/v2/node/peer_manager/peer_store/waku_peer_storage, + ../../waku/v2/node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations, ../../waku/v2/node/dnsdisc/waku_dnsdisc, ../../waku/v2/node/discv5/waku_discv5, ../../waku/v2/node/storage/migration, - ../../waku/v2/node/storage/peer/waku_peer_storage, ../../waku/v2/node/storage/message/waku_store_queue, ../../waku/v2/node/storage/message/sqlite_store, ../../waku/v2/node/storage/message/message_retention_policy, @@ -114,7 +115,7 @@ const PeerPersistenceDbUrl = "sqlite://peers.db" proc setupPeerStorage(): SetupResult[Option[WakuPeerStorage]] = let db = ?setupDatabaseConnection(PeerPersistenceDbUrl) - ?performDbMigration(db.get(), migrationPath=MessageStoreMigrationPath) + ?peer_store_sqlite_migrations.migrate(db.get()) let res = WakuPeerStorage.new(db.get()) if res.isErr(): diff --git a/apps/wakunode2/wakunode2_setup_rest.nim b/apps/wakunode2/wakunode2_setup_rest.nim index 41648996e..ebd4993e1 100644 --- a/apps/wakunode2/wakunode2_setup_rest.nim +++ b/apps/wakunode2/wakunode2_setup_rest.nim @@ -13,7 +13,7 @@ import logScope: - topics = "wakunode.setup.rest" + topics = "wakunode rest" proc startRestServer*(node: WakuNode, address: ValidIpAddress, port: Port, conf: WakuNodeConf) = diff --git a/apps/wakunode2/wakunode2_setup_rpc.nim b/apps/wakunode2/wakunode2_setup_rpc.nim index dbc1fe6bd..036a0f0d8 100644 --- a/apps/wakunode2/wakunode2_setup_rpc.nim +++ b/apps/wakunode2/wakunode2_setup_rpc.nim @@ -18,7 +18,7 @@ import ./config logScope: - topics = "wakunode.setup.rpc" + topics = "wakunode jsonrpc" proc startRpcServer*(node: WakuNode, rpcIp: ValidIpAddress, rpcPort: Port, conf: WakuNodeConf) diff --git a/tests/v2/test_peer_manager.nim b/tests/v2/test_peer_manager.nim index ccb4e32f1..4630cb19d 100644 --- a/tests/v2/test_peer_manager.nim +++ b/tests/v2/test_peer_manager.nim @@ -17,7 +17,7 @@ import import ../../waku/common/sqlite, ../../waku/v2/node/peer_manager/peer_manager, - ../../waku/v2/node/storage/peer/waku_peer_storage, + ../../waku/v2/node/peer_manager/peer_store/waku_peer_storage, ../../waku/v2/node/waku_node, ../../waku/v2/protocol/waku_relay, ../../waku/v2/protocol/waku_store, diff --git a/tests/v2/test_peer_storage.nim b/tests/v2/test_peer_storage.nim index c7d2301cf..15d87e378 100644 --- a/tests/v2/test_peer_storage.nim +++ b/tests/v2/test_peer_storage.nim @@ -6,7 +6,7 @@ import import ../../waku/common/sqlite, ../../waku/v2/node/peer_manager/peer_manager, - ../../waku/v2/node/storage/peer/waku_peer_storage, + ../../waku/v2/node/peer_manager/peer_store/waku_peer_storage, ../test_helpers suite "Peer Storage": diff --git a/waku/v2/node/peer_manager/peer_manager.nim b/waku/v2/node/peer_manager/peer_manager.nim index 7bfb0a6e4..a7384a6e9 100644 --- a/waku/v2/node/peer_manager/peer_manager.nim +++ b/waku/v2/node/peer_manager/peer_manager.nim @@ -1,19 +1,28 @@ -{.push raises: [Defect].} +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + import std/[options, sets, sequtils, times], - chronos, chronicles, metrics, - libp2p/multistream, - ./waku_peer_store, - ../storage/peer/peer_storage, - ../../utils/peers + chronos, + chronicles, + metrics, + libp2p/multistream +import + ../../utils/peers, + ./peer_store/peer_storage, + ./waku_peer_store export waku_peer_store, peer_storage, peers + declareCounter waku_peers_dials, "Number of peer dials", ["outcome"] declarePublicCounter waku_node_conns_initiated, "Number of connections initiated", ["source"] declarePublicGauge waku_peers_errors, "Number of peer manager errors", ["type"] + logScope: topics = "waku node peer_manager" @@ -24,7 +33,7 @@ type storage: PeerStorage let - defaultDialTimeout = chronos.minutes(1) # @TODO should this be made configurable? + defaultDialTimeout = chronos.minutes(1) # TODO: should this be made configurable? #################### # Helper functions # @@ -60,7 +69,7 @@ proc dialPeer(pm: PeerManager, peerId: PeerID, waku_peers_dials.inc(labelValues = ["successful"]) return some(dialFut.read()) else: - # @TODO any redial attempts? + # TODO: any redial attempts? debug "Dialing remote peer timed out" waku_peers_dials.inc(labelValues = ["timeout"]) @@ -70,7 +79,7 @@ proc dialPeer(pm: PeerManager, peerId: PeerID, return none(Connection) except CatchableError as e: - # @TODO any redial attempts? + # TODO: any redial attempts? debug "Dialing remote peer failed", msg = e.msg waku_peers_dials.inc(labelValues = ["failed"]) @@ -162,8 +171,8 @@ proc peers*(pm: PeerManager, protocolMatcher: Matcher): seq[StoredInfo] = proc connectedness*(pm: PeerManager, peerId: PeerID): Connectedness = # Return the connection state of the given, managed peer - # @TODO the PeerManager should keep and update local connectedness state for peers, redial on disconnect, etc. - # @TODO richer return than just bool, e.g. add enum "CanConnect", "CannotConnect", etc. based on recent connection attempts + # TODO: the PeerManager should keep and update local connectedness state for peers, redial on disconnect, etc. + # TODO: richer return than just bool, e.g. add enum "CanConnect", "CannotConnect", etc. based on recent connection attempts let storedInfo = pm.peerStore.get(peerId) @@ -217,7 +226,7 @@ proc selectPeer*(pm: PeerManager, proto: string): Option[RemotePeerInfo] = let peers = pm.peers.filterIt(it.protos.contains(proto)) if peers.len >= 1: - # @TODO proper heuristic here that compares peer scores and selects "best" one. For now the first peer for the given protocol is returned + # TODO: proper heuristic here that compares peer scores and selects "best" one. For now the first peer for the given protocol is returned let peerStored = peers[0] return some(peerStored.toRemotePeerInfo()) @@ -267,7 +276,7 @@ proc reconnectPeers*(pm: PeerManager, proc dialPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string, dialTimeout = defaultDialTimeout): Future[Option[Connection]] {.async.} = # Dial a given peer and add it to the list of known peers - # @TODO check peer validity and score before continuing. Limit number of peers to be managed. + # TODO: check peer validity and score before continuing. Limit number of peers to be managed. # First add dialed peer info to peer store, if it does not exist yet... if not pm.hasPeer(remotePeerInfo.peerId, proto): @@ -282,7 +291,7 @@ proc dialPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string, d proc dialPeer*(pm: PeerManager, peerId: PeerID, proto: string, dialTimeout = defaultDialTimeout): Future[Option[Connection]] {.async.} = # Dial an existing peer by looking up it's existing addrs in the switch's peerStore - # @TODO check peer validity and score before continuing. Limit number of peers to be managed. + # TODO: check peer validity and score before continuing. Limit number of peers to be managed. if peerId == pm.switch.peerInfo.peerId: # Do not attempt to dial self diff --git a/waku/v2/node/peer_manager/peer_store/migrations.nim b/waku/v2/node/peer_manager/peer_store/migrations.nim new file mode 100644 index 000000000..ae88e867e --- /dev/null +++ b/waku/v2/node/peer_manager/peer_store/migrations.nim @@ -0,0 +1,42 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + + +import + std/[tables, strutils, os], + stew/results, + chronicles +import + ../../../../common/sqlite, + ../../../../common/sqlite/migrations + + +logScope: + topics = "waku node peer_manager" + + +const SchemaVersion* = 1 # increase this when there is an update in the database schema + +template projectRoot: string = currentSourcePath.rsplit(DirSep, 1)[0] / ".." / ".." / ".." / ".." / ".." +const PeerStoreMigrationPath: string = projectRoot / "migrations" / "peer_store" + + +proc migrate*(db: SqliteDatabase, targetVersion = SchemaVersion): DatabaseResult[void] = + ## Compares the `user_version` of the sqlite database with the provided `targetVersion`, then + ## it runs migration scripts if the `user_version` is outdated. The `migrationScriptsDir` path + ## points to the directory holding the migrations scripts once the db is updated, it sets the + ## `user_version` to the `tragetVersion`. + ## + ## If not `targetVersion` is provided, it defaults to `SchemaVersion`. + ## + ## NOTE: Down migration it is not currently supported + debug "starting peer store's sqlite database migration" + + let migrationRes = migrate(db, targetVersion, migrationsScriptsDir=PeerStoreMigrationPath) + if migrationRes.isErr(): + return err("failed to execute migration scripts: " & migrationRes.error) + + debug "finished peer store's sqlite database migration" + ok() \ No newline at end of file diff --git a/waku/v2/node/storage/peer/peer_storage.nim b/waku/v2/node/peer_manager/peer_store/peer_storage.nim similarity index 80% rename from waku/v2/node/storage/peer/peer_storage.nim rename to waku/v2/node/peer_manager/peer_store/peer_storage.nim index 5b1a4db63..25ad66036 100644 --- a/waku/v2/node/storage/peer/peer_storage.nim +++ b/waku/v2/node/peer_manager/peer_store/peer_storage.nim @@ -1,8 +1,13 @@ -{.push raises: [Defect].} +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + import - stew/results, - ../../peer_manager/waku_peer_store + stew/results +import + ../waku_peer_store ## This module defines a peer storage interface. Implementations of ## PeerStorage are used to store and retrieve peers diff --git a/waku/v2/node/storage/peer/waku_peer_storage.nim b/waku/v2/node/peer_manager/peer_store/waku_peer_storage.nim similarity index 97% rename from waku/v2/node/storage/peer/waku_peer_storage.nim rename to waku/v2/node/peer_manager/peer_store/waku_peer_storage.nim index 84c18673d..7426074ae 100644 --- a/waku/v2/node/storage/peer/waku_peer_storage.nim +++ b/waku/v2/node/peer_manager/peer_store/waku_peer_storage.nim @@ -1,4 +1,8 @@ -{.push raises: [Defect].} +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + import std/sets, @@ -7,7 +11,7 @@ import libp2p/protobuf/minprotobuf import ../../../../common/sqlite, - ../../peer_manager/waku_peer_store, + ../waku_peer_store, ./peer_storage export sqlite