mirror of https://github.com/waku-org/nwaku.git
221 lines
6.0 KiB
Nim
221 lines
6.0 KiB
Nim
|
{.push raises: [].}
|
||
|
|
||
|
import
|
||
|
std/options,
|
||
|
sqlite3_abi,
|
||
|
chronicles,
|
||
|
chronos,
|
||
|
metrics,
|
||
|
libp2p/protocols/protocol,
|
||
|
libp2p/stream/connection,
|
||
|
libp2p/crypto/crypto,
|
||
|
eth/p2p/discoveryv5/enr
|
||
|
|
||
|
import
|
||
|
../common/databases/db_sqlite,
|
||
|
../waku_core,
|
||
|
../waku_archive,
|
||
|
../common/nimchronos,
|
||
|
../waku_store/[client, common],
|
||
|
../node/peer_manager/peer_manager
|
||
|
|
||
|
logScope:
|
||
|
topics = "waku store resume"
|
||
|
|
||
|
const
|
||
|
OnlineDbUrl = "lastonline.db"
|
||
|
LastOnlineInterval = chronos.minutes(1)
|
||
|
ResumeRangeLimit = 6 # hours
|
||
|
|
||
|
type
|
||
|
TransferCallback* = proc(
|
||
|
timestamp: Timestamp, peer: RemotePeerInfo
|
||
|
): Future[Result[void, string]] {.async: (raises: []), closure.}
|
||
|
|
||
|
StoreResume* = ref object
|
||
|
handle: Future[void]
|
||
|
|
||
|
db: SqliteDatabase
|
||
|
replaceStmt: SqliteStmt[(Timestamp), void]
|
||
|
|
||
|
transferCallBack: Option[TransferCallback]
|
||
|
|
||
|
peerManager: PeerManager
|
||
|
|
||
|
proc setupLastOnlineDB(): Result[SqliteDatabase, string] =
|
||
|
let db = SqliteDatabase.new(OnlineDbUrl).valueOr:
|
||
|
return err($error)
|
||
|
|
||
|
let createStmt = db
|
||
|
.prepareStmt(
|
||
|
"""CREATE TABLE IF NOT EXISTS last_online (timestamp BIGINT NOT NULL);""",
|
||
|
NoParams, void,
|
||
|
)
|
||
|
.expect("Valid statement")
|
||
|
|
||
|
createStmt.exec(()).isOkOr:
|
||
|
return err("failed to exec stmt")
|
||
|
|
||
|
# We dispose of this prepared statement here, as we never use it again
|
||
|
createStmt.dispose()
|
||
|
|
||
|
return ok(db)
|
||
|
|
||
|
proc initTransferHandler(
|
||
|
self: StoreResume, wakuArchive: WakuArchive, wakuStoreClient: WakuStoreClient
|
||
|
) =
|
||
|
# guard clauses to prevent faulty callback
|
||
|
if self.peerManager.isNil():
|
||
|
error "peer manager unavailable for store resume"
|
||
|
return
|
||
|
|
||
|
if wakuArchive.isNil():
|
||
|
error "waku archive unavailable for store resume"
|
||
|
return
|
||
|
|
||
|
if wakuStoreClient.isNil():
|
||
|
error "waku store client unavailable for store resume"
|
||
|
return
|
||
|
|
||
|
# tying archive, store client and resume into one callback and saving it for later
|
||
|
self.transferCallBack = some(
|
||
|
proc(
|
||
|
timestamp: Timestamp, peer: RemotePeerInfo
|
||
|
): Future[Result[void, string]] {.async: (raises: []), closure.} =
|
||
|
var req = StoreQueryRequest()
|
||
|
req.includeData = true
|
||
|
req.startTime = some(timestamp)
|
||
|
req.endTime = some(getNowInNanosecondTime())
|
||
|
req.paginationLimit = some(uint64(100))
|
||
|
|
||
|
while true:
|
||
|
let catchable = catch:
|
||
|
await wakuStoreClient.query(req, peer)
|
||
|
|
||
|
if catchable.isErr():
|
||
|
return err("store client error: " & catchable.error.msg)
|
||
|
|
||
|
let res = catchable.get()
|
||
|
let response = res.valueOr:
|
||
|
return err("store client error: " & $error)
|
||
|
|
||
|
req.paginationCursor = response.paginationCursor
|
||
|
|
||
|
for kv in response.messages:
|
||
|
let handleRes = catch:
|
||
|
await wakuArchive.handleMessage(kv.pubsubTopic.get(), kv.message.get())
|
||
|
|
||
|
if handleRes.isErr():
|
||
|
error "message transfer failed", error = handleRes.error.msg
|
||
|
continue
|
||
|
|
||
|
if req.paginationCursor.isNone():
|
||
|
break
|
||
|
|
||
|
return ok()
|
||
|
)
|
||
|
|
||
|
proc new*(
|
||
|
T: type StoreResume,
|
||
|
peerManager: PeerManager,
|
||
|
wakuArchive: WakuArchive,
|
||
|
wakuStoreClient: WakuStoreClient,
|
||
|
): Result[T, string] =
|
||
|
info "initializing store resume"
|
||
|
|
||
|
let db = setupLastOnlineDB().valueOr:
|
||
|
return err("Failed to setup last online DB")
|
||
|
|
||
|
let replaceStmt = db
|
||
|
.prepareStmt("REPLACE INTO last_online (timestamp) VALUES (?);", (Timestamp), void)
|
||
|
.expect("Valid statement")
|
||
|
|
||
|
let resume = StoreResume(db: db, replaceStmt: replaceStmt, peerManager: peerManager)
|
||
|
|
||
|
resume.initTransferHandler(wakuArchive, wakuStoreClient)
|
||
|
|
||
|
return ok(resume)
|
||
|
|
||
|
proc getLastOnlineTimestamp*(self: StoreResume): Result[Timestamp, string] =
|
||
|
var timestamp: Timestamp
|
||
|
|
||
|
proc queryCallback(s: ptr sqlite3_stmt) =
|
||
|
timestamp = sqlite3_column_int64(s, 0)
|
||
|
|
||
|
self.db.query("SELECT MAX(timestamp) FROM last_online", queryCallback).isOkOr:
|
||
|
return err("failed to query: " & $error)
|
||
|
|
||
|
return ok(timestamp)
|
||
|
|
||
|
proc setLastOnlineTimestamp*(
|
||
|
self: StoreResume, timestamp: Timestamp
|
||
|
): Result[void, string] =
|
||
|
self.replaceStmt.exec((timestamp)).isOkOr:
|
||
|
return err("failed to execute replace stmt" & $error)
|
||
|
|
||
|
return ok()
|
||
|
|
||
|
proc startStoreResume*(
|
||
|
self: StoreResume, time: Timestamp, peer: RemotePeerInfo
|
||
|
): Future[Result[void, string]] {.async.} =
|
||
|
info "starting store resume", lastOnline = $time, peer = $peer
|
||
|
|
||
|
# get the callback we saved if possible
|
||
|
let callback = self.transferCallBack.valueOr:
|
||
|
return err("transfer callback uninitialised")
|
||
|
|
||
|
# run the callback
|
||
|
(await callback(time, peer)).isOkOr:
|
||
|
return err("transfer callback failed: " & $error)
|
||
|
|
||
|
info "store resume completed"
|
||
|
|
||
|
return ok()
|
||
|
|
||
|
proc autoStoreResume*(self: StoreResume): Future[Result[void, string]] {.async.} =
|
||
|
let peer = self.peerManager.selectPeer(WakuStoreCodec).valueOr:
|
||
|
return err("no suitable peer found for store resume")
|
||
|
|
||
|
let lastOnlineTs = self.getLastOnlineTimestamp().valueOr:
|
||
|
return err("failed to get last online timestamp: " & $error)
|
||
|
|
||
|
# Limit the resume time range
|
||
|
let now = getNowInNanosecondTime()
|
||
|
let maxTime = now - (ResumeRangeLimit * 3600 * 1_000_000_000)
|
||
|
let ts = max(lastOnlineTs, maxTime)
|
||
|
|
||
|
return await self.startStoreResume(ts, peer)
|
||
|
|
||
|
proc periodicSetLastOnline(self: StoreResume) {.async.} =
|
||
|
## Save a timestamp periodically
|
||
|
## so that a node can know when it was last online
|
||
|
while true:
|
||
|
await sleepAsync(LastOnlineInterval)
|
||
|
|
||
|
let ts = getNowInNanosecondTime()
|
||
|
|
||
|
self.setLastOnlineTimestamp(ts).isOkOr:
|
||
|
error "failed to set last online timestamp", error, time = ts
|
||
|
|
||
|
proc start*(self: StoreResume) {.async.} =
|
||
|
# start resume process, will try thrice.
|
||
|
var tries = 3
|
||
|
while tries > 0:
|
||
|
(await self.autoStoreResume()).isOkOr:
|
||
|
tries -= 1
|
||
|
error "store resume failed", triesLeft = tries, error = $error
|
||
|
await sleepAsync(30.seconds)
|
||
|
continue
|
||
|
|
||
|
break
|
||
|
|
||
|
# starting periodic storage of last online timestamp
|
||
|
self.handle = self.periodicSetLastOnline()
|
||
|
|
||
|
proc stopWait*(self: StoreResume) {.async.} =
|
||
|
if not self.handle.isNil():
|
||
|
await noCancel(self.handle.cancelAndWait())
|
||
|
|
||
|
self.replaceStmt.dispose()
|
||
|
self.db.close()
|