Fluffy: Implement contains db handler to improve lookup performance when accepting offers (#2815)

* Remove getSszDecoded from ContentDb.

* Update ContentDb get to use onData callback to reduce copies.

* Use templates for helper procs in ContentDb.

* Add contains handler to portal protocol.

* Improve performance of DbGetHandler.
This commit is contained in:
bhartnett 2024-11-04 22:02:51 +08:00 committed by GitHub
parent 89fac051cd
commit 22653c83dd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 138 additions and 118 deletions

View File

@ -285,45 +285,21 @@ proc close*(db: ContentDB) =
db.largestDistanceStmt.disposeSafe()
discard db.kv.close()
## Private KvStoreRef Calls
proc get(kv: KvStoreRef, key: openArray[byte]): Opt[seq[byte]] =
var res: Opt[seq[byte]]
proc onData(data: openArray[byte]) =
res = Opt.some(@data)
discard kv.get(key, onData).expectDb()
return res
proc getSszDecoded(kv: KvStoreRef, key: openArray[byte], T: type auto): Opt[T] =
let res = kv.get(key)
if res.isSome():
try:
Opt.some(SSZ.decode(res.get(), T))
except SerializationError:
raiseAssert("Stored data should always be serialized correctly")
else:
Opt.none(T)
## Private ContentDB calls
proc get(db: ContentDB, key: openArray[byte]): Opt[seq[byte]] =
db.kv.get(key)
template get(db: ContentDB, key: openArray[byte], onData: DataProc): bool =
db.kv.get(key, onData).expectDb()
proc put(db: ContentDB, key, value: openArray[byte]) =
template put(db: ContentDB, key, value: openArray[byte]) =
db.kv.put(key, value).expectDb()
proc contains(db: ContentDB, key: openArray[byte]): bool =
template contains(db: ContentDB, key: openArray[byte]): bool =
db.kv.contains(key).expectDb()
proc del(db: ContentDB, key: openArray[byte]) =
template del(db: ContentDB, key: openArray[byte]) =
# TODO: Do we want to return the bool here too?
discard db.kv.del(key).expectDb()
proc getSszDecoded(db: ContentDB, key: openArray[byte], T: type auto): Opt[T] =
db.kv.getSszDecoded(key, T)
## Public ContentId based ContentDB calls
# TODO: Could also decide to use the ContentKey SSZ bytestring, as this is what
@ -334,9 +310,9 @@ proc getSszDecoded(db: ContentDB, key: openArray[byte], T: type auto): Opt[T] =
# checked with the Radius/distance of the node anyhow. So lets see how we end up
# using this mostly in the code.
proc get*(db: ContentDB, key: ContentId): Opt[seq[byte]] =
proc get*(db: ContentDB, key: ContentId, onData: DataProc): bool =
# TODO: Here it is unfortunate that ContentId is a uint256 instead of Digest256.
db.get(key.toBytesBE())
db.get(key.toBytesBE(), onData)
proc put*(db: ContentDB, key: ContentId, value: openArray[byte]) =
db.put(key.toBytesBE(), value)
@ -347,9 +323,6 @@ proc contains*(db: ContentDB, key: ContentId): bool =
proc del*(db: ContentDB, key: ContentId) =
db.del(key.toBytesBE())
proc getSszDecoded*(db: ContentDB, key: ContentId, T: type auto): Opt[T] =
db.getSszDecoded(key.toBytesBE(), T)
## Pruning related calls
proc deleteContentFraction*(
@ -484,10 +457,15 @@ proc adjustRadius(
proc createGetHandler*(db: ContentDB): DbGetHandler =
return (
proc(contentKey: ContentKeyByteList, contentId: ContentId): Opt[seq[byte]] =
let content = db.get(contentId).valueOr:
return Opt.none(seq[byte])
var res: seq[byte]
ok(content)
proc onData(data: openArray[byte]) =
res = @data
if db.get(contentId, onData):
Opt.some(res)
else:
Opt.none(seq[byte])
)
proc createStoreHandler*(db: ContentDB, cfg: RadiusConfig): DbStoreHandler =
@ -520,6 +498,12 @@ proc createStoreHandler*(db: ContentDB, cfg: RadiusConfig): DbStoreHandler =
db.put(contentId, content)
)
proc createContainsHandler*(db: ContentDB): DbContainsHandler =
return (
proc(contentKey: ContentKeyByteList, contentId: ContentId): bool =
db.contains(contentId)
)
proc createRadiusHandler*(db: ContentDB): DbRadiusHandler =
return (
proc(): UInt256 {.raises: [], gcsafe.} =

View File

@ -13,7 +13,7 @@ func xorDistance(a: openArray[byte], b: openArray[byte]): seq[byte] =
doAssert(a.len == b.len)
let length = a.len
var distance: seq[byte] = newSeq[byte](length)
var distance: seq[byte] = newSeqUninitialized[byte](length)
for i in 0 ..< length:
distance[i] = a[i] xor b[i]

View File

@ -442,68 +442,72 @@ func keepBootstrapsFrom*(db: BeaconDb, minSlot: Slot) =
let res = db.bootstraps.keepFromStmt.exec(minSlot.int64)
res.expect("SQL query OK")
proc getHandlerImpl(
db: BeaconDb, contentKey: ContentKeyByteList, contentId: ContentId
): results.Opt[seq[byte]] =
let contentKey = contentKey.decode().valueOr:
# TODO: as this should not fail, maybe it is better to raiseAssert ?
return Opt.none(seq[byte])
case contentKey.contentType
of unused:
raiseAssert "Should not be used and fail at decoding"
of lightClientBootstrap:
db.getBootstrap(contentId)
of lightClientUpdate:
let
# TODO: add validation that startPeriod is not from the future,
# this requires db to be aware off the current beacon time
startPeriod = contentKey.lightClientUpdateKey.startPeriod
# get max 128 updates
numOfUpdates = min(
uint64(MAX_REQUEST_LIGHT_CLIENT_UPDATES), contentKey.lightClientUpdateKey.count
)
toPeriod = startPeriod + numOfUpdates # Not inclusive
updates = db.getLightClientUpdates(startPeriod, toPeriod)
if len(updates) == 0:
Opt.none(seq[byte])
else:
# Note that this might not return all of the requested updates.
# This might seem faulty/tricky as it is also used in handleOffer to
# check if an offer should be accepted.
# But it is actually fine as this will occur only when the node is
# synced and it would not be able to verify the older updates in the
# range anyhow.
Opt.some(SSZ.encode(updates))
of lightClientFinalityUpdate:
# TODO:
# Return only when the update is better than what is requested by
# contentKey. This is currently not possible as the contentKey does not
# include best update information.
if db.finalityUpdateCache.isSome():
let slot = contentKey.lightClientFinalityUpdateKey.finalizedSlot
let cache = db.finalityUpdateCache.get()
if cache.lastFinalityUpdateSlot >= slot:
Opt.some(cache.lastFinalityUpdate)
else:
Opt.none(seq[byte])
else:
Opt.none(seq[byte])
of lightClientOptimisticUpdate:
# TODO same as above applies here too.
if db.optimisticUpdateCache.isSome():
let slot = contentKey.lightClientOptimisticUpdateKey.optimisticSlot
let cache = db.optimisticUpdateCache.get()
if cache.lastOptimisticUpdateSlot >= slot:
Opt.some(cache.lastOptimisticUpdate)
else:
Opt.none(seq[byte])
else:
Opt.none(seq[byte])
of beacon_content.ContentType.historicalSummaries:
db.get(contentId)
proc createGetHandler*(db: BeaconDb): DbGetHandler =
return (
proc(contentKey: ContentKeyByteList, contentId: ContentId): results.Opt[seq[byte]] =
let contentKey = contentKey.decode().valueOr:
# TODO: as this should not fail, maybe it is better to raiseAssert ?
return Opt.none(seq[byte])
case contentKey.contentType
of unused:
raiseAssert "Should not be used and fail at decoding"
of lightClientBootstrap:
db.getBootstrap(contentId)
of lightClientUpdate:
let
# TODO: add validation that startPeriod is not from the future,
# this requires db to be aware off the current beacon time
startPeriod = contentKey.lightClientUpdateKey.startPeriod
# get max 128 updates
numOfUpdates = min(
uint64(MAX_REQUEST_LIGHT_CLIENT_UPDATES),
contentKey.lightClientUpdateKey.count,
)
toPeriod = startPeriod + numOfUpdates # Not inclusive
updates = db.getLightClientUpdates(startPeriod, toPeriod)
if len(updates) == 0:
Opt.none(seq[byte])
else:
# Note that this might not return all of the requested updates.
# This might seem faulty/tricky as it is also used in handleOffer to
# check if an offer should be accepted.
# But it is actually fine as this will occur only when the node is
# synced and it would not be able to verify the older updates in the
# range anyhow.
Opt.some(SSZ.encode(updates))
of lightClientFinalityUpdate:
# TODO:
# Return only when the update is better than what is requested by
# contentKey. This is currently not possible as the contentKey does not
# include best update information.
if db.finalityUpdateCache.isSome():
let slot = contentKey.lightClientFinalityUpdateKey.finalizedSlot
let cache = db.finalityUpdateCache.get()
if cache.lastFinalityUpdateSlot >= slot:
Opt.some(cache.lastFinalityUpdate)
else:
Opt.none(seq[byte])
else:
Opt.none(seq[byte])
of lightClientOptimisticUpdate:
# TODO same as above applies here too.
if db.optimisticUpdateCache.isSome():
let slot = contentKey.lightClientOptimisticUpdateKey.optimisticSlot
let cache = db.optimisticUpdateCache.get()
if cache.lastOptimisticUpdateSlot >= slot:
Opt.some(cache.lastOptimisticUpdate)
else:
Opt.none(seq[byte])
else:
Opt.none(seq[byte])
of beacon_content.ContentType.historicalSummaries:
db.get(contentId)
db.getHandlerImpl(contentKey, contentId)
)
proc createStoreHandler*(db: BeaconDb): DbStoreHandler =
@ -573,6 +577,12 @@ proc createStoreHandler*(db: BeaconDb): DbStoreHandler =
db.put(contentId, content)
)
proc createContainsHandler*(db: BeaconDb): DbContainsHandler =
return (
proc(contentKey: ContentKeyByteList, contentId: ContentId): bool =
db.getHandlerImpl(contentKey, contentId).isSome()
)
proc createRadiusHandler*(db: BeaconDb): DbRadiusHandler =
return (
proc(): UInt256 {.raises: [], gcsafe.} =

View File

@ -208,6 +208,7 @@ proc new*(
toContentIdHandler,
createGetHandler(beaconDb),
createStoreHandler(beaconDb),
createContainsHandler(beaconDb),
createRadiusHandler(beaconDb),
stream,
bootstrapRecords,

View File

@ -677,6 +677,7 @@ proc new*(
toContentIdHandler,
createGetHandler(contentDB),
createStoreHandler(contentDB, portalConfig.radiusConfig),
createContainsHandler(contentDB),
createRadiusHandler(contentDB),
stream,
bootstrapRecords,

View File

@ -64,6 +64,7 @@ proc new*(
toContentIdHandler,
createGetHandler(contentDB),
createStoreHandler(contentDB, portalConfig.radiusConfig),
createContainsHandler(contentDB),
createRadiusHandler(contentDB),
s,
bootstrapRecords,

View File

@ -151,6 +151,10 @@ type
contentKey: ContentKeyByteList, contentId: ContentId, content: seq[byte]
) {.raises: [], gcsafe.}
DbContainsHandler* = proc(contentKey: ContentKeyByteList, contentId: ContentId): bool {.
raises: [], gcsafe
.}
DbRadiusHandler* = proc(): UInt256 {.raises: [], gcsafe.}
PortalProtocolId* = array[2, byte]
@ -183,6 +187,7 @@ type
contentCache: ContentCache
dbGet*: DbGetHandler
dbPut*: DbStoreHandler
dbContains*: DbContainsHandler
dataRadius*: DbRadiusHandler
bootstrapRecords*: seq[Record]
lastLookup: chronos.Moment
@ -319,7 +324,7 @@ func inRange(
let distance = p.distance(nodeId, contentId)
distance <= nodeRadius
proc inRange*(p: PortalProtocol, contentId: ContentId): bool =
template inRange*(p: PortalProtocol, contentId: ContentId): bool =
p.inRange(p.localNode.id, p.dataRadius(), contentId)
func truncateEnrs(
@ -474,7 +479,7 @@ proc handleOffer(p: PortalProtocol, o: OfferMessage, srcId: NodeId): seq[byte] =
)
if p.inRange(contentId):
if p.dbGet(contentKey, contentId).isErr:
if not p.dbContains(contentKey, contentId):
contentKeysBitList.setBit(i)
discard contentKeys.add(contentKey)
else:
@ -561,6 +566,7 @@ proc new*(
toContentId: ToContentIdHandler,
dbGet: DbGetHandler,
dbPut: DbStoreHandler,
dbContains: DbContainsHandler,
dbRadius: DbRadiusHandler,
stream: PortalStream,
bootstrapRecords: openArray[Record] = [],
@ -580,6 +586,7 @@ proc new*(
ContentCache.init(if config.disableContentCache: 0 else: config.contentCacheSize),
dbGet: dbGet,
dbPut: dbPut,
dbContains: dbContains,
dataRadius: dbRadius,
bootstrapRecords: @bootstrapRecords,
stream: stream,
@ -1627,7 +1634,6 @@ proc getLocalContent*(
# Check first if content is in range, as this is a cheaper operation
# than the database lookup.
if p.inRange(contentId):
doAssert(p.dbGet != nil)
p.dbGet(contentKey, contentId)
else:
Opt.none(seq[byte])

View File

@ -54,7 +54,7 @@ proc stop(hn: HistoryNode) {.async.} =
await hn.discoveryProtocol.closeWait()
proc containsId(hn: HistoryNode, contentId: ContentId): bool =
return hn.historyNetwork.contentDB.get(contentId).isSome()
return hn.historyNetwork.contentDB.contains(contentId)
proc createEmptyHeaders(fromNum: int, toNum: int): seq[Header] =
var headers: seq[Header]

View File

@ -57,7 +57,7 @@ proc stop(hn: HistoryNode) {.async.} =
await hn.discoveryProtocol.closeWait()
proc containsId(hn: HistoryNode, contentId: ContentId): bool =
return hn.historyNetwork.contentDB.get(contentId).isSome()
return hn.historyNetwork.contentDB.contains(contentId)
proc store*(hn: HistoryNode, blockHash: Hash32, blockHeader: Header) =
let

View File

@ -142,11 +142,10 @@ proc stop*(sn: StateNode) {.async.} =
await sn.discoveryProtocol.closeWait()
proc containsId*(sn: StateNode, contentId: ContentId): bool {.inline.} =
return sn.stateNetwork.portalProtocol
# The contentKey parameter isn't used but is required for compatibility
# with the dbGet handler inside getLocalContent.
.getLocalContent(ContentKeyByteList.init(@[]), contentId)
.isSome()
# The contentKey parameter isn't used but is required for compatibility with
# the dbContains handler
return
sn.stateNetwork.portalProtocol.dbContains(ContentKeyByteList.init(@[]), contentId)
proc mockStateRootLookup*(
sn: StateNode, blockNumOrHash: uint64 | Hash32, stateRoot: Hash32

View File

@ -26,26 +26,37 @@ suite "Content Database":
key = ContentId(UInt256.high()) # Some key
block:
let val = db.get(key)
var val = Opt.none(seq[byte])
proc onData(data: openArray[byte]) =
val = Opt.some(@data)
check:
db.get(key, onData) == false
val.isNone()
db.contains(key) == false
block:
discard db.putAndPrune(key, [byte 0, 1, 2, 3])
let val = db.get(key)
var val = Opt.none(seq[byte])
proc onData(data: openArray[byte]) =
val = Opt.some(@data)
check:
db.get(key, onData) == true
val.isSome()
val.get() == [byte 0, 1, 2, 3]
db.contains(key) == true
block:
db.del(key)
let val = db.get(key)
var val = Opt.none(seq[byte])
proc onData(data: openArray[byte]) =
val = Opt.some(@data)
check:
db.get(key, onData) == false
val.isNone()
db.contains(key) == false
@ -137,9 +148,9 @@ suite "Content Database":
# With the current settings the 2 furthest elements will be deleted,
# i.e key 30 and 40. The furthest non deleted one will have key 20.
pr10.distanceOfFurthestElement == thirdFurthest
db.get(furthestElement).isNone()
db.get(secondFurthest).isNone()
db.get(thirdFurthest).isSome()
not db.contains(furthestElement)
not db.contains(secondFurthest)
db.contains(thirdFurthest)
test "ContentDB force pruning":
const

View File

@ -50,6 +50,7 @@ proc initPortalProtocol(
toContentId,
createGetHandler(db),
createStoreHandler(db, defaultRadiusConfig),
createContainsHandler(db),
createRadiusHandler(db),
stream,
bootstrapRecords = bootstrapRecords,
@ -346,6 +347,7 @@ procSuite "Portal Wire Protocol Tests":
toContentId,
createGetHandler(db),
createStoreHandler(db, defaultRadiusConfig),
createContainsHandler(db),
createRadiusHandler(db),
stream,
)
@ -364,10 +366,10 @@ procSuite "Portal Wire Protocol Tests":
# Index 2 should be still be in database and its distance should be <=
# updated radius
check:
db.get((distances[0] xor proto1.localNode.id)).isNone()
db.get((distances[1] xor proto1.localNode.id)).isNone()
db.get((distances[2] xor proto1.localNode.id)).isNone()
db.get((distances[3] xor proto1.localNode.id)).isSome()
not db.contains((distances[0] xor proto1.localNode.id))
not db.contains((distances[1] xor proto1.localNode.id))
not db.contains((distances[2] xor proto1.localNode.id))
db.contains((distances[3] xor proto1.localNode.id))
# The radius has been updated and is lower than the maximum start value.
proto1.dataRadius() < UInt256.high
# Yet higher than or equal to the furthest non deleted element.

View File

@ -108,7 +108,11 @@ proc cmdBench(conf: DbConf) =
for key in keys:
withTimer(timers[tDbGet]):
let _ = db.get(key)
var val = Opt.none(seq[byte])
proc onData(data: openArray[byte]) =
val = Opt.some(@data)
let _ = db.get(key, onData)
for key in keys:
withTimer(timers[tDbContains]):

View File

@ -255,6 +255,7 @@ proc run(config: PortalCliConf) =
testContentIdHandler,
createGetHandler(db),
createStoreHandler(db, defaultRadiusConfig),
createContainsHandler(db),
createRadiusHandler(db),
stream,
bootstrapRecords = bootstrapRecords,