diff --git a/examples/v2/chat2.nim b/examples/v2/chat2.nim index 41e3062fe..26590df5c 100644 --- a/examples/v2/chat2.nim +++ b/examples/v2/chat2.nim @@ -449,7 +449,7 @@ proc processInput(rfd: AsyncFD) {.async.} = await node.mountSwap() if (conf.storenode != "") or (conf.store == true): - await node.mountStore(persistMessages = conf.persistMessages) + await node.mountStore() var storenode: Option[RemotePeerInfo] diff --git a/tests/all_tests_v2.nim b/tests/all_tests_v2.nim index 981ec02d2..9a8404001 100644 --- a/tests/all_tests_v2.nim +++ b/tests/all_tests_v2.nim @@ -5,20 +5,22 @@ import ./v2/test_wakunode, ./v2/test_wakunode_relay, ./v2/test_wakunode_lightpush, + # Waku Store + ./v2/test_utils_pagination, + ./v2/test_message_store_queue, + ./v2/test_message_store_queue_pagination, + ./v2/test_message_store_sqlite_query, + ./v2/test_message_store_sqlite, ./v2/test_waku_store_rpc_codec, ./v2/test_waku_store, ./v2/test_wakunode_store, + # Waku Filter ./v2/test_waku_filter, ./v2/test_wakunode_filter, ./v2/test_waku_payload, ./v2/test_waku_swap, ./v2/test_utils_peers, - ./v2/test_utils_pagination, ./v2/test_message_cache, - ./v2/test_message_store_queue, - ./v2/test_message_store_queue_pagination, - ./v2/test_message_store_sqlite_query, - ./v2/test_message_store_sqlite, ./v2/test_jsonrpc_waku, ./v2/test_rest_serdes, ./v2/test_rest_debug_api_serdes, diff --git a/tests/v2/test_jsonrpc_waku.nim b/tests/v2/test_jsonrpc_waku.nim index b8806c5e6..016d6a20c 100644 --- a/tests/v2/test_jsonrpc_waku.nim +++ b/tests/v2/test_jsonrpc_waku.nim @@ -11,8 +11,10 @@ import libp2p/stream/[bufferstream, connection], libp2p/crypto/crypto, libp2p/protocols/pubsub/pubsub, - libp2p/protocols/pubsub/rpc/message, + libp2p/protocols/pubsub/rpc/message +import ../../waku/v1/node/rpc/hexstrings, + ../../waku/v2/node/storage/message/waku_store_queue, ../../waku/v2/node/wakunode2, ../../waku/v2/node/jsonrpc/[store_api, relay_api, @@ -232,7 +234,7 @@ procSuite "Waku v2 JSON-RPC API": key = wakunode2.PrivateKey.random(ECDSA, rng[]).get() peer = PeerInfo.new(key) - await node.mountStore(persistMessages = true) + await node.mountStore(store=StoreQueueRef.new()) var listenSwitch = newStandardSwitch(some(key)) waitFor listenSwitch.start() @@ -528,7 +530,7 @@ procSuite "Waku v2 JSON-RPC API": await node.mountFilter() await node.mountSwap() - await node.mountStore(persistMessages = true) + await node.mountStore(store=StoreQueueRef.new()) # Create and set some peers let diff --git a/tests/v2/test_peer_manager.nim b/tests/v2/test_peer_manager.nim index 66f5aeb3b..d38fd25db 100644 --- a/tests/v2/test_peer_manager.nim +++ b/tests/v2/test_peer_manager.nim @@ -104,7 +104,7 @@ procSuite "Peer Manager": await node.mountFilter() await node.mountSwap() - await node.mountStore(persistMessages = true) + await node.mountStore() node.wakuFilter.setPeer(filterPeer.toRemotePeerInfo()) node.wakuSwap.setPeer(swapPeer.toRemotePeerInfo()) diff --git a/tests/v2/test_waku_store.nim b/tests/v2/test_waku_store.nim index 31490dd52..da98272b8 100644 --- a/tests/v2/test_waku_store.nim +++ b/tests/v2/test_waku_store.nim @@ -617,7 +617,7 @@ procSuite "Waku Store - fault tolerant store": check: successResult.isOk() successResult.value == 10 - proto3.messages.len == 10 + proto3.store.getMessagesCount().tryGet() == 10 ## Cleanup await allFutures(dialSwitch3.stop(), listenSwitch3.stop()) @@ -681,9 +681,7 @@ procSuite "Waku Store - fault tolerant store": let response = res.tryGet() check: response == 14 - - check: - proto3.messages.len == 14 + proto3.store.getMessagesCount().tryGet() == 14 ## Cleanup await allFutures(listenSwitch3.stop(), dialSwitch3.stop(), offListenSwitch.stop()) diff --git a/tests/v2/test_waku_swap.nim b/tests/v2/test_waku_swap.nim index 2385c95c9..890d1db3b 100644 --- a/tests/v2/test_waku_swap.nim +++ b/tests/v2/test_waku_swap.nim @@ -9,10 +9,12 @@ import libp2p/stream/[bufferstream, connection], libp2p/crypto/[crypto, secp], libp2p/switch, - eth/keys, + eth/keys +import ../../waku/v2/protocol/waku_message, ../../waku/v2/protocol/waku_store, ../../waku/v2/protocol/waku_swap/waku_swap, + ../../waku/v2/node/storage/message/waku_store_queue, ../../waku/v2/node/wakunode2, ../../waku/v2/utils/peers, ../test_helpers, ./utils @@ -50,11 +52,11 @@ procSuite "Waku SWAP Accounting": asyncTest "Update accounting state after store operations": let nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), - Port(60000)) + node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000)) nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), - Port(60001)) + node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60001)) + + let contentTopic = ContentTopic("/waku/2/default-content/proto") message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic) @@ -63,14 +65,14 @@ procSuite "Waku SWAP Accounting": # Start nodes and mount protocols await node1.start() await node1.mountSwap() - await node1.mountStore(persistMessages = true) + await node1.mountStore(store=StoreQueueRef.new()) await node2.start() await node2.mountSwap() - await node2.mountStore(persistMessages = true) + await node2.mountStore(store=StoreQueueRef.new()) await node2.wakuStore.handleMessage("/waku/2/default-waku/proto", message) - await sleepAsync(2000.millis) + await sleepAsync(500.millis) node1.wakuStore.setPeer(node2.switch.peerInfo.toRemotePeerInfo()) node1.wakuSwap.setPeer(node2.switch.peerInfo.toRemotePeerInfo()) @@ -97,11 +99,11 @@ procSuite "Waku SWAP Accounting": asyncTest "Update accounting state after sending cheque": let nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), - Port(60000)) + node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000)) nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), - Port(60001)) + node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60001)) + + let contentTopic = ContentTopic("/waku/2/default-content/proto") message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic) @@ -113,14 +115,14 @@ procSuite "Waku SWAP Accounting": # Start nodes and mount protocols await node1.start() await node1.mountSwap(swapConfig) - await node1.mountStore(persistMessages = true) + await node1.mountStore(store=StoreQueueRef.new()) await node2.start() await node2.mountSwap(swapConfig) - await node2.mountStore(persistMessages = true) + await node2.mountStore(store=StoreQueueRef.new()) await node2.wakuStore.handleMessage("/waku/2/default-waku/proto", message) - await sleepAsync(2000.millis) + await sleepAsync(500.millis) node1.wakuStore.setPeer(node2.switch.peerInfo.toRemotePeerInfo()) node1.wakuSwap.setPeer(node2.switch.peerInfo.toRemotePeerInfo()) diff --git a/tests/v2/test_wakunode_store.nim b/tests/v2/test_wakunode_store.nim index 77529567f..b5c3b7c3c 100644 --- a/tests/v2/test_wakunode_store.nim +++ b/tests/v2/test_wakunode_store.nim @@ -12,13 +12,13 @@ import libp2p/switch, libp2p/protocols/pubsub/rpc/messages, libp2p/protocols/pubsub/pubsub, - libp2p/protocols/pubsub/gossipsub, - eth/keys + libp2p/protocols/pubsub/gossipsub import ../../waku/v2/node/storage/sqlite, + ../../waku/v2/node/storage/message/message_store, ../../waku/v2/node/storage/message/sqlite_store, ../../waku/v2/node/storage/message/waku_store_queue, - ../../waku/v2/protocol/[waku_relay, waku_message], + ../../waku/v2/protocol/waku_message, ../../waku/v2/protocol/waku_store, ../../waku/v2/protocol/waku_filter, ../../waku/v2/node/peer_manager/peer_manager, @@ -27,29 +27,32 @@ import ../../waku/v2/utils/time, ../../waku/v2/node/wakunode2 -from std/times import epochTime +from std/times import getTime, toUnixFloat + +proc newTestMessageStore(): MessageStore = + let database = SqliteDatabase.init("", inMemory = true)[] + SqliteStore.init(database).tryGet() procSuite "WakuNode - Store": - let rng = keys.newRng() + let rng = crypto.newRng() asyncTest "Store protocol returns expected message": let nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), - Port(60000)) + node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000)) nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), - Port(60002)) + node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002)) + let contentTopic = ContentTopic("/waku/2/default-content/proto") message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic) var completionFut = newFuture[bool]() await node1.start() - await node1.mountStore(persistMessages = true) + await node1.mountStore(store=newTestMessageStore()) await node2.start() - await node2.mountStore(persistMessages = true) + await node2.mountStore(store=newTestMessageStore()) await node2.wakuStore.handleMessage("/waku/2/default-waku/proto", message) @@ -88,11 +91,11 @@ procSuite "WakuNode - Store": storeComplFut = newFuture[bool]() await node1.start() - await node1.mountStore(persistMessages = true) + await node1.mountStore(store=newTestMessageStore()) await node1.mountFilter() await node2.start() - await node2.mountStore(persistMessages = true) + await node2.mountStore(store=newTestMessageStore()) await node2.mountFilter() node2.wakuFilter.setPeer(node1.switch.peerInfo.toRemotePeerInfo()) @@ -142,9 +145,9 @@ procSuite "WakuNode - Store": message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic) await node1.start() - await node1.mountStore(persistMessages = true) + await node1.mountStore(store=newTestMessageStore()) await node2.start() - await node2.mountStore(persistMessages = true) + await node2.mountStore(store=StoreQueueRef.new()) await node2.wakuStore.handleMessage("/waku/2/default-waku/proto", message) @@ -156,61 +159,44 @@ procSuite "WakuNode - Store": check: # message is correctly stored - node1.wakuStore.messages.len == 1 + node1.wakuStore.store.getMessagesCount().tryGet() == 1 await node1.stop() await node2.stop() asyncTest "Resume proc discards duplicate messages": + let timeOrigin = getNanosecondTime(getTime().toUnixFloat()) let nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000)) + client = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000)) nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002)) + server = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002)) let contentTopic = ContentTopic("/waku/2/default-content/proto") - msg1 = WakuMessage(payload: "hello world1".toBytes(), contentTopic: contentTopic, timestamp: 1) - msg2 = WakuMessage(payload: "hello world2".toBytes(), contentTopic: contentTopic, timestamp: 2) + msg1 = WakuMessage(payload: "hello world1".toBytes(), contentTopic: contentTopic, timestamp: timeOrigin + 1) + msg2 = WakuMessage(payload: "hello world2".toBytes(), contentTopic: contentTopic, timestamp: timeOrigin + 2) + msg3 = WakuMessage(payload: "hello world3".toBytes(), contentTopic: contentTopic, timestamp: timeOrigin + 3) - # setup sqlite database for node1 - let - database = SqliteDatabase.init("", inMemory = true)[] - store = SqliteStore.init(database).tryGet() + await allFutures(client.start(), server.start()) + await client.mountStore(store=StoreQueueRef.new()) + await server.mountStore(store=StoreQueueRef.new()) - await node1.start() - await node1.mountStore(persistMessages = true, store = store) - await node2.start() - await node2.mountStore(persistMessages = true) + await server.wakuStore.handleMessage(DefaultTopic, msg1) + await server.wakuStore.handleMessage(DefaultTopic, msg2) - await node2.wakuStore.handleMessage(DefaultTopic, msg1) - await node2.wakuStore.handleMessage(DefaultTopic, msg2) + client.wakuStore.setPeer(server.switch.peerInfo.toRemotePeerInfo()) - await sleepAsync(500.millis) - - node1.wakuStore.setPeer(node2.switch.peerInfo.toRemotePeerInfo()) - - - # populate db with msg1 to be a duplicate - let index1 = Index.compute(msg1, getNanosecondTime(epochTime()), DefaultTopic) - let output1 = store.put(index1, msg1, DefaultTopic) - check output1.isOk - discard node1.wakuStore.messages.put(index1, msg1, DefaultTopic) + # Insert the same message in both node's store + let index3 = Index.compute(msg3, getNanosecondTime(getTime().toUnixFloat() + 10.float), DefaultTopic) + require server.wakuStore.store.put(index3, msg3, DefaultTopic).isOk() + require client.wakuStore.store.put(index3, msg3, DefaultTopic).isOk() # now run the resume proc - await node1.resume() - - # count the total number of retrieved messages from the database - let res = store.getAllMessages() - check: - res.isOk() + await client.resume() check: - # if the duplicates are discarded properly, then the total number of messages after resume should be 2 - # check no duplicates is in the messages field - node1.wakuStore.messages.len == 2 - # check no duplicates is in the db - res.value.len == 2 + # If the duplicates are discarded properly, then the total number of messages after resume should be 3 + client.wakuStore.store.getMessagesCount().tryGet() == 3 - await node1.stop() - await node2.stop() \ No newline at end of file + await allFutures(client.stop(), server.stop()) \ No newline at end of file diff --git a/waku/common/wakubridge.nim b/waku/common/wakubridge.nim index 83b6acb8c..bd1cbabdc 100644 --- a/waku/common/wakubridge.nim +++ b/waku/common/wakubridge.nim @@ -430,7 +430,7 @@ when isMainModule: waitFor mountLibp2pPing(bridge.nodev2) if conf.store: - waitFor mountStore(bridge.nodev2, persistMessages = false) # Bridge does not persist messages + waitFor mountStore(bridge.nodev2) # Bridge does not persist messages if conf.filter: waitFor mountFilter(bridge.nodev2) diff --git a/waku/v2/node/storage/message/dual_message_store.nim b/waku/v2/node/storage/message/dual_message_store.nim new file mode 100644 index 000000000..d5abe90fe --- /dev/null +++ b/waku/v2/node/storage/message/dual_message_store.nim @@ -0,0 +1,89 @@ +{.push raises: [Defect].} + +import + std/options, + stew/results, + chronicles +import + ../../../protocol/waku_message, + ../../../utils/pagination, + ../../../utils/time, + ../sqlite, + ./message_store, + ./waku_store_queue, + ./sqlite_store + +logScope: + topics = "message_store.dual" + + +type DualMessageStore* = ref object of MessageStore + inmemory: StoreQueueRef + persistent: SqliteStore + + +proc init*(T: type DualMessageStore, db: SqliteDatabase, capacity=StoreDefaultCapacity): MessageStoreResult[T] = + let + inmemory = StoreQueueRef.new(capacity) + persistent = ?SqliteStore.init(db) + + info "loading messages from persistent storage to in-memory store" + + let res = persistent.getAllMessages() + if res.isErr(): + warn "failed to load messages from the persistent store", err = res.error + else: + for (receiverTime, msg, pubsubTopic) in res.value: + let index = Index.compute(msg, receiverTime, pubsubTopic) + discard inmemory.put(index, msg, pubsubTopic) + + info "successfully loaded messages from the persistent store" + + + return ok(DualMessageStore(inmemory: inmemory, persistent: persistent)) + + +method put*(s: DualMessageStore, index: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] = + ?s.inmemory.put(index, message, pubsubTopic) + ?s.persistent.put(index, message, pubsubTopic) + ok() + + +method getAllMessages*(s: DualMessageStore): MessageStoreResult[seq[MessageStoreRow]] = + s.inmemory.getAllMessages() + + +method getMessagesByHistoryQuery*( + s: DualMessageStore, + contentTopic = none(seq[ContentTopic]), + pubsubTopic = none(string), + cursor = none(Index), + startTime = none(Timestamp), + endTime = none(Timestamp), + maxPageSize = StoreMaxPageSize, + ascendingOrder = true +): MessageStoreResult[MessageStorePage] = + s.inmemory.getMessagesByHistoryQuery(contentTopic, pubsubTopic, cursor, startTime, endTime, maxPageSize, ascendingOrder) + + +method getMessagesCount*(s: DualMessageStore): MessageStoreResult[int64] = + s.inmemory.getMessagesCount() + +method getOldestMessageTimestamp*(s: DualMessageStore): MessageStoreResult[Timestamp] = + s.inmemory.getOldestMessageTimestamp() + +method getNewestMessageTimestamp*(s: DualMessageStore): MessageStoreResult[Timestamp] = + s.inmemory.getNewestMessageTimestamp() + + +method deleteMessagesOlderThanTimestamp*(s: DualMessageStore, ts: Timestamp): MessageStoreResult[void] = + # NOTE: Current in-memory store deletes messages as they are inserted. This method fails with a "not implemented" error + # ?s.inmemory.deleteMessagesOlderThanTimestamp(ts) + ?s.persistent.deleteMessagesOlderThanTimestamp(ts) + ok() + +method deleteOldestMessagesNotWithinLimit*(s: DualMessageStore, limit: int): MessageStoreResult[void] = + # NOTE: Current in-memory store deletes messages as they are inserted. This method fails with a "not implemented" error + # ?s.inmemory.deleteOldestMessagesNotWithinLimit(limit) + ?s.persistent.deleteOldestMessagesNotWithinLimit(limit) + ok() \ No newline at end of file diff --git a/waku/v2/node/storage/message/waku_store_queue.nim b/waku/v2/node/storage/message/waku_store_queue.nim index 88e79d767..ca2452717 100644 --- a/waku/v2/node/storage/message/waku_store_queue.nim +++ b/waku/v2/node/storage/message/waku_store_queue.nim @@ -11,10 +11,12 @@ import ../../../utils/time, ./message_store +export pagination logScope: topics = "message_store.storequeue" + type IndexedWakuMessage* = object # TODO may need to rename this object as it holds both the index and the pubsub topic of a waku message diff --git a/waku/v2/node/wakunode2.nim b/waku/v2/node/wakunode2.nim index 13747c6ec..60d979db3 100644 --- a/waku/v2/node/wakunode2.nim +++ b/waku/v2/node/wakunode2.nim @@ -395,11 +395,15 @@ proc resume*(node: WakuNode, peerList: Option[seq[RemotePeerInfo]] = none(seq[Re ## peerList indicates the list of peers to query from. The history is fetched from the first available peer in this list. Such candidates should be found through a discovery method (to be developed). ## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from. ## The history gets fetched successfully if the dialed peer has been online during the queried time window. + if node.wakuStore.isNil(): + return + + let retrievedMessages = await node.wakuStore.resume(peerList) + if retrievedMessages.isErr(): + error "failed to resume store", error=retrievedMessages.error + return - if not node.wakuStore.isNil: - let retrievedMessages = await node.wakuStore.resume(peerList) - if retrievedMessages.isOk: - info "the number of retrieved messages since the last online time: ", number=retrievedMessages.value + info "the number of retrieved messages since the last online time: ", number=retrievedMessages.value # TODO Extend with more relevant info: topics, peers, memory usage, online time, etc proc info*(node: WakuNode): WakuInfo = @@ -452,24 +456,18 @@ proc mountSwap*(node: WakuNode, swapConfig: SwapConfig = SwapConfig.init()) {.as node.switch.mount(node.wakuSwap, protocolMatcher(WakuSwapCodec)) -proc mountStore*(node: WakuNode, store: MessageStore = nil, persistMessages: bool = false, capacity = StoreDefaultCapacity, retentionTime = StoreDefaultRetentionTime, isSqliteOnly = false) {.async, raises: [Defect, LPError].} = +proc mountStore*(node: WakuNode, store: MessageStore = nil, capacity = StoreDefaultCapacity, retentionPolicy=none(MessageRetentionPolicy) ) {.async, raises: [Defect, LPError].} = if node.wakuSwap.isNil(): info "mounting waku store protocol (no waku swap)" else: info "mounting waku store protocol with waku swap support" - let retentionPolicy = if isSqliteOnly: TimeRetentionPolicy.init(retentionTime) - else: CapacityRetentionPolicy.init(capacity) - node.wakuStore = WakuStore.init( node.peerManager, node.rng, store, wakuSwap=node.wakuSwap, - persistMessages=persistMessages, - capacity=capacity, - isSqliteOnly=isSqliteOnly, - retentionPolicy=some(retentionPolicy) + retentionPolicy=retentionPolicy ) if node.started: @@ -832,6 +830,8 @@ when isMainModule: ./wakunode2_setup_rpc, ./wakunode2_setup_sql_migrations, ./storage/sqlite, + ./storage/message/message_store, + ./storage/message/dual_message_store, ./storage/message/sqlite_store, ./storage/peer/waku_peer_storage @@ -844,7 +844,7 @@ when isMainModule: # 1/7 Setup storage proc setupStorage(conf: WakuNodeConf): - SetupResult[tuple[pStorage: WakuPeerStorage, mStorage: SqliteStore]] = + SetupResult[tuple[pStorage: WakuPeerStorage, mStorage: MessageStore]] = ## Setup a SQLite Database for a wakunode based on a supplied ## configuration file and perform all necessary migration. @@ -854,20 +854,20 @@ when isMainModule: var sqliteDatabase: SqliteDatabase - storeTuple: tuple[pStorage: WakuPeerStorage, mStorage: SqliteStore] + storeTuple: tuple[pStorage: WakuPeerStorage, mStorage: MessageStore] - # Setup DB + # Setup database connection if conf.dbPath != "": let dbRes = SqliteDatabase.init(conf.dbPath) - if dbRes.isErr: - warn "failed to init database", err = dbRes.error + if dbRes.isErr(): + warn "failed to init database connection", err = dbRes.error waku_node_errors.inc(labelValues = ["init_db_failure"]) - return err("failed to init database") + return err("failed to init database connection") else: sqliteDatabase = dbRes.value - if not sqliteDatabase.isNil and (conf.persistPeers or conf.persistMessages): - + + if not sqliteDatabase.isNil(): ## Database vacuuming # TODO: Wrap and move this logic to the appropriate module let @@ -887,28 +887,33 @@ when isMainModule: debug "finished sqlite database vacuuming" - # Database initialized. Let's set it up - sqliteDatabase.runMigrations(conf) # First migrate what we have + sqliteDatabase.runMigrations(conf) - if conf.persistPeers: - # Peer persistence enable. Set up Peer table in storage - let res = WakuPeerStorage.new(sqliteDatabase) - if res.isErr: - warn "failed to init new WakuPeerStorage", err = res.error - waku_node_errors.inc(labelValues = ["init_store_failure"]) - else: - storeTuple.pStorage = res.value - - if conf.persistMessages: - # Historical message persistence enable. Set up Message table in storage + if conf.persistPeers: + let res = WakuPeerStorage.new(sqliteDatabase) + if res.isErr(): + warn "failed to init peer store", err = res.error + waku_node_errors.inc(labelValues = ["init_store_failure"]) + else: + storeTuple.pStorage = res.value + + if conf.persistMessages: + if conf.sqliteStore: let res = SqliteStore.init(sqliteDatabase) if res.isErr(): - warn "failed to init SqliteStore", err = res.error + warn "failed to init message store", err = res.error waku_node_errors.inc(labelValues = ["init_store_failure"]) else: storeTuple.mStorage = res.value - + else: + let res = DualMessageStore.init(sqliteDatabase, conf.storeCapacity) + if res.isErr(): + warn "failed to init message store", err = res.error + waku_node_errors.inc(labelValues = ["init_store_failure"]) + else: + storeTuple.mStorage = res.value + ok(storeTuple) # 2/7 Retrieve dynamic bootstrap nodes @@ -1039,10 +1044,7 @@ when isMainModule: ok(node) # 4/7 Mount and initialize configured protocols - proc setupProtocols(node: WakuNode, - conf: WakuNodeConf, - mStorage: SqliteStore = nil): SetupResult[bool] = - + proc setupProtocols(node: WakuNode, conf: WakuNodeConf, mStorage: MessageStore): SetupResult[bool] = ## Setup configured protocols on an existing Waku v2 node. ## Optionally include persistent message storage. ## No protocols are started yet. @@ -1084,7 +1086,9 @@ when isMainModule: # Store setup if (conf.storenode != "") or (conf.store): - waitFor mountStore(node, mStorage, conf.persistMessages, conf.storeCapacity, conf.sqliteRetentionTime, conf.sqliteStore) + let retentionPolicy = if conf.sqliteStore: TimeRetentionPolicy.init(conf.sqliteRetentionTime) + else: CapacityRetentionPolicy.init(conf.storeCapacity) + waitFor mountStore(node, mStorage, retentionPolicy=some(retentionPolicy)) if conf.storenode != "": setStorePeer(node, conf.storenode) @@ -1188,7 +1192,7 @@ when isMainModule: var pStorage: WakuPeerStorage - mStorage: SqliteStore + mStorage: MessageStore let setupStorageRes = setupStorage(conf) diff --git a/waku/v2/protocol/waku_store/protocol.nim b/waku/v2/protocol/waku_store/protocol.nim index 8a9e321a4..6ebc04fc9 100644 --- a/waku/v2/protocol/waku_store/protocol.nim +++ b/waku/v2/protocol/waku_store/protocol.nim @@ -58,6 +58,7 @@ const MaxRpcSize = StoreMaxPageSize * MaxWakuMessageSize + 64*1024 # We add a 64 # Error types (metric label values) const insertFailure = "insert_failure" + retPolicyFailure = "retpolicy_failure" dialFailure = "dial_failure" decodeRpcFailure = "decode_rpc_failure" peerNotFoundFailure = "peer_not_found_failure" @@ -69,17 +70,28 @@ type WakuStore* = ref object of LPProtocol peerManager*: PeerManager rng*: ref rand.HmacDrbgContext - messages*: StoreQueueRef # in-memory message store - store*: MessageStore # sqlite DB handle + store*: MessageStore wakuSwap*: WakuSwap - persistMessages*: bool - #TODO: SqliteStore currenly also holds isSqliteOnly; put it in single place. - isSqliteOnly: bool # if true, don't use in memory-store and answer history queries from the sqlite DB retentionPolicy: Option[MessageRetentionPolicy] -proc reportMessagesCountMetric(store: MessageStore) = - let resCount = store.getMessagesCount() +proc executeMessageRetentionPolicy*(w: WakuStore): WakuStoreResult[void] = + if w.retentionPolicy.isNone(): + return ok() + + let policy = w.retentionPolicy.get() + + if w.store.isNil(): + return err("no message store provided (nil)") + + policy.execute(w.store) + + +proc reportStoredMessagesMetric*(w: WakuStore) = + if w.store.isNil(): + return + + let resCount = w.store.getMessagesCount() if resCount.isErr(): return @@ -104,23 +116,10 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse {.gcsafe.} qMaxPageSize = query.pagingInfo.pageSize qAscendingOrder = query.pagingInfo.direction == PagingDirection.FORWARD - let queryStartTime = getTime().toUnixFloat() - let queryRes = block: - # TODO: Move this logic, together with the insert message logic and load messages on boot - # into a "dual-store" message store implementation. - if w.isSqliteOnly: - w.store.getMessagesByHistoryQuery( - contentTopic = qContentTopics, - pubsubTopic = qPubSubTopic, - cursor = qCursor, - startTime = qStartTime, - endTime = qEndTime, - maxPageSize = qMaxPageSize, - ascendingOrder = qAscendingOrder - ) - else: - w.messages.getMessagesByHistoryQuery( + let queryStartTime = getTime().toUnixFloat() + + let queryRes = w.store.getMessagesByHistoryQuery( contentTopic = qContentTopics, pubsubTopic = qPubSubTopic, cursor = qCursor, @@ -147,8 +146,8 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse {.gcsafe.} error: HistoryResponseError.NONE ) -proc init*(ws: WakuStore, capacity = StoreDefaultCapacity) = - +proc initProtocolHandler*(ws: WakuStore) = + proc handler(conn: Connection, proto: string) {.async.} = let buf = await conn.readLp(MaxRpcSize.int) @@ -163,7 +162,9 @@ proc init*(ws: WakuStore, capacity = StoreDefaultCapacity) = info "received history query", peerId=conn.peerId, requestId=req.requestId, query=req.query waku_store_queries.inc() - let resp = ws.findMessages(req.query) + let resp = if not ws.store.isNil(): ws.findMessages(req.query) + # TODO: Improve error reporting + else: HistoryResponse(error: HistoryResponseError.SERVICE_UNAVAILABLE) if not ws.wakuSwap.isNil(): info "handle store swap", peerId=conn.peerId, requestId=req.requestId, text=ws.wakuSwap.text @@ -181,89 +182,44 @@ proc init*(ws: WakuStore, capacity = StoreDefaultCapacity) = ws.handler = handler ws.codec = WakuStoreCodec - ws.messages = StoreQueueRef.new(capacity) - - if ws.isSqliteOnly: - if ws.store.isNil(): - warn "store not provided (nil)" - return - - # Execute retention policy on initialization - if not ws.retentionPolicy.isNone(): - let policy = ws.retentionPolicy.get() - let resRetPolicy = policy.execute(ws.store) - if resRetPolicy.isErr(): - warn "an error occurred while applying the retention policy at init", error=resRetPolicy.error() - - info "SQLite-only store initialized. Messages are *not* loaded into memory." - - let numMessages = ws.store.getMessagesCount() - if numMessages.isOk(): - debug "number of messages in persistent store", messageNum=numMessages.value - waku_store_messages.set(numMessages.value, labelValues = ["stored"]) - - # TODO: Move this logic, together with the insert message logic - # into a "dual-store" message store implementation. - else: - if ws.store.isNil(): - return - - # Execute retention policy before loading any messages into in-memory store - if not ws.retentionPolicy.isNone(): - let policy = ws.retentionPolicy.get() - let resRetPolicy = policy.execute(ws.store) - if resRetPolicy.isErr(): - warn "an error occurred while applying the retention policy at init", error=resRetPolicy.error() - - info "loading messages from persistent storage" - - let res = ws.store.getAllMessages() - if res.isOk(): - for (receiverTime, msg, pubsubTopic) in res.value: - let index = Index.compute(msg, receiverTime, pubsubTopic) - discard ws.messages.put(index, msg, pubsubTopic) - - info "successfully loaded messages from the persistent store" - else: - warn "failed to load messages from the persistent store", err = res.error() - - let numMessages = ws.messages.getMessagesCount() - if numMessages.isOk(): - debug "number of messages in in-memory store", messageNum=numMessages.value - waku_store_messages.set(numMessages.value, labelValues = ["stored"]) - proc init*(T: type WakuStore, peerManager: PeerManager, rng: ref rand.HmacDrbgContext, - store: MessageStore = nil, + store: MessageStore, wakuSwap: WakuSwap = nil, - persistMessages = true, - capacity = StoreDefaultCapacity, - isSqliteOnly = false, retentionPolicy=none(MessageRetentionPolicy)): T = let ws = WakuStore( rng: rng, peerManager: peerManager, store: store, wakuSwap: wakuSwap, - persistMessages: persistMessages, - isSqliteOnly: isSqliteOnly, retentionPolicy: retentionPolicy ) - ws.init(capacity) + ws.initProtocolHandler() + + # TODO: Move to wakunode + # Execute retention policy on initialization + let retPolicyRes = ws.executeMessageRetentionPolicy() + if retPolicyRes.isErr(): + warn "an error occurred while applying the retention policy at init", error=retPolicyRes.error + + ws.reportStoredMessagesMetric() + return ws - -# TODO: This should probably be an add function and append the peer to an array -proc setPeer*(ws: WakuStore, peer: RemotePeerInfo) = - ws.peerManager.addPeer(peer, WakuStoreCodec) - waku_store_peers.inc() +proc init*(T: type WakuStore, + peerManager: PeerManager, + rng: ref rand.HmacDrbgContext, + wakuSwap: WakuSwap = nil, + retentionPolicy=none(MessageRetentionPolicy)): T = + let store = StoreQueueRef.new(StoreDefaultCapacity) + WakuStore.init(peerManager, rng, store, wakuSwap, retentionPolicy) proc handleMessage*(w: WakuStore, pubsubTopic: string, msg: WakuMessage) {.async.} = - if not w.persistMessages: - # Store is mounted but new messages should not be stored + if w.store.isNil(): + # Messages should not be stored return if msg.ephemeral: @@ -277,62 +233,32 @@ proc handleMessage*(w: WakuStore, pubsubTopic: string, msg: WakuMessage) {.async trace "handling message", topic=pubsubTopic, index=index - block: - if w.isSqliteOnly: - # Add messages to persistent store, if present - if w.store.isNil(): - return + # Add messages to persistent store, if present + let putStoreRes = w.store.put(index, msg, pubsubTopic) + if putStoreRes.isErr(): + debug "failed to insert message to persistent store", index=index, err=putStoreRes.error + waku_store_errors.inc(labelValues = [insertFailure]) + return - let resPutStore = w.store.put(index, msg, pubsubTopic) - if resPutStore.isErr(): - debug "failed to insert message to persistent store", index=index, err=resPutStore.error() - waku_store_errors.inc(labelValues = [insertFailure]) - return - - # Execute the retention policy after insertion - if not w.retentionPolicy.isNone(): - let policy = w.retentionPolicy.get() - let resRetPolicy = policy.execute(w.store) - if resRetPolicy.isErr(): - debug "message retention policy failure", error=resRetPolicy.error() - waku_store_errors.inc(labelValues = [insertFailure]) - - reportMessagesCountMetric(w.store) - - # TODO: Move this logic, together with the load from persistent store on init - # into a "dual-store" message store implementation. - else: - # Add message to in-memory store - let resPutInmemory = w.messages.put(index, msg, pubsubTopic) - if resPutInmemory.isErr(): - debug "failed to insert message to in-memory store", index=index, err=resPutInmemory.error() - waku_store_errors.inc(labelValues = [insertFailure]) - return - - reportMessagesCountMetric(w.messages) - - # Add messages to persistent store, if present - if w.store.isNil(): - return - - let resPutStore = w.store.put(index, msg, pubsubTopic) - if resPutStore.isErr(): - debug "failed to insert message to persistent store", index=index, err=resPutStore.error() - waku_store_errors.inc(labelValues = [insertFailure]) - return - - # Execute the retention policy after insertion - if not w.retentionPolicy.isNone(): - let policy = w.retentionPolicy.get() - let resRetPolicy = policy.execute(w.store) - if resRetPolicy.isErr(): - debug "message retention policy failure", error=resRetPolicy.error() - waku_store_errors.inc(labelValues = [insertFailure]) + # Execute the retention policy after insertion + let retPolicyRes = w.executeMessageRetentionPolicy() + if retPolicyRes.isErr(): + debug "message retention policy failure", error=retPolicyRes.error + waku_store_errors.inc(labelValues = [retPolicyFailure]) + + w.reportStoredMessagesMetric() let insertDuration = getTime().toUnixFloat() - insertStartTime waku_store_insert_duration_seconds.observe(insertDuration) +## CLIENT + +# TODO: This should probably be an add function and append the peer to an array +proc setPeer*(ws: WakuStore, peer: RemotePeerInfo) = + ws.peerManager.addPeer(peer, WakuStoreCodec) + waku_store_peers.inc() + # TODO: Remove after converting the query method into a non-callback method type QueryHandlerFunc* = proc(response: HistoryResponse) {.gcsafe, closure.} @@ -376,6 +302,8 @@ proc query*(w: WakuStore, req: HistoryQuery): Future[WakuStoreResult[HistoryResp ## 21/WAKU2-FAULT-TOLERANT-STORE +const StoreResumeTimeWindowOffset: Timestamp = getNanosecondTime(20) ## Adjust the time window with an offset of 20 seconds + proc queryFromWithPaging*(w: WakuStore, query: HistoryQuery, peer: RemotePeerInfo): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe.} = ## A thin wrapper for query. Sends the query to the given peer. when the query has a valid pagingInfo, ## it retrieves the historical messages in pages. @@ -390,7 +318,7 @@ proc queryFromWithPaging*(w: WakuStore, query: HistoryQuery, peer: RemotePeerInf while true: let res = await w.query(req, peer) if res.isErr(): - return err(res.error()) + return err(res.error) let response = res.get() @@ -446,29 +374,27 @@ proc resume*(w: WakuStore, ## the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string # If store has not been provided, don't even try - if w.isSqliteOnly and w.store.isNil(): - return err("store not provided") + if w.store.isNil(): + return err("store not provided (nil)") + # NOTE: Original implementation is based on the message's sender timestamp. At the moment + # of writing, the sqlite store implementation returns the last message's receiver + # timestamp. + # lastSeenTime = lastSeenItem.get().msg.timestamp + let + lastSeenTime = w.store.getNewestMessageTimestamp().get(Timestamp(0)) + now = getNanosecondTime(getTime().toUnixFloat()) - var lastSeenTime = Timestamp(0) - var currentTime = getNanosecondTime(epochTime()) - - let lastSeenItem = w.messages.last() - if lastSeenItem.isOk(): - lastSeenTime = lastSeenItem.get().msg.timestamp - - # adjust the time window with an offset of 20 seconds - let offset: Timestamp = getNanosecondTime(20) - currentTime = currentTime + offset - lastSeenTime = max(lastSeenTime - offset, 0) - - debug "the offline time window is", lastSeenTime=lastSeenTime, currentTime=currentTime + debug "resuming with offline time window", lastSeenTime=lastSeenTime, currentTime=now + let + queryEndTime = now + StoreResumeTimeWindowOffset + queryStartTime = max(lastSeenTime - StoreResumeTimeWindowOffset, 0) let req = HistoryQuery( pubsubTopic: pubsubTopic, - startTime: lastSeenTime, - endTime: currentTime, + startTime: queryStartTime, + endTime: queryEndTime, pagingInfo: PagingInfo( direction:PagingDirection.FORWARD, pageSize: pageSize @@ -498,70 +424,26 @@ proc resume*(w: WakuStore, # Save the retrieved messages in the store - var dismissed: uint = 0 var added: uint = 0 - for msg in res.get(): let now = getNanosecondTime(getTime().toUnixFloat()) let index = Index.compute(msg, receivedTime=now, pubsubTopic=pubsubTopic) - if w.isSqliteOnly: - # Add messages to persistent store - let resPutStore = w.store.put(index, msg, pubsubTopic) - if resPutStore.isErr(): - debug "failed to insert message to persistent store", index=index, err=resPutStore.error() - waku_store_errors.inc(labelValues = [insertFailure]) - continue - - # Execute the retention policy after insertion - if not w.retentionPolicy.isNone(): - let policy = w.retentionPolicy.get() - let resRetPolicy = policy.execute(w.store) - if resRetPolicy.isErr(): - debug "message retention policy failure", error=resRetPolicy.error() - waku_store_errors.inc(labelValues = [insertFailure]) + let putStoreRes = w.store.put(index, msg, pubsubTopic) + if putStoreRes.isErr(): + continue - # TODO: Move this logic, together with the load from persistent store on init - # into a "dual-store" message store implementation. - else: - # check for duplicate messages - # TODO: Should take pubsub topic into account if we are going to support topics rather than the DefaultTopic - if w.messages.contains(index): - dismissed.inc() - continue - - # Add message to in-memory store - let resPutInmemory = w.messages.put(index, msg, pubsubTopic) - if resPutInmemory.isErr(): - debug "failed to insert message to in-memory store", index=index, err=resPutInmemory.error() - waku_store_errors.inc(labelValues = [insertFailure]) - continue - - if w.store.isNil(): - continue - - # Add messages to persistent store - let resPutStore = w.store.put(index, msg, pubsubTopic) - if resPutStore.isErr(): - debug "failed to insert message to persistent store", index=index, err=resPutStore.error() - waku_store_errors.inc(labelValues = [insertFailure]) - continue - - # Execute the retention policy after insertion - if not w.retentionPolicy.isNone(): - let policy = w.retentionPolicy.get() - let resRetPolicy = policy.execute(w.store) - if resRetPolicy.isErr(): - debug "message retention policy failure", error=resRetPolicy.error() - waku_store_errors.inc(labelValues = [insertFailure]) - added.inc() - debug "resume finished successfully", addedMessages=added, dimissedMessages=dismissed + debug "resume finished successfully", retrievedMessages=res.get().len, addedMessages=added + + # Execute the retention policy after insertion + let retPolicyRes = w.executeMessageRetentionPolicy() + if retPolicyRes.isErr(): + debug "message retention policy failure", error=retPolicyRes.error + waku_store_errors.inc(labelValues = [retPolicyFailure]) - let store: MessageStore = if w.isSqliteOnly: w.store - else: w.messages - reportMessagesCountMetric(store) + w.reportStoredMessagesMetric() return ok(added) @@ -578,7 +460,7 @@ proc queryWithAccounting*(ws: WakuStore, req: HistoryQuery): Future[WakuStoreRes let res = await ws.query(req, peerOpt.get()) if res.isErr(): - return err(res.error()) + return err(res.error) let response = res.get() diff --git a/waku/v2/protocol/waku_store/rpc.nim b/waku/v2/protocol/waku_store/rpc.nim index 02215895f..2216bde1e 100644 --- a/waku/v2/protocol/waku_store/rpc.nim +++ b/waku/v2/protocol/waku_store/rpc.nim @@ -23,6 +23,7 @@ type ## HistoryResponseError contains error message to inform the querying node about the state of its request NONE = uint32(0) INVALID_CURSOR = uint32(1) + SERVICE_UNAVAILABLE = uint32(503) HistoryResponse* = object messages*: seq[WakuMessage]