2023-03-08 15:04:54 +00:00
|
|
|
## Nim-Codex
|
|
|
|
## Copyright (c) 2023 Status Research & Development GmbH
|
|
|
|
## Licensed under either of
|
|
|
|
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
|
|
|
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
|
|
|
## at your option.
|
|
|
|
## This file may not be copied, modified, or distributed except according to
|
|
|
|
## those terms.
|
|
|
|
|
|
|
|
## Store maintenance module
|
|
|
|
## Looks for and removes expired blocks from blockstores.
|
|
|
|
|
2024-06-05 17:09:04 +00:00
|
|
|
import std/sequtils
|
|
|
|
|
2023-03-08 15:04:54 +00:00
|
|
|
import pkg/chronos
|
2024-06-05 17:09:04 +00:00
|
|
|
import pkg/chronicles
|
|
|
|
import pkg/libp2p/cid
|
|
|
|
import pkg/serde/json
|
|
|
|
import pkg/datastore
|
|
|
|
import pkg/datastore/typedds
|
2023-03-08 15:04:54 +00:00
|
|
|
import pkg/questionable
|
|
|
|
import pkg/questionable/results
|
|
|
|
|
2024-06-05 17:09:04 +00:00
|
|
|
import ./blockstore
|
|
|
|
import ./keyutils
|
|
|
|
import ./queryiterhelper
|
2023-03-08 15:04:54 +00:00
|
|
|
import ../utils/timer
|
2023-11-14 12:02:17 +00:00
|
|
|
import ../utils/asynciter
|
2024-06-05 17:09:04 +00:00
|
|
|
import ../utils/json
|
2023-03-08 15:04:54 +00:00
|
|
|
import ../clock
|
feat: create logging proxy (#663)
* implement a logging proxy
The logging proxy:
- prevents the need to import chronicles (as well as export except toJson),
- prevents the need to override `writeValue` or use or import nim-json-seralization elsewhere in the codebase, allowing for sole use of utils/json for de/serialization,
- and handles json formatting correctly in chronicles json sinks
* Rename logging -> logutils to avoid ambiguity with common names
* clean up
* add setProperty for JsonRecord, remove nim-json-serialization conflict
* Allow specifying textlines and json format separately
Not specifying a LogFormat will apply the formatting to both textlines and json sinks.
Specifying a LogFormat will apply the formatting to only that sink.
* remove unneeded usages of std/json
We only need to import utils/json instead of std/json
* move serialization from rest/json to utils/json so it can be shared
* fix NoColors ambiguity
Was causing unit tests to fail on Windows.
* Remove nre usage to fix Windows error
Windows was erroring with `could not load: pcre64.dll`. Instead of fixing that error, remove the pcre usage :)
* Add logutils module doc
* Shorten logutils.formatIt for `NBytes`
Both json and textlines formatIt were not needed, and could be combined into one formatIt
* remove debug integration test config
debug output and logformat of json for integration test logs
* Use ## module doc to support docgen
* bump nim-poseidon2 to export fromBytes
Before the changes in this branch, fromBytes was likely being resolved by nim-stew, or other dependency. With the changes in this branch, that dependency was removed and fromBytes could no longer be resolved. By exporting fromBytes from nim-poseidon, the correct resolution is now happening.
* fixes to get compiling after rebasing master
* Add support for Result types being logged using formatIt
2024-01-23 07:35:03 +00:00
|
|
|
import ../logutils
|
2023-03-08 15:04:54 +00:00
|
|
|
import ../systemclock
|
|
|
|
|
2024-06-05 17:09:04 +00:00
|
|
|
|
|
|
|
logScope:
|
|
|
|
topics = "codex maintenance"
|
|
|
|
|
2023-03-08 15:04:54 +00:00
|
|
|
const
|
2024-06-05 17:09:04 +00:00
|
|
|
DefaultDefaultExpiry* = 24.hours
|
|
|
|
DefaultMaintenanceInterval* = 5.minutes
|
|
|
|
DefaultBatchSize* = 1000
|
|
|
|
|
|
|
|
# if no progress was observed for this amount of time
|
|
|
|
# we're going to retry deletion of the dataset
|
|
|
|
DefaultRetryDelay* = 15.minutes
|
2023-03-08 15:04:54 +00:00
|
|
|
|
|
|
|
type
|
2024-06-05 17:09:04 +00:00
|
|
|
DatasetMaintainer* = ref object of RootObj
|
|
|
|
blockStore: BlockStore
|
|
|
|
metaDs: TypedDatastore
|
2023-03-08 15:04:54 +00:00
|
|
|
interval: Duration
|
2024-06-05 17:09:04 +00:00
|
|
|
defaultExpiry: Duration
|
|
|
|
batchSize: int
|
|
|
|
retryDelay: Duration
|
2023-03-08 15:04:54 +00:00
|
|
|
timer: Timer
|
|
|
|
clock: Clock
|
2024-06-05 17:09:04 +00:00
|
|
|
|
|
|
|
Checkpoint* {.serialize.} = object
|
|
|
|
timestamp*: SecondsSince1970
|
|
|
|
progress*: Natural
|
|
|
|
|
|
|
|
DatasetMetadata* {.serialize.} = object
|
|
|
|
expiry*: SecondsSince1970
|
|
|
|
leavesCount*: Natural
|
|
|
|
manifestsCids*: seq[Cid]
|
|
|
|
checkpoint*: Checkpoint
|
|
|
|
|
|
|
|
MissingKey* = object of CodexError
|
2023-03-08 15:04:54 +00:00
|
|
|
|
2023-06-22 15:11:18 +00:00
|
|
|
proc new*(
|
2024-06-05 17:09:04 +00:00
|
|
|
T: type DatasetMaintainer,
|
|
|
|
blockStore: BlockStore,
|
|
|
|
metaDs: Datastore,
|
|
|
|
defaultExpiry = DefaultDefaultExpiry,
|
|
|
|
interval = DefaultMaintenanceInterval,
|
|
|
|
batchSize = DefaultBatchSize,
|
|
|
|
retryDelay = DefaultRetryDelay,
|
2023-03-08 15:04:54 +00:00
|
|
|
timer = Timer.new(),
|
|
|
|
clock: Clock = SystemClock.new()
|
2024-06-05 17:09:04 +00:00
|
|
|
): DatasetMaintainer =
|
|
|
|
## Create new DatasetMaintainer instance
|
2023-09-04 09:12:14 +00:00
|
|
|
##
|
2023-06-22 15:11:18 +00:00
|
|
|
## Call `start` to begin looking for for expired blocks
|
2023-09-04 09:12:14 +00:00
|
|
|
##
|
2024-06-05 17:09:04 +00:00
|
|
|
DatasetMaintainer(
|
|
|
|
blockStore: blockStore,
|
|
|
|
metaDs: TypedDatastore.init(metaDs),
|
|
|
|
defaultExpiry: defaultExpiry,
|
2023-03-08 15:04:54 +00:00
|
|
|
interval: interval,
|
2024-06-05 17:09:04 +00:00
|
|
|
batchSize: batchSize,
|
|
|
|
retryDelay: retryDelay,
|
2023-03-08 15:04:54 +00:00
|
|
|
timer: timer,
|
2024-06-05 17:09:04 +00:00
|
|
|
clock: clock)
|
2023-03-08 15:04:54 +00:00
|
|
|
|
2024-06-05 17:09:04 +00:00
|
|
|
proc encode(t: Checkpoint): seq[byte] = serializer.toJson(t).toBytes()
|
|
|
|
proc decode(T: type Checkpoint, bytes: seq[byte]): ?!T = T.fromJson(bytes)
|
2023-03-08 15:04:54 +00:00
|
|
|
|
2024-06-05 17:09:04 +00:00
|
|
|
proc encode(t: DatasetMetadata): seq[byte] = serializer.toJson(t).toBytes()
|
|
|
|
proc decode(T: type DatasetMetadata, bytes: seq[byte]): ?!T = T.fromJson(bytes)
|
|
|
|
|
|
|
|
proc trackExpiry*(
|
|
|
|
self: DatasetMaintainer,
|
|
|
|
treeCid: Cid,
|
|
|
|
leavesCount: Natural,
|
|
|
|
expiry: SecondsSince1970,
|
|
|
|
manifestsCids: seq[Cid] = @[]
|
|
|
|
): Future[?!void] {.async.} =
|
|
|
|
# Starts tracking expiry of a given dataset
|
|
|
|
#
|
|
|
|
|
|
|
|
trace "Tracking an expiry of a dataset", treeCid, leavesCount, expiry
|
|
|
|
|
|
|
|
without key =? createDatasetMetadataKey(treeCid), err:
|
|
|
|
return failure(err)
|
|
|
|
|
|
|
|
await modify[DatasetMetadata](self.metaDs, key,
|
|
|
|
proc (maybeCurrDatasetMd: ?DatasetMetadata): Future[?DatasetMetadata] {.async.} =
|
|
|
|
var md: DatasetMetadata
|
|
|
|
|
|
|
|
if currDatasetMd =? maybeCurrDatasetMd:
|
|
|
|
md.expiry = max(currDatasetMd.expiry, expiry)
|
|
|
|
|
|
|
|
if currDatasetMd.leavesCount != leavesCount:
|
|
|
|
raise newException(CatchableError, "DatasetMetadata for treeCid " & $treeCid & " is already stored with leavesCount " &
|
|
|
|
$currDatasetMd.leavesCount & ", cannot override it with leavesCount " & $leavesCount)
|
|
|
|
|
|
|
|
md.leavesCount = leavesCount
|
|
|
|
md.manifestsCids = (currDatasetMd.manifestsCids & manifestsCids).deduplicate
|
|
|
|
md.checkpoint = Checkpoint(progress: 0, timestamp: 0)
|
|
|
|
else:
|
|
|
|
md.expiry = expiry
|
|
|
|
md.leavesCount = leavesCount
|
|
|
|
md.manifestsCids = manifestsCids
|
|
|
|
md.checkpoint = Checkpoint(progress: 0, timestamp: 0)
|
|
|
|
|
|
|
|
md.some
|
|
|
|
)
|
|
|
|
|
|
|
|
proc trackExpiry*(
|
|
|
|
self: DatasetMaintainer,
|
|
|
|
treeCid: Cid,
|
|
|
|
leavesCount: Natural,
|
|
|
|
manifestsCids: seq[Cid] = @[]
|
|
|
|
): Future[?!void] {.async.} =
|
|
|
|
await self.trackExpiry(treeCid, leavesCount, self.clock.now + self.defaultExpiry.seconds, manifestsCids)
|
|
|
|
|
|
|
|
proc trackExpiry*(
|
|
|
|
self: DatasetMaintainer,
|
|
|
|
cid: Cid,
|
|
|
|
manifestsCids: seq[Cid] = @[]
|
|
|
|
): Future[?!void] {.async.} =
|
|
|
|
await self.trackExpiry(cid, 0, self.clock.now + self.defaultExpiry.seconds, manifestsCids)
|
|
|
|
|
|
|
|
proc ensureExpiry*(
|
|
|
|
self: DatasetMaintainer,
|
|
|
|
treeCid: Cid,
|
|
|
|
minExpiry: SecondsSince1970): Future[?!void] {.async.} =
|
|
|
|
## Sets the dataset expiry to a max of two values: current expiry and `minExpiry`,
|
|
|
|
## if a dataset for given `treeCid` is not currently tracked a CatchableError is thrown
|
|
|
|
##
|
|
|
|
|
|
|
|
trace "Updating a dataset expiry", treeCid, minExpiry
|
|
|
|
|
|
|
|
without key =? createDatasetMetadataKey(treeCid), err:
|
|
|
|
return failure(err)
|
|
|
|
|
|
|
|
await modify[DatasetMetadata](self.metaDs, key,
|
|
|
|
proc (maybeCurrDatasetMd: ?DatasetMetadata): Future[?DatasetMetadata] {.async.} =
|
|
|
|
if currDatasetMd =? maybeCurrDatasetMd:
|
|
|
|
let datasetMd = DatasetMetadata(
|
|
|
|
expiry: max(currDatasetMd.expiry, minExpiry),
|
|
|
|
leavesCount: currDatasetMd.leavesCount,
|
|
|
|
manifestsCids: currDatasetMd.manifestsCids,
|
|
|
|
checkpoint: currDatasetMd.checkpoint
|
|
|
|
)
|
|
|
|
return datasetMd.some
|
|
|
|
else:
|
|
|
|
raise newException(CatchableError, "DatasetMetadata for treeCid " & $treeCid & " not found")
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
proc recordCheckpoint*(self: DatasetMaintainer, treeCid: Cid, datasetMd: DatasetMetadata): Future[?!void] {.async.} =
|
|
|
|
# Saves progress or deletes dataset metadata if progress > leavesCount
|
|
|
|
#
|
|
|
|
|
|
|
|
without key =? createDatasetMetadataKey(treeCid), err:
|
|
|
|
return failure(err)
|
2023-03-08 15:04:54 +00:00
|
|
|
|
2024-06-05 17:09:04 +00:00
|
|
|
await self.metaDs.modify(key,
|
|
|
|
proc (maybeCurrDatasetMd: ?DatasetMetadata): Future[?DatasetMetadata] {.async.} =
|
|
|
|
if currDatasetMd =? maybeCurrDatasetMd:
|
|
|
|
if currDatasetMd.expiry != datasetMd.expiry or currDatasetMd.manifestsCids != datasetMd.manifestsCids:
|
|
|
|
raise newException(CatchableError, "Change in expiry detected, interrupting maintenance for dataset with treeCid " & $treeCid)
|
|
|
|
|
|
|
|
if currDatasetMd.checkpoint.progress > datasetMd.checkpoint.progress:
|
|
|
|
raise newException(CatchableError, "Progress should be increasing only, treeCid " & $treeCid)
|
|
|
|
|
|
|
|
if currDatasetMd.leavesCount <= datasetMd.checkpoint.progress:
|
|
|
|
DatasetMetadata.none
|
|
|
|
else:
|
|
|
|
datasetMd.some
|
|
|
|
else:
|
|
|
|
raise newException(CatchableError, "Metadata for dataset with treeCid " & $treeCid & " not found")
|
2023-03-08 15:04:54 +00:00
|
|
|
)
|
|
|
|
|
2024-06-05 17:09:04 +00:00
|
|
|
proc deleteBatch(self: DatasetMaintainer, treeCid: Cid, datasetMd: DatasetMetadata): Future[?!void] {.async.} =
|
|
|
|
var datasetMd = datasetMd
|
|
|
|
|
|
|
|
datasetMd.checkpoint.timestamp = self.clock.now
|
|
|
|
|
|
|
|
if err =? (await self.recordCheckpoint(treeCid, datasetMd)).errorOption:
|
|
|
|
return failure(err)
|
|
|
|
|
|
|
|
logScope:
|
|
|
|
treeCid = treeCid
|
|
|
|
manifestsCids = datasetMd.manifestsCids
|
|
|
|
startingIndex = datasetMd.checkpoint.progress
|
|
|
|
|
|
|
|
trace "Deleting a batch of blocks", size = self.batchSize
|
|
|
|
|
|
|
|
var index = datasetMd.checkpoint.progress
|
|
|
|
while (index < datasetMd.checkpoint.progress + self.batchSize) and
|
|
|
|
(index < datasetMd.leavesCount):
|
|
|
|
if err =? (await self.blockStore.delBlock(treeCid, index)).errorOption:
|
|
|
|
error "Error deleting a block", msg = err.msg, index
|
|
|
|
|
|
|
|
index.inc
|
|
|
|
|
|
|
|
await sleepAsync(50.millis) # cooperative scheduling
|
|
|
|
|
|
|
|
if index >= datasetMd.leavesCount:
|
|
|
|
trace "All blocks deleted from a dataset", leavesCount = datasetMd.leavesCount
|
|
|
|
|
|
|
|
for manifestCid in datasetMd.manifestsCids:
|
|
|
|
if err =? (await self.blockStore.delBlock(manifestCid)).errorOption:
|
|
|
|
error "Error deleting manifest", cid = manifestCid
|
|
|
|
|
|
|
|
if err =? (await self.blockStore.delBlock(treeCid)).errorOption:
|
|
|
|
error "Error deleting block", cid = treeCid
|
|
|
|
|
|
|
|
if err =? (await self.recordCheckpoint(treeCid, datasetMd)).errorOption:
|
|
|
|
return failure(err)
|
|
|
|
|
|
|
|
return success()
|
|
|
|
else:
|
|
|
|
datasetMd.checkpoint.progress = index
|
|
|
|
return await self.deleteBatch(treeCid, datasetMd)
|
|
|
|
|
|
|
|
proc superviseDatasetDeletion(self: DatasetMaintainer, treeCid: Cid, datasetMd: DatasetMetadata): Future[void] {.async.} =
|
|
|
|
logScope:
|
|
|
|
treeCid = treeCid
|
|
|
|
manifestsCids = datasetMd.manifestsCids
|
|
|
|
expiry = datasetMd.expiry
|
|
|
|
leavesCount = datasetMd.leavesCount
|
|
|
|
|
|
|
|
try:
|
|
|
|
if datasetMd.checkpoint.progress == 0 and datasetMd.checkpoint.timestamp == 0:
|
|
|
|
info "Initiating deletion of a dataset"
|
|
|
|
else:
|
|
|
|
info "Retrying deletion of a dataset", progress = datasetMd.checkpoint.progress, timestamp = datasetMd.checkpoint.timestamp
|
|
|
|
|
|
|
|
if err =? (await self.deleteBatch(treeCid, datasetMd)).errorOption:
|
|
|
|
error "Error occurred during deletion of a dataset", msg = err.msg
|
|
|
|
else:
|
|
|
|
info "Dataset deletion complete"
|
|
|
|
except CatchableError as err:
|
|
|
|
error "Unexpected error during dataset deletion", msg = err.msg, treeCid = treeCid
|
|
|
|
|
|
|
|
proc listDatasetMetadata*(
|
|
|
|
self: DatasetMaintainer
|
|
|
|
): Future[?!AsyncIter[(Cid, DatasetMetadata)]] {.async.} =
|
|
|
|
without queryKey =? createDatasetMetadataQueryKey(), err:
|
|
|
|
return failure(err)
|
|
|
|
|
|
|
|
without queryIter =? await query[DatasetMetadata](self.metaDs, Query.init(queryKey)), err:
|
|
|
|
error "Unable to execute block expirations query", err = err.msg
|
|
|
|
return failure(err)
|
|
|
|
|
|
|
|
without asyncQueryIter =? await queryIter.toAsyncIter(), err:
|
|
|
|
error "Unable to convert QueryIter to AsyncIter", err = err.msg
|
|
|
|
return failure(err)
|
|
|
|
|
|
|
|
let
|
|
|
|
filteredIter = await asyncQueryIter.filterSuccess()
|
|
|
|
|
|
|
|
datasetMdIter = await mapFilter[KeyVal[DatasetMetadata], (Cid, DatasetMetadata)](filteredIter,
|
|
|
|
proc (kv: KeyVal[DatasetMetadata]): Future[?(Cid, DatasetMetadata)] {.async.} =
|
|
|
|
without cid =? Cid.init(kv.key.value).mapFailure, err:
|
|
|
|
error "Failed decoding cid", err = err.msg
|
|
|
|
return (Cid, DatasetMetadata).none
|
|
|
|
|
|
|
|
|
|
|
|
(cid, kv.value).some
|
|
|
|
)
|
|
|
|
|
|
|
|
success(datasetMdIter)
|
|
|
|
|
|
|
|
|
|
|
|
proc checkDatasets(self: DatasetMaintainer): Future[?!void] {.async.} =
|
|
|
|
without iter =? await self.listDatasetMetadata(), err:
|
|
|
|
return failure(err)
|
2023-03-08 15:04:54 +00:00
|
|
|
|
2024-06-05 17:09:04 +00:00
|
|
|
for fut in iter:
|
|
|
|
let (treeCid, datasetMd) = await fut
|
2023-03-08 15:04:54 +00:00
|
|
|
|
2024-06-05 17:09:04 +00:00
|
|
|
if (datasetMd.expiry < self.clock.now) and
|
|
|
|
(datasetMd.checkpoint.timestamp + self.retryDelay.seconds < self.clock.now):
|
|
|
|
asyncSpawn self.superviseDatasetDeletion(treeCid, datasetMd)
|
|
|
|
else:
|
|
|
|
trace "Item either not expired or expired but already in maintenance", treeCid, expiry = datasetMd.expiry, timestamp = datasetMd.checkpoint.timestamp
|
|
|
|
success()
|
2023-03-08 15:04:54 +00:00
|
|
|
|
2024-06-05 17:09:04 +00:00
|
|
|
proc start*(self: DatasetMaintainer) =
|
2023-03-08 15:04:54 +00:00
|
|
|
proc onTimer(): Future[void] {.async.} =
|
|
|
|
try:
|
2024-06-05 17:09:04 +00:00
|
|
|
if err =? (await self.checkDatasets()).errorOption:
|
|
|
|
error "Error when checking datasets", msg = err.msg
|
|
|
|
except Exception as exc:
|
|
|
|
error "Unexpected error during maintenance", msg = exc.msg
|
2023-03-08 15:04:54 +00:00
|
|
|
|
2024-06-05 17:09:04 +00:00
|
|
|
if self.interval.seconds > 0:
|
|
|
|
self.timer.start(onTimer, self.interval)
|
2023-03-08 15:04:54 +00:00
|
|
|
|
2024-06-05 17:09:04 +00:00
|
|
|
proc stop*(self: DatasetMaintainer): Future[void] {.async.} =
|
2023-03-08 15:04:54 +00:00
|
|
|
await self.timer.stop()
|