refactor(peer_manager): move peer_store under peer_manager module

This commit is contained in:
Lorenzo Delgado 2022-11-04 09:40:13 +01:00 committed by GitHub
parent 53e8979aa9
commit 0569beadbe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 86 additions and 25 deletions

View File

@ -22,10 +22,11 @@ import
../../waku/v2/protocol/waku_filter, ../../waku/v2/protocol/waku_filter,
../../waku/v2/protocol/waku_peer_exchange, ../../waku/v2/protocol/waku_peer_exchange,
../../waku/v2/node/peer_manager/peer_manager, ../../waku/v2/node/peer_manager/peer_manager,
../../waku/v2/node/peer_manager/peer_store/waku_peer_storage,
../../waku/v2/node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations,
../../waku/v2/node/dnsdisc/waku_dnsdisc, ../../waku/v2/node/dnsdisc/waku_dnsdisc,
../../waku/v2/node/discv5/waku_discv5, ../../waku/v2/node/discv5/waku_discv5,
../../waku/v2/node/storage/migration, ../../waku/v2/node/storage/migration,
../../waku/v2/node/storage/peer/waku_peer_storage,
../../waku/v2/node/storage/message/waku_store_queue, ../../waku/v2/node/storage/message/waku_store_queue,
../../waku/v2/node/storage/message/sqlite_store, ../../waku/v2/node/storage/message/sqlite_store,
../../waku/v2/node/storage/message/message_retention_policy, ../../waku/v2/node/storage/message/message_retention_policy,
@ -114,7 +115,7 @@ const PeerPersistenceDbUrl = "sqlite://peers.db"
proc setupPeerStorage(): SetupResult[Option[WakuPeerStorage]] = proc setupPeerStorage(): SetupResult[Option[WakuPeerStorage]] =
let db = ?setupDatabaseConnection(PeerPersistenceDbUrl) let db = ?setupDatabaseConnection(PeerPersistenceDbUrl)
?performDbMigration(db.get(), migrationPath=MessageStoreMigrationPath) ?peer_store_sqlite_migrations.migrate(db.get())
let res = WakuPeerStorage.new(db.get()) let res = WakuPeerStorage.new(db.get())
if res.isErr(): if res.isErr():

View File

@ -13,7 +13,7 @@ import
logScope: logScope:
topics = "wakunode.setup.rest" topics = "wakunode rest"
proc startRestServer*(node: WakuNode, address: ValidIpAddress, port: Port, conf: WakuNodeConf) = proc startRestServer*(node: WakuNode, address: ValidIpAddress, port: Port, conf: WakuNodeConf) =

View File

@ -18,7 +18,7 @@ import
./config ./config
logScope: logScope:
topics = "wakunode.setup.rpc" topics = "wakunode jsonrpc"
proc startRpcServer*(node: WakuNode, rpcIp: ValidIpAddress, rpcPort: Port, conf: WakuNodeConf) proc startRpcServer*(node: WakuNode, rpcIp: ValidIpAddress, rpcPort: Port, conf: WakuNodeConf)

View File

@ -17,7 +17,7 @@ import
import import
../../waku/common/sqlite, ../../waku/common/sqlite,
../../waku/v2/node/peer_manager/peer_manager, ../../waku/v2/node/peer_manager/peer_manager,
../../waku/v2/node/storage/peer/waku_peer_storage, ../../waku/v2/node/peer_manager/peer_store/waku_peer_storage,
../../waku/v2/node/waku_node, ../../waku/v2/node/waku_node,
../../waku/v2/protocol/waku_relay, ../../waku/v2/protocol/waku_relay,
../../waku/v2/protocol/waku_store, ../../waku/v2/protocol/waku_store,

View File

@ -6,7 +6,7 @@ import
import import
../../waku/common/sqlite, ../../waku/common/sqlite,
../../waku/v2/node/peer_manager/peer_manager, ../../waku/v2/node/peer_manager/peer_manager,
../../waku/v2/node/storage/peer/waku_peer_storage, ../../waku/v2/node/peer_manager/peer_store/waku_peer_storage,
../test_helpers ../test_helpers
suite "Peer Storage": suite "Peer Storage":

View File

@ -1,19 +1,28 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].} {.push raises: [Defect].}
else:
{.push raises: [].}
import import
std/[options, sets, sequtils, times], std/[options, sets, sequtils, times],
chronos, chronicles, metrics, chronos,
libp2p/multistream, chronicles,
./waku_peer_store, metrics,
../storage/peer/peer_storage, libp2p/multistream
../../utils/peers import
../../utils/peers,
./peer_store/peer_storage,
./waku_peer_store
export waku_peer_store, peer_storage, peers export waku_peer_store, peer_storage, peers
declareCounter waku_peers_dials, "Number of peer dials", ["outcome"] declareCounter waku_peers_dials, "Number of peer dials", ["outcome"]
declarePublicCounter waku_node_conns_initiated, "Number of connections initiated", ["source"] declarePublicCounter waku_node_conns_initiated, "Number of connections initiated", ["source"]
declarePublicGauge waku_peers_errors, "Number of peer manager errors", ["type"] declarePublicGauge waku_peers_errors, "Number of peer manager errors", ["type"]
logScope: logScope:
topics = "waku node peer_manager" topics = "waku node peer_manager"
@ -24,7 +33,7 @@ type
storage: PeerStorage storage: PeerStorage
let let
defaultDialTimeout = chronos.minutes(1) # @TODO should this be made configurable? defaultDialTimeout = chronos.minutes(1) # TODO: should this be made configurable?
#################### ####################
# Helper functions # # Helper functions #
@ -60,7 +69,7 @@ proc dialPeer(pm: PeerManager, peerId: PeerID,
waku_peers_dials.inc(labelValues = ["successful"]) waku_peers_dials.inc(labelValues = ["successful"])
return some(dialFut.read()) return some(dialFut.read())
else: else:
# @TODO any redial attempts? # TODO: any redial attempts?
debug "Dialing remote peer timed out" debug "Dialing remote peer timed out"
waku_peers_dials.inc(labelValues = ["timeout"]) waku_peers_dials.inc(labelValues = ["timeout"])
@ -70,7 +79,7 @@ proc dialPeer(pm: PeerManager, peerId: PeerID,
return none(Connection) return none(Connection)
except CatchableError as e: except CatchableError as e:
# @TODO any redial attempts? # TODO: any redial attempts?
debug "Dialing remote peer failed", msg = e.msg debug "Dialing remote peer failed", msg = e.msg
waku_peers_dials.inc(labelValues = ["failed"]) waku_peers_dials.inc(labelValues = ["failed"])
@ -162,8 +171,8 @@ proc peers*(pm: PeerManager, protocolMatcher: Matcher): seq[StoredInfo] =
proc connectedness*(pm: PeerManager, peerId: PeerID): Connectedness = proc connectedness*(pm: PeerManager, peerId: PeerID): Connectedness =
# Return the connection state of the given, managed peer # Return the connection state of the given, managed peer
# @TODO the PeerManager should keep and update local connectedness state for peers, redial on disconnect, etc. # TODO: the PeerManager should keep and update local connectedness state for peers, redial on disconnect, etc.
# @TODO richer return than just bool, e.g. add enum "CanConnect", "CannotConnect", etc. based on recent connection attempts # TODO: richer return than just bool, e.g. add enum "CanConnect", "CannotConnect", etc. based on recent connection attempts
let storedInfo = pm.peerStore.get(peerId) let storedInfo = pm.peerStore.get(peerId)
@ -217,7 +226,7 @@ proc selectPeer*(pm: PeerManager, proto: string): Option[RemotePeerInfo] =
let peers = pm.peers.filterIt(it.protos.contains(proto)) let peers = pm.peers.filterIt(it.protos.contains(proto))
if peers.len >= 1: if peers.len >= 1:
# @TODO proper heuristic here that compares peer scores and selects "best" one. For now the first peer for the given protocol is returned # TODO: proper heuristic here that compares peer scores and selects "best" one. For now the first peer for the given protocol is returned
let peerStored = peers[0] let peerStored = peers[0]
return some(peerStored.toRemotePeerInfo()) return some(peerStored.toRemotePeerInfo())
@ -267,7 +276,7 @@ proc reconnectPeers*(pm: PeerManager,
proc dialPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string, dialTimeout = defaultDialTimeout): Future[Option[Connection]] {.async.} = proc dialPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string, dialTimeout = defaultDialTimeout): Future[Option[Connection]] {.async.} =
# Dial a given peer and add it to the list of known peers # Dial a given peer and add it to the list of known peers
# @TODO check peer validity and score before continuing. Limit number of peers to be managed. # TODO: check peer validity and score before continuing. Limit number of peers to be managed.
# First add dialed peer info to peer store, if it does not exist yet... # First add dialed peer info to peer store, if it does not exist yet...
if not pm.hasPeer(remotePeerInfo.peerId, proto): if not pm.hasPeer(remotePeerInfo.peerId, proto):
@ -282,7 +291,7 @@ proc dialPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string, d
proc dialPeer*(pm: PeerManager, peerId: PeerID, proto: string, dialTimeout = defaultDialTimeout): Future[Option[Connection]] {.async.} = proc dialPeer*(pm: PeerManager, peerId: PeerID, proto: string, dialTimeout = defaultDialTimeout): Future[Option[Connection]] {.async.} =
# Dial an existing peer by looking up it's existing addrs in the switch's peerStore # Dial an existing peer by looking up it's existing addrs in the switch's peerStore
# @TODO check peer validity and score before continuing. Limit number of peers to be managed. # TODO: check peer validity and score before continuing. Limit number of peers to be managed.
if peerId == pm.switch.peerInfo.peerId: if peerId == pm.switch.peerInfo.peerId:
# Do not attempt to dial self # Do not attempt to dial self

View File

@ -0,0 +1,42 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/[tables, strutils, os],
stew/results,
chronicles
import
../../../../common/sqlite,
../../../../common/sqlite/migrations
logScope:
topics = "waku node peer_manager"
const SchemaVersion* = 1 # increase this when there is an update in the database schema
template projectRoot: string = currentSourcePath.rsplit(DirSep, 1)[0] / ".." / ".." / ".." / ".." / ".."
const PeerStoreMigrationPath: string = projectRoot / "migrations" / "peer_store"
proc migrate*(db: SqliteDatabase, targetVersion = SchemaVersion): DatabaseResult[void] =
## Compares the `user_version` of the sqlite database with the provided `targetVersion`, then
## it runs migration scripts if the `user_version` is outdated. The `migrationScriptsDir` path
## points to the directory holding the migrations scripts once the db is updated, it sets the
## `user_version` to the `tragetVersion`.
##
## If not `targetVersion` is provided, it defaults to `SchemaVersion`.
##
## NOTE: Down migration it is not currently supported
debug "starting peer store's sqlite database migration"
let migrationRes = migrate(db, targetVersion, migrationsScriptsDir=PeerStoreMigrationPath)
if migrationRes.isErr():
return err("failed to execute migration scripts: " & migrationRes.error)
debug "finished peer store's sqlite database migration"
ok()

View File

@ -1,8 +1,13 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].} {.push raises: [Defect].}
else:
{.push raises: [].}
import import
stew/results, stew/results
../../peer_manager/waku_peer_store import
../waku_peer_store
## This module defines a peer storage interface. Implementations of ## This module defines a peer storage interface. Implementations of
## PeerStorage are used to store and retrieve peers ## PeerStorage are used to store and retrieve peers

View File

@ -1,4 +1,8 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].} {.push raises: [Defect].}
else:
{.push raises: [].}
import import
std/sets, std/sets,
@ -7,7 +11,7 @@ import
libp2p/protobuf/minprotobuf libp2p/protobuf/minprotobuf
import import
../../../../common/sqlite, ../../../../common/sqlite,
../../peer_manager/waku_peer_store, ../waku_peer_store,
./peer_storage ./peer_storage
export sqlite export sqlite