Add database contains handler to portal protocol and use it
This commit is contained in:
parent
bb88b50298
commit
98cebad9f9
|
@ -404,3 +404,9 @@ proc createStoreHandler*(
|
||||||
# so we will effectivly store fraction of the network
|
# so we will effectivly store fraction of the network
|
||||||
db.put(contentId, content)
|
db.put(contentId, content)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
proc createContainsHandler*(db: ContentDB): DbContainsHandler =
|
||||||
|
return (
|
||||||
|
proc(contentKey: ByteList, contentId: ContentId): bool =
|
||||||
|
db.contains(contentId)
|
||||||
|
)
|
||||||
|
|
|
@ -146,6 +146,9 @@ proc get(db: BeaconDb, key: openArray[byte]): results.Opt[seq[byte]] =
|
||||||
proc put(db: BeaconDb, key, value: openArray[byte]) =
|
proc put(db: BeaconDb, key, value: openArray[byte]) =
|
||||||
db.kv.put(key, value).expectDb()
|
db.kv.put(key, value).expectDb()
|
||||||
|
|
||||||
|
proc contains(db: BeaconDb, key: openArray[byte]): bool =
|
||||||
|
db.kv.contains(key).expectDb()
|
||||||
|
|
||||||
## Public ContentId based ContentDB calls
|
## Public ContentId based ContentDB calls
|
||||||
proc get*(db: BeaconDb, key: ContentId): results.Opt[seq[byte]] =
|
proc get*(db: BeaconDb, key: ContentId): results.Opt[seq[byte]] =
|
||||||
# TODO: Here it is unfortunate that ContentId is a uint256 instead of Digest256.
|
# TODO: Here it is unfortunate that ContentId is a uint256 instead of Digest256.
|
||||||
|
@ -154,6 +157,9 @@ proc get*(db: BeaconDb, key: ContentId): results.Opt[seq[byte]] =
|
||||||
proc put*(db: BeaconDb, key: ContentId, value: openArray[byte]) =
|
proc put*(db: BeaconDb, key: ContentId, value: openArray[byte]) =
|
||||||
db.put(key.toBytesBE(), value)
|
db.put(key.toBytesBE(), value)
|
||||||
|
|
||||||
|
proc contains*(db: BeaconDb, key: ContentId): bool =
|
||||||
|
db.contains(key.toBytesBE())
|
||||||
|
|
||||||
# TODO Add checks that uint64 can be safely casted to int64
|
# TODO Add checks that uint64 can be safely casted to int64
|
||||||
proc getLightClientUpdates(
|
proc getLightClientUpdates(
|
||||||
db: BeaconDb, start: uint64, to: uint64):
|
db: BeaconDb, start: uint64, to: uint64):
|
||||||
|
@ -340,3 +346,52 @@ proc createStoreHandler*(db: BeaconDb): DbStoreHandler =
|
||||||
lastOptimisticUpdate: content
|
lastOptimisticUpdate: content
|
||||||
))
|
))
|
||||||
)
|
)
|
||||||
|
|
||||||
|
proc createContainsHandler*(db: BeaconDb): DbContainsHandler =
|
||||||
|
return (
|
||||||
|
proc(contentKey: ByteList, contentId: ContentId): bool =
|
||||||
|
let contentKey = contentKey.decode().valueOr:
|
||||||
|
# TODO: as this should not fail, maybe it is better to raiseAssert ?
|
||||||
|
return true
|
||||||
|
|
||||||
|
case contentKey.contentType:
|
||||||
|
of lightClientBootstrap:
|
||||||
|
db.contains(contentId)
|
||||||
|
of lightClientUpdate:
|
||||||
|
let
|
||||||
|
# TODO: add validation that startPeriod is not from the future,
|
||||||
|
# this requires db to be aware off the current beacon time
|
||||||
|
startPeriod = contentKey.lightClientUpdateKey.startPeriod
|
||||||
|
# get max 128 updates
|
||||||
|
numOfUpdates = min(
|
||||||
|
uint64(MAX_REQUEST_LIGHT_CLIENT_UPDATES),
|
||||||
|
contentKey.lightClientUpdateKey.count
|
||||||
|
)
|
||||||
|
toPeriod = startPeriod + numOfUpdates # Not inclusive
|
||||||
|
updates = db.getLightClientUpdates(startPeriod, toPeriod)
|
||||||
|
|
||||||
|
if len(updates) == 0:
|
||||||
|
false
|
||||||
|
else:
|
||||||
|
true
|
||||||
|
of lightClientFinalityUpdate:
|
||||||
|
if db.finalityUpdateCache.isSome():
|
||||||
|
let slot = contentKey.lightClientFinalityUpdateKey.finalizedSlot
|
||||||
|
let cache = db.finalityUpdateCache.get()
|
||||||
|
if cache.lastFinalityUpdateSlot >= slot:
|
||||||
|
true
|
||||||
|
else:
|
||||||
|
false
|
||||||
|
else:
|
||||||
|
false
|
||||||
|
of lightClientOptimisticUpdate:
|
||||||
|
if db.optimisticUpdateCache.isSome():
|
||||||
|
let slot = contentKey.lightClientOptimisticUpdateKey.optimisticSlot
|
||||||
|
let cache = db.optimisticUpdateCache.get()
|
||||||
|
if cache.lastOptimisticUpdateSlot >= slot:
|
||||||
|
true
|
||||||
|
else:
|
||||||
|
false
|
||||||
|
else:
|
||||||
|
false
|
||||||
|
)
|
||||||
|
|
|
@ -173,7 +173,9 @@ proc new*(
|
||||||
portalProtocol = PortalProtocol.new(
|
portalProtocol = PortalProtocol.new(
|
||||||
baseProtocol, lightClientProtocolId,
|
baseProtocol, lightClientProtocolId,
|
||||||
toContentIdHandler,
|
toContentIdHandler,
|
||||||
createGetHandler(beaconDb), stream, bootstrapRecords,
|
createGetHandler(beaconDb),
|
||||||
|
createContainsHandler(beaconDb),
|
||||||
|
stream, bootstrapRecords,
|
||||||
config = portalConfigAdjusted)
|
config = portalConfigAdjusted)
|
||||||
|
|
||||||
portalProtocol.dbPut = createStoreHandler(beaconDb)
|
portalProtocol.dbPut = createStoreHandler(beaconDb)
|
||||||
|
|
|
@ -715,7 +715,10 @@ proc new*(
|
||||||
|
|
||||||
portalProtocol = PortalProtocol.new(
|
portalProtocol = PortalProtocol.new(
|
||||||
baseProtocol, historyProtocolId,
|
baseProtocol, historyProtocolId,
|
||||||
toContentIdHandler, createGetHandler(contentDB), stream, bootstrapRecords,
|
toContentIdHandler,
|
||||||
|
createGetHandler(contentDB),
|
||||||
|
createContainsHandler(contentDB),
|
||||||
|
stream, bootstrapRecords,
|
||||||
config = portalConfig)
|
config = portalConfig)
|
||||||
|
|
||||||
portalProtocol.dbPut = createStoreHandler(contentDB, portalConfig.radiusConfig, portalProtocol)
|
portalProtocol.dbPut = createStoreHandler(contentDB, portalConfig.radiusConfig, portalProtocol)
|
||||||
|
|
|
@ -69,23 +69,26 @@ proc new*(
|
||||||
streamManager: StreamManager,
|
streamManager: StreamManager,
|
||||||
bootstrapRecords: openArray[Record] = [],
|
bootstrapRecords: openArray[Record] = [],
|
||||||
portalConfig: PortalProtocolConfig = defaultPortalProtocolConfig): T =
|
portalConfig: PortalProtocolConfig = defaultPortalProtocolConfig): T =
|
||||||
|
let
|
||||||
|
contentQueue = newAsyncQueue[(Opt[NodeId], ContentKeysList, seq[seq[byte]])](50)
|
||||||
|
|
||||||
let cq = newAsyncQueue[(Opt[NodeId], ContentKeysList, seq[seq[byte]])](50)
|
stream = streamManager.registerNewStream(contentQueue)
|
||||||
|
|
||||||
let s = streamManager.registerNewStream(cq)
|
portalProtocol = PortalProtocol.new(
|
||||||
|
baseProtocol, stateProtocolId,
|
||||||
let portalProtocol = PortalProtocol.new(
|
toContentIdHandler,
|
||||||
baseProtocol, stateProtocolId,
|
createGetHandler(contentDB),
|
||||||
toContentIdHandler, createGetHandler(contentDB), s,
|
createContainsHandler(contentDB),
|
||||||
bootstrapRecords, stateDistanceCalculator,
|
stream,
|
||||||
config = portalConfig)
|
bootstrapRecords, stateDistanceCalculator,
|
||||||
|
config = portalConfig)
|
||||||
|
|
||||||
portalProtocol.dbPut = createStoreHandler(contentDB, portalConfig.radiusConfig, portalProtocol)
|
portalProtocol.dbPut = createStoreHandler(contentDB, portalConfig.radiusConfig, portalProtocol)
|
||||||
|
|
||||||
return StateNetwork(
|
return StateNetwork(
|
||||||
portalProtocol: portalProtocol,
|
portalProtocol: portalProtocol,
|
||||||
contentDB: contentDB,
|
contentDB: contentDB,
|
||||||
contentQueue: cq
|
contentQueue: contentQueue
|
||||||
)
|
)
|
||||||
|
|
||||||
proc processContentLoop(n: StateNetwork) {.async.} =
|
proc processContentLoop(n: StateNetwork) {.async.} =
|
||||||
|
|
|
@ -134,6 +134,9 @@ type
|
||||||
contentId: ContentId,
|
contentId: ContentId,
|
||||||
content: seq[byte]) {.raises: [], gcsafe.}
|
content: seq[byte]) {.raises: [], gcsafe.}
|
||||||
|
|
||||||
|
DbContainsHandler* =
|
||||||
|
proc(contentKey: ByteList, contentId: ContentId): bool {.raises: [], gcsafe.}
|
||||||
|
|
||||||
PortalProtocolId* = array[2, byte]
|
PortalProtocolId* = array[2, byte]
|
||||||
|
|
||||||
RadiusCache* = LRUCache[NodeId, UInt256]
|
RadiusCache* = LRUCache[NodeId, UInt256]
|
||||||
|
@ -160,6 +163,7 @@ type
|
||||||
toContentId*: ToContentIdHandler
|
toContentId*: ToContentIdHandler
|
||||||
dbGet*: DbGetHandler
|
dbGet*: DbGetHandler
|
||||||
dbPut*: DbStoreHandler
|
dbPut*: DbStoreHandler
|
||||||
|
dbContains*: DbContainsHandler
|
||||||
radiusConfig: RadiusConfig
|
radiusConfig: RadiusConfig
|
||||||
dataRadius*: UInt256
|
dataRadius*: UInt256
|
||||||
bootstrapRecords*: seq[Record]
|
bootstrapRecords*: seq[Record]
|
||||||
|
@ -392,7 +396,7 @@ proc handleOffer(p: PortalProtocol, o: OfferMessage, srcId: NodeId): seq[byte] =
|
||||||
if contentIdResult.isOk():
|
if contentIdResult.isOk():
|
||||||
let contentId = contentIdResult.get()
|
let contentId = contentIdResult.get()
|
||||||
if p.inRange(contentId):
|
if p.inRange(contentId):
|
||||||
if p.dbGet(contentKey, contentId).isErr:
|
if not p.dbContains(contentKey, contentId):
|
||||||
contentKeysBitList.setBit(i)
|
contentKeysBitList.setBit(i)
|
||||||
discard contentKeys.add(contentKey)
|
discard contentKeys.add(contentKey)
|
||||||
else:
|
else:
|
||||||
|
@ -484,6 +488,7 @@ proc new*(T: type PortalProtocol,
|
||||||
protocolId: PortalProtocolId,
|
protocolId: PortalProtocolId,
|
||||||
toContentId: ToContentIdHandler,
|
toContentId: ToContentIdHandler,
|
||||||
dbGet: DbGetHandler,
|
dbGet: DbGetHandler,
|
||||||
|
dbContains: DbContainsHandler,
|
||||||
stream: PortalStream,
|
stream: PortalStream,
|
||||||
bootstrapRecords: openArray[Record] = [],
|
bootstrapRecords: openArray[Record] = [],
|
||||||
distanceCalculator: DistanceCalculator = XorDistanceCalculator,
|
distanceCalculator: DistanceCalculator = XorDistanceCalculator,
|
||||||
|
@ -501,6 +506,7 @@ proc new*(T: type PortalProtocol,
|
||||||
baseProtocol: baseProtocol,
|
baseProtocol: baseProtocol,
|
||||||
toContentId: toContentId,
|
toContentId: toContentId,
|
||||||
dbGet: dbGet,
|
dbGet: dbGet,
|
||||||
|
dbContains: dbContains,
|
||||||
radiusConfig: config.radiusConfig,
|
radiusConfig: config.radiusConfig,
|
||||||
dataRadius: initialRadius,
|
dataRadius: initialRadius,
|
||||||
bootstrapRecords: @bootstrapRecords,
|
bootstrapRecords: @bootstrapRecords,
|
||||||
|
|
Loading…
Reference in New Issue