Add force prune for statically set radius (#1896)
This commit is contained in:
parent
610e2d338d
commit
a7bb52e5b5
|
@ -231,6 +231,14 @@ type
|
|||
defaultValue: none(TrustedDigest)
|
||||
name: "trusted-block-root" .}: Option[TrustedDigest]
|
||||
|
||||
forcePrune* {.
|
||||
hidden
|
||||
desc: "Force the pruning of the database. This should be used when the " &
|
||||
"database is decreased in size, e.g. when a lower static radius " &
|
||||
"is set. Only supported for statically set radius."
|
||||
defaultValue: false
|
||||
name: "force-prune" .}: bool
|
||||
|
||||
disablePoke* {.
|
||||
hidden
|
||||
desc: "Disable POKE functionality for gossip mechanisms testing"
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
# Nimbus
|
||||
# Fluffy
|
||||
# Copyright (c) 2021-2023 Status Research & Development GmbH
|
||||
# Licensed and distributed under either of
|
||||
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
||||
|
@ -10,12 +10,13 @@
|
|||
import
|
||||
chronicles,
|
||||
metrics,
|
||||
eth/db/kvstore,
|
||||
eth/db/kvstore_sqlite3,
|
||||
stint,
|
||||
stew/results,
|
||||
eth/db/kvstore,
|
||||
eth/db/kvstore_sqlite3,
|
||||
./network/state/state_content,
|
||||
"."/network/wire/[portal_protocol, portal_protocol_config]
|
||||
"."/network/wire/[portal_protocol, portal_protocol_config],
|
||||
./content_db_custom_sql_functions
|
||||
|
||||
export kvstore_sqlite3
|
||||
|
||||
|
@ -51,7 +52,9 @@ type
|
|||
distance: array[32, byte]
|
||||
|
||||
ContentDB* = ref object
|
||||
backend: SqStoreRef
|
||||
kv: KvStoreRef
|
||||
manualCheckpoint: bool
|
||||
storageCapacity*: uint64
|
||||
sizeStmt: SqliteStmt[NoParams, int64]
|
||||
unusedSizeStmt: SqliteStmt[NoParams, int64]
|
||||
|
@ -59,6 +62,7 @@ type
|
|||
contentCountStmt: SqliteStmt[NoParams, int64]
|
||||
contentSizeStmt: SqliteStmt[NoParams, int64]
|
||||
getAllOrderedByDistanceStmt: SqliteStmt[array[32, byte], RowInfo]
|
||||
deleteOutOfRadiusStmt: SqliteStmt[(array[32, byte], array[32, byte]), void]
|
||||
|
||||
PutResultType* = enum
|
||||
ContentStored, DbPruned
|
||||
|
@ -72,30 +76,14 @@ type
|
|||
deletedFraction*: float64
|
||||
deletedElements*: int64
|
||||
|
||||
func xorDistance(
|
||||
a: openArray[byte],
|
||||
b: openArray[byte]
|
||||
): Result[seq[byte], cstring] {.cdecl.} =
|
||||
var s: seq[byte] = newSeq[byte](32)
|
||||
|
||||
if len(a) != 32 or len(b) != 32:
|
||||
return err("Blobs should have 32 byte length")
|
||||
|
||||
var i = 0
|
||||
while i < 32:
|
||||
s[i] = a[i] xor b[i]
|
||||
inc i
|
||||
|
||||
return ok(s)
|
||||
|
||||
template expectDb(x: auto): untyped =
|
||||
# There's no meaningful error handling implemented for a corrupt database or
|
||||
# full disk - this requires manual intervention, so we'll panic for now
|
||||
x.expect("working database (disk broken/full?)")
|
||||
|
||||
proc new*(
|
||||
T: type ContentDB, path: string, storageCapacity: uint64, inMemory = false):
|
||||
ContentDB =
|
||||
T: type ContentDB, path: string, storageCapacity: uint64,
|
||||
inMemory = false, manualCheckpoint = false): ContentDB =
|
||||
doAssert(storageCapacity <= uint64(int64.high))
|
||||
|
||||
let db =
|
||||
|
@ -103,10 +91,13 @@ proc new*(
|
|||
SqStoreRef.init("", "fluffy-test", inMemory = true).expect(
|
||||
"working database (out of memory?)")
|
||||
else:
|
||||
SqStoreRef.init(path, "fluffy").expectDb()
|
||||
SqStoreRef.init(path, "fluffy", manualCheckpoint = false).expectDb()
|
||||
|
||||
db.registerCustomScalarFunction("xorDistance", xorDistance)
|
||||
.expect("Couldn't register custom xor function")
|
||||
db.createCustomFunction("xorDistance", 2, xorDistance).expect(
|
||||
"Custom function xorDistance creation OK")
|
||||
|
||||
db.createCustomFunction("isInRadius", 3, isInRadius).expect(
|
||||
"Custom function isInRadius creation OK")
|
||||
|
||||
let sizeStmt = db.prepareStmt(
|
||||
"SELECT page_count * page_size as size FROM pragma_page_count(), pragma_page_size();",
|
||||
|
@ -134,17 +125,39 @@ proc new*(
|
|||
"SELECT key, length(value), xorDistance(?, key) as distance FROM kvstore ORDER BY distance DESC",
|
||||
array[32, byte], RowInfo).get()
|
||||
|
||||
let deleteOutOfRadiusStmt = db.prepareStmt(
|
||||
"DELETE FROM kvstore WHERE isInRadius(?, key, ?) == 0",
|
||||
(array[32, byte], array[32, byte]), void).get()
|
||||
|
||||
ContentDB(
|
||||
kv: kvStore,
|
||||
backend: db,
|
||||
manualCheckpoint: manualCheckpoint,
|
||||
storageCapacity: storageCapacity,
|
||||
sizeStmt: sizeStmt,
|
||||
unusedSizeStmt: unusedSizeStmt,
|
||||
vacuumStmt: vacuumStmt,
|
||||
contentSizeStmt: contentSizeStmt,
|
||||
contentCountStmt: contentCountStmt,
|
||||
getAllOrderedByDistanceStmt: getAllOrderedByDistanceStmt
|
||||
getAllOrderedByDistanceStmt: getAllOrderedByDistanceStmt,
|
||||
deleteOutOfRadiusStmt: deleteOutOfRadiusStmt
|
||||
)
|
||||
|
||||
template disposeSafe(s: untyped): untyped =
|
||||
if distinctBase(s) != nil:
|
||||
s.dispose()
|
||||
s = typeof(s)(nil)
|
||||
|
||||
proc close*(db: ContentDB) =
|
||||
db.sizeStmt.disposeSafe()
|
||||
db.unusedSizeStmt.disposeSafe()
|
||||
db.vacuumStmt.disposeSafe()
|
||||
db.contentCountStmt.disposeSafe()
|
||||
db.contentSizeStmt.disposeSafe()
|
||||
db.getAllOrderedByDistanceStmt.disposeSafe()
|
||||
db.deleteOutOfRadiusStmt.disposeSafe()
|
||||
discard db.kv.close()
|
||||
|
||||
## Private KvStoreRef Calls
|
||||
|
||||
proc get(kv: KvStoreRef, key: openArray[byte]): Opt[seq[byte]] =
|
||||
|
@ -210,15 +223,7 @@ proc del*(db: ContentDB, key: ContentId) =
|
|||
proc getSszDecoded*(db: ContentDB, key: ContentId, T: type auto): Opt[T] =
|
||||
db.getSszDecoded(key.toBytesBE(), T)
|
||||
|
||||
## Public database size, content and pruning related calls
|
||||
|
||||
proc reclaimSpace*(db: ContentDB): void =
|
||||
## Runs sqlite VACUUM commands which rebuilds the db, repacking it into a
|
||||
## minimal amount of disk space.
|
||||
## Ideal mode of operation, is to run it after several deletes.
|
||||
## Another option would be to run 'PRAGMA auto_vacuum = FULL;' statement at
|
||||
## the start of db to leave it up to sqlite to clean up
|
||||
db.vacuumStmt.exec().expectDb()
|
||||
## Public calls to get database size, content size and similar.
|
||||
|
||||
proc size*(db: ContentDB): int64 =
|
||||
## Return current size of DB as product of sqlite page_count and page_size:
|
||||
|
@ -260,12 +265,14 @@ proc contentCount*(db: ContentDB): int64 =
|
|||
count = res).expectDb()
|
||||
return count
|
||||
|
||||
## Pruning related calls
|
||||
|
||||
proc deleteContentFraction*(
|
||||
db: ContentDB,
|
||||
target: UInt256,
|
||||
fraction: float64): (UInt256, int64, int64, int64) =
|
||||
## Deletes at most `fraction` percent of content form database.
|
||||
## Content furthest from provided `target` is deleted first.
|
||||
## Deletes at most `fraction` percent of content from the database.
|
||||
## The content furthest from the provided `target` is deleted first.
|
||||
# TODO: The usage of `db.contentSize()` for the deletion calculation versus
|
||||
# `db.usedSize()` for the pruning threshold leads sometimes to some unexpected
|
||||
# results of how much content gets up deleted.
|
||||
|
@ -294,6 +301,38 @@ proc deleteContentFraction*(
|
|||
deletedElements
|
||||
)
|
||||
|
||||
proc reclaimSpace*(db: ContentDB): void =
|
||||
## Runs sqlite VACUUM commands which rebuilds the db, repacking it into a
|
||||
## minimal amount of disk space.
|
||||
## Ideal mode of operation is to run it after several deletes.
|
||||
## Another option would be to run 'PRAGMA auto_vacuum = FULL;' statement at
|
||||
## the start of db to leave it up to sqlite to clean up.
|
||||
db.vacuumStmt.exec().expectDb()
|
||||
|
||||
proc deleteContentOutOfRadius*(
|
||||
db: ContentDB, localId: UInt256, radius: UInt256) =
|
||||
## Deletes all content that falls outside of the given radius range.
|
||||
db.deleteOutOfRadiusStmt.exec(
|
||||
(localId.toBytesBE(), radius.toBytesBE())).expect("SQL query OK")
|
||||
|
||||
proc forcePrune*(db: ContentDB, localId: UInt256, radius: UInt256) =
|
||||
## Force prune the database to a statically set radius. This will also run
|
||||
## the reclaimSpace (vacuum) to free unused pages. As side effect this will
|
||||
## cause the pruned database size to double in size on disk (wal file will be
|
||||
## approximately the same size as the db). A truncate checkpoint is done to
|
||||
## clean that up. In order to be able do the truncate checkpoint, the db needs
|
||||
## to be initialized in with `manualCheckpoint` on, else this step will be
|
||||
## skipped.
|
||||
notice "Starting the pruning of content"
|
||||
db.deleteContentOutOfRadius(localId, radius)
|
||||
notice "Reclaiming unused pages"
|
||||
db.reclaimSpace()
|
||||
if db.manualCheckpoint:
|
||||
notice "Truncating WAL file"
|
||||
db.backend.checkpoint(SqStoreCheckpointKind.truncate)
|
||||
db.close()
|
||||
notice "Finished database pruning"
|
||||
|
||||
proc put*(
|
||||
db: ContentDB,
|
||||
key: ContentId,
|
||||
|
|
|
@ -0,0 +1,72 @@
|
|||
# Fluffy
|
||||
# Copyright (c) 2023 Status Research & Development GmbH
|
||||
# Licensed and distributed under either of
|
||||
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
||||
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
||||
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
stew/ptrops,
|
||||
stint,
|
||||
sqlite3_abi,
|
||||
eth/db/kvstore_sqlite3
|
||||
|
||||
func xorDistance(a: openArray[byte], b: openArray[byte]): seq[byte] =
|
||||
doAssert(a.len == b.len)
|
||||
|
||||
let length = a.len
|
||||
var distance: seq[byte] = newSeq[byte](length)
|
||||
for i in 0..<length:
|
||||
distance[i] = a[i] xor b[i]
|
||||
|
||||
return distance
|
||||
|
||||
proc xorDistance*(
|
||||
ctx: SqliteContext, n: cint, v: SqliteValue)
|
||||
{.cdecl, gcsafe, raises: [].} =
|
||||
doAssert(n == 2)
|
||||
|
||||
let
|
||||
ptrs = makeUncheckedArray(v)
|
||||
blob1Len = sqlite3_value_bytes(ptrs[][0])
|
||||
blob2Len = sqlite3_value_bytes(ptrs[][1])
|
||||
|
||||
bytes = xorDistance(
|
||||
makeOpenArray(sqlite3_value_blob(ptrs[][0]), byte, blob1Len),
|
||||
makeOpenArray(sqlite3_value_blob(ptrs[][1]), byte, blob2Len)
|
||||
)
|
||||
|
||||
sqlite3_result_blob(ctx, baseAddr bytes, cint bytes.len, SQLITE_TRANSIENT)
|
||||
|
||||
func isInRadius(contentId: UInt256, localId: UInt256, radius: UInt256): bool =
|
||||
let distance = contentId xor localId
|
||||
|
||||
radius > distance
|
||||
|
||||
func isInRadius*(
|
||||
ctx: SqliteContext, n: cint, v: SqliteValue)
|
||||
{.cdecl, gcsafe, raises: [].} =
|
||||
doAssert(n == 3)
|
||||
|
||||
let
|
||||
ptrs = makeUncheckedArray(v)
|
||||
blob1Len = sqlite3_value_bytes(ptrs[][0])
|
||||
blob2Len = sqlite3_value_bytes(ptrs[][1])
|
||||
blob3Len = sqlite3_value_bytes(ptrs[][2])
|
||||
|
||||
doAssert(blob1Len == 32 and blob2Len == 32 and blob3Len == 32)
|
||||
|
||||
let
|
||||
localId = UInt256.fromBytesBE(
|
||||
makeOpenArray(sqlite3_value_blob(ptrs[][0]), byte, blob1Len))
|
||||
contentId = UInt256.fromBytesBE(
|
||||
makeOpenArray(sqlite3_value_blob(ptrs[][1]), byte, blob2Len))
|
||||
radius = UInt256.fromBytesBE(
|
||||
makeOpenArray(sqlite3_value_blob(ptrs[][2]), byte, blob3Len))
|
||||
|
||||
if isInRadius(contentId, localId, radius):
|
||||
ctx.sqlite3_result_int(cint 1)
|
||||
else:
|
||||
ctx.sqlite3_result_int(cint 0)
|
|
@ -123,6 +123,15 @@ proc run(config: PortalConf) {.raises: [CatchableError].} =
|
|||
|
||||
d.open()
|
||||
|
||||
# Force pruning
|
||||
if config.forcePrune and config.radiusConfig.kind == Static:
|
||||
let db = ContentDB.new(config.dataDir / "db" / "contentdb_" &
|
||||
d.localNode.id.toBytesBE().toOpenArray(0, 8).toHex(),
|
||||
storageCapacity = config.storageCapacityMB * 1_000_000,
|
||||
manualCheckpoint = true)
|
||||
db.forcePrune(d.localNode.id, UInt256.fromLogRadius(config.radiusConfig.logRadius))
|
||||
db.close()
|
||||
|
||||
# Store the database at contentdb prefixed with the first 8 chars of node id.
|
||||
# This is done because the content in the db is dependant on the `NodeId` and
|
||||
# the selected `Radius`.
|
||||
|
|
|
@ -463,22 +463,6 @@ proc messageHandler(protocol: TalkProtocol, request: seq[byte],
|
|||
debug "Packet decoding error", error = decoded.error, srcId, srcUdpAddress
|
||||
@[]
|
||||
|
||||
proc fromLogRadius(T: type UInt256, logRadius: uint16): T =
|
||||
# Get the max value of the logRadius range
|
||||
pow((2).stuint(256), logRadius) - 1
|
||||
|
||||
proc getInitialRadius(rc: RadiusConfig): UInt256 =
|
||||
case rc.kind
|
||||
of Static:
|
||||
return UInt256.fromLogRadius(rc.logRadius)
|
||||
of Dynamic:
|
||||
# In case of a dynamic radius we start from the maximum value to quickly
|
||||
# gather as much data as possible, and also make sure each data piece in
|
||||
# the database is in our range after a node restart.
|
||||
# Alternative would be to store node the radius in database, and initialize
|
||||
# it from database after a restart
|
||||
return UInt256.high()
|
||||
|
||||
proc new*(T: type PortalProtocol,
|
||||
baseProtocol: protocol.Protocol,
|
||||
protocolId: PortalProtocolId,
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
# Nimbus
|
||||
# Fluffy
|
||||
# Copyright (c) 2021-2023 Status Research & Development GmbH
|
||||
# Licensed and distributed under either of
|
||||
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
||||
|
@ -11,6 +11,7 @@ import
|
|||
std/strutils,
|
||||
confutils,
|
||||
chronos,
|
||||
stint,
|
||||
eth/p2p/discoveryv5/routing_table
|
||||
|
||||
type
|
||||
|
@ -30,7 +31,6 @@ type
|
|||
radiusConfig*: RadiusConfig
|
||||
disablePoke*: bool
|
||||
|
||||
|
||||
const
|
||||
defaultRadiusConfig* = RadiusConfig(kind: Dynamic)
|
||||
defaultRadiusConfigDesc* = $defaultRadiusConfig.kind
|
||||
|
@ -65,6 +65,24 @@ proc init*(
|
|||
disablePoke: disablePoke
|
||||
)
|
||||
|
||||
proc fromLogRadius*(T: type UInt256, logRadius: uint16): T =
|
||||
# Get the max value of the logRadius range
|
||||
pow((2).stuint(256), logRadius) - 1
|
||||
|
||||
proc getInitialRadius*(rc: RadiusConfig): UInt256 =
|
||||
case rc.kind
|
||||
of Static:
|
||||
return UInt256.fromLogRadius(rc.logRadius)
|
||||
of Dynamic:
|
||||
# In case of a dynamic radius we start from the maximum value to quickly
|
||||
# gather as much data as possible, and also make sure each data piece in
|
||||
# the database is in our range after a node restart.
|
||||
# Alternative would be to store node the radius in database, and initialize
|
||||
# it from database after a restart
|
||||
return UInt256.high()
|
||||
|
||||
## Confutils parsers
|
||||
|
||||
proc parseCmdArg*(T: type RadiusConfig, p: string): T
|
||||
{.raises: [ValueError].} =
|
||||
if p.startsWith("dynamic") and len(p) == 7:
|
||||
|
|
|
@ -12,7 +12,8 @@ import
|
|||
strutils,
|
||||
eth/db/kvstore,
|
||||
eth/db/kvstore_sqlite3,
|
||||
stint
|
||||
stint,
|
||||
./content_db_custom_sql_functions
|
||||
|
||||
export kvstore_sqlite3
|
||||
|
||||
|
@ -34,22 +35,6 @@ type
|
|||
getStmt: SqliteStmt[array[32, byte], ContentData]
|
||||
getInRangeStmt: SqliteStmt[(array[32, byte], array[32, byte], int64, int64), ContentDataDist]
|
||||
|
||||
func xorDistance(
|
||||
a: openArray[byte],
|
||||
b: openArray[byte]
|
||||
): Result[seq[byte], cstring] {.cdecl.} =
|
||||
var s: seq[byte] = newSeq[byte](32)
|
||||
|
||||
if len(a) != 32 or len(b) != 32:
|
||||
return err("Blobs should have 32 byte length")
|
||||
|
||||
var i = 0
|
||||
while i < 32:
|
||||
s[i] = a[i] xor b[i]
|
||||
inc i
|
||||
|
||||
return ok(s)
|
||||
|
||||
template expectDb(x: auto): untyped =
|
||||
# There's no meaningful error handling implemented for a corrupt database or
|
||||
# full disk - this requires manual intervention, so we'll panic for now
|
||||
|
@ -98,8 +83,8 @@ proc new*(T: type SeedDb, path: string, name: string, inMemory = false): SeedDb
|
|||
ContentData
|
||||
).get()
|
||||
|
||||
db.registerCustomScalarFunction("xorDistance", xorDistance)
|
||||
.expect("Couldn't register custom xor function")
|
||||
db.createCustomFunction("xorDistance", 2, xorDistance).expect(
|
||||
"Custom function xorDistance creation OK")
|
||||
|
||||
let getInRangeStmt =
|
||||
db.prepareStmt(
|
||||
|
|
|
@ -1 +1 @@
|
|||
Subproject commit ca4898e24a4ffb61759d57469f208b542123a092
|
||||
Subproject commit e5c2b1784ec7badc6162325e37daee2229f53d8b
|
Loading…
Reference in New Issue