diff --git a/fluffy/conf.nim b/fluffy/conf.nim index ed7f2e0a3..17ddf72dd 100644 --- a/fluffy/conf.nim +++ b/fluffy/conf.nim @@ -33,6 +33,9 @@ const defaultAdminListenAddressDesc = $defaultAdminListenAddress defaultDataDirDesc = defaultDataDir() defaultClientConfigDesc = $(defaultClientConfig.httpUri) + # 100mb seems a bit smallish we may consider increasing defaults after some + # network measurements + defaultStorageSize* = uint32(1000 * 1000 * 100) type PortalCmd* = enum @@ -184,6 +187,14 @@ type defaultValue: DefaultBitsPerHop name: "bits-per-hop" .}: int + # TODO maybe it is worth defining minimal storage size and throw error if + # value provided is smaller than minimum + storageSize* {. + desc: "Maximum amount (in bytes) of content which will be stored " & + "in local database." + defaultValue: defaultStorageSize + name: "storage-size" .}: uint32 + case cmd* {. command defaultValue: noCommand .}: PortalCmd diff --git a/fluffy/content_db.nim b/fluffy/content_db.nim index 7e516d9f6..29a0489b1 100644 --- a/fluffy/content_db.nim +++ b/fluffy/content_db.nim @@ -29,6 +29,12 @@ export kvstore_sqlite3 # 3. Or databases are created per network (and kvstores pre content type) and # thus depending on the network the right db needs to be selected. +const + # Maximal number of ObjInfo objects held in memory per database scan. 100k + # objects should result in memory usage of around 7mb which should be + # appropriate for even low resource devices + maxObjPerScan = 100000 + type RowInfo = tuple contentId: array[32, byte] @@ -41,10 +47,22 @@ type ContentDB* = ref object kv: KvStoreRef + maxSize: uint32 sizeStmt: SqliteStmt[NoParams, int64] vacStmt: SqliteStmt[NoParams, void] getAll: SqliteStmt[NoParams, RowInfo] + PutResultType* = enum + ContentStored, DbPruned + + PutResult* = object + case kind*: PutResultType + of ContentStored: + discard + of DbPruned: + furthestStoredElementDistance*: UInt256 + fractionOfDeletedContent*: float64 + # Objects must be sorted from largest to closest distance proc `<`(a, b: ObjInfo): bool = return a.distFrom < b.distFrom @@ -54,7 +72,7 @@ template expectDb(x: auto): untyped = # full disk - this requires manual intervention, so we'll panic for now x.expect("working database (disk broken/full?)") -proc new*(T: type ContentDB, path: string, inMemory = false): ContentDB = +proc new*(T: type ContentDB, path: string, maxSize: uint32, inMemory = false): ContentDB = let db = if inMemory: SqStoreRef.init("", "fluffy-test", inMemory = true).expect( @@ -80,10 +98,10 @@ proc new*(T: type ContentDB, path: string, inMemory = false): ContentDB = ).get() ContentDB( - kv: kvStore, sizeStmt: getSizeStmt, vacStmt: vacStmt, getAll: getKeysStmt) + kv: kvStore, maxSize: maxSize, sizeStmt: getSizeStmt, vacStmt: vacStmt, getAll: getKeysStmt) proc getNFurthestElements*( - db: ContentDB, target: UInt256, n: uint64): seq[ObjInfo] = + db: ContentDB, target: UInt256, n: uint64): (seq[ObjInfo], int64) = ## Get at most n furthest elements from db in order from furthest to closest. ## Payload lengths are also returned so the caller can decide how many of ## those elements need to be deleted. @@ -99,9 +117,10 @@ proc getNFurthestElements*( ## would be possible we may be able to all this work by one sql query if n == 0: - return newSeq[ObjInfo]() + return (newSeq[ObjInfo](), 0'i64) var heap = initHeapQueue[ObjInfo]() + var totalContentSize: int64 = 0 var ri: RowInfo for e in db.getAll.exec(ri): @@ -118,6 +137,8 @@ proc getNFurthestElements*( else: if obj > heap[0]: discard heap.replace(obj) + + totalContentSize = totalContentSize + ri.payloadLength var res: seq[ObjInfo] = newSeq[ObjInfo](heap.len()) @@ -126,7 +147,7 @@ proc getNFurthestElements*( res[i] = heap.pop() dec i - return res + return (res, totalContentSize) proc reclaimSpace*(db: ContentDB): void = ## Runs sqlite VACUUM commands which rebuilds the db, repacking it into a @@ -158,7 +179,7 @@ proc get*(db: ContentDB, key: openArray[byte]): Option[seq[byte]] = return res -proc put*(db: ContentDB, key, value: openArray[byte]) = +proc put(db: ContentDB, key, value: openArray[byte]) = db.kv.put(key, value).expectDb() proc contains*(db: ContentDB, key: openArray[byte]): bool = @@ -179,6 +200,8 @@ proc get*(db: ContentDB, key: ContentId): Option[seq[byte]] = # TODO: Here it is unfortunate that ContentId is a uint256 instead of Digest256. db.get(key.toByteArrayBE()) +# TODO: Public due to usage in populating portal db, should be made private after +# improving db populating to use local node id proc put*(db: ContentDB, key: ContentId, value: openArray[byte]) = db.put(key.toByteArrayBE(), value) @@ -187,3 +210,61 @@ proc contains*(db: ContentDB, key: ContentId): bool = proc del*(db: ContentDB, key: ContentId) = db.del(key.toByteArrayBE()) + +proc deleteFractionOfContent( + db: ContentDB, + target: Uint256, + targetFraction: float64): (UInt256, int64, int64) = + ## Procedure which tries to delete fraction of database by scanning maxObjPerScan + ## furthest elements. + ## If the maxObjPerScan furthest elements, is not enough to attain required fraction + ## procedure deletes all but one element and report how many bytes have been + ## deleted + ## Procedure do not call reclaim space, it is left to the caller. + + let (furthestElements, totalContentSize) = db.getNFurthestElements(target, maxObjPerScan) + var bytesDeleted: int64 = 0 + let bytesToDelete = int64(targetFraction * float64(totalContentSize)) + let numOfElements = len(furthestElements) + + if numOfElements == 0: + # no elements in database, return some zero value + return (UInt256.zero, 0'i64, 0'i64) + + let lastIdx = len(furthestElements) - 1 + + for i, elem in furthestElements: + if i == lastIdx: + # this is our last element, do not delete it and report it as last non deleted + # element + return (elem.distFrom, bytesDeleted, totalContentSize) + + if bytesDeleted + elem.payloadLength < bytesToDelete: + db.del(elem.contentId) + bytesDeleted = bytesDeleted + elem.payloadLength + else: + return (elem.distFrom, bytesDeleted, totalContentSize) + +proc put*( + db: ContentDB, + key: ContentId, + value: openArray[byte], + target: UInt256): PutResult = + db.put(key, value) + let dbSize = db.size() + + if dbSize < int64(db.maxSize): + return PutResult(kind: ContentStored) + else: + # TODO Add some configuration for this magic number + let (furthestNonDeletedElement, deletedBytes, totalContentSize) = + db.deleteFractionOfContent(target, 0.25) + + let deletedFraction = float64(deletedBytes) / float64(totalContentSize) + + db.reclaimSpace() + + return PutResult( + kind: DbPruned, + furthestStoredElementDistance: furthestNonDeletedElement, + fractionOfDeletedContent: deletedFraction) diff --git a/fluffy/fluffy.nim b/fluffy/fluffy.nim index 1ca546c7d..81bf14759 100644 --- a/fluffy/fluffy.nim +++ b/fluffy/fluffy.nim @@ -100,7 +100,7 @@ proc run(config: PortalConf) {.raises: [CatchableError, Defect].} = let radius = UInt256.fromLogRadius(config.logRadius) db = ContentDB.new(config.dataDir / "db" / "contentdb_" & - d.localNode.id.toByteArrayBE().toOpenArray(0, 8).toHex()) + d.localNode.id.toByteArrayBE().toOpenArray(0, 8).toHex(), maxSize = config.storageSize) portalConfig = PortalProtocolConfig.init( config.tableIpLimit, config.bucketIpLimit, config.bitsPerHop) @@ -180,7 +180,7 @@ when isMainModule: run(config) of PortalCmd.populateHistoryDb: let - db = ContentDB.new(config.dbDir.string) + db = ContentDB.new(config.dbDir.string, config.storageSize) res = populateHistoryDb(db, config.dataFile.string) if res.isErr(): fatal "Failed populating the history content db", error = $res.error diff --git a/fluffy/network/history/history_network.nim b/fluffy/network/history/history_network.nim index 1ce8e0ecf..4dfd377af 100644 --- a/fluffy/network/history/history_network.nim +++ b/fluffy/network/history/history_network.nim @@ -148,7 +148,12 @@ proc getBlockHeader*( if h.portalProtocol.inRange(contentId): # content is valid and in our range, save it into our db - h.contentDB.put(contentId, headerContent.content) + # TODO handle radius adjustments + discard h.contentDB.put( + contentId, + headerContent.content, + h.portalProtocol.localNode.id + ) return maybeHeader @@ -198,7 +203,11 @@ proc getBlock*( # content is in range and valid, put into db if h.portalProtocol.inRange(contentId): - h.contentDB.put(contentId, bodyContent.content) + # TODO handle radius adjustments + discard h.contentDB.put( + contentId, bodyContent.content, + h.portalProtocol.localNode.id + ) return some[Block]((header, blockBody)) diff --git a/fluffy/network/state/state_network.nim b/fluffy/network/state/state_network.nim index c50caaaba..ffbf8331e 100644 --- a/fluffy/network/state/state_network.nim +++ b/fluffy/network/state/state_network.nim @@ -53,7 +53,7 @@ proc getContent*(n: StateNetwork, key: ContentKey): # When content is found on the network and is in the radius range, store it. if content.isSome() and contentInRange: # TODO Add poke when working on state network - n.contentDB.put(contentId, contentResult.content) + discard n.contentDB.put(contentId, contentResult.content, n.portalProtocol.localNode.id) # TODO: for now returning bytes, ultimately it would be nice to return proper # domain types. diff --git a/fluffy/network/wire/portal_protocol.nim b/fluffy/network/wire/portal_protocol.nim index c45917d5f..88a9acbdf 100644 --- a/fluffy/network/wire/portal_protocol.nim +++ b/fluffy/network/wire/portal_protocol.nim @@ -1084,7 +1084,12 @@ proc processContent( let contentId = contentIdOpt.get() # Store content, should we recheck radius? - p.contentDB.put(contentId, content) + # TODO handle radius adjustments + discard p.contentDB.put( + contentId, + content, + p.baseProtocol.localNode.id + ) info "Received valid offered content", contentKey diff --git a/fluffy/populate_db.nim b/fluffy/populate_db.nim index c3db791a7..aacb1f080 100644 --- a/fluffy/populate_db.nim +++ b/fluffy/populate_db.nim @@ -137,6 +137,8 @@ iterator blocks*( else: error "Failed reading block from block data", error = res.error +# TODO pass nodeid as uint256 so it will be possible to use put method which +# preserves size proc populateHistoryDb*( db: ContentDB, dataFile: string, verify = false): Result[void, string] = let blockData = ? readBlockDataTable(dataFile) @@ -144,6 +146,7 @@ proc populateHistoryDb*( for b in blocks(blockData, verify): for value in b: # Note: This is the slowest part due to the hashing that takes place. + # TODO use put method which preserves size db.put(history_content.toContentId(value[0]), value[1]) ok() diff --git a/fluffy/rpc/rpc_portal_debug_api.nim b/fluffy/rpc/rpc_portal_debug_api.nim index 03d8fa0c7..cf8e10577 100644 --- a/fluffy/rpc/rpc_portal_debug_api.nim +++ b/fluffy/rpc/rpc_portal_debug_api.nim @@ -23,7 +23,8 @@ proc installPortalDebugApiHandlers*( contentId: string, content: string) -> bool: # Using content id as parameter to make it more easy to store. Might evolve # in using content key. - p.contentDB.put(hexToSeqByte(contentId), hexToSeqByte(content)) + let cId = UInt256.fromBytesBE(hexToSeqByte(contentId)) + discard p.contentDB.put(cId, hexToSeqByte(content), p.localNode.id) return true diff --git a/fluffy/tests/test_content_db.nim b/fluffy/tests/test_content_db.nim index 8d11bd1fa..9543d83ac 100644 --- a/fluffy/tests/test_content_db.nim +++ b/fluffy/tests/test_content_db.nim @@ -35,12 +35,12 @@ proc generateNRandomU256(rng: var BrHmacDrbgContext, n: int): seq[UInt256] = suite "Content Database": let rng = newRng() - + let testId = u256(0) # Note: We are currently not really testing something new here just basic # underlying kvstore. test "ContentDB basic API": let - db = ContentDB.new("", inMemory = true) + db = ContentDB.new("", uint32.high, inMemory = true) key = ContentId(UInt256.high()) # Some key block: @@ -51,7 +51,7 @@ suite "Content Database": db.contains(key) == false block: - db.put(key, [byte 0, 1, 2, 3]) + discard db.put(key, [byte 0, 1, 2, 3], testId) let val = db.get(key) check: @@ -69,15 +69,15 @@ suite "Content Database": test "ContentDB size": let - db = ContentDB.new("", inMemory = true) + db = ContentDB.new("", uint32.high, inMemory = true) let numBytes = 10000 let size1 = db.size() - db.put(@[1'u8], genByteSeq(numBytes)) + discard db.put(u256(1), genByteSeq(numBytes), testId) let size2 = db.size() - db.put(@[2'u8], genByteSeq(numBytes)) + discard db.put(u256(2), genByteSeq(numBytes), testId) let size3 = db.size() - db.put(@[2'u8], genByteSeq(numBytes)) + discard db.put(u256(2), genByteSeq(numBytes), testId) let size4 = db.size() check: @@ -85,8 +85,8 @@ suite "Content Database": size3 > size2 size3 == size4 - db.del(@[2'u8]) - db.del(@[1'u8]) + db.del(u256(2)) + db.del(u256(1)) let size5 = db.size() @@ -134,12 +134,12 @@ suite "Content Database": for testCase in testCases: let - db = ContentDB.new("", inMemory = true) + db = ContentDB.new("", uint32.high, inMemory = true) for elem in testCase.keys: - db.put(elem, genByteSeq(32)) + discard db.put(elem, genByteSeq(32), testId) - let furthest = db.getNFurthestElements(zero, testCase.n) + let (furthest, _) = db.getNFurthestElements(zero, testCase.n) var sortedKeys = testCase.keys @@ -153,3 +153,46 @@ suite "Content Database": uint64(len(furthest)) == testCase.n check: furthest.hasCorrectOrder(sortedKeys) + + test "ContentDB pruning": + let + maxDbSize = uint32(100000) + db = ContentDB.new("", maxDbSize, inMemory = true) + + let furthestElement = u256(40) + let secondFurthest = u256(30) + let thirdFurthest = u256(20) + + + let numBytes = 10000 + let pr1 = db.put(u256(1), genByteSeq(numBytes), u256(0)) + let pr2 = db.put(thirdFurthest, genByteSeq(numBytes), u256(0)) + let pr3 = db.put(u256(3), genByteSeq(numBytes), u256(0)) + let pr4 = db.put(u256(10), genByteSeq(numBytes), u256(0)) + let pr5 = db.put(u256(5), genByteSeq(numBytes), u256(0)) + let pr6 = db.put(u256(10), genByteSeq(numBytes), u256(0)) + let pr7 = db.put(furthestElement, genByteSeq(numBytes), u256(0)) + let pr8 = db.put(secondFurthest, genByteSeq(numBytes), u256(0)) + let pr9 = db.put(u256(2), genByteSeq(numBytes), u256(0)) + let pr10 = db.put(u256(4), genByteSeq(numBytes), u256(0)) + + check: + pr1.kind == ContentStored + pr2.kind == ContentStored + pr3.kind == ContentStored + pr4.kind == ContentStored + pr5.kind == ContentStored + pr6.kind == ContentStored + pr7.kind == ContentStored + pr8.kind == ContentStored + pr9.kind == ContentStored + pr10.kind == DbPruned + + check: + uint32(db.size()) < maxDbSize + # With current settings 2 furthers elements will be delted i.e 30 and 40 + # so the furthest non deleted one will be 20 + pr10.furthestStoredElementDistance == thirdFurthest + db.get(furthestElement).isNone() + db.get(secondFurthest).isNone() + db.get(thirdFurthest).isSome() diff --git a/fluffy/tests/test_portal_wire_protocol.nim b/fluffy/tests/test_portal_wire_protocol.nim index a44983032..74c49ef49 100644 --- a/fluffy/tests/test_portal_wire_protocol.nim +++ b/fluffy/tests/test_portal_wire_protocol.nim @@ -49,8 +49,8 @@ proc defaultTestCase(rng: ref BrHmacDrbgContext): Default2NodeTest = node2 = initDiscoveryNode( rng, PrivateKey.random(rng[]), localAddress(20303)) - db1 = ContentDB.new("", inMemory = true) - db2 = ContentDB.new("", inMemory = true) + db1 = ContentDB.new("", uint32.high, inMemory = true) + db2 = ContentDB.new("", uint32.high, inMemory = true) proto1 = PortalProtocol.new(node1, protocolId, db1, testHandler, validateContent) @@ -207,9 +207,9 @@ procSuite "Portal Wire Protocol Tests": node3 = initDiscoveryNode( rng, PrivateKey.random(rng[]), localAddress(20304)) - db1 = ContentDB.new("", inMemory = true) - db2 = ContentDB.new("", inMemory = true) - db3 = ContentDB.new("", inMemory = true) + db1 = ContentDB.new("", uint32.high, inMemory = true) + db2 = ContentDB.new("", uint32.high, inMemory = true) + db3 = ContentDB.new("", uint32.high, inMemory = true) proto1 = PortalProtocol.new( node1, protocolId, db1, testHandler, validateContent) @@ -243,9 +243,9 @@ procSuite "Portal Wire Protocol Tests": node3 = initDiscoveryNode( rng, PrivateKey.random(rng[]), localAddress(20304)) - db1 = ContentDB.new("", inMemory = true) - db2 = ContentDB.new("", inMemory = true) - db3 = ContentDB.new("", inMemory = true) + db1 = ContentDB.new("", uint32.high, inMemory = true) + db2 = ContentDB.new("", uint32.high, inMemory = true) + db3 = ContentDB.new("", uint32.high, inMemory = true) proto1 = PortalProtocol.new( node1, protocolId, db1, testHandlerSha256, validateContent) @@ -259,7 +259,7 @@ procSuite "Portal Wire Protocol Tests": contentId = readUintBE[256](sha256.digest(content).data) # Only node3 have content - db3.put(contentId, content) + discard db3.put(contentId, content, proto3.localNode.id) # Node1 knows about Node2, and Node2 knows about Node3 which hold all content # Node1 needs to known Node2 radius to determine if node2 is interested in content @@ -291,8 +291,8 @@ procSuite "Portal Wire Protocol Tests": node2 = initDiscoveryNode( rng, PrivateKey.random(rng[]), localAddress(20303)) - db1 = ContentDB.new("", inMemory = true) - db2 = ContentDB.new("", inMemory = true) + db1 = ContentDB.new("", uint32.high, inMemory = true) + db2 = ContentDB.new("", uint32.high, inMemory = true) proto1 = PortalProtocol.new( node1, protocolId, db1, testHandler, validateContent) @@ -317,7 +317,7 @@ procSuite "Portal Wire Protocol Tests": node2 = initDiscoveryNode( rng, PrivateKey.random(rng[]), localAddress(20303)) - db = ContentDB.new("", inMemory = true) + db = ContentDB.new("", uint32.high, inMemory = true) # No portal protocol for node1, hence an invalid bootstrap node proto2 = PortalProtocol.new(node2, protocolId, db, testHandler, validateContent, bootstrapRecords = [node1.localNode.record]) diff --git a/fluffy/tests/test_state_network.nim b/fluffy/tests/test_state_network.nim index 2103f6aac..45ff678d5 100644 --- a/fluffy/tests/test_state_network.nim +++ b/fluffy/tests/test_state_network.nim @@ -45,8 +45,8 @@ procSuite "State Content Network": node2 = initDiscoveryNode( rng, PrivateKey.random(rng[]), localAddress(20303)) - proto1 = StateNetwork.new(node1, ContentDB.new("", inMemory = true)) - proto2 = StateNetwork.new(node2, ContentDB.new("", inMemory = true)) + proto1 = StateNetwork.new(node1, ContentDB.new("", uint32.high, inMemory = true)) + proto2 = StateNetwork.new(node2, ContentDB.new("", uint32.high, inMemory = true)) check proto2.portalProtocol.addNode(node1.localNode) == Added @@ -64,7 +64,7 @@ procSuite "State Content Network": contentType: accountTrieNode, accountTrieNodeKey: accountTrieNodeKey) contentId = toContentId(contentKey) - proto1.contentDB.put(contentId, v) + discard proto1.contentDB.put(contentId, v, proto1.portalProtocol.localNode.id) for key in keys: var nodeHash: NodeHash @@ -101,9 +101,9 @@ procSuite "State Content Network": node3 = initDiscoveryNode( rng, PrivateKey.random(rng[]), localAddress(20304)) - proto1 = StateNetwork.new(node1, ContentDB.new("", inMemory = true)) - proto2 = StateNetwork.new(node2, ContentDB.new("", inMemory = true)) - proto3 = StateNetwork.new(node3, ContentDB.new("", inMemory = true)) + proto1 = StateNetwork.new(node1, ContentDB.new("", uint32.high, inMemory = true)) + proto2 = StateNetwork.new(node2, ContentDB.new("", uint32.high, inMemory = true)) + proto3 = StateNetwork.new(node3, ContentDB.new("", uint32.high, inMemory = true)) # Node1 knows about Node2, and Node2 knows about Node3 which hold all content check proto1.portalProtocol.addNode(node2.localNode) == Added @@ -124,10 +124,10 @@ procSuite "State Content Network": contentType: accountTrieNode, accountTrieNodeKey: accountTrieNodeKey) contentId = toContentId(contentKey) - proto2.contentDB.put(contentId, v) + discard proto2.contentDB.put(contentId, v, proto2.portalProtocol.localNode.id) # Not needed right now as 1 node is enough considering node 1 is connected # to both. - proto3.contentDB.put(contentId, v) + discard proto3.contentDB.put(contentId, v, proto3.portalProtocol.localNode.id) # Get first key var nodeHash: NodeHash @@ -160,8 +160,8 @@ procSuite "State Content Network": rng, PrivateKey.random(rng[]), localAddress(20303)) - proto1 = StateNetwork.new(node1, ContentDB.new("", inMemory = true)) - proto2 = StateNetwork.new(node2, ContentDB.new("", inMemory = true)) + proto1 = StateNetwork.new(node1, ContentDB.new("", uint32.high, inMemory = true)) + proto2 = StateNetwork.new(node2, ContentDB.new("", uint32.high, inMemory = true)) check (await node1.ping(node2.localNode)).isOk() check (await node2.ping(node1.localNode)).isOk() diff --git a/fluffy/tools/portalcli.nim b/fluffy/tools/portalcli.nim index f46313cde..2556e4ebf 100644 --- a/fluffy/tools/portalcli.nim +++ b/fluffy/tools/portalcli.nim @@ -24,7 +24,10 @@ const defaultListenAddressDesc = $defaultListenAddress defaultAdminListenAddressDesc = $defaultAdminListenAddress - + # 100mb seems a bit smallish we may consider increasing defaults after some + # network measurements + defaultStorageSize* = uint32(1000 * 1000 * 100) + type PortalCmd* = enum noCommand @@ -103,6 +106,14 @@ type desc: "Portal wire protocol id for the network to connect to" name: "protocol-id" .}: PortalProtocolId + # TODO maybe it is worth defining minimal storage size and throw error if + # value provided is smaller than minimum + storageSize* {. + desc: "Maximum amount (in bytes) of content which will be stored " & + "in local database." + defaultValue: defaultStorageSize + name: "storage-size" .}: uint32 + case cmd* {. command defaultValue: noCommand }: PortalCmd @@ -214,7 +225,7 @@ proc run(config: PortalCliConf) = d.open() let - db = ContentDB.new("", inMemory = true) + db = ContentDB.new("", config.storageSize, inMemory = true) portal = PortalProtocol.new(d, config.protocolId, db, testHandler, validateContent, bootstrapRecords = bootstrapRecords)