mirror of
https://github.com/status-im/nimbus-eth1.git
synced 2025-01-12 21:34:33 +00:00
Add contains handler to portal protocol.
This commit is contained in:
parent
37c795b391
commit
39aa06f0ad
@ -497,6 +497,12 @@ proc createStoreHandler*(db: ContentDB, cfg: RadiusConfig): DbStoreHandler =
|
|||||||
db.put(contentId, content)
|
db.put(contentId, content)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
proc createContainsHandler*(db: ContentDB): DbContainsHandler =
|
||||||
|
return (
|
||||||
|
proc(contentKey: ContentKeyByteList, contentId: ContentId): bool =
|
||||||
|
db.contains(contentId)
|
||||||
|
)
|
||||||
|
|
||||||
proc createRadiusHandler*(db: ContentDB): DbRadiusHandler =
|
proc createRadiusHandler*(db: ContentDB): DbRadiusHandler =
|
||||||
return (
|
return (
|
||||||
proc(): UInt256 {.raises: [], gcsafe.} =
|
proc(): UInt256 {.raises: [], gcsafe.} =
|
||||||
|
@ -442,68 +442,72 @@ func keepBootstrapsFrom*(db: BeaconDb, minSlot: Slot) =
|
|||||||
let res = db.bootstraps.keepFromStmt.exec(minSlot.int64)
|
let res = db.bootstraps.keepFromStmt.exec(minSlot.int64)
|
||||||
res.expect("SQL query OK")
|
res.expect("SQL query OK")
|
||||||
|
|
||||||
|
proc getHandlerImpl(
|
||||||
|
db: BeaconDb, contentKey: ContentKeyByteList, contentId: ContentId
|
||||||
|
): results.Opt[seq[byte]] =
|
||||||
|
let contentKey = contentKey.decode().valueOr:
|
||||||
|
# TODO: as this should not fail, maybe it is better to raiseAssert ?
|
||||||
|
return Opt.none(seq[byte])
|
||||||
|
|
||||||
|
case contentKey.contentType
|
||||||
|
of unused:
|
||||||
|
raiseAssert "Should not be used and fail at decoding"
|
||||||
|
of lightClientBootstrap:
|
||||||
|
db.getBootstrap(contentId)
|
||||||
|
of lightClientUpdate:
|
||||||
|
let
|
||||||
|
# TODO: add validation that startPeriod is not from the future,
|
||||||
|
# this requires db to be aware off the current beacon time
|
||||||
|
startPeriod = contentKey.lightClientUpdateKey.startPeriod
|
||||||
|
# get max 128 updates
|
||||||
|
numOfUpdates = min(
|
||||||
|
uint64(MAX_REQUEST_LIGHT_CLIENT_UPDATES), contentKey.lightClientUpdateKey.count
|
||||||
|
)
|
||||||
|
toPeriod = startPeriod + numOfUpdates # Not inclusive
|
||||||
|
updates = db.getLightClientUpdates(startPeriod, toPeriod)
|
||||||
|
|
||||||
|
if len(updates) == 0:
|
||||||
|
Opt.none(seq[byte])
|
||||||
|
else:
|
||||||
|
# Note that this might not return all of the requested updates.
|
||||||
|
# This might seem faulty/tricky as it is also used in handleOffer to
|
||||||
|
# check if an offer should be accepted.
|
||||||
|
# But it is actually fine as this will occur only when the node is
|
||||||
|
# synced and it would not be able to verify the older updates in the
|
||||||
|
# range anyhow.
|
||||||
|
Opt.some(SSZ.encode(updates))
|
||||||
|
of lightClientFinalityUpdate:
|
||||||
|
# TODO:
|
||||||
|
# Return only when the update is better than what is requested by
|
||||||
|
# contentKey. This is currently not possible as the contentKey does not
|
||||||
|
# include best update information.
|
||||||
|
if db.finalityUpdateCache.isSome():
|
||||||
|
let slot = contentKey.lightClientFinalityUpdateKey.finalizedSlot
|
||||||
|
let cache = db.finalityUpdateCache.get()
|
||||||
|
if cache.lastFinalityUpdateSlot >= slot:
|
||||||
|
Opt.some(cache.lastFinalityUpdate)
|
||||||
|
else:
|
||||||
|
Opt.none(seq[byte])
|
||||||
|
else:
|
||||||
|
Opt.none(seq[byte])
|
||||||
|
of lightClientOptimisticUpdate:
|
||||||
|
# TODO same as above applies here too.
|
||||||
|
if db.optimisticUpdateCache.isSome():
|
||||||
|
let slot = contentKey.lightClientOptimisticUpdateKey.optimisticSlot
|
||||||
|
let cache = db.optimisticUpdateCache.get()
|
||||||
|
if cache.lastOptimisticUpdateSlot >= slot:
|
||||||
|
Opt.some(cache.lastOptimisticUpdate)
|
||||||
|
else:
|
||||||
|
Opt.none(seq[byte])
|
||||||
|
else:
|
||||||
|
Opt.none(seq[byte])
|
||||||
|
of beacon_content.ContentType.historicalSummaries:
|
||||||
|
db.get(contentId)
|
||||||
|
|
||||||
proc createGetHandler*(db: BeaconDb): DbGetHandler =
|
proc createGetHandler*(db: BeaconDb): DbGetHandler =
|
||||||
return (
|
return (
|
||||||
proc(contentKey: ContentKeyByteList, contentId: ContentId): results.Opt[seq[byte]] =
|
proc(contentKey: ContentKeyByteList, contentId: ContentId): results.Opt[seq[byte]] =
|
||||||
let contentKey = contentKey.decode().valueOr:
|
db.getHandlerImpl(contentKey, contentId)
|
||||||
# TODO: as this should not fail, maybe it is better to raiseAssert ?
|
|
||||||
return Opt.none(seq[byte])
|
|
||||||
|
|
||||||
case contentKey.contentType
|
|
||||||
of unused:
|
|
||||||
raiseAssert "Should not be used and fail at decoding"
|
|
||||||
of lightClientBootstrap:
|
|
||||||
db.getBootstrap(contentId)
|
|
||||||
of lightClientUpdate:
|
|
||||||
let
|
|
||||||
# TODO: add validation that startPeriod is not from the future,
|
|
||||||
# this requires db to be aware off the current beacon time
|
|
||||||
startPeriod = contentKey.lightClientUpdateKey.startPeriod
|
|
||||||
# get max 128 updates
|
|
||||||
numOfUpdates = min(
|
|
||||||
uint64(MAX_REQUEST_LIGHT_CLIENT_UPDATES),
|
|
||||||
contentKey.lightClientUpdateKey.count,
|
|
||||||
)
|
|
||||||
toPeriod = startPeriod + numOfUpdates # Not inclusive
|
|
||||||
updates = db.getLightClientUpdates(startPeriod, toPeriod)
|
|
||||||
|
|
||||||
if len(updates) == 0:
|
|
||||||
Opt.none(seq[byte])
|
|
||||||
else:
|
|
||||||
# Note that this might not return all of the requested updates.
|
|
||||||
# This might seem faulty/tricky as it is also used in handleOffer to
|
|
||||||
# check if an offer should be accepted.
|
|
||||||
# But it is actually fine as this will occur only when the node is
|
|
||||||
# synced and it would not be able to verify the older updates in the
|
|
||||||
# range anyhow.
|
|
||||||
Opt.some(SSZ.encode(updates))
|
|
||||||
of lightClientFinalityUpdate:
|
|
||||||
# TODO:
|
|
||||||
# Return only when the update is better than what is requested by
|
|
||||||
# contentKey. This is currently not possible as the contentKey does not
|
|
||||||
# include best update information.
|
|
||||||
if db.finalityUpdateCache.isSome():
|
|
||||||
let slot = contentKey.lightClientFinalityUpdateKey.finalizedSlot
|
|
||||||
let cache = db.finalityUpdateCache.get()
|
|
||||||
if cache.lastFinalityUpdateSlot >= slot:
|
|
||||||
Opt.some(cache.lastFinalityUpdate)
|
|
||||||
else:
|
|
||||||
Opt.none(seq[byte])
|
|
||||||
else:
|
|
||||||
Opt.none(seq[byte])
|
|
||||||
of lightClientOptimisticUpdate:
|
|
||||||
# TODO same as above applies here too.
|
|
||||||
if db.optimisticUpdateCache.isSome():
|
|
||||||
let slot = contentKey.lightClientOptimisticUpdateKey.optimisticSlot
|
|
||||||
let cache = db.optimisticUpdateCache.get()
|
|
||||||
if cache.lastOptimisticUpdateSlot >= slot:
|
|
||||||
Opt.some(cache.lastOptimisticUpdate)
|
|
||||||
else:
|
|
||||||
Opt.none(seq[byte])
|
|
||||||
else:
|
|
||||||
Opt.none(seq[byte])
|
|
||||||
of beacon_content.ContentType.historicalSummaries:
|
|
||||||
db.get(contentId)
|
|
||||||
)
|
)
|
||||||
|
|
||||||
proc createStoreHandler*(db: BeaconDb): DbStoreHandler =
|
proc createStoreHandler*(db: BeaconDb): DbStoreHandler =
|
||||||
@ -573,6 +577,12 @@ proc createStoreHandler*(db: BeaconDb): DbStoreHandler =
|
|||||||
db.put(contentId, content)
|
db.put(contentId, content)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
proc createContainsHandler*(db: BeaconDb): DbContainsHandler =
|
||||||
|
return (
|
||||||
|
proc(contentKey: ContentKeyByteList, contentId: ContentId): bool =
|
||||||
|
db.getHandlerImpl(contentKey, contentId).isSome()
|
||||||
|
)
|
||||||
|
|
||||||
proc createRadiusHandler*(db: BeaconDb): DbRadiusHandler =
|
proc createRadiusHandler*(db: BeaconDb): DbRadiusHandler =
|
||||||
return (
|
return (
|
||||||
proc(): UInt256 {.raises: [], gcsafe.} =
|
proc(): UInt256 {.raises: [], gcsafe.} =
|
||||||
|
@ -208,6 +208,7 @@ proc new*(
|
|||||||
toContentIdHandler,
|
toContentIdHandler,
|
||||||
createGetHandler(beaconDb),
|
createGetHandler(beaconDb),
|
||||||
createStoreHandler(beaconDb),
|
createStoreHandler(beaconDb),
|
||||||
|
createContainsHandler(beaconDb),
|
||||||
createRadiusHandler(beaconDb),
|
createRadiusHandler(beaconDb),
|
||||||
stream,
|
stream,
|
||||||
bootstrapRecords,
|
bootstrapRecords,
|
||||||
|
@ -677,6 +677,7 @@ proc new*(
|
|||||||
toContentIdHandler,
|
toContentIdHandler,
|
||||||
createGetHandler(contentDB),
|
createGetHandler(contentDB),
|
||||||
createStoreHandler(contentDB, portalConfig.radiusConfig),
|
createStoreHandler(contentDB, portalConfig.radiusConfig),
|
||||||
|
createContainsHandler(contentDB),
|
||||||
createRadiusHandler(contentDB),
|
createRadiusHandler(contentDB),
|
||||||
stream,
|
stream,
|
||||||
bootstrapRecords,
|
bootstrapRecords,
|
||||||
|
@ -64,6 +64,7 @@ proc new*(
|
|||||||
toContentIdHandler,
|
toContentIdHandler,
|
||||||
createGetHandler(contentDB),
|
createGetHandler(contentDB),
|
||||||
createStoreHandler(contentDB, portalConfig.radiusConfig),
|
createStoreHandler(contentDB, portalConfig.radiusConfig),
|
||||||
|
createContainsHandler(contentDB),
|
||||||
createRadiusHandler(contentDB),
|
createRadiusHandler(contentDB),
|
||||||
s,
|
s,
|
||||||
bootstrapRecords,
|
bootstrapRecords,
|
||||||
|
@ -151,6 +151,10 @@ type
|
|||||||
contentKey: ContentKeyByteList, contentId: ContentId, content: seq[byte]
|
contentKey: ContentKeyByteList, contentId: ContentId, content: seq[byte]
|
||||||
) {.raises: [], gcsafe.}
|
) {.raises: [], gcsafe.}
|
||||||
|
|
||||||
|
DbContainsHandler* = proc(contentKey: ContentKeyByteList, contentId: ContentId): bool {.
|
||||||
|
raises: [], gcsafe
|
||||||
|
.}
|
||||||
|
|
||||||
DbRadiusHandler* = proc(): UInt256 {.raises: [], gcsafe.}
|
DbRadiusHandler* = proc(): UInt256 {.raises: [], gcsafe.}
|
||||||
|
|
||||||
PortalProtocolId* = array[2, byte]
|
PortalProtocolId* = array[2, byte]
|
||||||
@ -183,6 +187,7 @@ type
|
|||||||
contentCache: ContentCache
|
contentCache: ContentCache
|
||||||
dbGet*: DbGetHandler
|
dbGet*: DbGetHandler
|
||||||
dbPut*: DbStoreHandler
|
dbPut*: DbStoreHandler
|
||||||
|
dbContains*: DbContainsHandler
|
||||||
dataRadius*: DbRadiusHandler
|
dataRadius*: DbRadiusHandler
|
||||||
bootstrapRecords*: seq[Record]
|
bootstrapRecords*: seq[Record]
|
||||||
lastLookup: chronos.Moment
|
lastLookup: chronos.Moment
|
||||||
@ -474,7 +479,7 @@ proc handleOffer(p: PortalProtocol, o: OfferMessage, srcId: NodeId): seq[byte] =
|
|||||||
)
|
)
|
||||||
|
|
||||||
if p.inRange(contentId):
|
if p.inRange(contentId):
|
||||||
if p.dbGet(contentKey, contentId).isErr:
|
if not p.dbContains(contentKey, contentId):
|
||||||
contentKeysBitList.setBit(i)
|
contentKeysBitList.setBit(i)
|
||||||
discard contentKeys.add(contentKey)
|
discard contentKeys.add(contentKey)
|
||||||
else:
|
else:
|
||||||
@ -561,6 +566,7 @@ proc new*(
|
|||||||
toContentId: ToContentIdHandler,
|
toContentId: ToContentIdHandler,
|
||||||
dbGet: DbGetHandler,
|
dbGet: DbGetHandler,
|
||||||
dbPut: DbStoreHandler,
|
dbPut: DbStoreHandler,
|
||||||
|
dbContains: DbContainsHandler,
|
||||||
dbRadius: DbRadiusHandler,
|
dbRadius: DbRadiusHandler,
|
||||||
stream: PortalStream,
|
stream: PortalStream,
|
||||||
bootstrapRecords: openArray[Record] = [],
|
bootstrapRecords: openArray[Record] = [],
|
||||||
@ -580,6 +586,7 @@ proc new*(
|
|||||||
ContentCache.init(if config.disableContentCache: 0 else: config.contentCacheSize),
|
ContentCache.init(if config.disableContentCache: 0 else: config.contentCacheSize),
|
||||||
dbGet: dbGet,
|
dbGet: dbGet,
|
||||||
dbPut: dbPut,
|
dbPut: dbPut,
|
||||||
|
dbContains: dbContains,
|
||||||
dataRadius: dbRadius,
|
dataRadius: dbRadius,
|
||||||
bootstrapRecords: @bootstrapRecords,
|
bootstrapRecords: @bootstrapRecords,
|
||||||
stream: stream,
|
stream: stream,
|
||||||
@ -1627,7 +1634,6 @@ proc getLocalContent*(
|
|||||||
# Check first if content is in range, as this is a cheaper operation
|
# Check first if content is in range, as this is a cheaper operation
|
||||||
# than the database lookup.
|
# than the database lookup.
|
||||||
if p.inRange(contentId):
|
if p.inRange(contentId):
|
||||||
doAssert(p.dbGet != nil)
|
|
||||||
p.dbGet(contentKey, contentId)
|
p.dbGet(contentKey, contentId)
|
||||||
else:
|
else:
|
||||||
Opt.none(seq[byte])
|
Opt.none(seq[byte])
|
||||||
|
@ -24,29 +24,39 @@ suite "Content Database":
|
|||||||
"", uint32.high, RadiusConfig(kind: Dynamic), testId, inMemory = true
|
"", uint32.high, RadiusConfig(kind: Dynamic), testId, inMemory = true
|
||||||
)
|
)
|
||||||
key = ContentId(UInt256.high()) # Some key
|
key = ContentId(UInt256.high()) # Some key
|
||||||
dbGet = db.createGetHandler()
|
|
||||||
|
|
||||||
block:
|
block:
|
||||||
let val = dbGet(ContentKeyByteList.init(@[]), key)
|
var val = Opt.none(seq[byte])
|
||||||
|
proc onData(data: openArray[byte]) =
|
||||||
|
val = Opt.some(@data)
|
||||||
|
|
||||||
check:
|
check:
|
||||||
|
db.get(key, onData) == false
|
||||||
val.isNone()
|
val.isNone()
|
||||||
db.contains(key) == false
|
db.contains(key) == false
|
||||||
|
|
||||||
block:
|
block:
|
||||||
discard db.putAndPrune(key, [byte 0, 1, 2, 3])
|
discard db.putAndPrune(key, [byte 0, 1, 2, 3])
|
||||||
let val = dbGet(ContentKeyByteList.init(@[]), key)
|
|
||||||
|
var val = Opt.none(seq[byte])
|
||||||
|
proc onData(data: openArray[byte]) =
|
||||||
|
val = Opt.some(@data)
|
||||||
|
|
||||||
check:
|
check:
|
||||||
|
db.get(key, onData) == true
|
||||||
val.isSome()
|
val.isSome()
|
||||||
val.get() == [byte 0, 1, 2, 3]
|
val.get() == [byte 0, 1, 2, 3]
|
||||||
db.contains(key) == true
|
db.contains(key) == true
|
||||||
|
|
||||||
block:
|
block:
|
||||||
db.del(key)
|
db.del(key)
|
||||||
let val = dbGet(ContentKeyByteList.init(@[]), key)
|
|
||||||
|
var val = Opt.none(seq[byte])
|
||||||
|
proc onData(data: openArray[byte]) =
|
||||||
|
val = Opt.some(@data)
|
||||||
|
|
||||||
check:
|
check:
|
||||||
|
db.get(key, onData) == false
|
||||||
val.isNone()
|
val.isNone()
|
||||||
db.contains(key) == false
|
db.contains(key) == false
|
||||||
|
|
||||||
|
@ -50,6 +50,7 @@ proc initPortalProtocol(
|
|||||||
toContentId,
|
toContentId,
|
||||||
createGetHandler(db),
|
createGetHandler(db),
|
||||||
createStoreHandler(db, defaultRadiusConfig),
|
createStoreHandler(db, defaultRadiusConfig),
|
||||||
|
createContainsHandler(db),
|
||||||
createRadiusHandler(db),
|
createRadiusHandler(db),
|
||||||
stream,
|
stream,
|
||||||
bootstrapRecords = bootstrapRecords,
|
bootstrapRecords = bootstrapRecords,
|
||||||
@ -346,6 +347,7 @@ procSuite "Portal Wire Protocol Tests":
|
|||||||
toContentId,
|
toContentId,
|
||||||
createGetHandler(db),
|
createGetHandler(db),
|
||||||
createStoreHandler(db, defaultRadiusConfig),
|
createStoreHandler(db, defaultRadiusConfig),
|
||||||
|
createContainsHandler(db),
|
||||||
createRadiusHandler(db),
|
createRadiusHandler(db),
|
||||||
stream,
|
stream,
|
||||||
)
|
)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user