mirror of
https://github.com/waku-org/nwaku.git
synced 2025-02-16 00:47:24 +00:00
deploy: 4a9f6f3c5eab69af6b78e6d69449c00e82489d6f
This commit is contained in:
parent
13dba198fc
commit
e65568da92
@ -449,7 +449,7 @@ proc processInput(rfd: AsyncFD) {.async.} =
|
||||
await node.mountSwap()
|
||||
|
||||
if (conf.storenode != "") or (conf.store == true):
|
||||
await node.mountStore(persistMessages = conf.persistMessages)
|
||||
await node.mountStore()
|
||||
|
||||
var storenode: Option[RemotePeerInfo]
|
||||
|
||||
|
@ -5,20 +5,22 @@ import
|
||||
./v2/test_wakunode,
|
||||
./v2/test_wakunode_relay,
|
||||
./v2/test_wakunode_lightpush,
|
||||
# Waku Store
|
||||
./v2/test_utils_pagination,
|
||||
./v2/test_message_store_queue,
|
||||
./v2/test_message_store_queue_pagination,
|
||||
./v2/test_message_store_sqlite_query,
|
||||
./v2/test_message_store_sqlite,
|
||||
./v2/test_waku_store_rpc_codec,
|
||||
./v2/test_waku_store,
|
||||
./v2/test_wakunode_store,
|
||||
# Waku Filter
|
||||
./v2/test_waku_filter,
|
||||
./v2/test_wakunode_filter,
|
||||
./v2/test_waku_payload,
|
||||
./v2/test_waku_swap,
|
||||
./v2/test_utils_peers,
|
||||
./v2/test_utils_pagination,
|
||||
./v2/test_message_cache,
|
||||
./v2/test_message_store_queue,
|
||||
./v2/test_message_store_queue_pagination,
|
||||
./v2/test_message_store_sqlite_query,
|
||||
./v2/test_message_store_sqlite,
|
||||
./v2/test_jsonrpc_waku,
|
||||
./v2/test_rest_serdes,
|
||||
./v2/test_rest_debug_api_serdes,
|
||||
|
@ -11,8 +11,10 @@ import
|
||||
libp2p/stream/[bufferstream, connection],
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/protocols/pubsub/pubsub,
|
||||
libp2p/protocols/pubsub/rpc/message,
|
||||
libp2p/protocols/pubsub/rpc/message
|
||||
import
|
||||
../../waku/v1/node/rpc/hexstrings,
|
||||
../../waku/v2/node/storage/message/waku_store_queue,
|
||||
../../waku/v2/node/wakunode2,
|
||||
../../waku/v2/node/jsonrpc/[store_api,
|
||||
relay_api,
|
||||
@ -232,7 +234,7 @@ procSuite "Waku v2 JSON-RPC API":
|
||||
key = wakunode2.PrivateKey.random(ECDSA, rng[]).get()
|
||||
peer = PeerInfo.new(key)
|
||||
|
||||
await node.mountStore(persistMessages = true)
|
||||
await node.mountStore(store=StoreQueueRef.new())
|
||||
|
||||
var listenSwitch = newStandardSwitch(some(key))
|
||||
waitFor listenSwitch.start()
|
||||
@ -528,7 +530,7 @@ procSuite "Waku v2 JSON-RPC API":
|
||||
|
||||
await node.mountFilter()
|
||||
await node.mountSwap()
|
||||
await node.mountStore(persistMessages = true)
|
||||
await node.mountStore(store=StoreQueueRef.new())
|
||||
|
||||
# Create and set some peers
|
||||
let
|
||||
|
@ -104,7 +104,7 @@ procSuite "Peer Manager":
|
||||
|
||||
await node.mountFilter()
|
||||
await node.mountSwap()
|
||||
await node.mountStore(persistMessages = true)
|
||||
await node.mountStore()
|
||||
|
||||
node.wakuFilter.setPeer(filterPeer.toRemotePeerInfo())
|
||||
node.wakuSwap.setPeer(swapPeer.toRemotePeerInfo())
|
||||
|
@ -617,7 +617,7 @@ procSuite "Waku Store - fault tolerant store":
|
||||
check:
|
||||
successResult.isOk()
|
||||
successResult.value == 10
|
||||
proto3.messages.len == 10
|
||||
proto3.store.getMessagesCount().tryGet() == 10
|
||||
|
||||
## Cleanup
|
||||
await allFutures(dialSwitch3.stop(), listenSwitch3.stop())
|
||||
@ -681,9 +681,7 @@ procSuite "Waku Store - fault tolerant store":
|
||||
let response = res.tryGet()
|
||||
check:
|
||||
response == 14
|
||||
|
||||
check:
|
||||
proto3.messages.len == 14
|
||||
proto3.store.getMessagesCount().tryGet() == 14
|
||||
|
||||
## Cleanup
|
||||
await allFutures(listenSwitch3.stop(), dialSwitch3.stop(), offListenSwitch.stop())
|
||||
|
@ -9,10 +9,12 @@ import
|
||||
libp2p/stream/[bufferstream, connection],
|
||||
libp2p/crypto/[crypto, secp],
|
||||
libp2p/switch,
|
||||
eth/keys,
|
||||
eth/keys
|
||||
import
|
||||
../../waku/v2/protocol/waku_message,
|
||||
../../waku/v2/protocol/waku_store,
|
||||
../../waku/v2/protocol/waku_swap/waku_swap,
|
||||
../../waku/v2/node/storage/message/waku_store_queue,
|
||||
../../waku/v2/node/wakunode2,
|
||||
../../waku/v2/utils/peers,
|
||||
../test_helpers, ./utils
|
||||
@ -50,11 +52,11 @@ procSuite "Waku SWAP Accounting":
|
||||
asyncTest "Update accounting state after store operations":
|
||||
let
|
||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60000))
|
||||
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000))
|
||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60001))
|
||||
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60001))
|
||||
|
||||
let
|
||||
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic)
|
||||
|
||||
@ -63,14 +65,14 @@ procSuite "Waku SWAP Accounting":
|
||||
# Start nodes and mount protocols
|
||||
await node1.start()
|
||||
await node1.mountSwap()
|
||||
await node1.mountStore(persistMessages = true)
|
||||
await node1.mountStore(store=StoreQueueRef.new())
|
||||
await node2.start()
|
||||
await node2.mountSwap()
|
||||
await node2.mountStore(persistMessages = true)
|
||||
await node2.mountStore(store=StoreQueueRef.new())
|
||||
|
||||
await node2.wakuStore.handleMessage("/waku/2/default-waku/proto", message)
|
||||
|
||||
await sleepAsync(2000.millis)
|
||||
await sleepAsync(500.millis)
|
||||
|
||||
node1.wakuStore.setPeer(node2.switch.peerInfo.toRemotePeerInfo())
|
||||
node1.wakuSwap.setPeer(node2.switch.peerInfo.toRemotePeerInfo())
|
||||
@ -97,11 +99,11 @@ procSuite "Waku SWAP Accounting":
|
||||
asyncTest "Update accounting state after sending cheque":
|
||||
let
|
||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60000))
|
||||
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000))
|
||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60001))
|
||||
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60001))
|
||||
|
||||
let
|
||||
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic)
|
||||
|
||||
@ -113,14 +115,14 @@ procSuite "Waku SWAP Accounting":
|
||||
# Start nodes and mount protocols
|
||||
await node1.start()
|
||||
await node1.mountSwap(swapConfig)
|
||||
await node1.mountStore(persistMessages = true)
|
||||
await node1.mountStore(store=StoreQueueRef.new())
|
||||
await node2.start()
|
||||
await node2.mountSwap(swapConfig)
|
||||
await node2.mountStore(persistMessages = true)
|
||||
await node2.mountStore(store=StoreQueueRef.new())
|
||||
|
||||
await node2.wakuStore.handleMessage("/waku/2/default-waku/proto", message)
|
||||
|
||||
await sleepAsync(2000.millis)
|
||||
await sleepAsync(500.millis)
|
||||
|
||||
node1.wakuStore.setPeer(node2.switch.peerInfo.toRemotePeerInfo())
|
||||
node1.wakuSwap.setPeer(node2.switch.peerInfo.toRemotePeerInfo())
|
||||
|
@ -12,13 +12,13 @@ import
|
||||
libp2p/switch,
|
||||
libp2p/protocols/pubsub/rpc/messages,
|
||||
libp2p/protocols/pubsub/pubsub,
|
||||
libp2p/protocols/pubsub/gossipsub,
|
||||
eth/keys
|
||||
libp2p/protocols/pubsub/gossipsub
|
||||
import
|
||||
../../waku/v2/node/storage/sqlite,
|
||||
../../waku/v2/node/storage/message/message_store,
|
||||
../../waku/v2/node/storage/message/sqlite_store,
|
||||
../../waku/v2/node/storage/message/waku_store_queue,
|
||||
../../waku/v2/protocol/[waku_relay, waku_message],
|
||||
../../waku/v2/protocol/waku_message,
|
||||
../../waku/v2/protocol/waku_store,
|
||||
../../waku/v2/protocol/waku_filter,
|
||||
../../waku/v2/node/peer_manager/peer_manager,
|
||||
@ -27,29 +27,32 @@ import
|
||||
../../waku/v2/utils/time,
|
||||
../../waku/v2/node/wakunode2
|
||||
|
||||
from std/times import epochTime
|
||||
from std/times import getTime, toUnixFloat
|
||||
|
||||
proc newTestMessageStore(): MessageStore =
|
||||
let database = SqliteDatabase.init("", inMemory = true)[]
|
||||
SqliteStore.init(database).tryGet()
|
||||
|
||||
|
||||
procSuite "WakuNode - Store":
|
||||
let rng = keys.newRng()
|
||||
let rng = crypto.newRng()
|
||||
|
||||
asyncTest "Store protocol returns expected message":
|
||||
let
|
||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60000))
|
||||
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000))
|
||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60002))
|
||||
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002))
|
||||
let
|
||||
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic)
|
||||
|
||||
var completionFut = newFuture[bool]()
|
||||
|
||||
await node1.start()
|
||||
await node1.mountStore(persistMessages = true)
|
||||
await node1.mountStore(store=newTestMessageStore())
|
||||
await node2.start()
|
||||
await node2.mountStore(persistMessages = true)
|
||||
await node2.mountStore(store=newTestMessageStore())
|
||||
|
||||
await node2.wakuStore.handleMessage("/waku/2/default-waku/proto", message)
|
||||
|
||||
@ -88,11 +91,11 @@ procSuite "WakuNode - Store":
|
||||
storeComplFut = newFuture[bool]()
|
||||
|
||||
await node1.start()
|
||||
await node1.mountStore(persistMessages = true)
|
||||
await node1.mountStore(store=newTestMessageStore())
|
||||
await node1.mountFilter()
|
||||
|
||||
await node2.start()
|
||||
await node2.mountStore(persistMessages = true)
|
||||
await node2.mountStore(store=newTestMessageStore())
|
||||
await node2.mountFilter()
|
||||
|
||||
node2.wakuFilter.setPeer(node1.switch.peerInfo.toRemotePeerInfo())
|
||||
@ -142,9 +145,9 @@ procSuite "WakuNode - Store":
|
||||
message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic)
|
||||
|
||||
await node1.start()
|
||||
await node1.mountStore(persistMessages = true)
|
||||
await node1.mountStore(store=newTestMessageStore())
|
||||
await node2.start()
|
||||
await node2.mountStore(persistMessages = true)
|
||||
await node2.mountStore(store=StoreQueueRef.new())
|
||||
|
||||
await node2.wakuStore.handleMessage("/waku/2/default-waku/proto", message)
|
||||
|
||||
@ -156,61 +159,44 @@ procSuite "WakuNode - Store":
|
||||
|
||||
check:
|
||||
# message is correctly stored
|
||||
node1.wakuStore.messages.len == 1
|
||||
node1.wakuStore.store.getMessagesCount().tryGet() == 1
|
||||
|
||||
await node1.stop()
|
||||
await node2.stop()
|
||||
|
||||
asyncTest "Resume proc discards duplicate messages":
|
||||
let timeOrigin = getNanosecondTime(getTime().toUnixFloat())
|
||||
let
|
||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000))
|
||||
client = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000))
|
||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002))
|
||||
server = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002))
|
||||
|
||||
let
|
||||
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
msg1 = WakuMessage(payload: "hello world1".toBytes(), contentTopic: contentTopic, timestamp: 1)
|
||||
msg2 = WakuMessage(payload: "hello world2".toBytes(), contentTopic: contentTopic, timestamp: 2)
|
||||
msg1 = WakuMessage(payload: "hello world1".toBytes(), contentTopic: contentTopic, timestamp: timeOrigin + 1)
|
||||
msg2 = WakuMessage(payload: "hello world2".toBytes(), contentTopic: contentTopic, timestamp: timeOrigin + 2)
|
||||
msg3 = WakuMessage(payload: "hello world3".toBytes(), contentTopic: contentTopic, timestamp: timeOrigin + 3)
|
||||
|
||||
# setup sqlite database for node1
|
||||
let
|
||||
database = SqliteDatabase.init("", inMemory = true)[]
|
||||
store = SqliteStore.init(database).tryGet()
|
||||
await allFutures(client.start(), server.start())
|
||||
await client.mountStore(store=StoreQueueRef.new())
|
||||
await server.mountStore(store=StoreQueueRef.new())
|
||||
|
||||
await node1.start()
|
||||
await node1.mountStore(persistMessages = true, store = store)
|
||||
await node2.start()
|
||||
await node2.mountStore(persistMessages = true)
|
||||
await server.wakuStore.handleMessage(DefaultTopic, msg1)
|
||||
await server.wakuStore.handleMessage(DefaultTopic, msg2)
|
||||
|
||||
await node2.wakuStore.handleMessage(DefaultTopic, msg1)
|
||||
await node2.wakuStore.handleMessage(DefaultTopic, msg2)
|
||||
client.wakuStore.setPeer(server.switch.peerInfo.toRemotePeerInfo())
|
||||
|
||||
await sleepAsync(500.millis)
|
||||
|
||||
node1.wakuStore.setPeer(node2.switch.peerInfo.toRemotePeerInfo())
|
||||
|
||||
|
||||
# populate db with msg1 to be a duplicate
|
||||
let index1 = Index.compute(msg1, getNanosecondTime(epochTime()), DefaultTopic)
|
||||
let output1 = store.put(index1, msg1, DefaultTopic)
|
||||
check output1.isOk
|
||||
discard node1.wakuStore.messages.put(index1, msg1, DefaultTopic)
|
||||
# Insert the same message in both node's store
|
||||
let index3 = Index.compute(msg3, getNanosecondTime(getTime().toUnixFloat() + 10.float), DefaultTopic)
|
||||
require server.wakuStore.store.put(index3, msg3, DefaultTopic).isOk()
|
||||
require client.wakuStore.store.put(index3, msg3, DefaultTopic).isOk()
|
||||
|
||||
# now run the resume proc
|
||||
await node1.resume()
|
||||
|
||||
# count the total number of retrieved messages from the database
|
||||
let res = store.getAllMessages()
|
||||
check:
|
||||
res.isOk()
|
||||
await client.resume()
|
||||
|
||||
check:
|
||||
# if the duplicates are discarded properly, then the total number of messages after resume should be 2
|
||||
# check no duplicates is in the messages field
|
||||
node1.wakuStore.messages.len == 2
|
||||
# check no duplicates is in the db
|
||||
res.value.len == 2
|
||||
# If the duplicates are discarded properly, then the total number of messages after resume should be 3
|
||||
client.wakuStore.store.getMessagesCount().tryGet() == 3
|
||||
|
||||
await node1.stop()
|
||||
await node2.stop()
|
||||
await allFutures(client.stop(), server.stop())
|
@ -2,7 +2,7 @@
|
||||
|
||||
# libtool - Provide generalized library-building support services.
|
||||
# Generated automatically by config.status (libbacktrace) version-unused
|
||||
# Libtool was configured on host fv-az77-583:
|
||||
# Libtool was configured on host fv-az198-457:
|
||||
# NOTE: Changes made to this file will be lost: look at ltmain.sh.
|
||||
#
|
||||
# Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005,
|
||||
|
@ -430,7 +430,7 @@ when isMainModule:
|
||||
waitFor mountLibp2pPing(bridge.nodev2)
|
||||
|
||||
if conf.store:
|
||||
waitFor mountStore(bridge.nodev2, persistMessages = false) # Bridge does not persist messages
|
||||
waitFor mountStore(bridge.nodev2) # Bridge does not persist messages
|
||||
|
||||
if conf.filter:
|
||||
waitFor mountFilter(bridge.nodev2)
|
||||
|
89
waku/v2/node/storage/message/dual_message_store.nim
Normal file
89
waku/v2/node/storage/message/dual_message_store.nim
Normal file
@ -0,0 +1,89 @@
|
||||
{.push raises: [Defect].}
|
||||
|
||||
import
|
||||
std/options,
|
||||
stew/results,
|
||||
chronicles
|
||||
import
|
||||
../../../protocol/waku_message,
|
||||
../../../utils/pagination,
|
||||
../../../utils/time,
|
||||
../sqlite,
|
||||
./message_store,
|
||||
./waku_store_queue,
|
||||
./sqlite_store
|
||||
|
||||
logScope:
|
||||
topics = "message_store.dual"
|
||||
|
||||
|
||||
type DualMessageStore* = ref object of MessageStore
|
||||
inmemory: StoreQueueRef
|
||||
persistent: SqliteStore
|
||||
|
||||
|
||||
proc init*(T: type DualMessageStore, db: SqliteDatabase, capacity=StoreDefaultCapacity): MessageStoreResult[T] =
|
||||
let
|
||||
inmemory = StoreQueueRef.new(capacity)
|
||||
persistent = ?SqliteStore.init(db)
|
||||
|
||||
info "loading messages from persistent storage to in-memory store"
|
||||
|
||||
let res = persistent.getAllMessages()
|
||||
if res.isErr():
|
||||
warn "failed to load messages from the persistent store", err = res.error
|
||||
else:
|
||||
for (receiverTime, msg, pubsubTopic) in res.value:
|
||||
let index = Index.compute(msg, receiverTime, pubsubTopic)
|
||||
discard inmemory.put(index, msg, pubsubTopic)
|
||||
|
||||
info "successfully loaded messages from the persistent store"
|
||||
|
||||
|
||||
return ok(DualMessageStore(inmemory: inmemory, persistent: persistent))
|
||||
|
||||
|
||||
method put*(s: DualMessageStore, index: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] =
|
||||
?s.inmemory.put(index, message, pubsubTopic)
|
||||
?s.persistent.put(index, message, pubsubTopic)
|
||||
ok()
|
||||
|
||||
|
||||
method getAllMessages*(s: DualMessageStore): MessageStoreResult[seq[MessageStoreRow]] =
|
||||
s.inmemory.getAllMessages()
|
||||
|
||||
|
||||
method getMessagesByHistoryQuery*(
|
||||
s: DualMessageStore,
|
||||
contentTopic = none(seq[ContentTopic]),
|
||||
pubsubTopic = none(string),
|
||||
cursor = none(Index),
|
||||
startTime = none(Timestamp),
|
||||
endTime = none(Timestamp),
|
||||
maxPageSize = StoreMaxPageSize,
|
||||
ascendingOrder = true
|
||||
): MessageStoreResult[MessageStorePage] =
|
||||
s.inmemory.getMessagesByHistoryQuery(contentTopic, pubsubTopic, cursor, startTime, endTime, maxPageSize, ascendingOrder)
|
||||
|
||||
|
||||
method getMessagesCount*(s: DualMessageStore): MessageStoreResult[int64] =
|
||||
s.inmemory.getMessagesCount()
|
||||
|
||||
method getOldestMessageTimestamp*(s: DualMessageStore): MessageStoreResult[Timestamp] =
|
||||
s.inmemory.getOldestMessageTimestamp()
|
||||
|
||||
method getNewestMessageTimestamp*(s: DualMessageStore): MessageStoreResult[Timestamp] =
|
||||
s.inmemory.getNewestMessageTimestamp()
|
||||
|
||||
|
||||
method deleteMessagesOlderThanTimestamp*(s: DualMessageStore, ts: Timestamp): MessageStoreResult[void] =
|
||||
# NOTE: Current in-memory store deletes messages as they are inserted. This method fails with a "not implemented" error
|
||||
# ?s.inmemory.deleteMessagesOlderThanTimestamp(ts)
|
||||
?s.persistent.deleteMessagesOlderThanTimestamp(ts)
|
||||
ok()
|
||||
|
||||
method deleteOldestMessagesNotWithinLimit*(s: DualMessageStore, limit: int): MessageStoreResult[void] =
|
||||
# NOTE: Current in-memory store deletes messages as they are inserted. This method fails with a "not implemented" error
|
||||
# ?s.inmemory.deleteOldestMessagesNotWithinLimit(limit)
|
||||
?s.persistent.deleteOldestMessagesNotWithinLimit(limit)
|
||||
ok()
|
@ -11,10 +11,12 @@ import
|
||||
../../../utils/time,
|
||||
./message_store
|
||||
|
||||
export pagination
|
||||
|
||||
logScope:
|
||||
topics = "message_store.storequeue"
|
||||
|
||||
|
||||
type
|
||||
IndexedWakuMessage* = object
|
||||
# TODO may need to rename this object as it holds both the index and the pubsub topic of a waku message
|
||||
|
@ -395,11 +395,15 @@ proc resume*(node: WakuNode, peerList: Option[seq[RemotePeerInfo]] = none(seq[Re
|
||||
## peerList indicates the list of peers to query from. The history is fetched from the first available peer in this list. Such candidates should be found through a discovery method (to be developed).
|
||||
## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from.
|
||||
## The history gets fetched successfully if the dialed peer has been online during the queried time window.
|
||||
if node.wakuStore.isNil():
|
||||
return
|
||||
|
||||
let retrievedMessages = await node.wakuStore.resume(peerList)
|
||||
if retrievedMessages.isErr():
|
||||
error "failed to resume store", error=retrievedMessages.error
|
||||
return
|
||||
|
||||
if not node.wakuStore.isNil:
|
||||
let retrievedMessages = await node.wakuStore.resume(peerList)
|
||||
if retrievedMessages.isOk:
|
||||
info "the number of retrieved messages since the last online time: ", number=retrievedMessages.value
|
||||
info "the number of retrieved messages since the last online time: ", number=retrievedMessages.value
|
||||
|
||||
# TODO Extend with more relevant info: topics, peers, memory usage, online time, etc
|
||||
proc info*(node: WakuNode): WakuInfo =
|
||||
@ -452,24 +456,18 @@ proc mountSwap*(node: WakuNode, swapConfig: SwapConfig = SwapConfig.init()) {.as
|
||||
|
||||
node.switch.mount(node.wakuSwap, protocolMatcher(WakuSwapCodec))
|
||||
|
||||
proc mountStore*(node: WakuNode, store: MessageStore = nil, persistMessages: bool = false, capacity = StoreDefaultCapacity, retentionTime = StoreDefaultRetentionTime, isSqliteOnly = false) {.async, raises: [Defect, LPError].} =
|
||||
proc mountStore*(node: WakuNode, store: MessageStore = nil, capacity = StoreDefaultCapacity, retentionPolicy=none(MessageRetentionPolicy) ) {.async, raises: [Defect, LPError].} =
|
||||
if node.wakuSwap.isNil():
|
||||
info "mounting waku store protocol (no waku swap)"
|
||||
else:
|
||||
info "mounting waku store protocol with waku swap support"
|
||||
|
||||
let retentionPolicy = if isSqliteOnly: TimeRetentionPolicy.init(retentionTime)
|
||||
else: CapacityRetentionPolicy.init(capacity)
|
||||
|
||||
node.wakuStore = WakuStore.init(
|
||||
node.peerManager,
|
||||
node.rng,
|
||||
store,
|
||||
wakuSwap=node.wakuSwap,
|
||||
persistMessages=persistMessages,
|
||||
capacity=capacity,
|
||||
isSqliteOnly=isSqliteOnly,
|
||||
retentionPolicy=some(retentionPolicy)
|
||||
retentionPolicy=retentionPolicy
|
||||
)
|
||||
|
||||
if node.started:
|
||||
@ -832,6 +830,8 @@ when isMainModule:
|
||||
./wakunode2_setup_rpc,
|
||||
./wakunode2_setup_sql_migrations,
|
||||
./storage/sqlite,
|
||||
./storage/message/message_store,
|
||||
./storage/message/dual_message_store,
|
||||
./storage/message/sqlite_store,
|
||||
./storage/peer/waku_peer_storage
|
||||
|
||||
@ -844,7 +844,7 @@ when isMainModule:
|
||||
|
||||
# 1/7 Setup storage
|
||||
proc setupStorage(conf: WakuNodeConf):
|
||||
SetupResult[tuple[pStorage: WakuPeerStorage, mStorage: SqliteStore]] =
|
||||
SetupResult[tuple[pStorage: WakuPeerStorage, mStorage: MessageStore]] =
|
||||
|
||||
## Setup a SQLite Database for a wakunode based on a supplied
|
||||
## configuration file and perform all necessary migration.
|
||||
@ -854,20 +854,20 @@ when isMainModule:
|
||||
|
||||
var
|
||||
sqliteDatabase: SqliteDatabase
|
||||
storeTuple: tuple[pStorage: WakuPeerStorage, mStorage: SqliteStore]
|
||||
storeTuple: tuple[pStorage: WakuPeerStorage, mStorage: MessageStore]
|
||||
|
||||
# Setup DB
|
||||
# Setup database connection
|
||||
if conf.dbPath != "":
|
||||
let dbRes = SqliteDatabase.init(conf.dbPath)
|
||||
if dbRes.isErr:
|
||||
warn "failed to init database", err = dbRes.error
|
||||
if dbRes.isErr():
|
||||
warn "failed to init database connection", err = dbRes.error
|
||||
waku_node_errors.inc(labelValues = ["init_db_failure"])
|
||||
return err("failed to init database")
|
||||
return err("failed to init database connection")
|
||||
else:
|
||||
sqliteDatabase = dbRes.value
|
||||
|
||||
if not sqliteDatabase.isNil and (conf.persistPeers or conf.persistMessages):
|
||||
|
||||
|
||||
if not sqliteDatabase.isNil():
|
||||
## Database vacuuming
|
||||
# TODO: Wrap and move this logic to the appropriate module
|
||||
let
|
||||
@ -887,28 +887,33 @@ when isMainModule:
|
||||
|
||||
debug "finished sqlite database vacuuming"
|
||||
|
||||
# Database initialized. Let's set it up
|
||||
sqliteDatabase.runMigrations(conf) # First migrate what we have
|
||||
sqliteDatabase.runMigrations(conf)
|
||||
|
||||
if conf.persistPeers:
|
||||
# Peer persistence enable. Set up Peer table in storage
|
||||
let res = WakuPeerStorage.new(sqliteDatabase)
|
||||
|
||||
if res.isErr:
|
||||
warn "failed to init new WakuPeerStorage", err = res.error
|
||||
waku_node_errors.inc(labelValues = ["init_store_failure"])
|
||||
else:
|
||||
storeTuple.pStorage = res.value
|
||||
|
||||
if conf.persistMessages:
|
||||
# Historical message persistence enable. Set up Message table in storage
|
||||
if conf.persistPeers:
|
||||
let res = WakuPeerStorage.new(sqliteDatabase)
|
||||
if res.isErr():
|
||||
warn "failed to init peer store", err = res.error
|
||||
waku_node_errors.inc(labelValues = ["init_store_failure"])
|
||||
else:
|
||||
storeTuple.pStorage = res.value
|
||||
|
||||
if conf.persistMessages:
|
||||
if conf.sqliteStore:
|
||||
let res = SqliteStore.init(sqliteDatabase)
|
||||
if res.isErr():
|
||||
warn "failed to init SqliteStore", err = res.error
|
||||
warn "failed to init message store", err = res.error
|
||||
waku_node_errors.inc(labelValues = ["init_store_failure"])
|
||||
else:
|
||||
storeTuple.mStorage = res.value
|
||||
|
||||
else:
|
||||
let res = DualMessageStore.init(sqliteDatabase, conf.storeCapacity)
|
||||
if res.isErr():
|
||||
warn "failed to init message store", err = res.error
|
||||
waku_node_errors.inc(labelValues = ["init_store_failure"])
|
||||
else:
|
||||
storeTuple.mStorage = res.value
|
||||
|
||||
ok(storeTuple)
|
||||
|
||||
# 2/7 Retrieve dynamic bootstrap nodes
|
||||
@ -1039,10 +1044,7 @@ when isMainModule:
|
||||
ok(node)
|
||||
|
||||
# 4/7 Mount and initialize configured protocols
|
||||
proc setupProtocols(node: WakuNode,
|
||||
conf: WakuNodeConf,
|
||||
mStorage: SqliteStore = nil): SetupResult[bool] =
|
||||
|
||||
proc setupProtocols(node: WakuNode, conf: WakuNodeConf, mStorage: MessageStore): SetupResult[bool] =
|
||||
## Setup configured protocols on an existing Waku v2 node.
|
||||
## Optionally include persistent message storage.
|
||||
## No protocols are started yet.
|
||||
@ -1084,7 +1086,9 @@ when isMainModule:
|
||||
|
||||
# Store setup
|
||||
if (conf.storenode != "") or (conf.store):
|
||||
waitFor mountStore(node, mStorage, conf.persistMessages, conf.storeCapacity, conf.sqliteRetentionTime, conf.sqliteStore)
|
||||
let retentionPolicy = if conf.sqliteStore: TimeRetentionPolicy.init(conf.sqliteRetentionTime)
|
||||
else: CapacityRetentionPolicy.init(conf.storeCapacity)
|
||||
waitFor mountStore(node, mStorage, retentionPolicy=some(retentionPolicy))
|
||||
|
||||
if conf.storenode != "":
|
||||
setStorePeer(node, conf.storenode)
|
||||
@ -1188,7 +1192,7 @@ when isMainModule:
|
||||
|
||||
var
|
||||
pStorage: WakuPeerStorage
|
||||
mStorage: SqliteStore
|
||||
mStorage: MessageStore
|
||||
|
||||
let setupStorageRes = setupStorage(conf)
|
||||
|
||||
|
@ -58,6 +58,7 @@ const MaxRpcSize = StoreMaxPageSize * MaxWakuMessageSize + 64*1024 # We add a 64
|
||||
# Error types (metric label values)
|
||||
const
|
||||
insertFailure = "insert_failure"
|
||||
retPolicyFailure = "retpolicy_failure"
|
||||
dialFailure = "dial_failure"
|
||||
decodeRpcFailure = "decode_rpc_failure"
|
||||
peerNotFoundFailure = "peer_not_found_failure"
|
||||
@ -69,17 +70,28 @@ type
|
||||
WakuStore* = ref object of LPProtocol
|
||||
peerManager*: PeerManager
|
||||
rng*: ref rand.HmacDrbgContext
|
||||
messages*: StoreQueueRef # in-memory message store
|
||||
store*: MessageStore # sqlite DB handle
|
||||
store*: MessageStore
|
||||
wakuSwap*: WakuSwap
|
||||
persistMessages*: bool
|
||||
#TODO: SqliteStore currenly also holds isSqliteOnly; put it in single place.
|
||||
isSqliteOnly: bool # if true, don't use in memory-store and answer history queries from the sqlite DB
|
||||
retentionPolicy: Option[MessageRetentionPolicy]
|
||||
|
||||
|
||||
proc reportMessagesCountMetric(store: MessageStore) =
|
||||
let resCount = store.getMessagesCount()
|
||||
proc executeMessageRetentionPolicy*(w: WakuStore): WakuStoreResult[void] =
|
||||
if w.retentionPolicy.isNone():
|
||||
return ok()
|
||||
|
||||
let policy = w.retentionPolicy.get()
|
||||
|
||||
if w.store.isNil():
|
||||
return err("no message store provided (nil)")
|
||||
|
||||
policy.execute(w.store)
|
||||
|
||||
|
||||
proc reportStoredMessagesMetric*(w: WakuStore) =
|
||||
if w.store.isNil():
|
||||
return
|
||||
|
||||
let resCount = w.store.getMessagesCount()
|
||||
if resCount.isErr():
|
||||
return
|
||||
|
||||
@ -104,23 +116,10 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse {.gcsafe.}
|
||||
qMaxPageSize = query.pagingInfo.pageSize
|
||||
qAscendingOrder = query.pagingInfo.direction == PagingDirection.FORWARD
|
||||
|
||||
let queryStartTime = getTime().toUnixFloat()
|
||||
|
||||
let queryRes = block:
|
||||
# TODO: Move this logic, together with the insert message logic and load messages on boot
|
||||
# into a "dual-store" message store implementation.
|
||||
if w.isSqliteOnly:
|
||||
w.store.getMessagesByHistoryQuery(
|
||||
contentTopic = qContentTopics,
|
||||
pubsubTopic = qPubSubTopic,
|
||||
cursor = qCursor,
|
||||
startTime = qStartTime,
|
||||
endTime = qEndTime,
|
||||
maxPageSize = qMaxPageSize,
|
||||
ascendingOrder = qAscendingOrder
|
||||
)
|
||||
else:
|
||||
w.messages.getMessagesByHistoryQuery(
|
||||
let queryStartTime = getTime().toUnixFloat()
|
||||
|
||||
let queryRes = w.store.getMessagesByHistoryQuery(
|
||||
contentTopic = qContentTopics,
|
||||
pubsubTopic = qPubSubTopic,
|
||||
cursor = qCursor,
|
||||
@ -147,8 +146,8 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse {.gcsafe.}
|
||||
error: HistoryResponseError.NONE
|
||||
)
|
||||
|
||||
proc init*(ws: WakuStore, capacity = StoreDefaultCapacity) =
|
||||
|
||||
proc initProtocolHandler*(ws: WakuStore) =
|
||||
|
||||
proc handler(conn: Connection, proto: string) {.async.} =
|
||||
let buf = await conn.readLp(MaxRpcSize.int)
|
||||
|
||||
@ -163,7 +162,9 @@ proc init*(ws: WakuStore, capacity = StoreDefaultCapacity) =
|
||||
info "received history query", peerId=conn.peerId, requestId=req.requestId, query=req.query
|
||||
waku_store_queries.inc()
|
||||
|
||||
let resp = ws.findMessages(req.query)
|
||||
let resp = if not ws.store.isNil(): ws.findMessages(req.query)
|
||||
# TODO: Improve error reporting
|
||||
else: HistoryResponse(error: HistoryResponseError.SERVICE_UNAVAILABLE)
|
||||
|
||||
if not ws.wakuSwap.isNil():
|
||||
info "handle store swap", peerId=conn.peerId, requestId=req.requestId, text=ws.wakuSwap.text
|
||||
@ -181,89 +182,44 @@ proc init*(ws: WakuStore, capacity = StoreDefaultCapacity) =
|
||||
|
||||
ws.handler = handler
|
||||
ws.codec = WakuStoreCodec
|
||||
ws.messages = StoreQueueRef.new(capacity)
|
||||
|
||||
if ws.isSqliteOnly:
|
||||
if ws.store.isNil():
|
||||
warn "store not provided (nil)"
|
||||
return
|
||||
|
||||
# Execute retention policy on initialization
|
||||
if not ws.retentionPolicy.isNone():
|
||||
let policy = ws.retentionPolicy.get()
|
||||
let resRetPolicy = policy.execute(ws.store)
|
||||
if resRetPolicy.isErr():
|
||||
warn "an error occurred while applying the retention policy at init", error=resRetPolicy.error()
|
||||
|
||||
info "SQLite-only store initialized. Messages are *not* loaded into memory."
|
||||
|
||||
let numMessages = ws.store.getMessagesCount()
|
||||
if numMessages.isOk():
|
||||
debug "number of messages in persistent store", messageNum=numMessages.value
|
||||
waku_store_messages.set(numMessages.value, labelValues = ["stored"])
|
||||
|
||||
# TODO: Move this logic, together with the insert message logic
|
||||
# into a "dual-store" message store implementation.
|
||||
else:
|
||||
if ws.store.isNil():
|
||||
return
|
||||
|
||||
# Execute retention policy before loading any messages into in-memory store
|
||||
if not ws.retentionPolicy.isNone():
|
||||
let policy = ws.retentionPolicy.get()
|
||||
let resRetPolicy = policy.execute(ws.store)
|
||||
if resRetPolicy.isErr():
|
||||
warn "an error occurred while applying the retention policy at init", error=resRetPolicy.error()
|
||||
|
||||
info "loading messages from persistent storage"
|
||||
|
||||
let res = ws.store.getAllMessages()
|
||||
if res.isOk():
|
||||
for (receiverTime, msg, pubsubTopic) in res.value:
|
||||
let index = Index.compute(msg, receiverTime, pubsubTopic)
|
||||
discard ws.messages.put(index, msg, pubsubTopic)
|
||||
|
||||
info "successfully loaded messages from the persistent store"
|
||||
else:
|
||||
warn "failed to load messages from the persistent store", err = res.error()
|
||||
|
||||
let numMessages = ws.messages.getMessagesCount()
|
||||
if numMessages.isOk():
|
||||
debug "number of messages in in-memory store", messageNum=numMessages.value
|
||||
waku_store_messages.set(numMessages.value, labelValues = ["stored"])
|
||||
|
||||
|
||||
proc init*(T: type WakuStore,
|
||||
peerManager: PeerManager,
|
||||
rng: ref rand.HmacDrbgContext,
|
||||
store: MessageStore = nil,
|
||||
store: MessageStore,
|
||||
wakuSwap: WakuSwap = nil,
|
||||
persistMessages = true,
|
||||
capacity = StoreDefaultCapacity,
|
||||
isSqliteOnly = false,
|
||||
retentionPolicy=none(MessageRetentionPolicy)): T =
|
||||
let ws = WakuStore(
|
||||
rng: rng,
|
||||
peerManager: peerManager,
|
||||
store: store,
|
||||
wakuSwap: wakuSwap,
|
||||
persistMessages: persistMessages,
|
||||
isSqliteOnly: isSqliteOnly,
|
||||
retentionPolicy: retentionPolicy
|
||||
)
|
||||
ws.init(capacity)
|
||||
ws.initProtocolHandler()
|
||||
|
||||
# TODO: Move to wakunode
|
||||
# Execute retention policy on initialization
|
||||
let retPolicyRes = ws.executeMessageRetentionPolicy()
|
||||
if retPolicyRes.isErr():
|
||||
warn "an error occurred while applying the retention policy at init", error=retPolicyRes.error
|
||||
|
||||
ws.reportStoredMessagesMetric()
|
||||
|
||||
return ws
|
||||
|
||||
|
||||
# TODO: This should probably be an add function and append the peer to an array
|
||||
proc setPeer*(ws: WakuStore, peer: RemotePeerInfo) =
|
||||
ws.peerManager.addPeer(peer, WakuStoreCodec)
|
||||
waku_store_peers.inc()
|
||||
proc init*(T: type WakuStore,
|
||||
peerManager: PeerManager,
|
||||
rng: ref rand.HmacDrbgContext,
|
||||
wakuSwap: WakuSwap = nil,
|
||||
retentionPolicy=none(MessageRetentionPolicy)): T =
|
||||
let store = StoreQueueRef.new(StoreDefaultCapacity)
|
||||
WakuStore.init(peerManager, rng, store, wakuSwap, retentionPolicy)
|
||||
|
||||
|
||||
proc handleMessage*(w: WakuStore, pubsubTopic: string, msg: WakuMessage) {.async.} =
|
||||
if not w.persistMessages:
|
||||
# Store is mounted but new messages should not be stored
|
||||
if w.store.isNil():
|
||||
# Messages should not be stored
|
||||
return
|
||||
|
||||
if msg.ephemeral:
|
||||
@ -277,62 +233,32 @@ proc handleMessage*(w: WakuStore, pubsubTopic: string, msg: WakuMessage) {.async
|
||||
|
||||
trace "handling message", topic=pubsubTopic, index=index
|
||||
|
||||
block:
|
||||
if w.isSqliteOnly:
|
||||
# Add messages to persistent store, if present
|
||||
if w.store.isNil():
|
||||
return
|
||||
# Add messages to persistent store, if present
|
||||
let putStoreRes = w.store.put(index, msg, pubsubTopic)
|
||||
if putStoreRes.isErr():
|
||||
debug "failed to insert message to persistent store", index=index, err=putStoreRes.error
|
||||
waku_store_errors.inc(labelValues = [insertFailure])
|
||||
return
|
||||
|
||||
let resPutStore = w.store.put(index, msg, pubsubTopic)
|
||||
if resPutStore.isErr():
|
||||
debug "failed to insert message to persistent store", index=index, err=resPutStore.error()
|
||||
waku_store_errors.inc(labelValues = [insertFailure])
|
||||
return
|
||||
|
||||
# Execute the retention policy after insertion
|
||||
if not w.retentionPolicy.isNone():
|
||||
let policy = w.retentionPolicy.get()
|
||||
let resRetPolicy = policy.execute(w.store)
|
||||
if resRetPolicy.isErr():
|
||||
debug "message retention policy failure", error=resRetPolicy.error()
|
||||
waku_store_errors.inc(labelValues = [insertFailure])
|
||||
|
||||
reportMessagesCountMetric(w.store)
|
||||
|
||||
# TODO: Move this logic, together with the load from persistent store on init
|
||||
# into a "dual-store" message store implementation.
|
||||
else:
|
||||
# Add message to in-memory store
|
||||
let resPutInmemory = w.messages.put(index, msg, pubsubTopic)
|
||||
if resPutInmemory.isErr():
|
||||
debug "failed to insert message to in-memory store", index=index, err=resPutInmemory.error()
|
||||
waku_store_errors.inc(labelValues = [insertFailure])
|
||||
return
|
||||
|
||||
reportMessagesCountMetric(w.messages)
|
||||
|
||||
# Add messages to persistent store, if present
|
||||
if w.store.isNil():
|
||||
return
|
||||
|
||||
let resPutStore = w.store.put(index, msg, pubsubTopic)
|
||||
if resPutStore.isErr():
|
||||
debug "failed to insert message to persistent store", index=index, err=resPutStore.error()
|
||||
waku_store_errors.inc(labelValues = [insertFailure])
|
||||
return
|
||||
|
||||
# Execute the retention policy after insertion
|
||||
if not w.retentionPolicy.isNone():
|
||||
let policy = w.retentionPolicy.get()
|
||||
let resRetPolicy = policy.execute(w.store)
|
||||
if resRetPolicy.isErr():
|
||||
debug "message retention policy failure", error=resRetPolicy.error()
|
||||
waku_store_errors.inc(labelValues = [insertFailure])
|
||||
# Execute the retention policy after insertion
|
||||
let retPolicyRes = w.executeMessageRetentionPolicy()
|
||||
if retPolicyRes.isErr():
|
||||
debug "message retention policy failure", error=retPolicyRes.error
|
||||
waku_store_errors.inc(labelValues = [retPolicyFailure])
|
||||
|
||||
w.reportStoredMessagesMetric()
|
||||
|
||||
let insertDuration = getTime().toUnixFloat() - insertStartTime
|
||||
waku_store_insert_duration_seconds.observe(insertDuration)
|
||||
|
||||
|
||||
## CLIENT
|
||||
|
||||
# TODO: This should probably be an add function and append the peer to an array
|
||||
proc setPeer*(ws: WakuStore, peer: RemotePeerInfo) =
|
||||
ws.peerManager.addPeer(peer, WakuStoreCodec)
|
||||
waku_store_peers.inc()
|
||||
|
||||
# TODO: Remove after converting the query method into a non-callback method
|
||||
type QueryHandlerFunc* = proc(response: HistoryResponse) {.gcsafe, closure.}
|
||||
|
||||
@ -376,6 +302,8 @@ proc query*(w: WakuStore, req: HistoryQuery): Future[WakuStoreResult[HistoryResp
|
||||
|
||||
## 21/WAKU2-FAULT-TOLERANT-STORE
|
||||
|
||||
const StoreResumeTimeWindowOffset: Timestamp = getNanosecondTime(20) ## Adjust the time window with an offset of 20 seconds
|
||||
|
||||
proc queryFromWithPaging*(w: WakuStore, query: HistoryQuery, peer: RemotePeerInfo): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe.} =
|
||||
## A thin wrapper for query. Sends the query to the given peer. when the query has a valid pagingInfo,
|
||||
## it retrieves the historical messages in pages.
|
||||
@ -390,7 +318,7 @@ proc queryFromWithPaging*(w: WakuStore, query: HistoryQuery, peer: RemotePeerInf
|
||||
while true:
|
||||
let res = await w.query(req, peer)
|
||||
if res.isErr():
|
||||
return err(res.error())
|
||||
return err(res.error)
|
||||
|
||||
let response = res.get()
|
||||
|
||||
@ -446,29 +374,27 @@ proc resume*(w: WakuStore,
|
||||
## the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string
|
||||
|
||||
# If store has not been provided, don't even try
|
||||
if w.isSqliteOnly and w.store.isNil():
|
||||
return err("store not provided")
|
||||
if w.store.isNil():
|
||||
return err("store not provided (nil)")
|
||||
|
||||
# NOTE: Original implementation is based on the message's sender timestamp. At the moment
|
||||
# of writing, the sqlite store implementation returns the last message's receiver
|
||||
# timestamp.
|
||||
# lastSeenTime = lastSeenItem.get().msg.timestamp
|
||||
let
|
||||
lastSeenTime = w.store.getNewestMessageTimestamp().get(Timestamp(0))
|
||||
now = getNanosecondTime(getTime().toUnixFloat())
|
||||
|
||||
var lastSeenTime = Timestamp(0)
|
||||
var currentTime = getNanosecondTime(epochTime())
|
||||
|
||||
let lastSeenItem = w.messages.last()
|
||||
if lastSeenItem.isOk():
|
||||
lastSeenTime = lastSeenItem.get().msg.timestamp
|
||||
|
||||
# adjust the time window with an offset of 20 seconds
|
||||
let offset: Timestamp = getNanosecondTime(20)
|
||||
currentTime = currentTime + offset
|
||||
lastSeenTime = max(lastSeenTime - offset, 0)
|
||||
|
||||
debug "the offline time window is", lastSeenTime=lastSeenTime, currentTime=currentTime
|
||||
debug "resuming with offline time window", lastSeenTime=lastSeenTime, currentTime=now
|
||||
|
||||
let
|
||||
queryEndTime = now + StoreResumeTimeWindowOffset
|
||||
queryStartTime = max(lastSeenTime - StoreResumeTimeWindowOffset, 0)
|
||||
|
||||
let req = HistoryQuery(
|
||||
pubsubTopic: pubsubTopic,
|
||||
startTime: lastSeenTime,
|
||||
endTime: currentTime,
|
||||
startTime: queryStartTime,
|
||||
endTime: queryEndTime,
|
||||
pagingInfo: PagingInfo(
|
||||
direction:PagingDirection.FORWARD,
|
||||
pageSize: pageSize
|
||||
@ -498,70 +424,26 @@ proc resume*(w: WakuStore,
|
||||
|
||||
|
||||
# Save the retrieved messages in the store
|
||||
var dismissed: uint = 0
|
||||
var added: uint = 0
|
||||
|
||||
for msg in res.get():
|
||||
let now = getNanosecondTime(getTime().toUnixFloat())
|
||||
let index = Index.compute(msg, receivedTime=now, pubsubTopic=pubsubTopic)
|
||||
|
||||
if w.isSqliteOnly:
|
||||
# Add messages to persistent store
|
||||
let resPutStore = w.store.put(index, msg, pubsubTopic)
|
||||
if resPutStore.isErr():
|
||||
debug "failed to insert message to persistent store", index=index, err=resPutStore.error()
|
||||
waku_store_errors.inc(labelValues = [insertFailure])
|
||||
continue
|
||||
|
||||
# Execute the retention policy after insertion
|
||||
if not w.retentionPolicy.isNone():
|
||||
let policy = w.retentionPolicy.get()
|
||||
let resRetPolicy = policy.execute(w.store)
|
||||
if resRetPolicy.isErr():
|
||||
debug "message retention policy failure", error=resRetPolicy.error()
|
||||
waku_store_errors.inc(labelValues = [insertFailure])
|
||||
let putStoreRes = w.store.put(index, msg, pubsubTopic)
|
||||
if putStoreRes.isErr():
|
||||
continue
|
||||
|
||||
# TODO: Move this logic, together with the load from persistent store on init
|
||||
# into a "dual-store" message store implementation.
|
||||
else:
|
||||
# check for duplicate messages
|
||||
# TODO: Should take pubsub topic into account if we are going to support topics rather than the DefaultTopic
|
||||
if w.messages.contains(index):
|
||||
dismissed.inc()
|
||||
continue
|
||||
|
||||
# Add message to in-memory store
|
||||
let resPutInmemory = w.messages.put(index, msg, pubsubTopic)
|
||||
if resPutInmemory.isErr():
|
||||
debug "failed to insert message to in-memory store", index=index, err=resPutInmemory.error()
|
||||
waku_store_errors.inc(labelValues = [insertFailure])
|
||||
continue
|
||||
|
||||
if w.store.isNil():
|
||||
continue
|
||||
|
||||
# Add messages to persistent store
|
||||
let resPutStore = w.store.put(index, msg, pubsubTopic)
|
||||
if resPutStore.isErr():
|
||||
debug "failed to insert message to persistent store", index=index, err=resPutStore.error()
|
||||
waku_store_errors.inc(labelValues = [insertFailure])
|
||||
continue
|
||||
|
||||
# Execute the retention policy after insertion
|
||||
if not w.retentionPolicy.isNone():
|
||||
let policy = w.retentionPolicy.get()
|
||||
let resRetPolicy = policy.execute(w.store)
|
||||
if resRetPolicy.isErr():
|
||||
debug "message retention policy failure", error=resRetPolicy.error()
|
||||
waku_store_errors.inc(labelValues = [insertFailure])
|
||||
|
||||
added.inc()
|
||||
|
||||
debug "resume finished successfully", addedMessages=added, dimissedMessages=dismissed
|
||||
debug "resume finished successfully", retrievedMessages=res.get().len, addedMessages=added
|
||||
|
||||
# Execute the retention policy after insertion
|
||||
let retPolicyRes = w.executeMessageRetentionPolicy()
|
||||
if retPolicyRes.isErr():
|
||||
debug "message retention policy failure", error=retPolicyRes.error
|
||||
waku_store_errors.inc(labelValues = [retPolicyFailure])
|
||||
|
||||
let store: MessageStore = if w.isSqliteOnly: w.store
|
||||
else: w.messages
|
||||
reportMessagesCountMetric(store)
|
||||
w.reportStoredMessagesMetric()
|
||||
|
||||
return ok(added)
|
||||
|
||||
@ -578,7 +460,7 @@ proc queryWithAccounting*(ws: WakuStore, req: HistoryQuery): Future[WakuStoreRes
|
||||
|
||||
let res = await ws.query(req, peerOpt.get())
|
||||
if res.isErr():
|
||||
return err(res.error())
|
||||
return err(res.error)
|
||||
|
||||
let response = res.get()
|
||||
|
||||
|
@ -23,6 +23,7 @@ type
|
||||
## HistoryResponseError contains error message to inform the querying node about the state of its request
|
||||
NONE = uint32(0)
|
||||
INVALID_CURSOR = uint32(1)
|
||||
SERVICE_UNAVAILABLE = uint32(503)
|
||||
|
||||
HistoryResponse* = object
|
||||
messages*: seq[WakuMessage]
|
||||
|
Loading…
x
Reference in New Issue
Block a user