From 39aa06f0ad8954d767cf3c12350ac3a0fb45e0cc Mon Sep 17 00:00:00 2001 From: bhartnett <51288821+bhartnett@users.noreply.github.com> Date: Fri, 1 Nov 2024 14:02:02 +0800 Subject: [PATCH] Add contains handler to portal protocol. --- fluffy/database/content_db.nim | 6 + fluffy/network/beacon/beacon_db.nim | 128 ++++++++++-------- fluffy/network/beacon/beacon_network.nim | 1 + fluffy/network/history/history_network.nim | 1 + fluffy/network/state/state_network.nim | 1 + fluffy/network/wire/portal_protocol.nim | 10 +- fluffy/tests/test_content_db.nim | 18 ++- .../test_portal_wire_protocol.nim | 2 + 8 files changed, 102 insertions(+), 65 deletions(-) diff --git a/fluffy/database/content_db.nim b/fluffy/database/content_db.nim index 32ef725ff..c6f02b5bd 100644 --- a/fluffy/database/content_db.nim +++ b/fluffy/database/content_db.nim @@ -497,6 +497,12 @@ proc createStoreHandler*(db: ContentDB, cfg: RadiusConfig): DbStoreHandler = db.put(contentId, content) ) +proc createContainsHandler*(db: ContentDB): DbContainsHandler = + return ( + proc(contentKey: ContentKeyByteList, contentId: ContentId): bool = + db.contains(contentId) + ) + proc createRadiusHandler*(db: ContentDB): DbRadiusHandler = return ( proc(): UInt256 {.raises: [], gcsafe.} = diff --git a/fluffy/network/beacon/beacon_db.nim b/fluffy/network/beacon/beacon_db.nim index 6481ed605..c70c7a1a5 100644 --- a/fluffy/network/beacon/beacon_db.nim +++ b/fluffy/network/beacon/beacon_db.nim @@ -442,68 +442,72 @@ func keepBootstrapsFrom*(db: BeaconDb, minSlot: Slot) = let res = db.bootstraps.keepFromStmt.exec(minSlot.int64) res.expect("SQL query OK") +proc getHandlerImpl( + db: BeaconDb, contentKey: ContentKeyByteList, contentId: ContentId +): results.Opt[seq[byte]] = + let contentKey = contentKey.decode().valueOr: + # TODO: as this should not fail, maybe it is better to raiseAssert ? + return Opt.none(seq[byte]) + + case contentKey.contentType + of unused: + raiseAssert "Should not be used and fail at decoding" + of lightClientBootstrap: + db.getBootstrap(contentId) + of lightClientUpdate: + let + # TODO: add validation that startPeriod is not from the future, + # this requires db to be aware off the current beacon time + startPeriod = contentKey.lightClientUpdateKey.startPeriod + # get max 128 updates + numOfUpdates = min( + uint64(MAX_REQUEST_LIGHT_CLIENT_UPDATES), contentKey.lightClientUpdateKey.count + ) + toPeriod = startPeriod + numOfUpdates # Not inclusive + updates = db.getLightClientUpdates(startPeriod, toPeriod) + + if len(updates) == 0: + Opt.none(seq[byte]) + else: + # Note that this might not return all of the requested updates. + # This might seem faulty/tricky as it is also used in handleOffer to + # check if an offer should be accepted. + # But it is actually fine as this will occur only when the node is + # synced and it would not be able to verify the older updates in the + # range anyhow. + Opt.some(SSZ.encode(updates)) + of lightClientFinalityUpdate: + # TODO: + # Return only when the update is better than what is requested by + # contentKey. This is currently not possible as the contentKey does not + # include best update information. + if db.finalityUpdateCache.isSome(): + let slot = contentKey.lightClientFinalityUpdateKey.finalizedSlot + let cache = db.finalityUpdateCache.get() + if cache.lastFinalityUpdateSlot >= slot: + Opt.some(cache.lastFinalityUpdate) + else: + Opt.none(seq[byte]) + else: + Opt.none(seq[byte]) + of lightClientOptimisticUpdate: + # TODO same as above applies here too. + if db.optimisticUpdateCache.isSome(): + let slot = contentKey.lightClientOptimisticUpdateKey.optimisticSlot + let cache = db.optimisticUpdateCache.get() + if cache.lastOptimisticUpdateSlot >= slot: + Opt.some(cache.lastOptimisticUpdate) + else: + Opt.none(seq[byte]) + else: + Opt.none(seq[byte]) + of beacon_content.ContentType.historicalSummaries: + db.get(contentId) + proc createGetHandler*(db: BeaconDb): DbGetHandler = return ( proc(contentKey: ContentKeyByteList, contentId: ContentId): results.Opt[seq[byte]] = - let contentKey = contentKey.decode().valueOr: - # TODO: as this should not fail, maybe it is better to raiseAssert ? - return Opt.none(seq[byte]) - - case contentKey.contentType - of unused: - raiseAssert "Should not be used and fail at decoding" - of lightClientBootstrap: - db.getBootstrap(contentId) - of lightClientUpdate: - let - # TODO: add validation that startPeriod is not from the future, - # this requires db to be aware off the current beacon time - startPeriod = contentKey.lightClientUpdateKey.startPeriod - # get max 128 updates - numOfUpdates = min( - uint64(MAX_REQUEST_LIGHT_CLIENT_UPDATES), - contentKey.lightClientUpdateKey.count, - ) - toPeriod = startPeriod + numOfUpdates # Not inclusive - updates = db.getLightClientUpdates(startPeriod, toPeriod) - - if len(updates) == 0: - Opt.none(seq[byte]) - else: - # Note that this might not return all of the requested updates. - # This might seem faulty/tricky as it is also used in handleOffer to - # check if an offer should be accepted. - # But it is actually fine as this will occur only when the node is - # synced and it would not be able to verify the older updates in the - # range anyhow. - Opt.some(SSZ.encode(updates)) - of lightClientFinalityUpdate: - # TODO: - # Return only when the update is better than what is requested by - # contentKey. This is currently not possible as the contentKey does not - # include best update information. - if db.finalityUpdateCache.isSome(): - let slot = contentKey.lightClientFinalityUpdateKey.finalizedSlot - let cache = db.finalityUpdateCache.get() - if cache.lastFinalityUpdateSlot >= slot: - Opt.some(cache.lastFinalityUpdate) - else: - Opt.none(seq[byte]) - else: - Opt.none(seq[byte]) - of lightClientOptimisticUpdate: - # TODO same as above applies here too. - if db.optimisticUpdateCache.isSome(): - let slot = contentKey.lightClientOptimisticUpdateKey.optimisticSlot - let cache = db.optimisticUpdateCache.get() - if cache.lastOptimisticUpdateSlot >= slot: - Opt.some(cache.lastOptimisticUpdate) - else: - Opt.none(seq[byte]) - else: - Opt.none(seq[byte]) - of beacon_content.ContentType.historicalSummaries: - db.get(contentId) + db.getHandlerImpl(contentKey, contentId) ) proc createStoreHandler*(db: BeaconDb): DbStoreHandler = @@ -573,6 +577,12 @@ proc createStoreHandler*(db: BeaconDb): DbStoreHandler = db.put(contentId, content) ) +proc createContainsHandler*(db: BeaconDb): DbContainsHandler = + return ( + proc(contentKey: ContentKeyByteList, contentId: ContentId): bool = + db.getHandlerImpl(contentKey, contentId).isSome() + ) + proc createRadiusHandler*(db: BeaconDb): DbRadiusHandler = return ( proc(): UInt256 {.raises: [], gcsafe.} = diff --git a/fluffy/network/beacon/beacon_network.nim b/fluffy/network/beacon/beacon_network.nim index 3806fd4f9..7ad438648 100644 --- a/fluffy/network/beacon/beacon_network.nim +++ b/fluffy/network/beacon/beacon_network.nim @@ -208,6 +208,7 @@ proc new*( toContentIdHandler, createGetHandler(beaconDb), createStoreHandler(beaconDb), + createContainsHandler(beaconDb), createRadiusHandler(beaconDb), stream, bootstrapRecords, diff --git a/fluffy/network/history/history_network.nim b/fluffy/network/history/history_network.nim index 0a9d59114..e01147f73 100644 --- a/fluffy/network/history/history_network.nim +++ b/fluffy/network/history/history_network.nim @@ -677,6 +677,7 @@ proc new*( toContentIdHandler, createGetHandler(contentDB), createStoreHandler(contentDB, portalConfig.radiusConfig), + createContainsHandler(contentDB), createRadiusHandler(contentDB), stream, bootstrapRecords, diff --git a/fluffy/network/state/state_network.nim b/fluffy/network/state/state_network.nim index 5ed2fac9d..37afa9f48 100644 --- a/fluffy/network/state/state_network.nim +++ b/fluffy/network/state/state_network.nim @@ -64,6 +64,7 @@ proc new*( toContentIdHandler, createGetHandler(contentDB), createStoreHandler(contentDB, portalConfig.radiusConfig), + createContainsHandler(contentDB), createRadiusHandler(contentDB), s, bootstrapRecords, diff --git a/fluffy/network/wire/portal_protocol.nim b/fluffy/network/wire/portal_protocol.nim index d50839632..5cfb4acb6 100644 --- a/fluffy/network/wire/portal_protocol.nim +++ b/fluffy/network/wire/portal_protocol.nim @@ -151,6 +151,10 @@ type contentKey: ContentKeyByteList, contentId: ContentId, content: seq[byte] ) {.raises: [], gcsafe.} + DbContainsHandler* = proc(contentKey: ContentKeyByteList, contentId: ContentId): bool {. + raises: [], gcsafe + .} + DbRadiusHandler* = proc(): UInt256 {.raises: [], gcsafe.} PortalProtocolId* = array[2, byte] @@ -183,6 +187,7 @@ type contentCache: ContentCache dbGet*: DbGetHandler dbPut*: DbStoreHandler + dbContains*: DbContainsHandler dataRadius*: DbRadiusHandler bootstrapRecords*: seq[Record] lastLookup: chronos.Moment @@ -474,7 +479,7 @@ proc handleOffer(p: PortalProtocol, o: OfferMessage, srcId: NodeId): seq[byte] = ) if p.inRange(contentId): - if p.dbGet(contentKey, contentId).isErr: + if not p.dbContains(contentKey, contentId): contentKeysBitList.setBit(i) discard contentKeys.add(contentKey) else: @@ -561,6 +566,7 @@ proc new*( toContentId: ToContentIdHandler, dbGet: DbGetHandler, dbPut: DbStoreHandler, + dbContains: DbContainsHandler, dbRadius: DbRadiusHandler, stream: PortalStream, bootstrapRecords: openArray[Record] = [], @@ -580,6 +586,7 @@ proc new*( ContentCache.init(if config.disableContentCache: 0 else: config.contentCacheSize), dbGet: dbGet, dbPut: dbPut, + dbContains: dbContains, dataRadius: dbRadius, bootstrapRecords: @bootstrapRecords, stream: stream, @@ -1627,7 +1634,6 @@ proc getLocalContent*( # Check first if content is in range, as this is a cheaper operation # than the database lookup. if p.inRange(contentId): - doAssert(p.dbGet != nil) p.dbGet(contentKey, contentId) else: Opt.none(seq[byte]) diff --git a/fluffy/tests/test_content_db.nim b/fluffy/tests/test_content_db.nim index cfee320cb..fac0ac007 100644 --- a/fluffy/tests/test_content_db.nim +++ b/fluffy/tests/test_content_db.nim @@ -24,29 +24,39 @@ suite "Content Database": "", uint32.high, RadiusConfig(kind: Dynamic), testId, inMemory = true ) key = ContentId(UInt256.high()) # Some key - dbGet = db.createGetHandler() block: - let val = dbGet(ContentKeyByteList.init(@[]), key) + var val = Opt.none(seq[byte]) + proc onData(data: openArray[byte]) = + val = Opt.some(@data) check: + db.get(key, onData) == false val.isNone() db.contains(key) == false block: discard db.putAndPrune(key, [byte 0, 1, 2, 3]) - let val = dbGet(ContentKeyByteList.init(@[]), key) + + var val = Opt.none(seq[byte]) + proc onData(data: openArray[byte]) = + val = Opt.some(@data) check: + db.get(key, onData) == true val.isSome() val.get() == [byte 0, 1, 2, 3] db.contains(key) == true block: db.del(key) - let val = dbGet(ContentKeyByteList.init(@[]), key) + + var val = Opt.none(seq[byte]) + proc onData(data: openArray[byte]) = + val = Opt.some(@data) check: + db.get(key, onData) == false val.isNone() db.contains(key) == false diff --git a/fluffy/tests/wire_protocol_tests/test_portal_wire_protocol.nim b/fluffy/tests/wire_protocol_tests/test_portal_wire_protocol.nim index 42c20725f..a18aeecf9 100644 --- a/fluffy/tests/wire_protocol_tests/test_portal_wire_protocol.nim +++ b/fluffy/tests/wire_protocol_tests/test_portal_wire_protocol.nim @@ -50,6 +50,7 @@ proc initPortalProtocol( toContentId, createGetHandler(db), createStoreHandler(db, defaultRadiusConfig), + createContainsHandler(db), createRadiusHandler(db), stream, bootstrapRecords = bootstrapRecords, @@ -346,6 +347,7 @@ procSuite "Portal Wire Protocol Tests": toContentId, createGetHandler(db), createStoreHandler(db, defaultRadiusConfig), + createContainsHandler(db), createRadiusHandler(db), stream, )