refactor(store): protocol code refactoring and api reorganization

This commit is contained in:
Lorenzo Delgado 2022-09-20 11:39:52 +02:00 committed by GitHub
parent 269bb4944b
commit 4a9f6f3c5e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 307 additions and 339 deletions

View File

@ -449,7 +449,7 @@ proc processInput(rfd: AsyncFD) {.async.} =
await node.mountSwap()
if (conf.storenode != "") or (conf.store == true):
await node.mountStore(persistMessages = conf.persistMessages)
await node.mountStore()
var storenode: Option[RemotePeerInfo]

View File

@ -5,20 +5,22 @@ import
./v2/test_wakunode,
./v2/test_wakunode_relay,
./v2/test_wakunode_lightpush,
# Waku Store
./v2/test_utils_pagination,
./v2/test_message_store_queue,
./v2/test_message_store_queue_pagination,
./v2/test_message_store_sqlite_query,
./v2/test_message_store_sqlite,
./v2/test_waku_store_rpc_codec,
./v2/test_waku_store,
./v2/test_wakunode_store,
# Waku Filter
./v2/test_waku_filter,
./v2/test_wakunode_filter,
./v2/test_waku_payload,
./v2/test_waku_swap,
./v2/test_utils_peers,
./v2/test_utils_pagination,
./v2/test_message_cache,
./v2/test_message_store_queue,
./v2/test_message_store_queue_pagination,
./v2/test_message_store_sqlite_query,
./v2/test_message_store_sqlite,
./v2/test_jsonrpc_waku,
./v2/test_rest_serdes,
./v2/test_rest_debug_api_serdes,

View File

@ -11,8 +11,10 @@ import
libp2p/stream/[bufferstream, connection],
libp2p/crypto/crypto,
libp2p/protocols/pubsub/pubsub,
libp2p/protocols/pubsub/rpc/message,
libp2p/protocols/pubsub/rpc/message
import
../../waku/v1/node/rpc/hexstrings,
../../waku/v2/node/storage/message/waku_store_queue,
../../waku/v2/node/wakunode2,
../../waku/v2/node/jsonrpc/[store_api,
relay_api,
@ -232,7 +234,7 @@ procSuite "Waku v2 JSON-RPC API":
key = wakunode2.PrivateKey.random(ECDSA, rng[]).get()
peer = PeerInfo.new(key)
await node.mountStore(persistMessages = true)
await node.mountStore(store=StoreQueueRef.new())
var listenSwitch = newStandardSwitch(some(key))
waitFor listenSwitch.start()
@ -528,7 +530,7 @@ procSuite "Waku v2 JSON-RPC API":
await node.mountFilter()
await node.mountSwap()
await node.mountStore(persistMessages = true)
await node.mountStore(store=StoreQueueRef.new())
# Create and set some peers
let

View File

@ -104,7 +104,7 @@ procSuite "Peer Manager":
await node.mountFilter()
await node.mountSwap()
await node.mountStore(persistMessages = true)
await node.mountStore()
node.wakuFilter.setPeer(filterPeer.toRemotePeerInfo())
node.wakuSwap.setPeer(swapPeer.toRemotePeerInfo())

View File

@ -617,7 +617,7 @@ procSuite "Waku Store - fault tolerant store":
check:
successResult.isOk()
successResult.value == 10
proto3.messages.len == 10
proto3.store.getMessagesCount().tryGet() == 10
## Cleanup
await allFutures(dialSwitch3.stop(), listenSwitch3.stop())
@ -681,9 +681,7 @@ procSuite "Waku Store - fault tolerant store":
let response = res.tryGet()
check:
response == 14
check:
proto3.messages.len == 14
proto3.store.getMessagesCount().tryGet() == 14
## Cleanup
await allFutures(listenSwitch3.stop(), dialSwitch3.stop(), offListenSwitch.stop())

View File

@ -9,10 +9,12 @@ import
libp2p/stream/[bufferstream, connection],
libp2p/crypto/[crypto, secp],
libp2p/switch,
eth/keys,
eth/keys
import
../../waku/v2/protocol/waku_message,
../../waku/v2/protocol/waku_store,
../../waku/v2/protocol/waku_swap/waku_swap,
../../waku/v2/node/storage/message/waku_store_queue,
../../waku/v2/node/wakunode2,
../../waku/v2/utils/peers,
../test_helpers, ./utils
@ -50,11 +52,11 @@ procSuite "Waku SWAP Accounting":
asyncTest "Update accounting state after store operations":
let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"),
Port(60000))
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000))
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"),
Port(60001))
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60001))
let
contentTopic = ContentTopic("/waku/2/default-content/proto")
message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic)
@ -63,14 +65,14 @@ procSuite "Waku SWAP Accounting":
# Start nodes and mount protocols
await node1.start()
await node1.mountSwap()
await node1.mountStore(persistMessages = true)
await node1.mountStore(store=StoreQueueRef.new())
await node2.start()
await node2.mountSwap()
await node2.mountStore(persistMessages = true)
await node2.mountStore(store=StoreQueueRef.new())
await node2.wakuStore.handleMessage("/waku/2/default-waku/proto", message)
await sleepAsync(2000.millis)
await sleepAsync(500.millis)
node1.wakuStore.setPeer(node2.switch.peerInfo.toRemotePeerInfo())
node1.wakuSwap.setPeer(node2.switch.peerInfo.toRemotePeerInfo())
@ -97,11 +99,11 @@ procSuite "Waku SWAP Accounting":
asyncTest "Update accounting state after sending cheque":
let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"),
Port(60000))
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000))
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"),
Port(60001))
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60001))
let
contentTopic = ContentTopic("/waku/2/default-content/proto")
message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic)
@ -113,14 +115,14 @@ procSuite "Waku SWAP Accounting":
# Start nodes and mount protocols
await node1.start()
await node1.mountSwap(swapConfig)
await node1.mountStore(persistMessages = true)
await node1.mountStore(store=StoreQueueRef.new())
await node2.start()
await node2.mountSwap(swapConfig)
await node2.mountStore(persistMessages = true)
await node2.mountStore(store=StoreQueueRef.new())
await node2.wakuStore.handleMessage("/waku/2/default-waku/proto", message)
await sleepAsync(2000.millis)
await sleepAsync(500.millis)
node1.wakuStore.setPeer(node2.switch.peerInfo.toRemotePeerInfo())
node1.wakuSwap.setPeer(node2.switch.peerInfo.toRemotePeerInfo())

View File

@ -12,13 +12,13 @@ import
libp2p/switch,
libp2p/protocols/pubsub/rpc/messages,
libp2p/protocols/pubsub/pubsub,
libp2p/protocols/pubsub/gossipsub,
eth/keys
libp2p/protocols/pubsub/gossipsub
import
../../waku/v2/node/storage/sqlite,
../../waku/v2/node/storage/message/message_store,
../../waku/v2/node/storage/message/sqlite_store,
../../waku/v2/node/storage/message/waku_store_queue,
../../waku/v2/protocol/[waku_relay, waku_message],
../../waku/v2/protocol/waku_message,
../../waku/v2/protocol/waku_store,
../../waku/v2/protocol/waku_filter,
../../waku/v2/node/peer_manager/peer_manager,
@ -27,29 +27,32 @@ import
../../waku/v2/utils/time,
../../waku/v2/node/wakunode2
from std/times import epochTime
from std/times import getTime, toUnixFloat
proc newTestMessageStore(): MessageStore =
let database = SqliteDatabase.init("", inMemory = true)[]
SqliteStore.init(database).tryGet()
procSuite "WakuNode - Store":
let rng = keys.newRng()
let rng = crypto.newRng()
asyncTest "Store protocol returns expected message":
let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"),
Port(60000))
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000))
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"),
Port(60002))
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002))
let
contentTopic = ContentTopic("/waku/2/default-content/proto")
message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic)
var completionFut = newFuture[bool]()
await node1.start()
await node1.mountStore(persistMessages = true)
await node1.mountStore(store=newTestMessageStore())
await node2.start()
await node2.mountStore(persistMessages = true)
await node2.mountStore(store=newTestMessageStore())
await node2.wakuStore.handleMessage("/waku/2/default-waku/proto", message)
@ -88,11 +91,11 @@ procSuite "WakuNode - Store":
storeComplFut = newFuture[bool]()
await node1.start()
await node1.mountStore(persistMessages = true)
await node1.mountStore(store=newTestMessageStore())
await node1.mountFilter()
await node2.start()
await node2.mountStore(persistMessages = true)
await node2.mountStore(store=newTestMessageStore())
await node2.mountFilter()
node2.wakuFilter.setPeer(node1.switch.peerInfo.toRemotePeerInfo())
@ -142,9 +145,9 @@ procSuite "WakuNode - Store":
message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic)
await node1.start()
await node1.mountStore(persistMessages = true)
await node1.mountStore(store=newTestMessageStore())
await node2.start()
await node2.mountStore(persistMessages = true)
await node2.mountStore(store=StoreQueueRef.new())
await node2.wakuStore.handleMessage("/waku/2/default-waku/proto", message)
@ -156,61 +159,44 @@ procSuite "WakuNode - Store":
check:
# message is correctly stored
node1.wakuStore.messages.len == 1
node1.wakuStore.store.getMessagesCount().tryGet() == 1
await node1.stop()
await node2.stop()
asyncTest "Resume proc discards duplicate messages":
let timeOrigin = getNanosecondTime(getTime().toUnixFloat())
let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000))
client = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000))
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002))
server = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002))
let
contentTopic = ContentTopic("/waku/2/default-content/proto")
msg1 = WakuMessage(payload: "hello world1".toBytes(), contentTopic: contentTopic, timestamp: 1)
msg2 = WakuMessage(payload: "hello world2".toBytes(), contentTopic: contentTopic, timestamp: 2)
msg1 = WakuMessage(payload: "hello world1".toBytes(), contentTopic: contentTopic, timestamp: timeOrigin + 1)
msg2 = WakuMessage(payload: "hello world2".toBytes(), contentTopic: contentTopic, timestamp: timeOrigin + 2)
msg3 = WakuMessage(payload: "hello world3".toBytes(), contentTopic: contentTopic, timestamp: timeOrigin + 3)
# setup sqlite database for node1
let
database = SqliteDatabase.init("", inMemory = true)[]
store = SqliteStore.init(database).tryGet()
await allFutures(client.start(), server.start())
await client.mountStore(store=StoreQueueRef.new())
await server.mountStore(store=StoreQueueRef.new())
await node1.start()
await node1.mountStore(persistMessages = true, store = store)
await node2.start()
await node2.mountStore(persistMessages = true)
await server.wakuStore.handleMessage(DefaultTopic, msg1)
await server.wakuStore.handleMessage(DefaultTopic, msg2)
await node2.wakuStore.handleMessage(DefaultTopic, msg1)
await node2.wakuStore.handleMessage(DefaultTopic, msg2)
client.wakuStore.setPeer(server.switch.peerInfo.toRemotePeerInfo())
await sleepAsync(500.millis)
node1.wakuStore.setPeer(node2.switch.peerInfo.toRemotePeerInfo())
# populate db with msg1 to be a duplicate
let index1 = Index.compute(msg1, getNanosecondTime(epochTime()), DefaultTopic)
let output1 = store.put(index1, msg1, DefaultTopic)
check output1.isOk
discard node1.wakuStore.messages.put(index1, msg1, DefaultTopic)
# Insert the same message in both node's store
let index3 = Index.compute(msg3, getNanosecondTime(getTime().toUnixFloat() + 10.float), DefaultTopic)
require server.wakuStore.store.put(index3, msg3, DefaultTopic).isOk()
require client.wakuStore.store.put(index3, msg3, DefaultTopic).isOk()
# now run the resume proc
await node1.resume()
# count the total number of retrieved messages from the database
let res = store.getAllMessages()
check:
res.isOk()
await client.resume()
check:
# if the duplicates are discarded properly, then the total number of messages after resume should be 2
# check no duplicates is in the messages field
node1.wakuStore.messages.len == 2
# check no duplicates is in the db
res.value.len == 2
# If the duplicates are discarded properly, then the total number of messages after resume should be 3
client.wakuStore.store.getMessagesCount().tryGet() == 3
await node1.stop()
await node2.stop()
await allFutures(client.stop(), server.stop())

View File

@ -430,7 +430,7 @@ when isMainModule:
waitFor mountLibp2pPing(bridge.nodev2)
if conf.store:
waitFor mountStore(bridge.nodev2, persistMessages = false) # Bridge does not persist messages
waitFor mountStore(bridge.nodev2) # Bridge does not persist messages
if conf.filter:
waitFor mountFilter(bridge.nodev2)

View File

@ -0,0 +1,89 @@
{.push raises: [Defect].}
import
std/options,
stew/results,
chronicles
import
../../../protocol/waku_message,
../../../utils/pagination,
../../../utils/time,
../sqlite,
./message_store,
./waku_store_queue,
./sqlite_store
logScope:
topics = "message_store.dual"
type DualMessageStore* = ref object of MessageStore
inmemory: StoreQueueRef
persistent: SqliteStore
proc init*(T: type DualMessageStore, db: SqliteDatabase, capacity=StoreDefaultCapacity): MessageStoreResult[T] =
let
inmemory = StoreQueueRef.new(capacity)
persistent = ?SqliteStore.init(db)
info "loading messages from persistent storage to in-memory store"
let res = persistent.getAllMessages()
if res.isErr():
warn "failed to load messages from the persistent store", err = res.error
else:
for (receiverTime, msg, pubsubTopic) in res.value:
let index = Index.compute(msg, receiverTime, pubsubTopic)
discard inmemory.put(index, msg, pubsubTopic)
info "successfully loaded messages from the persistent store"
return ok(DualMessageStore(inmemory: inmemory, persistent: persistent))
method put*(s: DualMessageStore, index: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] =
?s.inmemory.put(index, message, pubsubTopic)
?s.persistent.put(index, message, pubsubTopic)
ok()
method getAllMessages*(s: DualMessageStore): MessageStoreResult[seq[MessageStoreRow]] =
s.inmemory.getAllMessages()
method getMessagesByHistoryQuery*(
s: DualMessageStore,
contentTopic = none(seq[ContentTopic]),
pubsubTopic = none(string),
cursor = none(Index),
startTime = none(Timestamp),
endTime = none(Timestamp),
maxPageSize = StoreMaxPageSize,
ascendingOrder = true
): MessageStoreResult[MessageStorePage] =
s.inmemory.getMessagesByHistoryQuery(contentTopic, pubsubTopic, cursor, startTime, endTime, maxPageSize, ascendingOrder)
method getMessagesCount*(s: DualMessageStore): MessageStoreResult[int64] =
s.inmemory.getMessagesCount()
method getOldestMessageTimestamp*(s: DualMessageStore): MessageStoreResult[Timestamp] =
s.inmemory.getOldestMessageTimestamp()
method getNewestMessageTimestamp*(s: DualMessageStore): MessageStoreResult[Timestamp] =
s.inmemory.getNewestMessageTimestamp()
method deleteMessagesOlderThanTimestamp*(s: DualMessageStore, ts: Timestamp): MessageStoreResult[void] =
# NOTE: Current in-memory store deletes messages as they are inserted. This method fails with a "not implemented" error
# ?s.inmemory.deleteMessagesOlderThanTimestamp(ts)
?s.persistent.deleteMessagesOlderThanTimestamp(ts)
ok()
method deleteOldestMessagesNotWithinLimit*(s: DualMessageStore, limit: int): MessageStoreResult[void] =
# NOTE: Current in-memory store deletes messages as they are inserted. This method fails with a "not implemented" error
# ?s.inmemory.deleteOldestMessagesNotWithinLimit(limit)
?s.persistent.deleteOldestMessagesNotWithinLimit(limit)
ok()

View File

@ -11,10 +11,12 @@ import
../../../utils/time,
./message_store
export pagination
logScope:
topics = "message_store.storequeue"
type
IndexedWakuMessage* = object
# TODO may need to rename this object as it holds both the index and the pubsub topic of a waku message

View File

@ -395,11 +395,15 @@ proc resume*(node: WakuNode, peerList: Option[seq[RemotePeerInfo]] = none(seq[Re
## peerList indicates the list of peers to query from. The history is fetched from the first available peer in this list. Such candidates should be found through a discovery method (to be developed).
## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from.
## The history gets fetched successfully if the dialed peer has been online during the queried time window.
if node.wakuStore.isNil():
return
let retrievedMessages = await node.wakuStore.resume(peerList)
if retrievedMessages.isErr():
error "failed to resume store", error=retrievedMessages.error
return
if not node.wakuStore.isNil:
let retrievedMessages = await node.wakuStore.resume(peerList)
if retrievedMessages.isOk:
info "the number of retrieved messages since the last online time: ", number=retrievedMessages.value
info "the number of retrieved messages since the last online time: ", number=retrievedMessages.value
# TODO Extend with more relevant info: topics, peers, memory usage, online time, etc
proc info*(node: WakuNode): WakuInfo =
@ -452,24 +456,18 @@ proc mountSwap*(node: WakuNode, swapConfig: SwapConfig = SwapConfig.init()) {.as
node.switch.mount(node.wakuSwap, protocolMatcher(WakuSwapCodec))
proc mountStore*(node: WakuNode, store: MessageStore = nil, persistMessages: bool = false, capacity = StoreDefaultCapacity, retentionTime = StoreDefaultRetentionTime, isSqliteOnly = false) {.async, raises: [Defect, LPError].} =
proc mountStore*(node: WakuNode, store: MessageStore = nil, capacity = StoreDefaultCapacity, retentionPolicy=none(MessageRetentionPolicy) ) {.async, raises: [Defect, LPError].} =
if node.wakuSwap.isNil():
info "mounting waku store protocol (no waku swap)"
else:
info "mounting waku store protocol with waku swap support"
let retentionPolicy = if isSqliteOnly: TimeRetentionPolicy.init(retentionTime)
else: CapacityRetentionPolicy.init(capacity)
node.wakuStore = WakuStore.init(
node.peerManager,
node.rng,
store,
wakuSwap=node.wakuSwap,
persistMessages=persistMessages,
capacity=capacity,
isSqliteOnly=isSqliteOnly,
retentionPolicy=some(retentionPolicy)
retentionPolicy=retentionPolicy
)
if node.started:
@ -832,6 +830,8 @@ when isMainModule:
./wakunode2_setup_rpc,
./wakunode2_setup_sql_migrations,
./storage/sqlite,
./storage/message/message_store,
./storage/message/dual_message_store,
./storage/message/sqlite_store,
./storage/peer/waku_peer_storage
@ -844,7 +844,7 @@ when isMainModule:
# 1/7 Setup storage
proc setupStorage(conf: WakuNodeConf):
SetupResult[tuple[pStorage: WakuPeerStorage, mStorage: SqliteStore]] =
SetupResult[tuple[pStorage: WakuPeerStorage, mStorage: MessageStore]] =
## Setup a SQLite Database for a wakunode based on a supplied
## configuration file and perform all necessary migration.
@ -854,20 +854,20 @@ when isMainModule:
var
sqliteDatabase: SqliteDatabase
storeTuple: tuple[pStorage: WakuPeerStorage, mStorage: SqliteStore]
storeTuple: tuple[pStorage: WakuPeerStorage, mStorage: MessageStore]
# Setup DB
# Setup database connection
if conf.dbPath != "":
let dbRes = SqliteDatabase.init(conf.dbPath)
if dbRes.isErr:
warn "failed to init database", err = dbRes.error
if dbRes.isErr():
warn "failed to init database connection", err = dbRes.error
waku_node_errors.inc(labelValues = ["init_db_failure"])
return err("failed to init database")
return err("failed to init database connection")
else:
sqliteDatabase = dbRes.value
if not sqliteDatabase.isNil and (conf.persistPeers or conf.persistMessages):
if not sqliteDatabase.isNil():
## Database vacuuming
# TODO: Wrap and move this logic to the appropriate module
let
@ -887,28 +887,33 @@ when isMainModule:
debug "finished sqlite database vacuuming"
# Database initialized. Let's set it up
sqliteDatabase.runMigrations(conf) # First migrate what we have
sqliteDatabase.runMigrations(conf)
if conf.persistPeers:
# Peer persistence enable. Set up Peer table in storage
let res = WakuPeerStorage.new(sqliteDatabase)
if res.isErr:
warn "failed to init new WakuPeerStorage", err = res.error
waku_node_errors.inc(labelValues = ["init_store_failure"])
else:
storeTuple.pStorage = res.value
if conf.persistMessages:
# Historical message persistence enable. Set up Message table in storage
if conf.persistPeers:
let res = WakuPeerStorage.new(sqliteDatabase)
if res.isErr():
warn "failed to init peer store", err = res.error
waku_node_errors.inc(labelValues = ["init_store_failure"])
else:
storeTuple.pStorage = res.value
if conf.persistMessages:
if conf.sqliteStore:
let res = SqliteStore.init(sqliteDatabase)
if res.isErr():
warn "failed to init SqliteStore", err = res.error
warn "failed to init message store", err = res.error
waku_node_errors.inc(labelValues = ["init_store_failure"])
else:
storeTuple.mStorage = res.value
else:
let res = DualMessageStore.init(sqliteDatabase, conf.storeCapacity)
if res.isErr():
warn "failed to init message store", err = res.error
waku_node_errors.inc(labelValues = ["init_store_failure"])
else:
storeTuple.mStorage = res.value
ok(storeTuple)
# 2/7 Retrieve dynamic bootstrap nodes
@ -1039,10 +1044,7 @@ when isMainModule:
ok(node)
# 4/7 Mount and initialize configured protocols
proc setupProtocols(node: WakuNode,
conf: WakuNodeConf,
mStorage: SqliteStore = nil): SetupResult[bool] =
proc setupProtocols(node: WakuNode, conf: WakuNodeConf, mStorage: MessageStore): SetupResult[bool] =
## Setup configured protocols on an existing Waku v2 node.
## Optionally include persistent message storage.
## No protocols are started yet.
@ -1084,7 +1086,9 @@ when isMainModule:
# Store setup
if (conf.storenode != "") or (conf.store):
waitFor mountStore(node, mStorage, conf.persistMessages, conf.storeCapacity, conf.sqliteRetentionTime, conf.sqliteStore)
let retentionPolicy = if conf.sqliteStore: TimeRetentionPolicy.init(conf.sqliteRetentionTime)
else: CapacityRetentionPolicy.init(conf.storeCapacity)
waitFor mountStore(node, mStorage, retentionPolicy=some(retentionPolicy))
if conf.storenode != "":
setStorePeer(node, conf.storenode)
@ -1188,7 +1192,7 @@ when isMainModule:
var
pStorage: WakuPeerStorage
mStorage: SqliteStore
mStorage: MessageStore
let setupStorageRes = setupStorage(conf)

View File

@ -58,6 +58,7 @@ const MaxRpcSize = StoreMaxPageSize * MaxWakuMessageSize + 64*1024 # We add a 64
# Error types (metric label values)
const
insertFailure = "insert_failure"
retPolicyFailure = "retpolicy_failure"
dialFailure = "dial_failure"
decodeRpcFailure = "decode_rpc_failure"
peerNotFoundFailure = "peer_not_found_failure"
@ -69,17 +70,28 @@ type
WakuStore* = ref object of LPProtocol
peerManager*: PeerManager
rng*: ref rand.HmacDrbgContext
messages*: StoreQueueRef # in-memory message store
store*: MessageStore # sqlite DB handle
store*: MessageStore
wakuSwap*: WakuSwap
persistMessages*: bool
#TODO: SqliteStore currenly also holds isSqliteOnly; put it in single place.
isSqliteOnly: bool # if true, don't use in memory-store and answer history queries from the sqlite DB
retentionPolicy: Option[MessageRetentionPolicy]
proc reportMessagesCountMetric(store: MessageStore) =
let resCount = store.getMessagesCount()
proc executeMessageRetentionPolicy*(w: WakuStore): WakuStoreResult[void] =
if w.retentionPolicy.isNone():
return ok()
let policy = w.retentionPolicy.get()
if w.store.isNil():
return err("no message store provided (nil)")
policy.execute(w.store)
proc reportStoredMessagesMetric*(w: WakuStore) =
if w.store.isNil():
return
let resCount = w.store.getMessagesCount()
if resCount.isErr():
return
@ -104,23 +116,10 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse {.gcsafe.}
qMaxPageSize = query.pagingInfo.pageSize
qAscendingOrder = query.pagingInfo.direction == PagingDirection.FORWARD
let queryStartTime = getTime().toUnixFloat()
let queryRes = block:
# TODO: Move this logic, together with the insert message logic and load messages on boot
# into a "dual-store" message store implementation.
if w.isSqliteOnly:
w.store.getMessagesByHistoryQuery(
contentTopic = qContentTopics,
pubsubTopic = qPubSubTopic,
cursor = qCursor,
startTime = qStartTime,
endTime = qEndTime,
maxPageSize = qMaxPageSize,
ascendingOrder = qAscendingOrder
)
else:
w.messages.getMessagesByHistoryQuery(
let queryStartTime = getTime().toUnixFloat()
let queryRes = w.store.getMessagesByHistoryQuery(
contentTopic = qContentTopics,
pubsubTopic = qPubSubTopic,
cursor = qCursor,
@ -147,8 +146,8 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse {.gcsafe.}
error: HistoryResponseError.NONE
)
proc init*(ws: WakuStore, capacity = StoreDefaultCapacity) =
proc initProtocolHandler*(ws: WakuStore) =
proc handler(conn: Connection, proto: string) {.async.} =
let buf = await conn.readLp(MaxRpcSize.int)
@ -163,7 +162,9 @@ proc init*(ws: WakuStore, capacity = StoreDefaultCapacity) =
info "received history query", peerId=conn.peerId, requestId=req.requestId, query=req.query
waku_store_queries.inc()
let resp = ws.findMessages(req.query)
let resp = if not ws.store.isNil(): ws.findMessages(req.query)
# TODO: Improve error reporting
else: HistoryResponse(error: HistoryResponseError.SERVICE_UNAVAILABLE)
if not ws.wakuSwap.isNil():
info "handle store swap", peerId=conn.peerId, requestId=req.requestId, text=ws.wakuSwap.text
@ -181,89 +182,44 @@ proc init*(ws: WakuStore, capacity = StoreDefaultCapacity) =
ws.handler = handler
ws.codec = WakuStoreCodec
ws.messages = StoreQueueRef.new(capacity)
if ws.isSqliteOnly:
if ws.store.isNil():
warn "store not provided (nil)"
return
# Execute retention policy on initialization
if not ws.retentionPolicy.isNone():
let policy = ws.retentionPolicy.get()
let resRetPolicy = policy.execute(ws.store)
if resRetPolicy.isErr():
warn "an error occurred while applying the retention policy at init", error=resRetPolicy.error()
info "SQLite-only store initialized. Messages are *not* loaded into memory."
let numMessages = ws.store.getMessagesCount()
if numMessages.isOk():
debug "number of messages in persistent store", messageNum=numMessages.value
waku_store_messages.set(numMessages.value, labelValues = ["stored"])
# TODO: Move this logic, together with the insert message logic
# into a "dual-store" message store implementation.
else:
if ws.store.isNil():
return
# Execute retention policy before loading any messages into in-memory store
if not ws.retentionPolicy.isNone():
let policy = ws.retentionPolicy.get()
let resRetPolicy = policy.execute(ws.store)
if resRetPolicy.isErr():
warn "an error occurred while applying the retention policy at init", error=resRetPolicy.error()
info "loading messages from persistent storage"
let res = ws.store.getAllMessages()
if res.isOk():
for (receiverTime, msg, pubsubTopic) in res.value:
let index = Index.compute(msg, receiverTime, pubsubTopic)
discard ws.messages.put(index, msg, pubsubTopic)
info "successfully loaded messages from the persistent store"
else:
warn "failed to load messages from the persistent store", err = res.error()
let numMessages = ws.messages.getMessagesCount()
if numMessages.isOk():
debug "number of messages in in-memory store", messageNum=numMessages.value
waku_store_messages.set(numMessages.value, labelValues = ["stored"])
proc init*(T: type WakuStore,
peerManager: PeerManager,
rng: ref rand.HmacDrbgContext,
store: MessageStore = nil,
store: MessageStore,
wakuSwap: WakuSwap = nil,
persistMessages = true,
capacity = StoreDefaultCapacity,
isSqliteOnly = false,
retentionPolicy=none(MessageRetentionPolicy)): T =
let ws = WakuStore(
rng: rng,
peerManager: peerManager,
store: store,
wakuSwap: wakuSwap,
persistMessages: persistMessages,
isSqliteOnly: isSqliteOnly,
retentionPolicy: retentionPolicy
)
ws.init(capacity)
ws.initProtocolHandler()
# TODO: Move to wakunode
# Execute retention policy on initialization
let retPolicyRes = ws.executeMessageRetentionPolicy()
if retPolicyRes.isErr():
warn "an error occurred while applying the retention policy at init", error=retPolicyRes.error
ws.reportStoredMessagesMetric()
return ws
# TODO: This should probably be an add function and append the peer to an array
proc setPeer*(ws: WakuStore, peer: RemotePeerInfo) =
ws.peerManager.addPeer(peer, WakuStoreCodec)
waku_store_peers.inc()
proc init*(T: type WakuStore,
peerManager: PeerManager,
rng: ref rand.HmacDrbgContext,
wakuSwap: WakuSwap = nil,
retentionPolicy=none(MessageRetentionPolicy)): T =
let store = StoreQueueRef.new(StoreDefaultCapacity)
WakuStore.init(peerManager, rng, store, wakuSwap, retentionPolicy)
proc handleMessage*(w: WakuStore, pubsubTopic: string, msg: WakuMessage) {.async.} =
if not w.persistMessages:
# Store is mounted but new messages should not be stored
if w.store.isNil():
# Messages should not be stored
return
if msg.ephemeral:
@ -277,62 +233,32 @@ proc handleMessage*(w: WakuStore, pubsubTopic: string, msg: WakuMessage) {.async
trace "handling message", topic=pubsubTopic, index=index
block:
if w.isSqliteOnly:
# Add messages to persistent store, if present
if w.store.isNil():
return
# Add messages to persistent store, if present
let putStoreRes = w.store.put(index, msg, pubsubTopic)
if putStoreRes.isErr():
debug "failed to insert message to persistent store", index=index, err=putStoreRes.error
waku_store_errors.inc(labelValues = [insertFailure])
return
let resPutStore = w.store.put(index, msg, pubsubTopic)
if resPutStore.isErr():
debug "failed to insert message to persistent store", index=index, err=resPutStore.error()
waku_store_errors.inc(labelValues = [insertFailure])
return
# Execute the retention policy after insertion
if not w.retentionPolicy.isNone():
let policy = w.retentionPolicy.get()
let resRetPolicy = policy.execute(w.store)
if resRetPolicy.isErr():
debug "message retention policy failure", error=resRetPolicy.error()
waku_store_errors.inc(labelValues = [insertFailure])
reportMessagesCountMetric(w.store)
# TODO: Move this logic, together with the load from persistent store on init
# into a "dual-store" message store implementation.
else:
# Add message to in-memory store
let resPutInmemory = w.messages.put(index, msg, pubsubTopic)
if resPutInmemory.isErr():
debug "failed to insert message to in-memory store", index=index, err=resPutInmemory.error()
waku_store_errors.inc(labelValues = [insertFailure])
return
reportMessagesCountMetric(w.messages)
# Add messages to persistent store, if present
if w.store.isNil():
return
let resPutStore = w.store.put(index, msg, pubsubTopic)
if resPutStore.isErr():
debug "failed to insert message to persistent store", index=index, err=resPutStore.error()
waku_store_errors.inc(labelValues = [insertFailure])
return
# Execute the retention policy after insertion
if not w.retentionPolicy.isNone():
let policy = w.retentionPolicy.get()
let resRetPolicy = policy.execute(w.store)
if resRetPolicy.isErr():
debug "message retention policy failure", error=resRetPolicy.error()
waku_store_errors.inc(labelValues = [insertFailure])
# Execute the retention policy after insertion
let retPolicyRes = w.executeMessageRetentionPolicy()
if retPolicyRes.isErr():
debug "message retention policy failure", error=retPolicyRes.error
waku_store_errors.inc(labelValues = [retPolicyFailure])
w.reportStoredMessagesMetric()
let insertDuration = getTime().toUnixFloat() - insertStartTime
waku_store_insert_duration_seconds.observe(insertDuration)
## CLIENT
# TODO: This should probably be an add function and append the peer to an array
proc setPeer*(ws: WakuStore, peer: RemotePeerInfo) =
ws.peerManager.addPeer(peer, WakuStoreCodec)
waku_store_peers.inc()
# TODO: Remove after converting the query method into a non-callback method
type QueryHandlerFunc* = proc(response: HistoryResponse) {.gcsafe, closure.}
@ -376,6 +302,8 @@ proc query*(w: WakuStore, req: HistoryQuery): Future[WakuStoreResult[HistoryResp
## 21/WAKU2-FAULT-TOLERANT-STORE
const StoreResumeTimeWindowOffset: Timestamp = getNanosecondTime(20) ## Adjust the time window with an offset of 20 seconds
proc queryFromWithPaging*(w: WakuStore, query: HistoryQuery, peer: RemotePeerInfo): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe.} =
## A thin wrapper for query. Sends the query to the given peer. when the query has a valid pagingInfo,
## it retrieves the historical messages in pages.
@ -390,7 +318,7 @@ proc queryFromWithPaging*(w: WakuStore, query: HistoryQuery, peer: RemotePeerInf
while true:
let res = await w.query(req, peer)
if res.isErr():
return err(res.error())
return err(res.error)
let response = res.get()
@ -446,29 +374,27 @@ proc resume*(w: WakuStore,
## the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string
# If store has not been provided, don't even try
if w.isSqliteOnly and w.store.isNil():
return err("store not provided")
if w.store.isNil():
return err("store not provided (nil)")
# NOTE: Original implementation is based on the message's sender timestamp. At the moment
# of writing, the sqlite store implementation returns the last message's receiver
# timestamp.
# lastSeenTime = lastSeenItem.get().msg.timestamp
let
lastSeenTime = w.store.getNewestMessageTimestamp().get(Timestamp(0))
now = getNanosecondTime(getTime().toUnixFloat())
var lastSeenTime = Timestamp(0)
var currentTime = getNanosecondTime(epochTime())
let lastSeenItem = w.messages.last()
if lastSeenItem.isOk():
lastSeenTime = lastSeenItem.get().msg.timestamp
# adjust the time window with an offset of 20 seconds
let offset: Timestamp = getNanosecondTime(20)
currentTime = currentTime + offset
lastSeenTime = max(lastSeenTime - offset, 0)
debug "the offline time window is", lastSeenTime=lastSeenTime, currentTime=currentTime
debug "resuming with offline time window", lastSeenTime=lastSeenTime, currentTime=now
let
queryEndTime = now + StoreResumeTimeWindowOffset
queryStartTime = max(lastSeenTime - StoreResumeTimeWindowOffset, 0)
let req = HistoryQuery(
pubsubTopic: pubsubTopic,
startTime: lastSeenTime,
endTime: currentTime,
startTime: queryStartTime,
endTime: queryEndTime,
pagingInfo: PagingInfo(
direction:PagingDirection.FORWARD,
pageSize: pageSize
@ -498,70 +424,26 @@ proc resume*(w: WakuStore,
# Save the retrieved messages in the store
var dismissed: uint = 0
var added: uint = 0
for msg in res.get():
let now = getNanosecondTime(getTime().toUnixFloat())
let index = Index.compute(msg, receivedTime=now, pubsubTopic=pubsubTopic)
if w.isSqliteOnly:
# Add messages to persistent store
let resPutStore = w.store.put(index, msg, pubsubTopic)
if resPutStore.isErr():
debug "failed to insert message to persistent store", index=index, err=resPutStore.error()
waku_store_errors.inc(labelValues = [insertFailure])
continue
# Execute the retention policy after insertion
if not w.retentionPolicy.isNone():
let policy = w.retentionPolicy.get()
let resRetPolicy = policy.execute(w.store)
if resRetPolicy.isErr():
debug "message retention policy failure", error=resRetPolicy.error()
waku_store_errors.inc(labelValues = [insertFailure])
let putStoreRes = w.store.put(index, msg, pubsubTopic)
if putStoreRes.isErr():
continue
# TODO: Move this logic, together with the load from persistent store on init
# into a "dual-store" message store implementation.
else:
# check for duplicate messages
# TODO: Should take pubsub topic into account if we are going to support topics rather than the DefaultTopic
if w.messages.contains(index):
dismissed.inc()
continue
# Add message to in-memory store
let resPutInmemory = w.messages.put(index, msg, pubsubTopic)
if resPutInmemory.isErr():
debug "failed to insert message to in-memory store", index=index, err=resPutInmemory.error()
waku_store_errors.inc(labelValues = [insertFailure])
continue
if w.store.isNil():
continue
# Add messages to persistent store
let resPutStore = w.store.put(index, msg, pubsubTopic)
if resPutStore.isErr():
debug "failed to insert message to persistent store", index=index, err=resPutStore.error()
waku_store_errors.inc(labelValues = [insertFailure])
continue
# Execute the retention policy after insertion
if not w.retentionPolicy.isNone():
let policy = w.retentionPolicy.get()
let resRetPolicy = policy.execute(w.store)
if resRetPolicy.isErr():
debug "message retention policy failure", error=resRetPolicy.error()
waku_store_errors.inc(labelValues = [insertFailure])
added.inc()
debug "resume finished successfully", addedMessages=added, dimissedMessages=dismissed
debug "resume finished successfully", retrievedMessages=res.get().len, addedMessages=added
# Execute the retention policy after insertion
let retPolicyRes = w.executeMessageRetentionPolicy()
if retPolicyRes.isErr():
debug "message retention policy failure", error=retPolicyRes.error
waku_store_errors.inc(labelValues = [retPolicyFailure])
let store: MessageStore = if w.isSqliteOnly: w.store
else: w.messages
reportMessagesCountMetric(store)
w.reportStoredMessagesMetric()
return ok(added)
@ -578,7 +460,7 @@ proc queryWithAccounting*(ws: WakuStore, req: HistoryQuery): Future[WakuStoreRes
let res = await ws.query(req, peerOpt.get())
if res.isErr():
return err(res.error())
return err(res.error)
let response = res.get()

View File

@ -23,6 +23,7 @@ type
## HistoryResponseError contains error message to inform the querying node about the state of its request
NONE = uint32(0)
INVALID_CURSOR = uint32(1)
SERVICE_UNAVAILABLE = uint32(503)
HistoryResponse* = object
messages*: seq[WakuMessage]