From 6f857b46fe61a4ba7324c8754cdfa7b42abc6c95 Mon Sep 17 00:00:00 2001 From: Simon-Pierre Vivier Date: Mon, 27 Nov 2023 08:08:58 -0500 Subject: [PATCH] chore: refactoring peer storage (#2243) --- tests/test_peer_storage.nim | 67 ++++---- waku/node/peer_manager/peer_manager.nim | 56 +++--- .../peer_manager/peer_store/peer_storage.nim | 16 +- .../peer_store/waku_peer_storage.nim | 161 +++++++++--------- 4 files changed, 154 insertions(+), 146 deletions(-) diff --git a/tests/test_peer_storage.nim b/tests/test_peer_storage.nim index f6d268b46..a9150b6ab 100644 --- a/tests/test_peer_storage.nim +++ b/tests/test_peer_storage.nim @@ -1,12 +1,15 @@ {.used.} import + std/options, testutils/unittests, + eth/p2p/discoveryv5/enr, libp2p/crypto/crypto import ../../waku/common/databases/db_sqlite, ../../waku/node/peer_manager/peer_manager, ../../waku/node/peer_manager/peer_store/waku_peer_storage, + ../../waku/waku_enr, ./testlib/wakucore @@ -19,14 +22,22 @@ suite "Peer Storage": # Test Peer peerLoc = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet() - peerKey = generateEcdsaKey() + peerKey = generateSecp256k1Key() peer = PeerInfo.new(peerKey, @[peerLoc]) peerProto = "/waku/2/default-waku/codec" connectedness = Connectedness.CanConnect disconn = 999999 - stored = RemotePeerInfo( + topics = @["/waku/2/rs/2/0", "/waku/2/rs/2/1"] + + # Create ENR + var enrBuilder = EnrBuilder.init(peerKey) + enrBuilder.withShardedTopics(topics).expect("Valid topics") + let record = enrBuilder.build().expect("Valid record") + + let stored = RemotePeerInfo( peerId: peer.peerId, addrs: @[peerLoc], + enr: some(record), protocols: @[peerProto], publicKey: peerKey.getPublicKey().tryGet(), connectedness: connectedness, @@ -36,72 +47,52 @@ suite "Peer Storage": # Test insert and retrieve - require storage.put(peer.peerId, stored, connectedness, disconn).isOk + require storage.put(stored).isOk var responseCount = 0 - # Fetched variables from callback - var resPeerId: PeerId + # Fetched variable from callback var resStoredInfo: RemotePeerInfo - var resConnectedness: Connectedness - var resDisconnect: int64 - proc data(peerId: PeerID, storedInfo: RemotePeerInfo, - connectedness: Connectedness, disconnectTime: int64) {.raises: [Defect].} = + proc data(storedInfo: RemotePeerInfo) = responseCount += 1 - - # Note: cannot use `check` within `{.raises: [Defect].}` block - # @TODO: /Nim/lib/pure/unittest.nim(577, 16) Error: can raise an unlisted exception: Exception - # These flags are checked outside this block. - resPeerId = peerId resStoredInfo = storedInfo - resConnectedness = connectedness - resDisconnect = disconnectTime let res = storage.getAll(data) check: res.isErr == false responseCount == 1 - resPeerId == peer.peerId resStoredInfo.peerId == peer.peerId resStoredInfo.addrs == @[peerLoc] resStoredInfo.protocols == @[peerProto] resStoredInfo.publicKey == peerKey.getPublicKey().tryGet() - # TODO: For compatibility, we don't store connectedness and disconnectTime - #resStoredInfo.connectedness == connectedness - #resStoredInfo.disconnectTime == disconn - resConnectedness == Connectedness.CanConnect - resDisconnect == disconn + resStoredInfo.connectedness == connectedness + resStoredInfo.disconnectTime == disconn + + assert resStoredInfo.enr.isSome(), "The ENR info wasn't properly stored" + check: resStoredInfo.enr.get() == record # Test replace and retrieve (update an existing entry) - require storage.put(peer.peerId, stored, Connectedness.CannotConnect, disconn + 10).isOk + stored.connectedness = CannotConnect + stored.disconnectTime = disconn + 10 + stored.enr = none(Record) + require storage.put(stored).isOk responseCount = 0 - proc replacedData(peerId: PeerID, storedInfo: RemotePeerInfo, - connectedness: Connectedness, disconnectTime: int64) {.raises: [Defect].} = + proc replacedData(storedInfo: RemotePeerInfo) = responseCount += 1 - - # Note: cannot use `check` within `{.raises: [Defect].}` block - # @TODO: /Nim/lib/pure/unittest.nim(577, 16) Error: can raise an unlisted exception: Exception - # These flags are checked outside this block. - resPeerId = peerId resStoredInfo = storedInfo - resConnectedness = connectedness - resDisconnect = disconnectTime let repRes = storage.getAll(replacedData) check: repRes.isErr == false responseCount == 1 - resPeerId == peer.peerId resStoredInfo.peerId == peer.peerId resStoredInfo.addrs == @[peerLoc] resStoredInfo.protocols == @[peerProto] resStoredInfo.publicKey == peerKey.getPublicKey().tryGet() - # TODO: For compatibility, we don't store connectedness and disconnectTime - #resStoredInfo.connectedness == connectedness - #resStoredInfo.disconnectTime == disconn - resConnectedness == Connectedness.CannotConnect - resDisconnect == disconn + 10 + resStoredInfo.connectedness == Connectedness.CannotConnect + resStoredInfo.disconnectTime == disconn + 10 + resStoredInfo.enr.isNone() diff --git a/waku/node/peer_manager/peer_manager.nim b/waku/node/peer_manager/peer_manager.nim index 9bfd892cd..69db53eed 100644 --- a/waku/node/peer_manager/peer_manager.nim +++ b/waku/node/peer_manager/peer_manager.nim @@ -101,19 +101,15 @@ proc calculateBackoff(initialBackoffInSec: int, # Helper functions # #################### -proc insertOrReplace(ps: PeerStorage, - peerId: PeerID, - remotePeerInfo: RemotePeerInfo, - connectedness: Connectedness, - disconnectTime: int64 = 0) = - # Insert peer entry into persistent storage, or replace existing entry with updated info - let res = ps.put(peerId, remotePeerInfo, connectedness, disconnectTime) - if res.isErr: - warn "failed to store peers", err = res.error +proc insertOrReplace(ps: PeerStorage, remotePeerInfo: RemotePeerInfo) = + ## Insert peer entry into persistent storage, or replace existing entry with updated info + ps.put(remotePeerInfo).isOkOr: + warn "failed to store peers", err = error waku_peers_errors.inc(labelValues = ["storage_failure"]) + return proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, origin = UnknownOrigin) = - # Adds peer to manager for the specified protocol + ## Adds peer to manager for the specified protocol if remotePeerInfo.peerId == pm.switch.peerInfo.peerId: # Do not attempt to manage our unmanageable self @@ -140,7 +136,9 @@ proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, origin = UnknownO # Add peer to storage. Entry will subsequently be updated with connectedness information if not pm.storage.isNil: - pm.storage.insertOrReplace(remotePeerInfo.peerId, remotePeerInfo, NotConnected) + remotePeerInfo.connectedness = NotConnected + + pm.storage.insertOrReplace(remotePeerInfo) # Connects to a given node. Note that this function uses `connect` and # does not provide a protocol. Streams for relay (gossipsub) are created @@ -231,16 +229,26 @@ proc dialPeer(pm: PeerManager, return none(Connection) proc loadFromStorage(pm: PeerManager) = + ## Load peers from storage, if available + debug "loading peers from storage" - # Load peers from storage, if available + var amount = 0 - proc onData(peerId: PeerID, remotePeerInfo: RemotePeerInfo, connectedness: Connectedness, disconnectTime: int64) = - trace "loading peer", peerId=peerId, connectedness=connectedness - if peerId == pm.switch.peerInfo.peerId: + proc onData(remotePeerInfo: RemotePeerInfo) = + let peerId = remotePeerInfo.peerId + + if pm.switch.peerInfo.peerId == peerId: # Do not manage self return + trace "loading peer", + peerId = peerId, + address = remotePeerInfo.addrs, + protocols = remotePeerInfo.protocols, + agent = remotePeerInfo.agent, + version = remotePeerInfo.protoVersion + # nim-libp2p books pm.peerStore[AddressBook][peerId] = remotePeerInfo.addrs pm.peerStore[ProtoBook][peerId] = remotePeerInfo.protocols @@ -250,18 +258,20 @@ proc loadFromStorage(pm: PeerManager) = # custom books pm.peerStore[ConnectionBook][peerId] = NotConnected # Reset connectedness state - pm.peerStore[DisconnectBook][peerId] = disconnectTime + pm.peerStore[DisconnectBook][peerId] = remotePeerInfo.disconnectTime pm.peerStore[SourceBook][peerId] = remotePeerInfo.origin + + if remotePeerInfo.enr.isSome(): + pm.peerStore[ENRBook][peerId] = remotePeerInfo.enr.get() amount.inc() - let res = pm.storage.getAll(onData) - if res.isErr: - warn "failed to load peers from storage", err = res.error + pm.storage.getAll(onData).isOkOr: + warn "loading peers from storage failed", err = error waku_peers_errors.inc(labelValues = ["storage_load_failure"]) return - debug "successfully queried peer storage", amount = amount + debug "recovered peers from storage", amount = amount proc canBeConnected*(pm: PeerManager, peerId: PeerId): bool = @@ -385,8 +395,12 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} = pm.peerStore[ConnectionBook][peerId] = connectedness pm.peerStore[DirectionBook][peerId] = direction + if not pm.storage.isNil: - pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), connectedness, getTime().toUnix) + var remotePeerInfo = pm.peerStore.get(peerId) + remotePeerInfo.disconnectTime = getTime().toUnix + + pm.storage.insertOrReplace(remotePeerInfo) proc new*(T: type PeerManager, switch: Switch, diff --git a/waku/node/peer_manager/peer_store/peer_storage.nim b/waku/node/peer_manager/peer_store/peer_storage.nim index dcb456b2f..5f5a37c12 100644 --- a/waku/node/peer_manager/peer_store/peer_storage.nim +++ b/waku/node/peer_manager/peer_store/peer_storage.nim @@ -18,14 +18,14 @@ type PeerStorageResult*[T] = Result[T, string] - DataProc* = proc(peerId: PeerID, remotePeerInfo: RemotePeerInfo, - connectedness: Connectedness, disconnectTime: int64) {.closure, raises: [Defect].} + DataProc* = proc(remotePeerInfo: RemotePeerInfo) {.closure, raises: [Defect].} # PeerStorage interface -method put*(db: PeerStorage, - peerId: PeerID, - remotePeerInfo: RemotePeerInfo, - connectedness: Connectedness, - disconnectTime: int64): PeerStorageResult[void] {.base.} = discard +method put*( + db: PeerStorage, + remotePeerInfo: RemotePeerInfo + ): PeerStorageResult[void] {.base.} = + return err("Unimplemented") -method getAll*(db: PeerStorage, onData: DataProc): PeerStorageResult[bool] {.base.} = discard +method getAll*(db: PeerStorage, onData: DataProc): PeerStorageResult[void] {.base.} = + return err("Unimplemented") diff --git a/waku/node/peer_manager/peer_store/waku_peer_storage.nim b/waku/node/peer_manager/peer_store/waku_peer_storage.nim index 4f9777380..8609f165c 100644 --- a/waku/node/peer_manager/peer_store/waku_peer_storage.nim +++ b/waku/node/peer_manager/peer_store/waku_peer_storage.nim @@ -5,13 +5,13 @@ else: import - std/sets, + std/[sets, options], stew/results, sqlite3_abi, + eth/p2p/discoveryv5/enr, libp2p/protobuf/minprotobuf import ../../../common/databases/db_sqlite, - ../../../common/databases/common, ../../../waku_core, ../waku_peer_store, ./peer_storage @@ -21,17 +21,20 @@ export db_sqlite type WakuPeerStorage* = ref object of PeerStorage database*: SqliteDatabase - replaceStmt: SqliteStmt[(seq[byte], seq[byte], int32, int64), void] + replaceStmt: SqliteStmt[(seq[byte], seq[byte]), void] ########################## # Protobuf Serialisation # ########################## -proc init*(T: type RemotePeerInfo, buffer: seq[byte]): ProtoResult[T] = +proc decode*(T: type RemotePeerInfo, buffer: seq[byte]): ProtoResult[T] = var multiaddrSeq: seq[MultiAddress] protoSeq: seq[string] storedInfo = RemotePeerInfo() + rlpBytes: seq[byte] + connectedness: uint32 + disconnectTime: uint64 var pb = initProtoBuffer(buffer) @@ -39,11 +42,20 @@ proc init*(T: type RemotePeerInfo, buffer: seq[byte]): ProtoResult[T] = discard ? pb.getRepeatedField(2, multiaddrSeq) discard ? pb.getRepeatedField(3, protoSeq) discard ? pb.getField(4, storedInfo.publicKey) - - # TODO: Store the rest of parameters such as connectedness and disconnectTime + discard ? pb.getField(5, connectedness) + discard ? pb.getField(6, disconnectTime) + let hasENR = ? pb.getField(7, rlpBytes) storedInfo.addrs = multiaddrSeq storedInfo.protocols = protoSeq + storedInfo.connectedness = Connectedness(connectedness) + storedInfo.disconnectTime = int64(disconnectTime) + + if hasENR: + var record: Record + + if record.fromBytes(rlpBytes): + storedInfo.enr = some(record) ok(storedInfo) @@ -58,113 +70,104 @@ proc encode*(remotePeerInfo: RemotePeerInfo): PeerStorageResult[ProtoBuffer] = for proto in remotePeerInfo.protocols.items: pb.write(3, proto) - try: - pb.write(4, remotePeerInfo.publicKey) - except ResultError[CryptoError] as e: - return err("Failed to encode public key") + let catchRes = catch: pb.write(4, remotePeerInfo.publicKey) + if catchRes.isErr(): + return err("Enncoding public key failed: " & catchRes.error.msg) - ok(pb) + pb.write(5, uint32(ord(remotePeerInfo.connectedness))) + + pb.write(6, uint64(remotePeerInfo.disconnectTime)) + + if remotePeerInfo.enr.isSome(): + pb.write(7, remotePeerInfo.enr.get().raw) + + return ok(pb) ########################## # Storage implementation # ########################## proc new*(T: type WakuPeerStorage, db: SqliteDatabase): PeerStorageResult[T] = - ## Misconfiguration can lead to nil DB + # Misconfiguration can lead to nil DB if db.isNil(): return err("db not initialized") - ## Create the "Peer" table - ## It contains: - ## - peer id as primary key, stored as a blob - ## - stored info (serialised protobuf), stored as a blob - ## - last known enumerated connectedness state, stored as an integer - ## - disconnect time in epoch seconds, if applicable + # Create the "Peer" table + # It contains: + # - peer id as primary key, stored as a blob + # - stored info (serialised protobuf), stored as a blob + let createStmt = db.prepareStmt( + """ + CREATE TABLE IF NOT EXISTS Peer ( + peerId BLOB PRIMARY KEY, + storedInfo BLOB + ) WITHOUT ROWID; + """, + NoParams, + void + ).expect("Valid statement") - # TODO: connectedness and disconnectTime are now stored in the storedInfo type - let - createStmt = db.prepareStmt(""" - CREATE TABLE IF NOT EXISTS Peer ( - peerId BLOB PRIMARY KEY, - storedInfo BLOB, - connectedness INTEGER, - disconnectTime INTEGER - ) WITHOUT ROWID; - """, NoParams, void).expect("this is a valid statement") - - let res = createStmt.exec(()) - if res.isErr: + createStmt.exec(()).isOkOr: return err("failed to exec") # We dispose of this prepared statement here, as we never use it again createStmt.dispose() - ## Reusable prepared statements - let - replaceStmt = db.prepareStmt( - "REPLACE INTO Peer (peerId, storedInfo, connectedness, disconnectTime) VALUES (?, ?, ?, ?);", - (seq[byte], seq[byte], int32, int64), - void - ).expect("this is a valid statement") + # Reusable prepared statements + let replaceStmt = db.prepareStmt( + "REPLACE INTO Peer (peerId, storedInfo) VALUES (?, ?);", + (seq[byte], seq[byte]), + void + ).expect("Valid statement") - ## General initialization + # General initialization + let ps = WakuPeerStorage(database: db, replaceStmt: replaceStmt) - ok(WakuPeerStorage(database: db, - replaceStmt: replaceStmt)) - - -method put*(db: WakuPeerStorage, - peerId: PeerID, - remotePeerInfo: RemotePeerInfo, - connectedness: Connectedness, - disconnectTime: int64): PeerStorageResult[void] = + return ok(ps) +method put*( + db: WakuPeerStorage, + remotePeerInfo: RemotePeerInfo + ): PeerStorageResult[void] = ## Adds a peer to storage or replaces existing entry if it already exists - let encoded = remotePeerInfo.encode() + + let encoded = remotePeerInfo.encode().valueOr: + return err("peer info encoding failed: " & error) - if encoded.isErr: - return err("failed to encode: " & encoded.error()) + db.replaceStmt.exec((remotePeerInfo.peerId.data, encoded.buffer)).isOkOr: + return err("DB operation failed: " & error) - let res = db.replaceStmt.exec((peerId.data, encoded.get().buffer, int32(ord(connectedness)), disconnectTime)) - if res.isErr: - return err("failed") + return ok() - ok() - -method getAll*(db: WakuPeerStorage, onData: peer_storage.DataProc): PeerStorageResult[bool] = +method getAll*( + db: WakuPeerStorage, + onData: peer_storage.DataProc + ): PeerStorageResult[void] = ## Retrieves all peers from storage - var gotPeers = false - - proc peer(s: ptr sqlite3_stmt) {.raises: [Defect, LPError, ResultError[ProtoError]].} = - gotPeers = true + + proc peer(s: ptr sqlite3_stmt) {.raises: [ResultError[ProtoError]].} = let - # Peer ID - pId = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 0)) - pIdL = sqlite3_column_bytes(s, 0) - peerId = PeerID.init(@(toOpenArray(pId, 0, pIdL - 1))).tryGet() # Stored Info sTo = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 1)) sToL = sqlite3_column_bytes(s, 1) - storedInfo = RemotePeerInfo.init(@(toOpenArray(sTo, 0, sToL - 1))).tryGet() - # Connectedness - connectedness = Connectedness(sqlite3_column_int(s, 2)) - # DisconnectTime - disconnectTime = sqlite3_column_int64(s, 3) + storedInfo = RemotePeerInfo.decode(@(toOpenArray(sTo, 0, sToL - 1))).tryGet() - onData(peerId, storedInfo, connectedness, disconnectTime) + onData(storedInfo) - var queryResult: DatabaseResult[bool] - try: - queryResult = db.database.query("SELECT peerId, storedInfo, connectedness, disconnectTime FROM Peer", peer) - except LPError, ResultError[ProtoError]: - return err("failed to extract peer from query result") + let catchRes = catch: db.database.query("SELECT peerId, storedInfo FROM Peer", peer) - if queryResult.isErr: - return err("failed") + let queryRes = + if catchRes.isErr(): + return err("failed to extract peer from query result: " & catchRes.error.msg) + else: catchRes.get() - ok gotPeers + if queryRes.isErr(): + return err("peer storage query failed: " & queryRes.error) + + return ok() proc close*(db: WakuPeerStorage) = ## Closes the database. + db.replaceStmt.dispose() db.database.close()