diff --git a/fluffy/content_db.nim b/fluffy/content_db.nim index b9ad787e2..d85e3e3b2 100644 --- a/fluffy/content_db.nim +++ b/fluffy/content_db.nim @@ -404,3 +404,9 @@ proc createStoreHandler*( # so we will effectivly store fraction of the network db.put(contentId, content) ) + +proc createContainsHandler*(db: ContentDB): DbContainsHandler = + return ( + proc(contentKey: ByteList, contentId: ContentId): bool = + db.contains(contentId) + ) diff --git a/fluffy/network/beacon/beacon_db.nim b/fluffy/network/beacon/beacon_db.nim index a3edc50b4..fb8e0d054 100644 --- a/fluffy/network/beacon/beacon_db.nim +++ b/fluffy/network/beacon/beacon_db.nim @@ -146,6 +146,9 @@ proc get(db: BeaconDb, key: openArray[byte]): results.Opt[seq[byte]] = proc put(db: BeaconDb, key, value: openArray[byte]) = db.kv.put(key, value).expectDb() +proc contains(db: BeaconDb, key: openArray[byte]): bool = + db.kv.contains(key).expectDb() + ## Public ContentId based ContentDB calls proc get*(db: BeaconDb, key: ContentId): results.Opt[seq[byte]] = # TODO: Here it is unfortunate that ContentId is a uint256 instead of Digest256. @@ -154,6 +157,9 @@ proc get*(db: BeaconDb, key: ContentId): results.Opt[seq[byte]] = proc put*(db: BeaconDb, key: ContentId, value: openArray[byte]) = db.put(key.toBytesBE(), value) +proc contains*(db: BeaconDb, key: ContentId): bool = + db.contains(key.toBytesBE()) + # TODO Add checks that uint64 can be safely casted to int64 proc getLightClientUpdates( db: BeaconDb, start: uint64, to: uint64): @@ -340,3 +346,52 @@ proc createStoreHandler*(db: BeaconDb): DbStoreHandler = lastOptimisticUpdate: content )) ) + +proc createContainsHandler*(db: BeaconDb): DbContainsHandler = + return ( + proc(contentKey: ByteList, contentId: ContentId): bool = + let contentKey = contentKey.decode().valueOr: + # TODO: as this should not fail, maybe it is better to raiseAssert ? + return true + + case contentKey.contentType: + of lightClientBootstrap: + db.contains(contentId) + of lightClientUpdate: + let + # TODO: add validation that startPeriod is not from the future, + # this requires db to be aware off the current beacon time + startPeriod = contentKey.lightClientUpdateKey.startPeriod + # get max 128 updates + numOfUpdates = min( + uint64(MAX_REQUEST_LIGHT_CLIENT_UPDATES), + contentKey.lightClientUpdateKey.count + ) + toPeriod = startPeriod + numOfUpdates # Not inclusive + updates = db.getLightClientUpdates(startPeriod, toPeriod) + + if len(updates) == 0: + false + else: + true + of lightClientFinalityUpdate: + if db.finalityUpdateCache.isSome(): + let slot = contentKey.lightClientFinalityUpdateKey.finalizedSlot + let cache = db.finalityUpdateCache.get() + if cache.lastFinalityUpdateSlot >= slot: + true + else: + false + else: + false + of lightClientOptimisticUpdate: + if db.optimisticUpdateCache.isSome(): + let slot = contentKey.lightClientOptimisticUpdateKey.optimisticSlot + let cache = db.optimisticUpdateCache.get() + if cache.lastOptimisticUpdateSlot >= slot: + true + else: + false + else: + false + ) diff --git a/fluffy/network/beacon/beacon_network.nim b/fluffy/network/beacon/beacon_network.nim index 3d318d17f..b373867f2 100644 --- a/fluffy/network/beacon/beacon_network.nim +++ b/fluffy/network/beacon/beacon_network.nim @@ -173,7 +173,9 @@ proc new*( portalProtocol = PortalProtocol.new( baseProtocol, lightClientProtocolId, toContentIdHandler, - createGetHandler(beaconDb), stream, bootstrapRecords, + createGetHandler(beaconDb), + createContainsHandler(beaconDb), + stream, bootstrapRecords, config = portalConfigAdjusted) portalProtocol.dbPut = createStoreHandler(beaconDb) diff --git a/fluffy/network/history/history_network.nim b/fluffy/network/history/history_network.nim index f909a19da..1f5befb88 100644 --- a/fluffy/network/history/history_network.nim +++ b/fluffy/network/history/history_network.nim @@ -715,7 +715,10 @@ proc new*( portalProtocol = PortalProtocol.new( baseProtocol, historyProtocolId, - toContentIdHandler, createGetHandler(contentDB), stream, bootstrapRecords, + toContentIdHandler, + createGetHandler(contentDB), + createContainsHandler(contentDB), + stream, bootstrapRecords, config = portalConfig) portalProtocol.dbPut = createStoreHandler(contentDB, portalConfig.radiusConfig, portalProtocol) diff --git a/fluffy/network/state/state_network.nim b/fluffy/network/state/state_network.nim index ff96b211b..7f455ccba 100644 --- a/fluffy/network/state/state_network.nim +++ b/fluffy/network/state/state_network.nim @@ -69,23 +69,26 @@ proc new*( streamManager: StreamManager, bootstrapRecords: openArray[Record] = [], portalConfig: PortalProtocolConfig = defaultPortalProtocolConfig): T = + let + contentQueue = newAsyncQueue[(Opt[NodeId], ContentKeysList, seq[seq[byte]])](50) - let cq = newAsyncQueue[(Opt[NodeId], ContentKeysList, seq[seq[byte]])](50) + stream = streamManager.registerNewStream(contentQueue) - let s = streamManager.registerNewStream(cq) - - let portalProtocol = PortalProtocol.new( - baseProtocol, stateProtocolId, - toContentIdHandler, createGetHandler(contentDB), s, - bootstrapRecords, stateDistanceCalculator, - config = portalConfig) + portalProtocol = PortalProtocol.new( + baseProtocol, stateProtocolId, + toContentIdHandler, + createGetHandler(contentDB), + createContainsHandler(contentDB), + stream, + bootstrapRecords, stateDistanceCalculator, + config = portalConfig) portalProtocol.dbPut = createStoreHandler(contentDB, portalConfig.radiusConfig, portalProtocol) return StateNetwork( portalProtocol: portalProtocol, contentDB: contentDB, - contentQueue: cq + contentQueue: contentQueue ) proc processContentLoop(n: StateNetwork) {.async.} = diff --git a/fluffy/network/wire/portal_protocol.nim b/fluffy/network/wire/portal_protocol.nim index 310208530..55349bfe6 100644 --- a/fluffy/network/wire/portal_protocol.nim +++ b/fluffy/network/wire/portal_protocol.nim @@ -134,6 +134,9 @@ type contentId: ContentId, content: seq[byte]) {.raises: [], gcsafe.} + DbContainsHandler* = + proc(contentKey: ByteList, contentId: ContentId): bool {.raises: [], gcsafe.} + PortalProtocolId* = array[2, byte] RadiusCache* = LRUCache[NodeId, UInt256] @@ -160,6 +163,7 @@ type toContentId*: ToContentIdHandler dbGet*: DbGetHandler dbPut*: DbStoreHandler + dbContains*: DbContainsHandler radiusConfig: RadiusConfig dataRadius*: UInt256 bootstrapRecords*: seq[Record] @@ -392,7 +396,7 @@ proc handleOffer(p: PortalProtocol, o: OfferMessage, srcId: NodeId): seq[byte] = if contentIdResult.isOk(): let contentId = contentIdResult.get() if p.inRange(contentId): - if p.dbGet(contentKey, contentId).isErr: + if not p.dbContains(contentKey, contentId): contentKeysBitList.setBit(i) discard contentKeys.add(contentKey) else: @@ -484,6 +488,7 @@ proc new*(T: type PortalProtocol, protocolId: PortalProtocolId, toContentId: ToContentIdHandler, dbGet: DbGetHandler, + dbContains: DbContainsHandler, stream: PortalStream, bootstrapRecords: openArray[Record] = [], distanceCalculator: DistanceCalculator = XorDistanceCalculator, @@ -501,6 +506,7 @@ proc new*(T: type PortalProtocol, baseProtocol: baseProtocol, toContentId: toContentId, dbGet: dbGet, + dbContains: dbContains, radiusConfig: config.radiusConfig, dataRadius: initialRadius, bootstrapRecords: @bootstrapRecords,