mirror of https://github.com/waku-org/nwaku.git
Added persistent storage for peers (#435)
This commit is contained in:
parent
29d69b98cb
commit
7a732e7cc6
|
@ -8,6 +8,7 @@
|
||||||
- Added a peer manager for `relay`, `filter`, `store` and `swap` peers.
|
- Added a peer manager for `relay`, `filter`, `store` and `swap` peers.
|
||||||
- `relay`, `filter`, `store` and `swap` peers are now stored in a common, shared peer store and no longer in separate sets.
|
- `relay`, `filter`, `store` and `swap` peers are now stored in a common, shared peer store and no longer in separate sets.
|
||||||
- Admin API now provides a `post` method to connect to peers on an ad-hoc basis
|
- Admin API now provides a `post` method to connect to peers on an ad-hoc basis
|
||||||
|
- Added persistent peer storage
|
||||||
|
|
||||||
## 2021-01-05 v0.2
|
## 2021-01-05 v0.2
|
||||||
|
|
||||||
|
|
|
@ -3,7 +3,7 @@
|
||||||
import
|
import
|
||||||
std/[unittest, options, tables, sets],
|
std/[unittest, options, tables, sets],
|
||||||
chronos, chronicles,
|
chronos, chronicles,
|
||||||
../../waku/v2/node/message_store/waku_message_store,
|
../../waku/v2/node/storage/message/waku_message_store,
|
||||||
../../waku/v2/protocol/waku_store/waku_store,
|
../../waku/v2/protocol/waku_store/waku_store,
|
||||||
./utils
|
./utils
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,42 @@
|
||||||
|
{.used.}
|
||||||
|
|
||||||
|
import
|
||||||
|
std/[unittest, sets],
|
||||||
|
libp2p/crypto/crypto,
|
||||||
|
../test_helpers,
|
||||||
|
../../waku/v2/node/peer_manager,
|
||||||
|
../../waku/v2/node/storage/peer/waku_peer_storage
|
||||||
|
|
||||||
|
suite "Peer Storage":
|
||||||
|
|
||||||
|
test "Store and retrieve from persistent peer storage":
|
||||||
|
let
|
||||||
|
database = SqliteDatabase.init("", inMemory = true)[]
|
||||||
|
storage = WakuPeerStorage.init(database)[]
|
||||||
|
|
||||||
|
# Test Peer
|
||||||
|
peerLoc = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet()
|
||||||
|
peerKey = crypto.PrivateKey.random(ECDSA, rng[]).get()
|
||||||
|
peer = PeerInfo.init(peerKey, @[peerLoc])
|
||||||
|
peerProto = "/waku/2/default-waku/codec"
|
||||||
|
stored = StoredInfo(peerId: peer.peerId, addrs: toHashSet([peerLoc]), protos: toHashSet([peerProto]), publicKey: peerKey.getKey().tryGet())
|
||||||
|
conn = Connectedness.CanConnect
|
||||||
|
|
||||||
|
defer: storage.close()
|
||||||
|
|
||||||
|
discard storage.put(peer.peerId, stored, conn)
|
||||||
|
|
||||||
|
var responseCount = 0
|
||||||
|
proc data(peerId: PeerID, storedInfo: StoredInfo,
|
||||||
|
connectedness: Connectedness) =
|
||||||
|
responseCount += 1
|
||||||
|
check:
|
||||||
|
peerId == peer.peerId
|
||||||
|
storedInfo == stored
|
||||||
|
connectedness == conn
|
||||||
|
|
||||||
|
let res = storage.getAll(data)
|
||||||
|
|
||||||
|
check:
|
||||||
|
res.isErr == false
|
||||||
|
responseCount == 1
|
|
@ -10,7 +10,7 @@ import
|
||||||
libp2p/protocols/pubsub/rpc/message,
|
libp2p/protocols/pubsub/rpc/message,
|
||||||
../../waku/v2/protocol/[waku_message, message_notifier],
|
../../waku/v2/protocol/[waku_message, message_notifier],
|
||||||
../../waku/v2/protocol/waku_store/waku_store,
|
../../waku/v2/protocol/waku_store/waku_store,
|
||||||
../../waku/v2/node/message_store/waku_message_store,
|
../../waku/v2/node/storage/message/waku_message_store,
|
||||||
../../waku/v2/node/peer_manager,
|
../../waku/v2/node/peer_manager,
|
||||||
../test_helpers, ./utils
|
../test_helpers, ./utils
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
import
|
import
|
||||||
stew/results,
|
stew/results,
|
||||||
../../protocol/waku_message,
|
../../../protocol/waku_message,
|
||||||
../../utils/pagination
|
../../../utils/pagination
|
||||||
|
|
||||||
## This module defines a message store interface. Implementations of
|
## This module defines a message store interface. Implementations of
|
||||||
## MessageStore are used by the `WakuStore` protocol to store and re-
|
## MessageStore are used by the `WakuStore` protocol to store and re-
|
|
@ -5,11 +5,11 @@ import
|
||||||
libp2p/protocols/protocol,
|
libp2p/protocols/protocol,
|
||||||
libp2p/protobuf/minprotobuf,
|
libp2p/protobuf/minprotobuf,
|
||||||
libp2p/stream/connection,
|
libp2p/stream/connection,
|
||||||
stew/results, metrics,
|
stew/results,
|
||||||
./sqlite,
|
|
||||||
./message_store,
|
./message_store,
|
||||||
../../protocol/waku_message,
|
../sqlite,
|
||||||
../../utils/pagination
|
../../../protocol/waku_message,
|
||||||
|
../../../utils/pagination
|
||||||
|
|
||||||
export sqlite
|
export sqlite
|
||||||
|
|
|
@ -0,0 +1,136 @@
|
||||||
|
import
|
||||||
|
std/sets,
|
||||||
|
sqlite3_abi,
|
||||||
|
chronos, metrics,
|
||||||
|
libp2p/protobuf/minprotobuf,
|
||||||
|
stew/results,
|
||||||
|
../sqlite,
|
||||||
|
../../peer_manager
|
||||||
|
|
||||||
|
export sqlite
|
||||||
|
|
||||||
|
type
|
||||||
|
WakuPeerStorage* = ref object of RootObj
|
||||||
|
database*: SqliteDatabase
|
||||||
|
|
||||||
|
WakuPeerStorageResult*[T] = Result[T, string]
|
||||||
|
|
||||||
|
DataProc* = proc(peerId: PeerID, storedInfo: StoredInfo,
|
||||||
|
connectedness: Connectedness) {.closure.}
|
||||||
|
|
||||||
|
|
||||||
|
##########################
|
||||||
|
# Protobuf Serialisation #
|
||||||
|
##########################
|
||||||
|
|
||||||
|
proc init*(T: type StoredInfo, buffer: seq[byte]): ProtoResult[T] =
|
||||||
|
var
|
||||||
|
multiaddrSeq: seq[MultiAddress]
|
||||||
|
protoSeq: seq[string]
|
||||||
|
storedInfo = StoredInfo()
|
||||||
|
|
||||||
|
var pb = initProtoBuffer(buffer)
|
||||||
|
|
||||||
|
discard ? pb.getField(1, storedInfo.peerId)
|
||||||
|
discard ? pb.getRepeatedField(2, multiaddrSeq)
|
||||||
|
discard ? pb.getRepeatedField(3, protoSeq)
|
||||||
|
discard ? pb.getField(4, storedInfo.publicKey)
|
||||||
|
|
||||||
|
storedInfo.addrs = toHashSet(multiaddrSeq)
|
||||||
|
storedInfo.protos = toHashSet(protoSeq)
|
||||||
|
|
||||||
|
ok(storedInfo)
|
||||||
|
|
||||||
|
proc encode*(storedInfo: StoredInfo): ProtoBuffer =
|
||||||
|
var pb = initProtoBuffer()
|
||||||
|
|
||||||
|
pb.write(1, storedInfo.peerId)
|
||||||
|
|
||||||
|
for multiaddr in storedInfo.addrs.items:
|
||||||
|
pb.write(2, multiaddr)
|
||||||
|
|
||||||
|
for proto in storedInfo.protos.items:
|
||||||
|
pb.write(3, proto)
|
||||||
|
|
||||||
|
pb.write(4, storedInfo.publicKey)
|
||||||
|
|
||||||
|
return pb
|
||||||
|
|
||||||
|
##########################
|
||||||
|
# Storage implementation #
|
||||||
|
##########################
|
||||||
|
|
||||||
|
proc init*(T: type WakuPeerStorage, db: SqliteDatabase): WakuPeerStorageResult[T] =
|
||||||
|
## Create the "Peers" table
|
||||||
|
## It contains:
|
||||||
|
## - peer id as primary key, stored as a blob
|
||||||
|
## - stored info (serialised protobuf), stored as a blob
|
||||||
|
## - last known enumerated connectedness state, stored as an integer
|
||||||
|
let prepare = db.prepareStmt("""
|
||||||
|
CREATE TABLE IF NOT EXISTS Peers (
|
||||||
|
peerId BLOB PRIMARY KEY,
|
||||||
|
storedInfo BLOB,
|
||||||
|
connectedness INTEGER
|
||||||
|
) WITHOUT ROWID;
|
||||||
|
""", NoParams, void)
|
||||||
|
|
||||||
|
if prepare.isErr:
|
||||||
|
return err("failed to prepare")
|
||||||
|
|
||||||
|
let res = prepare.value.exec(())
|
||||||
|
if res.isErr:
|
||||||
|
return err("failed to exec")
|
||||||
|
|
||||||
|
ok(WakuPeerStorage(database: db))
|
||||||
|
|
||||||
|
|
||||||
|
proc put*(db: WakuPeerStorage,
|
||||||
|
peerId: PeerID,
|
||||||
|
storedInfo: StoredInfo,
|
||||||
|
connectedness: Connectedness): WakuPeerStorageResult[void] =
|
||||||
|
|
||||||
|
## Adds a peer to storage
|
||||||
|
let prepare = db.database.prepareStmt(
|
||||||
|
"INSERT INTO Peers (peerId, storedInfo, connectedness) VALUES (?, ?, ?);",
|
||||||
|
(seq[byte], seq[byte], int32),
|
||||||
|
void
|
||||||
|
)
|
||||||
|
|
||||||
|
if prepare.isErr:
|
||||||
|
return err("failed to prepare")
|
||||||
|
|
||||||
|
let res = prepare.value.exec((peerId.data, storedInfo.encode().buffer, int32(ord(connectedness))))
|
||||||
|
if res.isErr:
|
||||||
|
return err("failed")
|
||||||
|
|
||||||
|
ok()
|
||||||
|
|
||||||
|
proc getAll*(db: WakuPeerStorage, onData: DataProc): WakuPeerStorageResult[bool] =
|
||||||
|
## Retrieves all peers from storage
|
||||||
|
var gotPeers = false
|
||||||
|
|
||||||
|
proc peer(s: ptr sqlite3_stmt) =
|
||||||
|
gotPeers = true
|
||||||
|
let
|
||||||
|
# Peer ID
|
||||||
|
pId = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 0))
|
||||||
|
pIdL = sqlite3_column_bytes(s, 0)
|
||||||
|
peerId = PeerID.init(@(toOpenArray(pId, 0, pIdL - 1))).tryGet()
|
||||||
|
# Stored Info
|
||||||
|
sTo = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 1))
|
||||||
|
sToL = sqlite3_column_bytes(s, 1)
|
||||||
|
storedInfo = StoredInfo.init(@(toOpenArray(sTo, 0, sToL - 1))).tryGet()
|
||||||
|
# Connectedness
|
||||||
|
connectedness = Connectedness(sqlite3_column_int(s, 2))
|
||||||
|
|
||||||
|
onData(peerId, storedInfo, connectedness)
|
||||||
|
|
||||||
|
let res = db.database.query("SELECT peerId, storedInfo, connectedness FROM Peers", peer)
|
||||||
|
if res.isErr:
|
||||||
|
return err("failed")
|
||||||
|
|
||||||
|
ok gotPeers
|
||||||
|
|
||||||
|
proc close*(db: WakuPeerStorage) =
|
||||||
|
## Closes the database.
|
||||||
|
db.database.close()
|
|
@ -1,5 +1,5 @@
|
||||||
import
|
import
|
||||||
os,
|
os,
|
||||||
sqlite3_abi,
|
sqlite3_abi,
|
||||||
chronos, chronicles, metrics, stew/results,
|
chronos, chronicles, metrics, stew/results,
|
||||||
libp2p/crypto/crypto,
|
libp2p/crypto/crypto,
|
||||||
|
@ -139,8 +139,11 @@ proc bindParam*(s: RawStmtPtr, n: int, val: auto): cint =
|
||||||
sqlite3_bind_int(s, int(n).cint, int(val).cint)
|
sqlite3_bind_int(s, int(n).cint, int(val).cint)
|
||||||
elif val is int64:
|
elif val is int64:
|
||||||
sqlite3_bind_int64(s, n.cint, val)
|
sqlite3_bind_int64(s, n.cint, val)
|
||||||
|
# Note: bind_text not yet supported in sqlite3_abi wrapper
|
||||||
|
# elif val is string:
|
||||||
|
# sqlite3_bind_text(s, n.cint, val.cstring, -1, nil) # `-1` implies string length is the number of bytes up to the first null-terminator
|
||||||
else:
|
else:
|
||||||
{.fatal: "Please add support for the 'kek' type".}
|
{.fatal: "Please add support for the '" & $typeof(val) & "' type".}
|
||||||
|
|
||||||
template bindParams(s: RawStmtPtr, params: auto) =
|
template bindParams(s: RawStmtPtr, params: auto) =
|
||||||
when params is tuple:
|
when params is tuple:
|
|
@ -17,7 +17,7 @@ import
|
||||||
../protocol/waku_filter/waku_filter,
|
../protocol/waku_filter/waku_filter,
|
||||||
../protocol/waku_rln_relay/[rln,waku_rln_relay_utils],
|
../protocol/waku_rln_relay/[rln,waku_rln_relay_utils],
|
||||||
../utils/peers,
|
../utils/peers,
|
||||||
./message_store/message_store,
|
./storage/message/message_store,
|
||||||
../utils/requests,
|
../utils/requests,
|
||||||
./peer_manager
|
./peer_manager
|
||||||
|
|
||||||
|
@ -475,7 +475,7 @@ when isMainModule:
|
||||||
private_api,
|
private_api,
|
||||||
relay_api,
|
relay_api,
|
||||||
store_api],
|
store_api],
|
||||||
./message_store/waku_message_store,
|
./storage/message/waku_message_store,
|
||||||
../../common/utils/nat
|
../../common/utils/nat
|
||||||
|
|
||||||
proc startRpc(node: WakuNode, rpcIp: ValidIpAddress, rpcPort: Port, conf: WakuNodeConf) =
|
proc startRpc(node: WakuNode, rpcIp: ValidIpAddress, rpcPort: Port, conf: WakuNodeConf) =
|
||||||
|
|
|
@ -11,7 +11,7 @@ import
|
||||||
libp2p/protobuf/minprotobuf,
|
libp2p/protobuf/minprotobuf,
|
||||||
libp2p/stream/connection,
|
libp2p/stream/connection,
|
||||||
../message_notifier,
|
../message_notifier,
|
||||||
../../node/message_store/message_store,
|
../../node/storage/message/message_store,
|
||||||
../waku_swap/waku_swap,
|
../waku_swap/waku_swap,
|
||||||
./waku_store_types,
|
./waku_store_types,
|
||||||
../../utils/requests,
|
../../utils/requests,
|
||||||
|
|
|
@ -6,7 +6,7 @@ import
|
||||||
libp2p/protocols/protocol,
|
libp2p/protocols/protocol,
|
||||||
../waku_swap/waku_swap_types,
|
../waku_swap/waku_swap_types,
|
||||||
../waku_message,
|
../waku_message,
|
||||||
../../node/message_store/message_store,
|
../../node/storage/message/message_store,
|
||||||
../../utils/pagination,
|
../../utils/pagination,
|
||||||
../../node/peer_manager
|
../../node/peer_manager
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue