Rework of Portal json-rpc debug API and related functionality (#1102)
- More consistent naming in calls used by the JSON-RPC debug API - Use storeContent everywhere to make sure content is only stored if in range - Remove populateHistoryDb subcommand and replace by JSON-RPC call storeContent - Remove some whitespace and fix some indentations
This commit is contained in:
parent
96bb09457e
commit
f219c69840
|
@ -42,7 +42,6 @@ const
|
|||
type
|
||||
PortalCmd* = enum
|
||||
noCommand
|
||||
populateHistoryDb
|
||||
|
||||
PortalNetwork* = enum
|
||||
none
|
||||
|
@ -206,17 +205,6 @@ type
|
|||
defaultValue: noCommand .}: PortalCmd
|
||||
of noCommand:
|
||||
discard
|
||||
of populateHistoryDb:
|
||||
# Note: we could use the existing data dir here, but it would require
|
||||
# also to properly store the network key and default use the one available
|
||||
dbDir* {.
|
||||
desc: "The directory of the fluffy content database"
|
||||
defaultValue: ""
|
||||
name: "db-dir" }: OutDir
|
||||
dataFile* {.
|
||||
desc: "Specify a json file with a map of k:v pairs representing BlockHash : Rlp encoded block"
|
||||
defaultValue: ""
|
||||
name: "data-file" }: InputFile
|
||||
|
||||
proc parseCmdArg*(T: type enr.Record, p: TaintedString): T
|
||||
{.raises: [Defect, ConfigurationError].} =
|
||||
|
|
|
@ -31,7 +31,7 @@ export kvstore_sqlite3
|
|||
|
||||
const
|
||||
# Maximal number of ObjInfo objects held in memory per database scan. 100k
|
||||
# objects should result in memory usage of around 7mb which should be
|
||||
# objects should result in memory usage of around 7mb which should be
|
||||
# appropriate for even low resource devices
|
||||
maxObjPerScan = 100000
|
||||
|
||||
|
@ -138,7 +138,7 @@ proc getNFurthestElements*(
|
|||
else:
|
||||
if obj > heap[0]:
|
||||
discard heap.replace(obj)
|
||||
|
||||
|
||||
totalContentSize = totalContentSize + ri.payloadLength
|
||||
|
||||
var res: seq[ObjInfo] = newSeq[ObjInfo](heap.len())
|
||||
|
@ -211,9 +211,9 @@ proc del*(db: ContentDB, key: ContentId) =
|
|||
db.del(key.toByteArrayBE())
|
||||
|
||||
proc deleteFractionOfContent(
|
||||
db: ContentDB,
|
||||
target: Uint256,
|
||||
targetFraction: float64): (UInt256, int64, int64, int64) =
|
||||
db: ContentDB,
|
||||
target: Uint256,
|
||||
targetFraction: float64): (UInt256, int64, int64, int64) =
|
||||
## Procedure which tries to delete fraction of database by scanning maxObjPerScan
|
||||
## furthest elements.
|
||||
## If the maxObjPerScan furthest elements, is not enough to attain required fraction
|
||||
|
@ -238,7 +238,7 @@ proc deleteFractionOfContent(
|
|||
# this is our last element, do not delete it and report it as last non deleted
|
||||
# element
|
||||
return (elem.distFrom, bytesDeleted, totalContentSize, numOfDeletedElements)
|
||||
|
||||
|
||||
if bytesDeleted + elem.payloadLength < bytesToDelete:
|
||||
db.del(elem.contentId)
|
||||
bytesDeleted = bytesDeleted + elem.payloadLength
|
||||
|
@ -247,15 +247,15 @@ proc deleteFractionOfContent(
|
|||
return (elem.distFrom, bytesDeleted, totalContentSize, numOfDeletedElements)
|
||||
|
||||
proc put*(
|
||||
db: ContentDB,
|
||||
key: ContentId,
|
||||
value: openArray[byte],
|
||||
target: UInt256): PutResult =
|
||||
db: ContentDB,
|
||||
key: ContentId,
|
||||
value: openArray[byte],
|
||||
target: UInt256): PutResult =
|
||||
|
||||
db.put(key, value)
|
||||
|
||||
let dbSize = db.size()
|
||||
|
||||
|
||||
if dbSize < int64(db.maxSize):
|
||||
return PutResult(kind: ContentStored)
|
||||
else:
|
||||
|
|
|
@ -97,9 +97,9 @@ proc run(config: PortalConf) {.raises: [CatchableError, Defect].} =
|
|||
d.localNode.id.toByteArrayBE().toOpenArray(0, 8).toHex(), maxSize = config.storageSize)
|
||||
|
||||
portalConfig = PortalProtocolConfig.init(
|
||||
config.tableIpLimit,
|
||||
config.bucketIpLimit,
|
||||
config.bitsPerHop,
|
||||
config.tableIpLimit,
|
||||
config.bucketIpLimit,
|
||||
config.bitsPerHop,
|
||||
config.radiusConfig
|
||||
)
|
||||
stateNetwork = StateNetwork.new(d, db,
|
||||
|
@ -176,10 +176,3 @@ when isMainModule:
|
|||
case config.cmd
|
||||
of PortalCmd.noCommand:
|
||||
run(config)
|
||||
of PortalCmd.populateHistoryDb:
|
||||
let
|
||||
db = ContentDB.new(config.dbDir.string, config.storageSize)
|
||||
res = populateHistoryDb(db, config.dataFile.string)
|
||||
if res.isErr():
|
||||
fatal "Failed populating the history content db", error = $res.error
|
||||
quit 1
|
||||
|
|
|
@ -46,6 +46,7 @@ func decode*(contentKey: ByteList): Option[ContentKey] =
|
|||
return none[ContentKey]()
|
||||
|
||||
func toContentId*(contentKey: ByteList): ContentId =
|
||||
# TODO: Should we try to parse the content key here for invalid ones?
|
||||
let idHash = sha2.sha_256.digest(contentKey.asSeq())
|
||||
readUintBE[256](idHash.data)
|
||||
|
||||
|
|
|
@ -159,7 +159,7 @@ type
|
|||
routingTable*: RoutingTable
|
||||
baseProtocol*: protocol.Protocol
|
||||
contentDB*: ContentDB
|
||||
toContentId: ToContentIdHandler
|
||||
toContentId*: ToContentIdHandler
|
||||
validateContent: ContentValidationHandler
|
||||
radiusConfig: RadiusConfig
|
||||
dataRadius*: UInt256
|
||||
|
@ -1093,9 +1093,9 @@ proc neighborhoodGossip*(
|
|||
await p.offerQueue.addLast(req)
|
||||
|
||||
proc adjustRadius(
|
||||
p: PortalProtocol,
|
||||
fractionOfDeletedContent: float64,
|
||||
furthestElementInDbDistance: UInt256) =
|
||||
p: PortalProtocol,
|
||||
fractionOfDeletedContent: float64,
|
||||
furthestElementInDbDistance: UInt256) =
|
||||
|
||||
if fractionOfDeletedContent == 0.0:
|
||||
# even though pruning was triggered no content was deleted, it could happen
|
||||
|
@ -1108,14 +1108,13 @@ proc adjustRadius(
|
|||
# multiplication by float
|
||||
let invertedFractionAsInt = int64(1.0 / fractionOfDeletedContent)
|
||||
|
||||
let scaledRadius = p.dataRadius div u256(invertedFractionAsInt)
|
||||
let scaledRadius = p.dataRadius div u256(invertedFractionAsInt)
|
||||
|
||||
# Chose larger value to avoid situation, where furthestElementInDbDistance
|
||||
# is super close to local id, so local radius would end up too small
|
||||
# to accept any more data to local database
|
||||
# If scaledRadius radius will be larger it will still contain all elements
|
||||
let newRadius = max(scaledRadius, furthestElementInDbDistance)
|
||||
|
||||
|
||||
debug "Database pruned",
|
||||
oldRadius = p.dataRadius,
|
||||
|
|
|
@ -56,7 +56,7 @@ iterator blockHashes*(blockData: BlockDataTable): BlockHash =
|
|||
|
||||
yield blockHash
|
||||
|
||||
proc readBlockData(
|
||||
func readBlockData(
|
||||
hash: string, blockData: BlockData, verify = false):
|
||||
Result[seq[(ContentKey, seq[byte])], string] =
|
||||
var res: seq[(ContentKey, seq[byte])]
|
||||
|
@ -139,7 +139,7 @@ iterator blocks*(
|
|||
else:
|
||||
error "Failed reading block from block data", error = res.error
|
||||
|
||||
proc readBlockHeader*(blockData: BlockData): Result[BlockHeader, string] =
|
||||
func readBlockHeader*(blockData: BlockData): Result[BlockHeader, string] =
|
||||
var rlp =
|
||||
try:
|
||||
rlpFromHex(blockData.rlp)
|
||||
|
@ -155,21 +155,19 @@ proc readBlockHeader*(blockData: BlockData): Result[BlockHeader, string] =
|
|||
else:
|
||||
return err("Item is not a valid rlp list, number " & $blockData.number)
|
||||
|
||||
# TODO pass nodeid as uint256 so it will be possible to use put method which
|
||||
# preserves size
|
||||
proc populateHistoryDb*(
|
||||
db: ContentDB, dataFile: string, verify = false): Result[void, string] =
|
||||
proc historyStore*(
|
||||
p: PortalProtocol, dataFile: string, verify = false):
|
||||
Result[void, string] =
|
||||
let blockData = ? readBlockDataTable(dataFile)
|
||||
|
||||
for b in blocks(blockData, verify):
|
||||
for value in b:
|
||||
# Note: This is the slowest part due to the hashing that takes place.
|
||||
# TODO use put method which preserves size
|
||||
db.put(history_content.toContentId(value[0]), value[1])
|
||||
p.storeContent(history_content.toContentId(value[0]), value[1])
|
||||
|
||||
ok()
|
||||
|
||||
proc propagateHistoryDb*(
|
||||
proc historyPropagate*(
|
||||
p: PortalProtocol, dataFile: string, verify = false):
|
||||
Future[Result[void, string]] {.async.} =
|
||||
|
||||
|
@ -196,8 +194,7 @@ proc propagateHistoryDb*(
|
|||
info "Seeding block content into the network", contentKey = value[0]
|
||||
# Note: This is the slowest part due to the hashing that takes place.
|
||||
let contentId = history_content.toContentId(value[0])
|
||||
if p.inRange(contentId):
|
||||
p.contentDB.put(contentId, value[1])
|
||||
p.storeContent(contentId, value[1])
|
||||
|
||||
await gossipQueue.addLast(
|
||||
(ContentKeysList(@[encode(value[0])]), value[1]))
|
||||
|
@ -206,7 +203,7 @@ proc propagateHistoryDb*(
|
|||
else:
|
||||
return err(blockData.error)
|
||||
|
||||
proc propagateBlockHistoryDb*(
|
||||
proc historyPropagateBlock*(
|
||||
p: PortalProtocol, dataFile: string, blockHash: string, verify = false):
|
||||
Future[Result[void, string]] {.async.} =
|
||||
let blockDataTable = readBlockDataTable(dataFile)
|
||||
|
@ -227,8 +224,7 @@ proc propagateBlockHistoryDb*(
|
|||
for value in blockData:
|
||||
info "Seeding block content into the network", contentKey = value[0]
|
||||
let contentId = history_content.toContentId(value[0])
|
||||
if p.inRange(contentId):
|
||||
p.contentDB.put(contentId, value[1])
|
||||
p.storeContent(contentId, value[1])
|
||||
|
||||
await p.neighborhoodGossip(ContentKeysList(@[encode(value[0])]), value[1])
|
||||
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
## Portal History Network json-rpc debug & testing calls
|
||||
proc portal_history_store(contentId: string, content: string): bool
|
||||
proc portal_history_store(contentKey: string, content: string): bool
|
||||
proc portal_history_storeContent(dataFile: string): bool
|
||||
proc portal_history_propagate(dataFile: string): bool
|
||||
proc portal_history_propagateBlock(dataFile: string, blockHash: string): bool
|
||||
|
|
|
@ -20,17 +20,28 @@ proc installPortalDebugApiHandlers*(
|
|||
{.raises: [Defect, CatchableError].} =
|
||||
|
||||
rpcServer.rpc("portal_" & network & "_store") do(
|
||||
contentId: string, content: string) -> bool:
|
||||
# Using content id as parameter to make it more easy to store. Might evolve
|
||||
# in using content key.
|
||||
let cId = UInt256.fromBytesBE(hexToSeqByte(contentId))
|
||||
discard p.contentDB.put(cId, hexToSeqByte(content), p.localNode.id)
|
||||
contentKey: string, content: string) -> bool:
|
||||
let key = ByteList.init(hexToSeqByte(contentKey))
|
||||
let contentId = p.toContentId(key)
|
||||
|
||||
return true
|
||||
if contentId.isSome():
|
||||
p.storeContent(contentId.get(), hexToSeqByte(content))
|
||||
|
||||
return true
|
||||
else:
|
||||
raise newException(ValueError, "Invalid content key")
|
||||
|
||||
rpcServer.rpc("portal_" & network & "_storeContent") do(
|
||||
dataFile: string) -> bool:
|
||||
let res = p.historyStore(dataFile)
|
||||
if res.isOk():
|
||||
return true
|
||||
else:
|
||||
raise newException(ValueError, $res.error)
|
||||
|
||||
rpcServer.rpc("portal_" & network & "_propagate") do(
|
||||
dataFile: string) -> bool:
|
||||
let res = await p.propagateHistoryDb(dataFile)
|
||||
let res = await p.historyPropagate(dataFile)
|
||||
if res.isOk():
|
||||
return true
|
||||
else:
|
||||
|
@ -38,7 +49,7 @@ proc installPortalDebugApiHandlers*(
|
|||
|
||||
rpcServer.rpc("portal_" & network & "_propagateBlock") do(
|
||||
dataFile: string, blockHash: string) -> bool:
|
||||
let res = await p.propagateBlockHistoryDb(dataFile, blockHash)
|
||||
let res = await p.historyPropagateBlock(dataFile, blockHash)
|
||||
if res.isOk():
|
||||
return true
|
||||
else:
|
||||
|
|
Loading…
Reference in New Issue