mirror of https://github.com/waku-org/nwaku.git
feat: store resume (#2919)
Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com>
This commit is contained in:
parent
04027e58b2
commit
aed2a1130e
|
@ -1,3 +1,8 @@
|
|||
{.used.}
|
||||
|
||||
import ./test_client, ./test_rpc_codec, ./test_waku_store, ./test_wakunode_store
|
||||
import
|
||||
./test_client,
|
||||
./test_rpc_codec,
|
||||
./test_waku_store,
|
||||
./test_wakunode_store,
|
||||
./test_resume
|
||||
|
|
|
@ -0,0 +1,113 @@
|
|||
{.used.}
|
||||
|
||||
import std/[options, net], testutils/unittests, chronos, results
|
||||
|
||||
import
|
||||
waku/[
|
||||
node/peer_manager,
|
||||
node/waku_node,
|
||||
waku_core,
|
||||
waku_store/resume,
|
||||
waku_store/common,
|
||||
waku_archive/driver,
|
||||
],
|
||||
../testlib/[wakucore, testasync, wakunode],
|
||||
./store_utils,
|
||||
../waku_archive/archive_utils
|
||||
|
||||
suite "Store Resume":
|
||||
var resume {.threadvar.}: StoreResume
|
||||
|
||||
asyncSetup:
|
||||
let resumeRes: Result[StoreResume, string] =
|
||||
StoreResume.new(peerManager = nil, wakuArchive = nil, wakuStoreClient = nil)
|
||||
|
||||
assert resumeRes.isOk(), $resumeRes.error
|
||||
|
||||
resume = resumeRes.get()
|
||||
|
||||
asyncTeardown:
|
||||
await resume.stopWait()
|
||||
|
||||
asyncTest "get set roundtrip":
|
||||
let ts = getNowInNanosecondTime()
|
||||
|
||||
let setRes = resume.setLastOnlineTimestamp(ts)
|
||||
assert setRes.isOk(), $setRes.error
|
||||
|
||||
let getRes = resume.getLastOnlineTimestamp()
|
||||
assert getRes.isOk(), $getRes.error
|
||||
|
||||
let getTs = getRes.get()
|
||||
|
||||
assert getTs == ts, "wrong timestamp"
|
||||
|
||||
suite "Store Resume - End to End":
|
||||
var server {.threadvar.}: WakuNode
|
||||
var client {.threadvar.}: WakuNode
|
||||
|
||||
var serverDriver {.threadvar.}: ArchiveDriver
|
||||
var clientDriver {.threadvar.}: ArchiveDriver
|
||||
|
||||
asyncSetup:
|
||||
let messages =
|
||||
@[
|
||||
fakeWakuMessage(@[byte 00]),
|
||||
fakeWakuMessage(@[byte 01]),
|
||||
fakeWakuMessage(@[byte 02]),
|
||||
fakeWakuMessage(@[byte 03]),
|
||||
fakeWakuMessage(@[byte 04]),
|
||||
fakeWakuMessage(@[byte 05]),
|
||||
fakeWakuMessage(@[byte 06]),
|
||||
fakeWakuMessage(@[byte 07]),
|
||||
fakeWakuMessage(@[byte 08]),
|
||||
fakeWakuMessage(@[byte 09]),
|
||||
]
|
||||
|
||||
let
|
||||
serverKey = generateSecp256k1Key()
|
||||
clientKey = generateSecp256k1Key()
|
||||
|
||||
server = newTestWakuNode(serverKey, IPv4_any(), Port(0))
|
||||
client = newTestWakuNode(clientKey, IPv4_any(), Port(0))
|
||||
|
||||
serverDriver = newArchiveDriverWithMessages(DefaultPubsubTopic, messages)
|
||||
clientDriver = newSqliteArchiveDriver()
|
||||
|
||||
let mountServerArchiveRes = server.mountArchive(serverDriver)
|
||||
let mountClientArchiveRes = client.mountArchive(clientDriver)
|
||||
|
||||
assert mountServerArchiveRes.isOk()
|
||||
assert mountClientArchiveRes.isOk()
|
||||
|
||||
await server.mountStore()
|
||||
await client.mountStore()
|
||||
|
||||
client.mountStoreClient()
|
||||
server.mountStoreClient()
|
||||
|
||||
client.setupStoreResume()
|
||||
|
||||
await server.start()
|
||||
|
||||
let serverRemotePeerInfo = server.peerInfo.toRemotePeerInfo()
|
||||
|
||||
client.peerManager.addServicePeer(serverRemotePeerInfo, WakuStoreCodec)
|
||||
|
||||
asyncTeardown:
|
||||
await allFutures(client.stop(), server.stop())
|
||||
|
||||
asyncTest "10 messages resume":
|
||||
var countRes = await clientDriver.getMessagesCount()
|
||||
assert countRes.isOk(), $countRes.error
|
||||
|
||||
check:
|
||||
countRes.get() == 0
|
||||
|
||||
await client.start()
|
||||
|
||||
countRes = await clientDriver.getMessagesCount()
|
||||
assert countRes.isOk(), $countRes.error
|
||||
|
||||
check:
|
||||
countRes.get() == 10
|
|
@ -379,6 +379,12 @@ type WakuNodeConf* = object
|
|||
name: "store-max-num-db-connections"
|
||||
.}: int
|
||||
|
||||
storeResume* {.
|
||||
desc: "Enable store resume functionality",
|
||||
defaultValue: false,
|
||||
name: "store-resume"
|
||||
.}: bool
|
||||
|
||||
## Filter config
|
||||
filter* {.
|
||||
desc: "Enable filter protocol: true|false", defaultValue: false, name: "filter"
|
||||
|
|
|
@ -296,6 +296,9 @@ proc setupProtocols(
|
|||
else:
|
||||
return err("failed to set node waku legacy store peer: " & storeNode.error)
|
||||
|
||||
if conf.store and conf.storeResume:
|
||||
node.setupStoreResume()
|
||||
|
||||
# NOTE Must be mounted after relay
|
||||
if conf.lightpush:
|
||||
try:
|
||||
|
|
|
@ -34,6 +34,7 @@ import
|
|||
../waku_store/protocol as store,
|
||||
../waku_store/client as store_client,
|
||||
../waku_store/common as store_common,
|
||||
../waku_store/resume,
|
||||
../waku_filter_v2,
|
||||
../waku_filter_v2/client as filter_client,
|
||||
../waku_filter_v2/subscriptions as filter_subscriptions,
|
||||
|
@ -94,6 +95,7 @@ type
|
|||
wakuLegacyStoreClient*: legacy_store_client.WakuStoreClient
|
||||
wakuStore*: store.WakuStore
|
||||
wakuStoreClient*: store_client.WakuStoreClient
|
||||
wakuStoreResume*: StoreResume
|
||||
wakuFilter*: waku_filter_v2.WakuFilter
|
||||
wakuFilterClient*: filter_client.WakuFilterClient
|
||||
wakuRlnRelay*: WakuRLNRelay
|
||||
|
@ -955,6 +957,13 @@ proc query*(
|
|||
|
||||
return ok(response)
|
||||
|
||||
proc setupStoreResume*(node: WakuNode) =
|
||||
node.wakuStoreResume = StoreResume.new(
|
||||
node.peerManager, node.wakuArchive, node.wakuStoreClient
|
||||
).valueOr:
|
||||
error "Failed to setup Store Resume", error = $error
|
||||
return
|
||||
|
||||
## Waku lightpush
|
||||
|
||||
proc mountLightPush*(
|
||||
|
@ -1280,6 +1289,9 @@ proc start*(node: WakuNode) {.async.} =
|
|||
if not node.wakuMetadata.isNil():
|
||||
node.wakuMetadata.start()
|
||||
|
||||
if not node.wakuStoreResume.isNil():
|
||||
await node.wakuStoreResume.start()
|
||||
|
||||
## The switch uses this mapper to update peer info addrs
|
||||
## with announced addrs after start
|
||||
let addressMapper = proc(
|
||||
|
@ -1315,6 +1327,9 @@ proc stop*(node: WakuNode) {.async.} =
|
|||
if not node.wakuArchive.isNil():
|
||||
await node.wakuArchive.stopWait()
|
||||
|
||||
if not node.wakuStoreResume.isNil():
|
||||
await node.wakuStoreResume.stopWait()
|
||||
|
||||
node.started = false
|
||||
|
||||
proc isReady*(node: WakuNode): Future[bool] {.async: (raises: [Exception]).} =
|
||||
|
|
|
@ -0,0 +1,220 @@
|
|||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/options,
|
||||
sqlite3_abi,
|
||||
chronicles,
|
||||
chronos,
|
||||
metrics,
|
||||
libp2p/protocols/protocol,
|
||||
libp2p/stream/connection,
|
||||
libp2p/crypto/crypto,
|
||||
eth/p2p/discoveryv5/enr
|
||||
|
||||
import
|
||||
../common/databases/db_sqlite,
|
||||
../waku_core,
|
||||
../waku_archive,
|
||||
../common/nimchronos,
|
||||
../waku_store/[client, common],
|
||||
../node/peer_manager/peer_manager
|
||||
|
||||
logScope:
|
||||
topics = "waku store resume"
|
||||
|
||||
const
|
||||
OnlineDbUrl = "lastonline.db"
|
||||
LastOnlineInterval = chronos.minutes(1)
|
||||
ResumeRangeLimit = 6 # hours
|
||||
|
||||
type
|
||||
TransferCallback* = proc(
|
||||
timestamp: Timestamp, peer: RemotePeerInfo
|
||||
): Future[Result[void, string]] {.async: (raises: []), closure.}
|
||||
|
||||
StoreResume* = ref object
|
||||
handle: Future[void]
|
||||
|
||||
db: SqliteDatabase
|
||||
replaceStmt: SqliteStmt[(Timestamp), void]
|
||||
|
||||
transferCallBack: Option[TransferCallback]
|
||||
|
||||
peerManager: PeerManager
|
||||
|
||||
proc setupLastOnlineDB(): Result[SqliteDatabase, string] =
|
||||
let db = SqliteDatabase.new(OnlineDbUrl).valueOr:
|
||||
return err($error)
|
||||
|
||||
let createStmt = db
|
||||
.prepareStmt(
|
||||
"""CREATE TABLE IF NOT EXISTS last_online (timestamp BIGINT NOT NULL);""",
|
||||
NoParams, void,
|
||||
)
|
||||
.expect("Valid statement")
|
||||
|
||||
createStmt.exec(()).isOkOr:
|
||||
return err("failed to exec stmt")
|
||||
|
||||
# We dispose of this prepared statement here, as we never use it again
|
||||
createStmt.dispose()
|
||||
|
||||
return ok(db)
|
||||
|
||||
proc initTransferHandler(
|
||||
self: StoreResume, wakuArchive: WakuArchive, wakuStoreClient: WakuStoreClient
|
||||
) =
|
||||
# guard clauses to prevent faulty callback
|
||||
if self.peerManager.isNil():
|
||||
error "peer manager unavailable for store resume"
|
||||
return
|
||||
|
||||
if wakuArchive.isNil():
|
||||
error "waku archive unavailable for store resume"
|
||||
return
|
||||
|
||||
if wakuStoreClient.isNil():
|
||||
error "waku store client unavailable for store resume"
|
||||
return
|
||||
|
||||
# tying archive, store client and resume into one callback and saving it for later
|
||||
self.transferCallBack = some(
|
||||
proc(
|
||||
timestamp: Timestamp, peer: RemotePeerInfo
|
||||
): Future[Result[void, string]] {.async: (raises: []), closure.} =
|
||||
var req = StoreQueryRequest()
|
||||
req.includeData = true
|
||||
req.startTime = some(timestamp)
|
||||
req.endTime = some(getNowInNanosecondTime())
|
||||
req.paginationLimit = some(uint64(100))
|
||||
|
||||
while true:
|
||||
let catchable = catch:
|
||||
await wakuStoreClient.query(req, peer)
|
||||
|
||||
if catchable.isErr():
|
||||
return err("store client error: " & catchable.error.msg)
|
||||
|
||||
let res = catchable.get()
|
||||
let response = res.valueOr:
|
||||
return err("store client error: " & $error)
|
||||
|
||||
req.paginationCursor = response.paginationCursor
|
||||
|
||||
for kv in response.messages:
|
||||
let handleRes = catch:
|
||||
await wakuArchive.handleMessage(kv.pubsubTopic.get(), kv.message.get())
|
||||
|
||||
if handleRes.isErr():
|
||||
error "message transfer failed", error = handleRes.error.msg
|
||||
continue
|
||||
|
||||
if req.paginationCursor.isNone():
|
||||
break
|
||||
|
||||
return ok()
|
||||
)
|
||||
|
||||
proc new*(
|
||||
T: type StoreResume,
|
||||
peerManager: PeerManager,
|
||||
wakuArchive: WakuArchive,
|
||||
wakuStoreClient: WakuStoreClient,
|
||||
): Result[T, string] =
|
||||
info "initializing store resume"
|
||||
|
||||
let db = setupLastOnlineDB().valueOr:
|
||||
return err("Failed to setup last online DB")
|
||||
|
||||
let replaceStmt = db
|
||||
.prepareStmt("REPLACE INTO last_online (timestamp) VALUES (?);", (Timestamp), void)
|
||||
.expect("Valid statement")
|
||||
|
||||
let resume = StoreResume(db: db, replaceStmt: replaceStmt, peerManager: peerManager)
|
||||
|
||||
resume.initTransferHandler(wakuArchive, wakuStoreClient)
|
||||
|
||||
return ok(resume)
|
||||
|
||||
proc getLastOnlineTimestamp*(self: StoreResume): Result[Timestamp, string] =
|
||||
var timestamp: Timestamp
|
||||
|
||||
proc queryCallback(s: ptr sqlite3_stmt) =
|
||||
timestamp = sqlite3_column_int64(s, 0)
|
||||
|
||||
self.db.query("SELECT MAX(timestamp) FROM last_online", queryCallback).isOkOr:
|
||||
return err("failed to query: " & $error)
|
||||
|
||||
return ok(timestamp)
|
||||
|
||||
proc setLastOnlineTimestamp*(
|
||||
self: StoreResume, timestamp: Timestamp
|
||||
): Result[void, string] =
|
||||
self.replaceStmt.exec((timestamp)).isOkOr:
|
||||
return err("failed to execute replace stmt" & $error)
|
||||
|
||||
return ok()
|
||||
|
||||
proc startStoreResume*(
|
||||
self: StoreResume, time: Timestamp, peer: RemotePeerInfo
|
||||
): Future[Result[void, string]] {.async.} =
|
||||
info "starting store resume", lastOnline = $time, peer = $peer
|
||||
|
||||
# get the callback we saved if possible
|
||||
let callback = self.transferCallBack.valueOr:
|
||||
return err("transfer callback uninitialised")
|
||||
|
||||
# run the callback
|
||||
(await callback(time, peer)).isOkOr:
|
||||
return err("transfer callback failed: " & $error)
|
||||
|
||||
info "store resume completed"
|
||||
|
||||
return ok()
|
||||
|
||||
proc autoStoreResume*(self: StoreResume): Future[Result[void, string]] {.async.} =
|
||||
let peer = self.peerManager.selectPeer(WakuStoreCodec).valueOr:
|
||||
return err("no suitable peer found for store resume")
|
||||
|
||||
let lastOnlineTs = self.getLastOnlineTimestamp().valueOr:
|
||||
return err("failed to get last online timestamp: " & $error)
|
||||
|
||||
# Limit the resume time range
|
||||
let now = getNowInNanosecondTime()
|
||||
let maxTime = now - (ResumeRangeLimit * 3600 * 1_000_000_000)
|
||||
let ts = max(lastOnlineTs, maxTime)
|
||||
|
||||
return await self.startStoreResume(ts, peer)
|
||||
|
||||
proc periodicSetLastOnline(self: StoreResume) {.async.} =
|
||||
## Save a timestamp periodically
|
||||
## so that a node can know when it was last online
|
||||
while true:
|
||||
await sleepAsync(LastOnlineInterval)
|
||||
|
||||
let ts = getNowInNanosecondTime()
|
||||
|
||||
self.setLastOnlineTimestamp(ts).isOkOr:
|
||||
error "failed to set last online timestamp", error, time = ts
|
||||
|
||||
proc start*(self: StoreResume) {.async.} =
|
||||
# start resume process, will try thrice.
|
||||
var tries = 3
|
||||
while tries > 0:
|
||||
(await self.autoStoreResume()).isOkOr:
|
||||
tries -= 1
|
||||
error "store resume failed", triesLeft = tries, error = $error
|
||||
await sleepAsync(30.seconds)
|
||||
continue
|
||||
|
||||
break
|
||||
|
||||
# starting periodic storage of last online timestamp
|
||||
self.handle = self.periodicSetLastOnline()
|
||||
|
||||
proc stopWait*(self: StoreResume) {.async.} =
|
||||
if not self.handle.isNil():
|
||||
await noCancel(self.handle.cancelAndWait())
|
||||
|
||||
self.replaceStmt.dispose()
|
||||
self.db.close()
|
Loading…
Reference in New Issue