Add put method which preserves max size (#1065)

* Add put method which preserves max size
This commit is contained in:
KonradStaniec 2022-05-09 17:18:57 +02:00 committed by GitHub
parent 58e0543920
commit d3f231e3da
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 213 additions and 49 deletions

View File

@ -33,6 +33,9 @@ const
defaultAdminListenAddressDesc = $defaultAdminListenAddress
defaultDataDirDesc = defaultDataDir()
defaultClientConfigDesc = $(defaultClientConfig.httpUri)
# 100mb seems a bit smallish we may consider increasing defaults after some
# network measurements
defaultStorageSize* = uint32(1000 * 1000 * 100)
type
PortalCmd* = enum
@ -184,6 +187,14 @@ type
defaultValue: DefaultBitsPerHop
name: "bits-per-hop" .}: int
# TODO maybe it is worth defining minimal storage size and throw error if
# value provided is smaller than minimum
storageSize* {.
desc: "Maximum amount (in bytes) of content which will be stored " &
"in local database."
defaultValue: defaultStorageSize
name: "storage-size" .}: uint32
case cmd* {.
command
defaultValue: noCommand .}: PortalCmd

View File

@ -29,6 +29,12 @@ export kvstore_sqlite3
# 3. Or databases are created per network (and kvstores pre content type) and
# thus depending on the network the right db needs to be selected.
const
# Maximal number of ObjInfo objects held in memory per database scan. 100k
# objects should result in memory usage of around 7mb which should be
# appropriate for even low resource devices
maxObjPerScan = 100000
type
RowInfo = tuple
contentId: array[32, byte]
@ -41,10 +47,22 @@ type
ContentDB* = ref object
kv: KvStoreRef
maxSize: uint32
sizeStmt: SqliteStmt[NoParams, int64]
vacStmt: SqliteStmt[NoParams, void]
getAll: SqliteStmt[NoParams, RowInfo]
PutResultType* = enum
ContentStored, DbPruned
PutResult* = object
case kind*: PutResultType
of ContentStored:
discard
of DbPruned:
furthestStoredElementDistance*: UInt256
fractionOfDeletedContent*: float64
# Objects must be sorted from largest to closest distance
proc `<`(a, b: ObjInfo): bool =
return a.distFrom < b.distFrom
@ -54,7 +72,7 @@ template expectDb(x: auto): untyped =
# full disk - this requires manual intervention, so we'll panic for now
x.expect("working database (disk broken/full?)")
proc new*(T: type ContentDB, path: string, inMemory = false): ContentDB =
proc new*(T: type ContentDB, path: string, maxSize: uint32, inMemory = false): ContentDB =
let db =
if inMemory:
SqStoreRef.init("", "fluffy-test", inMemory = true).expect(
@ -80,10 +98,10 @@ proc new*(T: type ContentDB, path: string, inMemory = false): ContentDB =
).get()
ContentDB(
kv: kvStore, sizeStmt: getSizeStmt, vacStmt: vacStmt, getAll: getKeysStmt)
kv: kvStore, maxSize: maxSize, sizeStmt: getSizeStmt, vacStmt: vacStmt, getAll: getKeysStmt)
proc getNFurthestElements*(
db: ContentDB, target: UInt256, n: uint64): seq[ObjInfo] =
db: ContentDB, target: UInt256, n: uint64): (seq[ObjInfo], int64) =
## Get at most n furthest elements from db in order from furthest to closest.
## Payload lengths are also returned so the caller can decide how many of
## those elements need to be deleted.
@ -99,9 +117,10 @@ proc getNFurthestElements*(
## would be possible we may be able to all this work by one sql query
if n == 0:
return newSeq[ObjInfo]()
return (newSeq[ObjInfo](), 0'i64)
var heap = initHeapQueue[ObjInfo]()
var totalContentSize: int64 = 0
var ri: RowInfo
for e in db.getAll.exec(ri):
@ -119,6 +138,8 @@ proc getNFurthestElements*(
if obj > heap[0]:
discard heap.replace(obj)
totalContentSize = totalContentSize + ri.payloadLength
var res: seq[ObjInfo] = newSeq[ObjInfo](heap.len())
var i = heap.len() - 1
@ -126,7 +147,7 @@ proc getNFurthestElements*(
res[i] = heap.pop()
dec i
return res
return (res, totalContentSize)
proc reclaimSpace*(db: ContentDB): void =
## Runs sqlite VACUUM commands which rebuilds the db, repacking it into a
@ -158,7 +179,7 @@ proc get*(db: ContentDB, key: openArray[byte]): Option[seq[byte]] =
return res
proc put*(db: ContentDB, key, value: openArray[byte]) =
proc put(db: ContentDB, key, value: openArray[byte]) =
db.kv.put(key, value).expectDb()
proc contains*(db: ContentDB, key: openArray[byte]): bool =
@ -179,6 +200,8 @@ proc get*(db: ContentDB, key: ContentId): Option[seq[byte]] =
# TODO: Here it is unfortunate that ContentId is a uint256 instead of Digest256.
db.get(key.toByteArrayBE())
# TODO: Public due to usage in populating portal db, should be made private after
# improving db populating to use local node id
proc put*(db: ContentDB, key: ContentId, value: openArray[byte]) =
db.put(key.toByteArrayBE(), value)
@ -187,3 +210,61 @@ proc contains*(db: ContentDB, key: ContentId): bool =
proc del*(db: ContentDB, key: ContentId) =
db.del(key.toByteArrayBE())
proc deleteFractionOfContent(
db: ContentDB,
target: Uint256,
targetFraction: float64): (UInt256, int64, int64) =
## Procedure which tries to delete fraction of database by scanning maxObjPerScan
## furthest elements.
## If the maxObjPerScan furthest elements, is not enough to attain required fraction
## procedure deletes all but one element and report how many bytes have been
## deleted
## Procedure do not call reclaim space, it is left to the caller.
let (furthestElements, totalContentSize) = db.getNFurthestElements(target, maxObjPerScan)
var bytesDeleted: int64 = 0
let bytesToDelete = int64(targetFraction * float64(totalContentSize))
let numOfElements = len(furthestElements)
if numOfElements == 0:
# no elements in database, return some zero value
return (UInt256.zero, 0'i64, 0'i64)
let lastIdx = len(furthestElements) - 1
for i, elem in furthestElements:
if i == lastIdx:
# this is our last element, do not delete it and report it as last non deleted
# element
return (elem.distFrom, bytesDeleted, totalContentSize)
if bytesDeleted + elem.payloadLength < bytesToDelete:
db.del(elem.contentId)
bytesDeleted = bytesDeleted + elem.payloadLength
else:
return (elem.distFrom, bytesDeleted, totalContentSize)
proc put*(
db: ContentDB,
key: ContentId,
value: openArray[byte],
target: UInt256): PutResult =
db.put(key, value)
let dbSize = db.size()
if dbSize < int64(db.maxSize):
return PutResult(kind: ContentStored)
else:
# TODO Add some configuration for this magic number
let (furthestNonDeletedElement, deletedBytes, totalContentSize) =
db.deleteFractionOfContent(target, 0.25)
let deletedFraction = float64(deletedBytes) / float64(totalContentSize)
db.reclaimSpace()
return PutResult(
kind: DbPruned,
furthestStoredElementDistance: furthestNonDeletedElement,
fractionOfDeletedContent: deletedFraction)

View File

@ -100,7 +100,7 @@ proc run(config: PortalConf) {.raises: [CatchableError, Defect].} =
let
radius = UInt256.fromLogRadius(config.logRadius)
db = ContentDB.new(config.dataDir / "db" / "contentdb_" &
d.localNode.id.toByteArrayBE().toOpenArray(0, 8).toHex())
d.localNode.id.toByteArrayBE().toOpenArray(0, 8).toHex(), maxSize = config.storageSize)
portalConfig = PortalProtocolConfig.init(
config.tableIpLimit, config.bucketIpLimit, config.bitsPerHop)
@ -180,7 +180,7 @@ when isMainModule:
run(config)
of PortalCmd.populateHistoryDb:
let
db = ContentDB.new(config.dbDir.string)
db = ContentDB.new(config.dbDir.string, config.storageSize)
res = populateHistoryDb(db, config.dataFile.string)
if res.isErr():
fatal "Failed populating the history content db", error = $res.error

View File

@ -148,7 +148,12 @@ proc getBlockHeader*(
if h.portalProtocol.inRange(contentId):
# content is valid and in our range, save it into our db
h.contentDB.put(contentId, headerContent.content)
# TODO handle radius adjustments
discard h.contentDB.put(
contentId,
headerContent.content,
h.portalProtocol.localNode.id
)
return maybeHeader
@ -198,7 +203,11 @@ proc getBlock*(
# content is in range and valid, put into db
if h.portalProtocol.inRange(contentId):
h.contentDB.put(contentId, bodyContent.content)
# TODO handle radius adjustments
discard h.contentDB.put(
contentId, bodyContent.content,
h.portalProtocol.localNode.id
)
return some[Block]((header, blockBody))

View File

@ -53,7 +53,7 @@ proc getContent*(n: StateNetwork, key: ContentKey):
# When content is found on the network and is in the radius range, store it.
if content.isSome() and contentInRange:
# TODO Add poke when working on state network
n.contentDB.put(contentId, contentResult.content)
discard n.contentDB.put(contentId, contentResult.content, n.portalProtocol.localNode.id)
# TODO: for now returning bytes, ultimately it would be nice to return proper
# domain types.

View File

@ -1084,7 +1084,12 @@ proc processContent(
let contentId = contentIdOpt.get()
# Store content, should we recheck radius?
p.contentDB.put(contentId, content)
# TODO handle radius adjustments
discard p.contentDB.put(
contentId,
content,
p.baseProtocol.localNode.id
)
info "Received valid offered content", contentKey

View File

@ -137,6 +137,8 @@ iterator blocks*(
else:
error "Failed reading block from block data", error = res.error
# TODO pass nodeid as uint256 so it will be possible to use put method which
# preserves size
proc populateHistoryDb*(
db: ContentDB, dataFile: string, verify = false): Result[void, string] =
let blockData = ? readBlockDataTable(dataFile)
@ -144,6 +146,7 @@ proc populateHistoryDb*(
for b in blocks(blockData, verify):
for value in b:
# Note: This is the slowest part due to the hashing that takes place.
# TODO use put method which preserves size
db.put(history_content.toContentId(value[0]), value[1])
ok()

View File

@ -23,7 +23,8 @@ proc installPortalDebugApiHandlers*(
contentId: string, content: string) -> bool:
# Using content id as parameter to make it more easy to store. Might evolve
# in using content key.
p.contentDB.put(hexToSeqByte(contentId), hexToSeqByte(content))
let cId = UInt256.fromBytesBE(hexToSeqByte(contentId))
discard p.contentDB.put(cId, hexToSeqByte(content), p.localNode.id)
return true

View File

@ -35,12 +35,12 @@ proc generateNRandomU256(rng: var BrHmacDrbgContext, n: int): seq[UInt256] =
suite "Content Database":
let rng = newRng()
let testId = u256(0)
# Note: We are currently not really testing something new here just basic
# underlying kvstore.
test "ContentDB basic API":
let
db = ContentDB.new("", inMemory = true)
db = ContentDB.new("", uint32.high, inMemory = true)
key = ContentId(UInt256.high()) # Some key
block:
@ -51,7 +51,7 @@ suite "Content Database":
db.contains(key) == false
block:
db.put(key, [byte 0, 1, 2, 3])
discard db.put(key, [byte 0, 1, 2, 3], testId)
let val = db.get(key)
check:
@ -69,15 +69,15 @@ suite "Content Database":
test "ContentDB size":
let
db = ContentDB.new("", inMemory = true)
db = ContentDB.new("", uint32.high, inMemory = true)
let numBytes = 10000
let size1 = db.size()
db.put(@[1'u8], genByteSeq(numBytes))
discard db.put(u256(1), genByteSeq(numBytes), testId)
let size2 = db.size()
db.put(@[2'u8], genByteSeq(numBytes))
discard db.put(u256(2), genByteSeq(numBytes), testId)
let size3 = db.size()
db.put(@[2'u8], genByteSeq(numBytes))
discard db.put(u256(2), genByteSeq(numBytes), testId)
let size4 = db.size()
check:
@ -85,8 +85,8 @@ suite "Content Database":
size3 > size2
size3 == size4
db.del(@[2'u8])
db.del(@[1'u8])
db.del(u256(2))
db.del(u256(1))
let size5 = db.size()
@ -134,12 +134,12 @@ suite "Content Database":
for testCase in testCases:
let
db = ContentDB.new("", inMemory = true)
db = ContentDB.new("", uint32.high, inMemory = true)
for elem in testCase.keys:
db.put(elem, genByteSeq(32))
discard db.put(elem, genByteSeq(32), testId)
let furthest = db.getNFurthestElements(zero, testCase.n)
let (furthest, _) = db.getNFurthestElements(zero, testCase.n)
var sortedKeys = testCase.keys
@ -153,3 +153,46 @@ suite "Content Database":
uint64(len(furthest)) == testCase.n
check:
furthest.hasCorrectOrder(sortedKeys)
test "ContentDB pruning":
let
maxDbSize = uint32(100000)
db = ContentDB.new("", maxDbSize, inMemory = true)
let furthestElement = u256(40)
let secondFurthest = u256(30)
let thirdFurthest = u256(20)
let numBytes = 10000
let pr1 = db.put(u256(1), genByteSeq(numBytes), u256(0))
let pr2 = db.put(thirdFurthest, genByteSeq(numBytes), u256(0))
let pr3 = db.put(u256(3), genByteSeq(numBytes), u256(0))
let pr4 = db.put(u256(10), genByteSeq(numBytes), u256(0))
let pr5 = db.put(u256(5), genByteSeq(numBytes), u256(0))
let pr6 = db.put(u256(10), genByteSeq(numBytes), u256(0))
let pr7 = db.put(furthestElement, genByteSeq(numBytes), u256(0))
let pr8 = db.put(secondFurthest, genByteSeq(numBytes), u256(0))
let pr9 = db.put(u256(2), genByteSeq(numBytes), u256(0))
let pr10 = db.put(u256(4), genByteSeq(numBytes), u256(0))
check:
pr1.kind == ContentStored
pr2.kind == ContentStored
pr3.kind == ContentStored
pr4.kind == ContentStored
pr5.kind == ContentStored
pr6.kind == ContentStored
pr7.kind == ContentStored
pr8.kind == ContentStored
pr9.kind == ContentStored
pr10.kind == DbPruned
check:
uint32(db.size()) < maxDbSize
# With current settings 2 furthers elements will be delted i.e 30 and 40
# so the furthest non deleted one will be 20
pr10.furthestStoredElementDistance == thirdFurthest
db.get(furthestElement).isNone()
db.get(secondFurthest).isNone()
db.get(thirdFurthest).isSome()

View File

@ -49,8 +49,8 @@ proc defaultTestCase(rng: ref BrHmacDrbgContext): Default2NodeTest =
node2 = initDiscoveryNode(
rng, PrivateKey.random(rng[]), localAddress(20303))
db1 = ContentDB.new("", inMemory = true)
db2 = ContentDB.new("", inMemory = true)
db1 = ContentDB.new("", uint32.high, inMemory = true)
db2 = ContentDB.new("", uint32.high, inMemory = true)
proto1 =
PortalProtocol.new(node1, protocolId, db1, testHandler, validateContent)
@ -207,9 +207,9 @@ procSuite "Portal Wire Protocol Tests":
node3 = initDiscoveryNode(
rng, PrivateKey.random(rng[]), localAddress(20304))
db1 = ContentDB.new("", inMemory = true)
db2 = ContentDB.new("", inMemory = true)
db3 = ContentDB.new("", inMemory = true)
db1 = ContentDB.new("", uint32.high, inMemory = true)
db2 = ContentDB.new("", uint32.high, inMemory = true)
db3 = ContentDB.new("", uint32.high, inMemory = true)
proto1 = PortalProtocol.new(
node1, protocolId, db1, testHandler, validateContent)
@ -243,9 +243,9 @@ procSuite "Portal Wire Protocol Tests":
node3 = initDiscoveryNode(
rng, PrivateKey.random(rng[]), localAddress(20304))
db1 = ContentDB.new("", inMemory = true)
db2 = ContentDB.new("", inMemory = true)
db3 = ContentDB.new("", inMemory = true)
db1 = ContentDB.new("", uint32.high, inMemory = true)
db2 = ContentDB.new("", uint32.high, inMemory = true)
db3 = ContentDB.new("", uint32.high, inMemory = true)
proto1 = PortalProtocol.new(
node1, protocolId, db1, testHandlerSha256, validateContent)
@ -259,7 +259,7 @@ procSuite "Portal Wire Protocol Tests":
contentId = readUintBE[256](sha256.digest(content).data)
# Only node3 have content
db3.put(contentId, content)
discard db3.put(contentId, content, proto3.localNode.id)
# Node1 knows about Node2, and Node2 knows about Node3 which hold all content
# Node1 needs to known Node2 radius to determine if node2 is interested in content
@ -291,8 +291,8 @@ procSuite "Portal Wire Protocol Tests":
node2 = initDiscoveryNode(
rng, PrivateKey.random(rng[]), localAddress(20303))
db1 = ContentDB.new("", inMemory = true)
db2 = ContentDB.new("", inMemory = true)
db1 = ContentDB.new("", uint32.high, inMemory = true)
db2 = ContentDB.new("", uint32.high, inMemory = true)
proto1 = PortalProtocol.new(
node1, protocolId, db1, testHandler, validateContent)
@ -317,7 +317,7 @@ procSuite "Portal Wire Protocol Tests":
node2 = initDiscoveryNode(
rng, PrivateKey.random(rng[]), localAddress(20303))
db = ContentDB.new("", inMemory = true)
db = ContentDB.new("", uint32.high, inMemory = true)
# No portal protocol for node1, hence an invalid bootstrap node
proto2 = PortalProtocol.new(node2, protocolId, db, testHandler,
validateContent, bootstrapRecords = [node1.localNode.record])

View File

@ -45,8 +45,8 @@ procSuite "State Content Network":
node2 = initDiscoveryNode(
rng, PrivateKey.random(rng[]), localAddress(20303))
proto1 = StateNetwork.new(node1, ContentDB.new("", inMemory = true))
proto2 = StateNetwork.new(node2, ContentDB.new("", inMemory = true))
proto1 = StateNetwork.new(node1, ContentDB.new("", uint32.high, inMemory = true))
proto2 = StateNetwork.new(node2, ContentDB.new("", uint32.high, inMemory = true))
check proto2.portalProtocol.addNode(node1.localNode) == Added
@ -64,7 +64,7 @@ procSuite "State Content Network":
contentType: accountTrieNode, accountTrieNodeKey: accountTrieNodeKey)
contentId = toContentId(contentKey)
proto1.contentDB.put(contentId, v)
discard proto1.contentDB.put(contentId, v, proto1.portalProtocol.localNode.id)
for key in keys:
var nodeHash: NodeHash
@ -101,9 +101,9 @@ procSuite "State Content Network":
node3 = initDiscoveryNode(
rng, PrivateKey.random(rng[]), localAddress(20304))
proto1 = StateNetwork.new(node1, ContentDB.new("", inMemory = true))
proto2 = StateNetwork.new(node2, ContentDB.new("", inMemory = true))
proto3 = StateNetwork.new(node3, ContentDB.new("", inMemory = true))
proto1 = StateNetwork.new(node1, ContentDB.new("", uint32.high, inMemory = true))
proto2 = StateNetwork.new(node2, ContentDB.new("", uint32.high, inMemory = true))
proto3 = StateNetwork.new(node3, ContentDB.new("", uint32.high, inMemory = true))
# Node1 knows about Node2, and Node2 knows about Node3 which hold all content
check proto1.portalProtocol.addNode(node2.localNode) == Added
@ -124,10 +124,10 @@ procSuite "State Content Network":
contentType: accountTrieNode, accountTrieNodeKey: accountTrieNodeKey)
contentId = toContentId(contentKey)
proto2.contentDB.put(contentId, v)
discard proto2.contentDB.put(contentId, v, proto2.portalProtocol.localNode.id)
# Not needed right now as 1 node is enough considering node 1 is connected
# to both.
proto3.contentDB.put(contentId, v)
discard proto3.contentDB.put(contentId, v, proto3.portalProtocol.localNode.id)
# Get first key
var nodeHash: NodeHash
@ -160,8 +160,8 @@ procSuite "State Content Network":
rng, PrivateKey.random(rng[]), localAddress(20303))
proto1 = StateNetwork.new(node1, ContentDB.new("", inMemory = true))
proto2 = StateNetwork.new(node2, ContentDB.new("", inMemory = true))
proto1 = StateNetwork.new(node1, ContentDB.new("", uint32.high, inMemory = true))
proto2 = StateNetwork.new(node2, ContentDB.new("", uint32.high, inMemory = true))
check (await node1.ping(node2.localNode)).isOk()
check (await node2.ping(node1.localNode)).isOk()

View File

@ -24,6 +24,9 @@ const
defaultListenAddressDesc = $defaultListenAddress
defaultAdminListenAddressDesc = $defaultAdminListenAddress
# 100mb seems a bit smallish we may consider increasing defaults after some
# network measurements
defaultStorageSize* = uint32(1000 * 1000 * 100)
type
PortalCmd* = enum
@ -103,6 +106,14 @@ type
desc: "Portal wire protocol id for the network to connect to"
name: "protocol-id" .}: PortalProtocolId
# TODO maybe it is worth defining minimal storage size and throw error if
# value provided is smaller than minimum
storageSize* {.
desc: "Maximum amount (in bytes) of content which will be stored " &
"in local database."
defaultValue: defaultStorageSize
name: "storage-size" .}: uint32
case cmd* {.
command
defaultValue: noCommand }: PortalCmd
@ -214,7 +225,7 @@ proc run(config: PortalCliConf) =
d.open()
let
db = ContentDB.new("", inMemory = true)
db = ContentDB.new("", config.storageSize, inMemory = true)
portal = PortalProtocol.new(d, config.protocolId, db,
testHandler, validateContent,
bootstrapRecords = bootstrapRecords)