diff --git a/tests/waku_store/test_all.nim b/tests/waku_store/test_all.nim index da990f412..d0a426ec5 100644 --- a/tests/waku_store/test_all.nim +++ b/tests/waku_store/test_all.nim @@ -1,3 +1,8 @@ {.used.} -import ./test_client, ./test_rpc_codec, ./test_waku_store, ./test_wakunode_store +import + ./test_client, + ./test_rpc_codec, + ./test_waku_store, + ./test_wakunode_store, + ./test_resume diff --git a/tests/waku_store/test_resume.nim b/tests/waku_store/test_resume.nim new file mode 100644 index 000000000..93e07ec0e --- /dev/null +++ b/tests/waku_store/test_resume.nim @@ -0,0 +1,113 @@ +{.used.} + +import std/[options, net], testutils/unittests, chronos, results + +import + waku/[ + node/peer_manager, + node/waku_node, + waku_core, + waku_store/resume, + waku_store/common, + waku_archive/driver, + ], + ../testlib/[wakucore, testasync, wakunode], + ./store_utils, + ../waku_archive/archive_utils + +suite "Store Resume": + var resume {.threadvar.}: StoreResume + + asyncSetup: + let resumeRes: Result[StoreResume, string] = + StoreResume.new(peerManager = nil, wakuArchive = nil, wakuStoreClient = nil) + + assert resumeRes.isOk(), $resumeRes.error + + resume = resumeRes.get() + + asyncTeardown: + await resume.stopWait() + + asyncTest "get set roundtrip": + let ts = getNowInNanosecondTime() + + let setRes = resume.setLastOnlineTimestamp(ts) + assert setRes.isOk(), $setRes.error + + let getRes = resume.getLastOnlineTimestamp() + assert getRes.isOk(), $getRes.error + + let getTs = getRes.get() + + assert getTs == ts, "wrong timestamp" + +suite "Store Resume - End to End": + var server {.threadvar.}: WakuNode + var client {.threadvar.}: WakuNode + + var serverDriver {.threadvar.}: ArchiveDriver + var clientDriver {.threadvar.}: ArchiveDriver + + asyncSetup: + let messages = + @[ + fakeWakuMessage(@[byte 00]), + fakeWakuMessage(@[byte 01]), + fakeWakuMessage(@[byte 02]), + fakeWakuMessage(@[byte 03]), + fakeWakuMessage(@[byte 04]), + fakeWakuMessage(@[byte 05]), + fakeWakuMessage(@[byte 06]), + fakeWakuMessage(@[byte 07]), + fakeWakuMessage(@[byte 08]), + fakeWakuMessage(@[byte 09]), + ] + + let + serverKey = generateSecp256k1Key() + clientKey = generateSecp256k1Key() + + server = newTestWakuNode(serverKey, IPv4_any(), Port(0)) + client = newTestWakuNode(clientKey, IPv4_any(), Port(0)) + + serverDriver = newArchiveDriverWithMessages(DefaultPubsubTopic, messages) + clientDriver = newSqliteArchiveDriver() + + let mountServerArchiveRes = server.mountArchive(serverDriver) + let mountClientArchiveRes = client.mountArchive(clientDriver) + + assert mountServerArchiveRes.isOk() + assert mountClientArchiveRes.isOk() + + await server.mountStore() + await client.mountStore() + + client.mountStoreClient() + server.mountStoreClient() + + client.setupStoreResume() + + await server.start() + + let serverRemotePeerInfo = server.peerInfo.toRemotePeerInfo() + + client.peerManager.addServicePeer(serverRemotePeerInfo, WakuStoreCodec) + + asyncTeardown: + await allFutures(client.stop(), server.stop()) + + asyncTest "10 messages resume": + var countRes = await clientDriver.getMessagesCount() + assert countRes.isOk(), $countRes.error + + check: + countRes.get() == 0 + + await client.start() + + countRes = await clientDriver.getMessagesCount() + assert countRes.isOk(), $countRes.error + + check: + countRes.get() == 10 diff --git a/waku/factory/external_config.nim b/waku/factory/external_config.nim index 71786ceb8..9435d4e27 100644 --- a/waku/factory/external_config.nim +++ b/waku/factory/external_config.nim @@ -379,6 +379,12 @@ type WakuNodeConf* = object name: "store-max-num-db-connections" .}: int + storeResume* {. + desc: "Enable store resume functionality", + defaultValue: false, + name: "store-resume" + .}: bool + ## Filter config filter* {. desc: "Enable filter protocol: true|false", defaultValue: false, name: "filter" diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index c13c1dd12..2b9524782 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -296,6 +296,9 @@ proc setupProtocols( else: return err("failed to set node waku legacy store peer: " & storeNode.error) + if conf.store and conf.storeResume: + node.setupStoreResume() + # NOTE Must be mounted after relay if conf.lightpush: try: diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index bf1ae74e2..796c958cf 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -34,6 +34,7 @@ import ../waku_store/protocol as store, ../waku_store/client as store_client, ../waku_store/common as store_common, + ../waku_store/resume, ../waku_filter_v2, ../waku_filter_v2/client as filter_client, ../waku_filter_v2/subscriptions as filter_subscriptions, @@ -94,6 +95,7 @@ type wakuLegacyStoreClient*: legacy_store_client.WakuStoreClient wakuStore*: store.WakuStore wakuStoreClient*: store_client.WakuStoreClient + wakuStoreResume*: StoreResume wakuFilter*: waku_filter_v2.WakuFilter wakuFilterClient*: filter_client.WakuFilterClient wakuRlnRelay*: WakuRLNRelay @@ -955,6 +957,13 @@ proc query*( return ok(response) +proc setupStoreResume*(node: WakuNode) = + node.wakuStoreResume = StoreResume.new( + node.peerManager, node.wakuArchive, node.wakuStoreClient + ).valueOr: + error "Failed to setup Store Resume", error = $error + return + ## Waku lightpush proc mountLightPush*( @@ -1280,6 +1289,9 @@ proc start*(node: WakuNode) {.async.} = if not node.wakuMetadata.isNil(): node.wakuMetadata.start() + if not node.wakuStoreResume.isNil(): + await node.wakuStoreResume.start() + ## The switch uses this mapper to update peer info addrs ## with announced addrs after start let addressMapper = proc( @@ -1315,6 +1327,9 @@ proc stop*(node: WakuNode) {.async.} = if not node.wakuArchive.isNil(): await node.wakuArchive.stopWait() + if not node.wakuStoreResume.isNil(): + await node.wakuStoreResume.stopWait() + node.started = false proc isReady*(node: WakuNode): Future[bool] {.async: (raises: [Exception]).} = diff --git a/waku/waku_store/resume.nim b/waku/waku_store/resume.nim new file mode 100644 index 000000000..208ba0aa6 --- /dev/null +++ b/waku/waku_store/resume.nim @@ -0,0 +1,220 @@ +{.push raises: [].} + +import + std/options, + sqlite3_abi, + chronicles, + chronos, + metrics, + libp2p/protocols/protocol, + libp2p/stream/connection, + libp2p/crypto/crypto, + eth/p2p/discoveryv5/enr + +import + ../common/databases/db_sqlite, + ../waku_core, + ../waku_archive, + ../common/nimchronos, + ../waku_store/[client, common], + ../node/peer_manager/peer_manager + +logScope: + topics = "waku store resume" + +const + OnlineDbUrl = "lastonline.db" + LastOnlineInterval = chronos.minutes(1) + ResumeRangeLimit = 6 # hours + +type + TransferCallback* = proc( + timestamp: Timestamp, peer: RemotePeerInfo + ): Future[Result[void, string]] {.async: (raises: []), closure.} + + StoreResume* = ref object + handle: Future[void] + + db: SqliteDatabase + replaceStmt: SqliteStmt[(Timestamp), void] + + transferCallBack: Option[TransferCallback] + + peerManager: PeerManager + +proc setupLastOnlineDB(): Result[SqliteDatabase, string] = + let db = SqliteDatabase.new(OnlineDbUrl).valueOr: + return err($error) + + let createStmt = db + .prepareStmt( + """CREATE TABLE IF NOT EXISTS last_online (timestamp BIGINT NOT NULL);""", + NoParams, void, + ) + .expect("Valid statement") + + createStmt.exec(()).isOkOr: + return err("failed to exec stmt") + + # We dispose of this prepared statement here, as we never use it again + createStmt.dispose() + + return ok(db) + +proc initTransferHandler( + self: StoreResume, wakuArchive: WakuArchive, wakuStoreClient: WakuStoreClient +) = + # guard clauses to prevent faulty callback + if self.peerManager.isNil(): + error "peer manager unavailable for store resume" + return + + if wakuArchive.isNil(): + error "waku archive unavailable for store resume" + return + + if wakuStoreClient.isNil(): + error "waku store client unavailable for store resume" + return + + # tying archive, store client and resume into one callback and saving it for later + self.transferCallBack = some( + proc( + timestamp: Timestamp, peer: RemotePeerInfo + ): Future[Result[void, string]] {.async: (raises: []), closure.} = + var req = StoreQueryRequest() + req.includeData = true + req.startTime = some(timestamp) + req.endTime = some(getNowInNanosecondTime()) + req.paginationLimit = some(uint64(100)) + + while true: + let catchable = catch: + await wakuStoreClient.query(req, peer) + + if catchable.isErr(): + return err("store client error: " & catchable.error.msg) + + let res = catchable.get() + let response = res.valueOr: + return err("store client error: " & $error) + + req.paginationCursor = response.paginationCursor + + for kv in response.messages: + let handleRes = catch: + await wakuArchive.handleMessage(kv.pubsubTopic.get(), kv.message.get()) + + if handleRes.isErr(): + error "message transfer failed", error = handleRes.error.msg + continue + + if req.paginationCursor.isNone(): + break + + return ok() + ) + +proc new*( + T: type StoreResume, + peerManager: PeerManager, + wakuArchive: WakuArchive, + wakuStoreClient: WakuStoreClient, +): Result[T, string] = + info "initializing store resume" + + let db = setupLastOnlineDB().valueOr: + return err("Failed to setup last online DB") + + let replaceStmt = db + .prepareStmt("REPLACE INTO last_online (timestamp) VALUES (?);", (Timestamp), void) + .expect("Valid statement") + + let resume = StoreResume(db: db, replaceStmt: replaceStmt, peerManager: peerManager) + + resume.initTransferHandler(wakuArchive, wakuStoreClient) + + return ok(resume) + +proc getLastOnlineTimestamp*(self: StoreResume): Result[Timestamp, string] = + var timestamp: Timestamp + + proc queryCallback(s: ptr sqlite3_stmt) = + timestamp = sqlite3_column_int64(s, 0) + + self.db.query("SELECT MAX(timestamp) FROM last_online", queryCallback).isOkOr: + return err("failed to query: " & $error) + + return ok(timestamp) + +proc setLastOnlineTimestamp*( + self: StoreResume, timestamp: Timestamp +): Result[void, string] = + self.replaceStmt.exec((timestamp)).isOkOr: + return err("failed to execute replace stmt" & $error) + + return ok() + +proc startStoreResume*( + self: StoreResume, time: Timestamp, peer: RemotePeerInfo +): Future[Result[void, string]] {.async.} = + info "starting store resume", lastOnline = $time, peer = $peer + + # get the callback we saved if possible + let callback = self.transferCallBack.valueOr: + return err("transfer callback uninitialised") + + # run the callback + (await callback(time, peer)).isOkOr: + return err("transfer callback failed: " & $error) + + info "store resume completed" + + return ok() + +proc autoStoreResume*(self: StoreResume): Future[Result[void, string]] {.async.} = + let peer = self.peerManager.selectPeer(WakuStoreCodec).valueOr: + return err("no suitable peer found for store resume") + + let lastOnlineTs = self.getLastOnlineTimestamp().valueOr: + return err("failed to get last online timestamp: " & $error) + + # Limit the resume time range + let now = getNowInNanosecondTime() + let maxTime = now - (ResumeRangeLimit * 3600 * 1_000_000_000) + let ts = max(lastOnlineTs, maxTime) + + return await self.startStoreResume(ts, peer) + +proc periodicSetLastOnline(self: StoreResume) {.async.} = + ## Save a timestamp periodically + ## so that a node can know when it was last online + while true: + await sleepAsync(LastOnlineInterval) + + let ts = getNowInNanosecondTime() + + self.setLastOnlineTimestamp(ts).isOkOr: + error "failed to set last online timestamp", error, time = ts + +proc start*(self: StoreResume) {.async.} = + # start resume process, will try thrice. + var tries = 3 + while tries > 0: + (await self.autoStoreResume()).isOkOr: + tries -= 1 + error "store resume failed", triesLeft = tries, error = $error + await sleepAsync(30.seconds) + continue + + break + + # starting periodic storage of last online timestamp + self.handle = self.periodicSetLastOnline() + +proc stopWait*(self: StoreResume) {.async.} = + if not self.handle.isNil(): + await noCancel(self.handle.cancelAndWait()) + + self.replaceStmt.dispose() + self.db.close()