diff --git a/CHANGELOG.md b/CHANGELOG.md index 7e2b2829d..179dc38c6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ - Added a peer manager for `relay`, `filter`, `store` and `swap` peers. - `relay`, `filter`, `store` and `swap` peers are now stored in a common, shared peer store and no longer in separate sets. - Admin API now provides a `post` method to connect to peers on an ad-hoc basis +- Added persistent peer storage ## 2021-01-05 v0.2 diff --git a/tests/v2/test_message_store.nim b/tests/v2/test_message_store.nim index ce51147ca..8a2f4bb33 100644 --- a/tests/v2/test_message_store.nim +++ b/tests/v2/test_message_store.nim @@ -3,7 +3,7 @@ import std/[unittest, options, tables, sets], chronos, chronicles, - ../../waku/v2/node/message_store/waku_message_store, + ../../waku/v2/node/storage/message/waku_message_store, ../../waku/v2/protocol/waku_store/waku_store, ./utils diff --git a/tests/v2/test_peer_storage.nim b/tests/v2/test_peer_storage.nim new file mode 100644 index 000000000..b41f3ac22 --- /dev/null +++ b/tests/v2/test_peer_storage.nim @@ -0,0 +1,42 @@ +{.used.} + +import + std/[unittest, sets], + libp2p/crypto/crypto, + ../test_helpers, + ../../waku/v2/node/peer_manager, + ../../waku/v2/node/storage/peer/waku_peer_storage + +suite "Peer Storage": + + test "Store and retrieve from persistent peer storage": + let + database = SqliteDatabase.init("", inMemory = true)[] + storage = WakuPeerStorage.init(database)[] + + # Test Peer + peerLoc = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet() + peerKey = crypto.PrivateKey.random(ECDSA, rng[]).get() + peer = PeerInfo.init(peerKey, @[peerLoc]) + peerProto = "/waku/2/default-waku/codec" + stored = StoredInfo(peerId: peer.peerId, addrs: toHashSet([peerLoc]), protos: toHashSet([peerProto]), publicKey: peerKey.getKey().tryGet()) + conn = Connectedness.CanConnect + + defer: storage.close() + + discard storage.put(peer.peerId, stored, conn) + + var responseCount = 0 + proc data(peerId: PeerID, storedInfo: StoredInfo, + connectedness: Connectedness) = + responseCount += 1 + check: + peerId == peer.peerId + storedInfo == stored + connectedness == conn + + let res = storage.getAll(data) + + check: + res.isErr == false + responseCount == 1 diff --git a/tests/v2/test_waku_store.nim b/tests/v2/test_waku_store.nim index a0c734725..2e19baa75 100644 --- a/tests/v2/test_waku_store.nim +++ b/tests/v2/test_waku_store.nim @@ -10,7 +10,7 @@ import libp2p/protocols/pubsub/rpc/message, ../../waku/v2/protocol/[waku_message, message_notifier], ../../waku/v2/protocol/waku_store/waku_store, - ../../waku/v2/node/message_store/waku_message_store, + ../../waku/v2/node/storage/message/waku_message_store, ../../waku/v2/node/peer_manager, ../test_helpers, ./utils diff --git a/waku/v2/node/message_store/message_store.nim b/waku/v2/node/storage/message/message_store.nim similarity index 90% rename from waku/v2/node/message_store/message_store.nim rename to waku/v2/node/storage/message/message_store.nim index af7ff1a23..6de59d2b3 100644 --- a/waku/v2/node/message_store/message_store.nim +++ b/waku/v2/node/storage/message/message_store.nim @@ -1,7 +1,7 @@ import stew/results, - ../../protocol/waku_message, - ../../utils/pagination + ../../../protocol/waku_message, + ../../../utils/pagination ## This module defines a message store interface. Implementations of ## MessageStore are used by the `WakuStore` protocol to store and re- diff --git a/waku/v2/node/message_store/waku_message_store.nim b/waku/v2/node/storage/message/waku_message_store.nim similarity index 96% rename from waku/v2/node/message_store/waku_message_store.nim rename to waku/v2/node/storage/message/waku_message_store.nim index 26ac42157..a4fa09299 100644 --- a/waku/v2/node/message_store/waku_message_store.nim +++ b/waku/v2/node/storage/message/waku_message_store.nim @@ -5,11 +5,11 @@ import libp2p/protocols/protocol, libp2p/protobuf/minprotobuf, libp2p/stream/connection, - stew/results, metrics, - ./sqlite, + stew/results, ./message_store, - ../../protocol/waku_message, - ../../utils/pagination + ../sqlite, + ../../../protocol/waku_message, + ../../../utils/pagination export sqlite diff --git a/waku/v2/node/storage/peer/waku_peer_storage.nim b/waku/v2/node/storage/peer/waku_peer_storage.nim new file mode 100644 index 000000000..428b58d9a --- /dev/null +++ b/waku/v2/node/storage/peer/waku_peer_storage.nim @@ -0,0 +1,136 @@ +import + std/sets, + sqlite3_abi, + chronos, metrics, + libp2p/protobuf/minprotobuf, + stew/results, + ../sqlite, + ../../peer_manager + +export sqlite + +type + WakuPeerStorage* = ref object of RootObj + database*: SqliteDatabase + + WakuPeerStorageResult*[T] = Result[T, string] + + DataProc* = proc(peerId: PeerID, storedInfo: StoredInfo, + connectedness: Connectedness) {.closure.} + + +########################## +# Protobuf Serialisation # +########################## + +proc init*(T: type StoredInfo, buffer: seq[byte]): ProtoResult[T] = + var + multiaddrSeq: seq[MultiAddress] + protoSeq: seq[string] + storedInfo = StoredInfo() + + var pb = initProtoBuffer(buffer) + + discard ? pb.getField(1, storedInfo.peerId) + discard ? pb.getRepeatedField(2, multiaddrSeq) + discard ? pb.getRepeatedField(3, protoSeq) + discard ? pb.getField(4, storedInfo.publicKey) + + storedInfo.addrs = toHashSet(multiaddrSeq) + storedInfo.protos = toHashSet(protoSeq) + + ok(storedInfo) + +proc encode*(storedInfo: StoredInfo): ProtoBuffer = + var pb = initProtoBuffer() + + pb.write(1, storedInfo.peerId) + + for multiaddr in storedInfo.addrs.items: + pb.write(2, multiaddr) + + for proto in storedInfo.protos.items: + pb.write(3, proto) + + pb.write(4, storedInfo.publicKey) + + return pb + +########################## +# Storage implementation # +########################## + +proc init*(T: type WakuPeerStorage, db: SqliteDatabase): WakuPeerStorageResult[T] = + ## Create the "Peers" table + ## It contains: + ## - peer id as primary key, stored as a blob + ## - stored info (serialised protobuf), stored as a blob + ## - last known enumerated connectedness state, stored as an integer + let prepare = db.prepareStmt(""" + CREATE TABLE IF NOT EXISTS Peers ( + peerId BLOB PRIMARY KEY, + storedInfo BLOB, + connectedness INTEGER + ) WITHOUT ROWID; + """, NoParams, void) + + if prepare.isErr: + return err("failed to prepare") + + let res = prepare.value.exec(()) + if res.isErr: + return err("failed to exec") + + ok(WakuPeerStorage(database: db)) + + +proc put*(db: WakuPeerStorage, + peerId: PeerID, + storedInfo: StoredInfo, + connectedness: Connectedness): WakuPeerStorageResult[void] = + + ## Adds a peer to storage + let prepare = db.database.prepareStmt( + "INSERT INTO Peers (peerId, storedInfo, connectedness) VALUES (?, ?, ?);", + (seq[byte], seq[byte], int32), + void + ) + + if prepare.isErr: + return err("failed to prepare") + + let res = prepare.value.exec((peerId.data, storedInfo.encode().buffer, int32(ord(connectedness)))) + if res.isErr: + return err("failed") + + ok() + +proc getAll*(db: WakuPeerStorage, onData: DataProc): WakuPeerStorageResult[bool] = + ## Retrieves all peers from storage + var gotPeers = false + + proc peer(s: ptr sqlite3_stmt) = + gotPeers = true + let + # Peer ID + pId = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 0)) + pIdL = sqlite3_column_bytes(s, 0) + peerId = PeerID.init(@(toOpenArray(pId, 0, pIdL - 1))).tryGet() + # Stored Info + sTo = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 1)) + sToL = sqlite3_column_bytes(s, 1) + storedInfo = StoredInfo.init(@(toOpenArray(sTo, 0, sToL - 1))).tryGet() + # Connectedness + connectedness = Connectedness(sqlite3_column_int(s, 2)) + + onData(peerId, storedInfo, connectedness) + + let res = db.database.query("SELECT peerId, storedInfo, connectedness FROM Peers", peer) + if res.isErr: + return err("failed") + + ok gotPeers + +proc close*(db: WakuPeerStorage) = + ## Closes the database. + db.database.close() \ No newline at end of file diff --git a/waku/v2/node/message_store/sqlite.nim b/waku/v2/node/storage/sqlite.nim similarity index 94% rename from waku/v2/node/message_store/sqlite.nim rename to waku/v2/node/storage/sqlite.nim index 1d2127e36..930e3957c 100644 --- a/waku/v2/node/message_store/sqlite.nim +++ b/waku/v2/node/storage/sqlite.nim @@ -1,5 +1,5 @@ import - os, + os, sqlite3_abi, chronos, chronicles, metrics, stew/results, libp2p/crypto/crypto, @@ -139,8 +139,11 @@ proc bindParam*(s: RawStmtPtr, n: int, val: auto): cint = sqlite3_bind_int(s, int(n).cint, int(val).cint) elif val is int64: sqlite3_bind_int64(s, n.cint, val) + # Note: bind_text not yet supported in sqlite3_abi wrapper + # elif val is string: + # sqlite3_bind_text(s, n.cint, val.cstring, -1, nil) # `-1` implies string length is the number of bytes up to the first null-terminator else: - {.fatal: "Please add support for the 'kek' type".} + {.fatal: "Please add support for the '" & $typeof(val) & "' type".} template bindParams(s: RawStmtPtr, params: auto) = when params is tuple: diff --git a/waku/v2/node/wakunode2.nim b/waku/v2/node/wakunode2.nim index 379ab38c6..bb461caf5 100644 --- a/waku/v2/node/wakunode2.nim +++ b/waku/v2/node/wakunode2.nim @@ -17,7 +17,7 @@ import ../protocol/waku_filter/waku_filter, ../protocol/waku_rln_relay/[rln,waku_rln_relay_utils], ../utils/peers, - ./message_store/message_store, + ./storage/message/message_store, ../utils/requests, ./peer_manager @@ -475,7 +475,7 @@ when isMainModule: private_api, relay_api, store_api], - ./message_store/waku_message_store, + ./storage/message/waku_message_store, ../../common/utils/nat proc startRpc(node: WakuNode, rpcIp: ValidIpAddress, rpcPort: Port, conf: WakuNodeConf) = diff --git a/waku/v2/protocol/waku_store/waku_store.nim b/waku/v2/protocol/waku_store/waku_store.nim index 9e3a258d7..a68605ed7 100644 --- a/waku/v2/protocol/waku_store/waku_store.nim +++ b/waku/v2/protocol/waku_store/waku_store.nim @@ -11,7 +11,7 @@ import libp2p/protobuf/minprotobuf, libp2p/stream/connection, ../message_notifier, - ../../node/message_store/message_store, + ../../node/storage/message/message_store, ../waku_swap/waku_swap, ./waku_store_types, ../../utils/requests, diff --git a/waku/v2/protocol/waku_store/waku_store_types.nim b/waku/v2/protocol/waku_store/waku_store_types.nim index 09157bece..e14480ec0 100644 --- a/waku/v2/protocol/waku_store/waku_store_types.nim +++ b/waku/v2/protocol/waku_store/waku_store_types.nim @@ -6,7 +6,7 @@ import libp2p/protocols/protocol, ../waku_swap/waku_swap_types, ../waku_message, - ../../node/message_store/message_store, + ../../node/storage/message/message_store, ../../utils/pagination, ../../node/peer_manager