From 9430619272c03f753350f90ed089355d320b6213 Mon Sep 17 00:00:00 2001 From: Kim De Mey Date: Mon, 17 Oct 2022 20:38:51 +0200 Subject: [PATCH] Remove Accumulator as network content type and add it to binary (#1267) Portal master accumulator was removed from the network specs as a content type shared on the network, as since the merge this is a finite accumulator (pre-merge only). So in this PR the accumulator gets removed as network type and gets instead baked into the library. Building it is done by seperate tooling (eth_data_exporter). Because of this a lot of extra code can be removed that was located in history_network, content_db, portal_protocol, etc. Also removed to option to build the accumulator at start-up of fluffy as this takes several minutes making it not viable. It can still be loaded from a provided file however. The ssz accumulator file is for now stored in the recently created portal-spec-tests repository. --- .gitmodules | 5 + fluffy/conf.nim | 12 +- fluffy/content_db.nim | 34 ---- fluffy/data/history_data_parser.nim | 32 ++-- fluffy/data/history_data_seeding.nim | 4 +- fluffy/fluffy.nim | 30 ++-- fluffy/network/history/accumulator.nim | 44 ++--- fluffy/network/history/history_content.nim | 21 --- fluffy/network/history/history_network.nim | 147 +++------------- fluffy/network/state/state_network.nim | 10 +- fluffy/network/wire/portal_protocol.nim | 23 ++- fluffy/network_metadata.nim | 20 ++- fluffy/scripts/launch_local_testnet.sh | 9 - .../mainnet/test_history_content.nim | 67 -------- fluffy/tests/test_accumulator.nim | 28 +++- fluffy/tests/test_history_network.nim | 102 ++---------- fluffy/tests/test_portal_wire_protocol.nim | 9 +- fluffy/tools/eth_data_exporter.nim | 157 +++++++++--------- fluffy/tools/portalcli.nim | 8 +- vendor/portal-spec-tests | 1 + 20 files changed, 239 insertions(+), 524 deletions(-) create mode 160000 vendor/portal-spec-tests diff --git a/.gitmodules b/.gitmodules index e95b2098b..2ae26205c 100644 --- a/.gitmodules +++ b/.gitmodules @@ -212,3 +212,8 @@ url = https://github.com/status-im/nimbus-security-resources.git ignore = untracked branch = master +[submodule "vendor/portal-spec-tests"] + path = vendor/portal-spec-tests + url = https://github.com/status-im/portal-spec-tests.git + ignore = untracked + branch = master diff --git a/fluffy/conf.nim b/fluffy/conf.nim index 6a7ebed11..d0fc2391a 100644 --- a/fluffy/conf.nim +++ b/fluffy/conf.nim @@ -117,18 +117,10 @@ type defaultValueDesc: "none" name: "netkey-unsafe" .}: Option[PrivateKey] - accumulatorDataFile* {. - desc: - "Build the master accumulator snapshot from a data file containing " & - "blocks instead of getting it from peers on the network." - defaultValue: none(InputFile) - defaultValueDesc: "none" - name: "accumulator-data-file" .}: Option[InputFile] - accumulatorFile* {. desc: - "Get the master accumulator snapshot from a file containing a " & - "pre-build master accumulator." + "Get the master accumulator snapshot from a file containing an " & + "pre-build SSZ encoded master accumulator." defaultValue: none(InputFile) defaultValueDesc: "none" name: "accumulator-file" .}: Option[InputFile] diff --git a/fluffy/content_db.nim b/fluffy/content_db.nim index 9b203ad90..f8993688c 100644 --- a/fluffy/content_db.nim +++ b/fluffy/content_db.nim @@ -42,7 +42,6 @@ type ContentDB* = ref object kv: KvStoreRef - kvPermanent: KvStoreRef maxSize: uint32 sizeStmt: SqliteStmt[NoParams, int64] unusedSizeStmt: SqliteStmt[NoParams, int64] @@ -62,14 +61,6 @@ type fractionOfDeletedContent*: float64 numOfDeletedElements*: int64 - DbKey* = enum - kLatestAccumulator - -# Note: Might eventually evolve in DbKey Prefix + actual key, but for now this -# is enough -func subkey*(kind: DbKey): array[1, byte] = - [byte ord(kind)] - func xorDistance( a: openArray[byte], b: openArray[byte] @@ -128,21 +119,8 @@ proc new*( array[32, byte], RowInfo ).get() - # Using a whole new db for the "permanent" (meaning: non pruned) data, as else - # it might intervene with the pruning mechanism of the regular db. Might put - # them together in the future though. - let dbPerm = - if inMemory: - SqStoreRef.init("", "fluffy-test-perm", inMemory = true).expect( - "working database (out of memory?)") - else: - SqStoreRef.init(path, "fluffy-perm").expectDb() - - let kvPermanentStore = kvStore dbPerm.openKvStore("kv_permanent").expectDb() - ContentDB( kv: kvStore, - kvPermanent: kvPermanentStore, maxSize: maxSize, sizeStmt: getSizeStmt, vacStmt: vacStmt, @@ -189,18 +167,6 @@ proc getSszDecoded*( db: ContentDB, key: openArray[byte], T: type auto): Option[T] = db.kv.getSszDecoded(key, T) -## Public permanent kvstore calls - -proc getPermanent*(db: ContentDB, key: openArray[byte]): Option[seq[byte]] = - db.kvPermanent.get(key) - -proc putPermanent*(db: ContentDB, key, value: openArray[byte]) = - db.kvPermanent.put(key, value).expectDb() - -proc getPermanentSszDecoded*( - db: ContentDB, key: openArray[byte], T: type auto): Option[T] = - db.kvPermanent.getSszDecoded(key, T) - proc reclaimSpace*(db: ContentDB): void = ## Runs sqlite VACUUM commands which rebuilds the db, repacking it into a ## minimal amount of disk space. diff --git a/fluffy/data/history_data_parser.nim b/fluffy/data/history_data_parser.nim index 45d6d6b01..bbfe73eb8 100644 --- a/fluffy/data/history_data_parser.nim +++ b/fluffy/data/history_data_parser.nim @@ -33,14 +33,6 @@ type # Fix in nim-json-serialization or should I overload something here? number*: int - AccumulatorData = object - accumulatorHash: string - maxBlockNumber: int - accumulator: string - - AccumulatorObject = object - accumulator: AccumulatorData - EpochAccumulatorObject = object epochAccumulator: string @@ -60,7 +52,7 @@ proc readJsonType*(dataFile: string, T: type): Result[T, string] = ok(decoded) iterator blockHashes*(blockData: BlockDataTable): BlockHash = - for k,v in blockData: + for k, v in blockData: var blockHash: BlockHash try: blockHash.data = hexToByteArray[sizeof(BlockHash)](k) @@ -119,7 +111,7 @@ func readBlockData*( iterator blocks*( blockData: BlockDataTable, verify = false): seq[(ContentKey, seq[byte])] = - for k,v in blockData: + for k, v in blockData: let res = readBlockData(k, v, verify) if res.isOk(): @@ -180,7 +172,7 @@ func readHeaderData*( iterator headers*( blockData: BlockDataTable, verify = false): (ContentKey, seq[byte]) = - for k,v in blockData: + for k, v in blockData: let res = readHeaderData(k, v, verify) if res.isOk(): @@ -200,19 +192,19 @@ proc getGenesisHeader*(id: NetworkId = MainNet): BlockHeader = except RlpError: raise (ref Defect)(msg: "Genesis should be valid") -proc readAccumulator*(dataFile: string): Result[Accumulator, string] = - let res = ? readJsonType(dataFile, AccumulatorObject) - let encodedAccumulator = - try: - res.accumulator.accumulator.hexToSeqByte() - except ValueError as e: - return err("Invalid hex data for accumulator: " & e.msg) +proc toString*(v: IoErrorCode): string = + try: ioErrorMsg(v) + except Exception as e: raiseAssert e.msg + +proc readAccumulator*(file: string): Result[FinishedAccumulator, string] = + let encodedAccumulator = ? readAllFile(file).mapErr(toString) try: - ok(SSZ.decode(encodedAccumulator, Accumulator)) + ok(SSZ.decode(encodedAccumulator, FinishedAccumulator)) except SszError as e: - err("Decoding accumulator failed: " & e.msg) + err("Failed decoding accumulator: " & e.msg) + proc readEpochAccumulator*(dataFile: string): Result[EpochAccumulator, string] = let res = ? readJsonType(dataFile, EpochAccumulatorObject) diff --git a/fluffy/data/history_data_seeding.nim b/fluffy/data/history_data_seeding.nim index 87e876ca6..92d8e7217 100644 --- a/fluffy/data/history_data_seeding.nim +++ b/fluffy/data/history_data_seeding.nim @@ -18,7 +18,7 @@ export results ### Helper calls to seed the local database and/or the network -proc buildAccumulator*(dataFile: string): Result[Accumulator, string] = +proc buildAccumulator*(dataFile: string): Result[FinishedAccumulator, string] = let blockData = ? readJsonType(dataFile, BlockDataTable) var headers: seq[BlockHeader] @@ -31,7 +31,7 @@ proc buildAccumulator*(dataFile: string): Result[Accumulator, string] = let header = ? v.readBlockHeader() headers[header.blockNumber.truncate(int)] = header - ok(buildAccumulator(headers)) + buildAccumulator(headers) proc buildAccumulatorData*( dataFile: string): diff --git a/fluffy/fluffy.nim b/fluffy/fluffy.nim index a10b3612a..56c5034d7 100644 --- a/fluffy/fluffy.nim +++ b/fluffy/fluffy.nim @@ -108,7 +108,23 @@ proc run(config: PortalConf) {.raises: [CatchableError, Defect].} = stateNetwork = StateNetwork.new(d, db, streamManager, bootstrapRecords = bootstrapRecords, portalConfig = portalConfig) - historyNetwork = HistoryNetwork.new(d, db, streamManager, + accumulator = + # Building an accumulator from header epoch files takes > 2m30s and is + # thus not really a viable option at start-up. + # Options are: + # - Start with baked-in accumulator + # - Start with file containing SSZ encoded accumulator + if config.accumulatorFile.isSome(): + readAccumulator(string config.accumulatorFile.get()).expect( + "Need a file with a valid SSZ encoded accumulator") + else: + # Get it from binary file containing SSZ encoded accumulator + try: + SSZ.decode(finishedAccumulator, FinishedAccumulator) + except SszError as err: + raiseAssert "Invalid baked-in accumulator: " & err.msg + + historyNetwork = HistoryNetwork.new(d, db, streamManager, accumulator, bootstrapRecords = bootstrapRecords, portalConfig = portalConfig) # TODO: If no new network key is generated then we should first check if an @@ -148,18 +164,6 @@ proc run(config: PortalConf) {.raises: [CatchableError, Defect].} = d.start() historyNetwork.start() - - let accumulator = - if config.accumulatorDataFile.isSome(): - some(buildAccumulator(string config.accumulatorDataFile.get()).expect( - "Need a valid data file to build the master accumulator locally")) - elif config.accumulatorFile.isSome(): - some(readAccumulator(string config.accumulatorFile.get()).expect( - "Need a valid accumulator file to store the master accumulator locally")) - else: - none(Accumulator) - - waitFor historyNetwork.initMasterAccumulator(accumulator) stateNetwork.start() runForever() diff --git a/fluffy/network/history/accumulator.nim b/fluffy/network/history/accumulator.nim index 2841f6c5e..14b33e45c 100644 --- a/fluffy/network/history/accumulator.nim +++ b/fluffy/network/history/accumulator.nim @@ -8,7 +8,6 @@ {.push raises: [Defect].} import - std/hashes, eth/common/eth_types_rlp, ssz_serialization, ssz_serialization/[proofs, merkleization], ../../common/common_types, @@ -22,14 +21,25 @@ export ssz_serialization, merkleization, proofs, eth_types_rlp const epochSize* = 8192 # blocks - # Allow this to be adjusted at compile time. If more constants need to be - # adjusted we can add some presets file. + # Allow this to be adjusted at compile time fir testing. If more constants + # need to be adjusted we can add some presets file. mergeBlockNumber* {.intdefine.}: uint64 = 15537394 # Note: This is like a ceil(mergeBlockNumber / epochSize) # Could use ceilDiv(mergeBlockNumber, epochSize) in future versions preMergeEpochs* = (mergeBlockNumber + epochSize - 1) div epochSize + # TODO: + # Currently disabled, because of testing issues, but could be used as value to + # double check on at merge block. + # TODO: Could also be used as value to actual finish the accumulator, instead + # of `mergeBlockNumber`, but: + # - Still need to store the actual `mergeBlockNumber` and run-time somewhere + # as it allows for each pre vs post merge block header checking. + # - Can't limit `historicalEpochs` SSZ list at `preMergeEpochs` value. + # - Should probably be stated in the portal network specs. + # TERMINAL_TOTAL_DIFFICULTY = u256"58750000000000000000000" + type HeaderRecord* = object blockHash*: BlockHash @@ -54,8 +64,6 @@ func init*(T: type Accumulator): T = currentEpoch: EpochAccumulator.init(@[]) ) -# TODO: -# Could probably also make this work with TTD instead of merge block number. func updateAccumulator*( a: var Accumulator, header: BlockHeader) = doAssert(header.blockNumber.truncate(uint64) < mergeBlockNumber, @@ -85,27 +93,25 @@ func updateAccumulator*( let res = a.currentEpoch.add(headerRecord) doAssert(res, "Can't fail because of currentEpoch length check") -func isFinished*(a: Accumulator): bool = - a.historicalEpochs.len() == (int)(preMergeEpochs) - -func finishAccumulator*(a: var Accumulator) = +func finishAccumulator*(a: var Accumulator): FinishedAccumulator = + # doAssert(a.currentEpoch[^2].totalDifficulty < TERMINAL_TOTAL_DIFFICULTY) + # doAssert(a.currentEpoch[^1].totalDifficulty >= TERMINAL_TOTAL_DIFFICULTY) let epochHash = hash_tree_root(a.currentEpoch) doAssert(a.historicalEpochs.add(epochHash.data)) -func hash*(a: Accumulator): hashes.Hash = - # TODO: This is used for the CountTable but it will be expensive. - hash(hash_tree_root(a).data) + FinishedAccumulator(historicalEpochs: a.historicalEpochs) -func buildAccumulator*(headers: seq[BlockHeader]): Accumulator = +func buildAccumulator*( + headers: seq[BlockHeader]): Result[FinishedAccumulator, string] = var accumulator: Accumulator for header in headers: updateAccumulator(accumulator, header) if header.blockNumber.truncate(uint64) == mergeBlockNumber - 1: - finishAccumulator(accumulator) + return ok(finishAccumulator(accumulator)) - accumulator + err("Not enough headers provided to finish the accumulator") func buildAccumulatorData*(headers: seq[BlockHeader]): seq[(ContentKey, EpochAccumulator)] = @@ -137,7 +143,7 @@ func buildAccumulatorData*(headers: seq[BlockHeader]): epochAccumulators.add((key, accumulator.currentEpoch)) - finishAccumulator(accumulator) + discard finishAccumulator(accumulator) epochAccumulators @@ -167,7 +173,7 @@ func isPreMerge*(header: BlockHeader): bool = isPreMerge(header.blockNumber.truncate(uint64)) func verifyProof*( - a: Accumulator, header: BlockHeader, proof: openArray[Digest]): bool = + a: FinishedAccumulator, header: BlockHeader, proof: openArray[Digest]): bool = let epochIndex = getEpochIndex(header) epochAccumulatorHash = Digest(data: a.historicalEpochs[epochIndex]) @@ -181,7 +187,7 @@ func verifyProof*( verify_merkle_multiproof(@[leave], proof, @[gIndex], epochAccumulatorHash) func verifyHeader*( - a: Accumulator, header: BlockHeader, proof: openArray[Digest]): + a: FinishedAccumulator, header: BlockHeader, proof: openArray[Digest]): Result[void, string] = if header.isPreMerge(): if a.verifyProof(header, proof): @@ -192,7 +198,7 @@ func verifyHeader*( err("Cannot verify post merge header with accumulator proof") func getBlockEpochDataForBlockNumber*( - a: Accumulator, bn: UInt256): Result[BlockEpochData, string] = + a: FinishedAccumulator, bn: UInt256): Result[BlockEpochData, string] = let blockNumber = bn.truncate(uint64) if blockNumber.isPreMerge: diff --git a/fluffy/network/history/history_content.nim b/fluffy/network/history/history_content.nim index 9f2310f90..a4228e47a 100644 --- a/fluffy/network/history/history_content.nim +++ b/fluffy/network/history/history_content.nim @@ -32,7 +32,6 @@ type blockBody = 0x01 receipts = 0x02 epochAccumulator = 0x03 - masterAccumulator = 0x04 BlockKey* = object blockHash*: BlockHash @@ -40,17 +39,6 @@ type EpochAccumulatorKey* = object epochHash*: Digest # TODO: Perhaps this should be called epochRoot in the spec instead - MasterAccumulatorKeyType* = enum - latest = 0x00 # An SSZ Union None - masterHash = 0x01 - - MasterAccumulatorKey* = object - case accumulaterKeyType*: MasterAccumulatorKeyType - of latest: - discard - of masterHash: - masterHashKey*: Digest - ContentKey* = object case contentType*: ContentType of blockHeader: @@ -61,8 +49,6 @@ type receiptsKey*: BlockKey of epochAccumulator: epochAccumulatorKey*: EpochAccumulatorKey - of masterAccumulator: - masterAccumulatorKey*: MasterAccumulatorKey func encode*(contentKey: ContentKey): ByteList = ByteList.init(SSZ.encode(contentKey)) @@ -100,13 +86,6 @@ func `$`*(x: ContentKey): string = of epochAccumulator: let key = x.epochAccumulatorKey res.add("epochHash: " & $key.epochHash) - of masterAccumulator: - let key = x.masterAccumulatorKey - case key.accumulaterKeyType: - of latest: - res.add($key.accumulaterKeyType) - of masterHash: - res.add($key.accumulaterKeyType & ": " & $key.masterHashKey) res.add(")") diff --git a/fluffy/network/history/history_network.nim b/fluffy/network/history/history_network.nim index 9b1f8920a..d7d487060 100644 --- a/fluffy/network/history/history_network.nim +++ b/fluffy/network/history/history_network.nim @@ -40,6 +40,7 @@ type portalProtocol*: PortalProtocol contentDB*: ContentDB contentQueue*: AsyncQueue[(ContentKeysList, seq[seq[byte]])] + accumulator*: FinishedAccumulator processContentLoop: Future[void] Block* = (BlockHeader, BlockBody) @@ -66,8 +67,6 @@ func getEncodedKeyForContent( ContentKey(contentType: cType, receiptsKey: contentKeyType) of epochAccumulator: raiseAssert("Not implemented") - of masterAccumulator: - raiseAssert("Not implemented") return encodeKey(contentKey) @@ -255,12 +254,6 @@ proc get( db: ContentDB, T: type EpochAccumulator, contentId: ContentId): Option[T] = db.getSszDecoded(contentId, T) -proc getAccumulator(db: ContentDB): Option[Accumulator] = - db.getPermanentSszDecoded(subkey(kLatestAccumulator), Accumulator) - -proc putAccumulator*(db: ContentDB, value: openArray[byte]) = - db.putPermanent(subkey(kLatestAccumulator), value) - proc getContentFromDb( n: HistoryNetwork, T: type, contentId: ContentId): Option[T] = if n.portalProtocol.inRange(contentId): @@ -268,20 +261,9 @@ proc getContentFromDb( else: none(T) -proc dbGetHandler(db: ContentDB, contentKey: ByteList): - (Option[ContentId], Option[seq[byte]]) {.raises: [Defect], gcsafe.} = - let keyOpt = decode(contentKey) - if keyOpt.isNone(): - return (none(ContentId), none(seq[byte])) - - let key = keyOpt.get() - - case key.contentType: - of masterAccumulator: - (none(ContentId), db.getPermanent(subkey(kLatestAccumulator))) - else: - let contentId = key.toContentId() - (some(contentId), db.get(contentId)) +proc dbGetHandler(db: ContentDB, contentId: ContentId): + Option[seq[byte]] {.raises: [Defect], gcsafe.} = + db.get(contentId) ## Public API to get the history network specific types, either from database ## or through a lookup on the Portal Network @@ -504,18 +486,7 @@ proc getEpochAccumulator( proc getBlock*( n: HistoryNetwork, bn: UInt256): Future[Result[Option[Block], string]] {.async.} = - - # TODO for now checking accumulator only in db, we could also ask our - # peers for it. - let accumulatorOpt = n.contentDB.getAccumulator() - - if accumulatorOpt.isNone(): - return err("Master accumulator not found in database") - - let accumulator = accumulatorOpt.unsafeGet() - - let epochDataRes = accumulator.getBlockEpochDataForBlockNumber(bn) - + let epochDataRes = n.accumulator.getBlockEpochDataForBlockNumber(bn) if epochDataRes.isOk(): let epochData = epochDataRes.get() @@ -535,51 +506,12 @@ proc getBlock*( else: return err(epochDataRes.error) -proc getInitialMasterAccumulator*( - n: HistoryNetwork): - Future[bool] {.async.} = - let - contentKey = ContentKey( - contentType: masterAccumulator, - masterAccumulatorKey: MasterAccumulatorKey(accumulaterKeyType: latest)) - keyEncoded = encode(contentKey) - - let nodes = await n.portalProtocol.queryRandom() - - var hashes: CountTable[Accumulator] - - for node in nodes: - # TODO: Could make concurrent - let foundContentRes = await n.portalProtocol.findContent(node, keyEncoded) - if foundContentRes.isOk(): - let foundContent = foundContentRes.get() - if foundContent.kind == Content: - let masterAccumulator = - try: - SSZ.decode(foundContent.content, Accumulator) - except SszError: - continue - hashes.inc(masterAccumulator) - let (accumulator, count) = hashes.largest() - - if count > 1: # Should be increased eventually - n.contentDB.putAccumulator(foundContent.content) - return true - - # Could not find a common accumulator from all the queried nodes - return false - proc buildProof*(n: HistoryNetwork, header: BlockHeader): Future[Result[seq[Digest], string]] {.async.} = # Note: Temporarily needed proc until proofs are send over with headers. - let accumulatorOpt = n.contentDB.getAccumulator() - if accumulatorOpt.isNone(): - return err("Master accumulator not found in database") - let - accumulator = accumulatorOpt.get() epochIndex = getEpochIndex(header) - epochHash = Digest(data: accumulator.historicalEpochs[epochIndex]) + epochHash = Digest(data: n.accumulator.historicalEpochs[epochIndex]) epochAccumulatorOpt = await n.getEpochAccumulator(epochHash) @@ -600,20 +532,13 @@ proc verifyCanonicalChain( when not canonicalVerify: return ok() - let accumulatorOpt = n.contentDB.getAccumulator() - if accumulatorOpt.isNone(): - # Should acquire a master accumulator first - return err("Cannot accept any data without a master accumulator") - - let accumulator = accumulatorOpt.get() - # Note: It is a bit silly to build a proof, as we still need to request the # epoch accumulators for it, and could just verify it with those. But the # idea here is that eventually this gets changed so that the proof is send # together with the header. let proof = await n.buildProof(header) if proof.isOk(): - return verifyHeader(accumulator, header, proof.get()) + return verifyHeader(n.accumulator, header, proof.get()) else: # Can't verify without master and epoch accumulators return err("Cannot build proof: " & proof.error) @@ -691,15 +616,10 @@ proc validateContent( return true of epochAccumulator: # Check first if epochHash is part of master accumulator - let masterAccumulator = n.contentDB.getAccumulator() - if masterAccumulator.isNone(): - error "Cannot accept any data without a master accumulator" - return false - let epochHash = key.epochAccumulatorKey.epochHash - - if not masterAccumulator.get().historicalEpochs.contains(epochHash.data): - warn "Offered epoch accumulator is not part of master accumulator" + if not n.accumulator.historicalEpochs.contains(epochHash.data): + warn "Offered epoch accumulator is not part of master accumulator", + epochHash return false let epochAccumulator = @@ -708,6 +628,7 @@ proc validateContent( except SszError: warn "Failed decoding epoch accumulator" return false + # Next check the hash tree root, as this is probably more expensive let hash = hash_tree_root(epochAccumulator) if hash != epochHash: @@ -715,32 +636,29 @@ proc validateContent( return false else: return true - of masterAccumulator: - # Don't allow a master accumulator to be offered, we only request it. - warn "Node does not accept master accumulators through offer/accept" - return false proc new*( T: type HistoryNetwork, baseProtocol: protocol.Protocol, contentDB: ContentDB, streamManager: StreamManager, + accumulator: FinishedAccumulator, bootstrapRecords: openArray[Record] = [], portalConfig: PortalProtocolConfig = defaultPortalProtocolConfig): T = + let + contentQueue = newAsyncQueue[(ContentKeysList, seq[seq[byte]])](50) - let cq = newAsyncQueue[(ContentKeysList, seq[seq[byte]])](50) + stream = streamManager.registerNewStream(contentQueue) + portalProtocol = PortalProtocol.new( + baseProtocol, historyProtocolId, contentDB, + toContentIdHandler, dbGetHandler, stream, bootstrapRecords, + config = portalConfig) - let s = streamManager.registerNewStream(cq) - - let portalProtocol = PortalProtocol.new( - baseProtocol, historyProtocolId, contentDB, - toContentIdHandler, dbGetHandler, s, bootstrapRecords, - config = portalConfig) - - return HistoryNetwork( + HistoryNetwork( portalProtocol: portalProtocol, contentDB: contentDB, - contentQueue: cq + contentQueue: contentQueue, + accumulator: accumulator ) proc validateContent( @@ -794,29 +712,12 @@ proc processContentLoop(n: HistoryNetwork) {.async.} = proc start*(n: HistoryNetwork) = info "Starting Portal execution history network", - protocolId = n.portalProtocol.protocolId + protocolId = n.portalProtocol.protocolId, + accumulatorRoot = hash_tree_root(n.accumulator) n.portalProtocol.start() n.processContentLoop = processContentLoop(n) -proc initMasterAccumulator*( - n: HistoryNetwork, - accumulator: Option[Accumulator]) {.async.} = - if accumulator.isSome(): - n.contentDB.putAccumulator(SSZ.encode(accumulator.get())) - info "Successfully retrieved master accumulator from local data" - else: - while true: - if await n.getInitialMasterAccumulator(): - info "Successfully retrieved master accumulator from the network" - return - else: - warn "Could not retrieve initial master accumulator from the network" - when not canonicalVerify: - return - else: - await sleepAsync(2.seconds) - proc stop*(n: HistoryNetwork) = n.portalProtocol.stop() diff --git a/fluffy/network/state/state_network.nim b/fluffy/network/state/state_network.nim index 9763b6a15..230c233ea 100644 --- a/fluffy/network/state/state_network.nim +++ b/fluffy/network/state/state_network.nim @@ -29,13 +29,9 @@ type StateNetwork* = ref object func toContentIdHandler(contentKey: ByteList): Option[ContentId] = toContentId(contentKey) -proc dbGetHandler(db: ContentDB, contentKey: ByteList): - (Option[ContentId], Option[seq[byte]]) = - let contentIdOpt = contentKey.toContentId() - if contentIdOpt.isSome(): - (contentIdOpt, db.get(contentIdOpt.get())) - else: - (contentIdOpt, none(seq[byte])) +proc dbGetHandler(db: ContentDB, contentId: ContentId): + Option[seq[byte]] {.raises: [Defect], gcsafe.} = + db.get(contentId) proc getContent*(n: StateNetwork, key: ContentKey): Future[Option[seq[byte]]] {.async.} = diff --git a/fluffy/network/wire/portal_protocol.nim b/fluffy/network/wire/portal_protocol.nim index 95d8ccd13..e94b9636a 100644 --- a/fluffy/network/wire/portal_protocol.nim +++ b/fluffy/network/wire/portal_protocol.nim @@ -132,8 +132,8 @@ type proc(contentKey: ByteList): Option[ContentId] {.raises: [Defect], gcsafe.} DbGetHandler* = - proc(contentDB: ContentDB, contentKey: ByteList): - (Option[ContentId], Option[seq[byte]]) {.raises: [Defect], gcsafe.} + proc(contentDB: ContentDB, contentId: ContentId): + Option[seq[byte]] {.raises: [Defect], gcsafe.} PortalProtocolId* = array[2, byte] @@ -313,7 +313,14 @@ proc handleFindContent( maxPayloadSize = maxDiscv5PacketSize - talkRespOverhead - contentOverhead enrOverhead = 4 # per added ENR, 4 bytes offset overhead - let (contentIdOpt, contentOpt) = p.dbGet(p.contentDB, fc.contentKey) + let contentIdOpt = p.toContentId(fc.contentKey) + if contentIdOpt.isNone: + # Return empty response when content key validation fails + # TODO: Better would be to return no message at all? Needs changes on + # discv5 layer. + return @[] + + let contentOpt = p.dbGet(p.contentDB, contentIdOpt.get()) if contentOpt.isSome(): let content = contentOpt.get() if content.len <= maxPayloadSize: @@ -324,7 +331,7 @@ proc handleFindContent( encodeMessage(ContentMessage( contentMessageType: connectionIdType, connectionId: connectionId)) - elif contentIdOpt.isSome(): + else: # Don't have the content, send closest neighbours to content id. let closestNodes = p.routingTable.neighbours( @@ -333,14 +340,6 @@ proc handleFindContent( portal_content_enrs_packed.observe(enrs.len().int64) encodeMessage(ContentMessage(contentMessageType: enrsType, enrs: enrs)) - else: - # Return empty response when: - # a. content key validation fails - # b. it is a special case such as "latest accumulator" - # TODO: Better would be to return no message at all for a, needs changes on - # discv5 layer. - # TODO: Better would be to have a specific protocol message for b. - @[] proc handleOffer(p: PortalProtocol, o: OfferMessage, srcId: NodeId): seq[byte] = var contentKeysBitList = ContentKeysBitList.init(o.contentKeys.len) diff --git a/fluffy/network_metadata.nim b/fluffy/network_metadata.nim index 26b6b2dd2..3791c24a4 100644 --- a/fluffy/network_metadata.nim +++ b/fluffy/network_metadata.nim @@ -28,18 +28,29 @@ proc loadCompileTimeBootstrapNodes( macros.error "Failed to load bootstrap nodes metadata at '" & path & "': " & err.msg +# Need to use std/io readFile because: +# https://github.com/status-im/nim-stew/issues/145 +proc loadEncodedAccumulator(path: string): string = + try: + return readFile(path).string + except IOError as err: + macros.error "Failed to read finished accumulator at '" & + path & "': " & err.msg + const # TODO: Change this from our local repo to an eth-client repo if/when this # gets created for the Portal networks. portalNetworksDir = currentSourcePath.parentDir.replace('\\', '/') / "network_data" + + # TODO: Using a repo for test vectors for now, as it is something to test + # against, but at the same time could also go in a network metadata repo. + portalTestDir = + currentSourcePath.parentDir.parentDir.replace('\\', '/') / "vendor" / "portal-spec-tests" / "tests" # Note: # For now it gets called testnet0 but this Portal network serves Eth1 mainnet # data. Giving the actual Portal (test)networks different names might not be # that useful as there is no way to distinguish the networks currently. - # Additionally, sub-networks like history network pass the eth1 network - # information in their requests, potentially supporting many eth1 networks - # over a single Portal Network. # # When more config data is required per Portal network, a metadata object can # be created, but right now only bootstrap nodes can be different. @@ -48,3 +59,6 @@ const # rlp.rawData() in the enr code. testnet0BootstrapNodes* = loadCompileTimeBootstrapNodes( portalNetworksDir / "testnet0" / "bootstrap_nodes.txt") + + finishedAccumulator* = loadEncodedAccumulator( + portalTestDir / "mainnet" / "accumulator" / "finished_accumulator.ssz") diff --git a/fluffy/scripts/launch_local_testnet.sh b/fluffy/scripts/launch_local_testnet.sh index befbaae9b..b9a079289 100755 --- a/fluffy/scripts/launch_local_testnet.sh +++ b/fluffy/scripts/launch_local_testnet.sh @@ -233,10 +233,6 @@ dump_logs() { BOOTSTRAP_NODE=0 BOOTSTRAP_TIMEOUT=5 # in seconds BOOTSTRAP_ENR_FILE="${DATA_DIR}/node${BOOTSTRAP_NODE}/fluffy_node.enr" -# Amount of nodes in the testnet that will build their master accumulator -# locally from a block data file. -# TODO: Currently not enabled -LOCAL_ACCUMULATOR_NODES=$((NUM_NODES / 4)) for NUM_NODE in $(seq 0 $(( NUM_NODES - 1 ))); do NODE_DATA_DIR="${DATA_DIR}/node${NUM_NODE}" @@ -271,10 +267,6 @@ for NUM_NODE in $(seq 0 $(( NUM_NODES - 1 ))); do done fi - # if [[ ${NUM_NODE} -lt ${LOCAL_ACCUMULATOR_NODES} ]]; then - # ACCUMULATOR_ARG="--accumulator-file=./fluffy/scripts/eth-accumulator.json" - # fi - # Running with bits-per-hop of 1 to make the lookups more likely requiring # to request to nodes over the network instead of having most of them in the # own routing table. @@ -295,7 +287,6 @@ for NUM_NODE in $(seq 0 $(( NUM_NODES - 1 ))); do --bucket-ip-limit=24 \ --bits-per-hop=1 \ ${RADIUS_ARG} \ - ${ACCUMULATOR_ARG} \ ${EXTRA_ARGS} \ > "${DATA_DIR}/log${NUM_NODE}.txt" 2>&1 & diff --git a/fluffy/tests/portal_spec_tests/mainnet/test_history_content.nim b/fluffy/tests/portal_spec_tests/mainnet/test_history_content.nim index 0372c51e8..a698c757b 100644 --- a/fluffy/tests/portal_spec_tests/mainnet/test_history_content.nim +++ b/fluffy/tests/portal_spec_tests/mainnet/test_history_content.nim @@ -147,70 +147,3 @@ suite "History ContentKey Encodings": toContentId(contentKey) == parse(contentId, StUint[256], 10) # In stint this does BE hex string toContentId(contentKey).toHex() == contentIdHexBE - - test "Master Accumulator - Latest": - # Output - const - contentKeyHex = - "0400" - contentId = - "87173654316145541646904042090629917349369185510102051783618763191692466404071" - # or - contentIdHexBE = - "c0ba8a33ac67f44abff5984dfbb6f56c46b880ac2b86e1f23e7fa9c402c53ae7" - - let contentKey = ContentKey( - contentType: masterAccumulator, - masterAccumulatorKey: MasterAccumulatorKey(accumulaterKeyType: latest)) - - let encoded = encode(contentKey) - check encoded.asSeq.toHex == contentKeyHex - let decoded = decode(encoded) - check decoded.isSome() - - let contentKeyDecoded = decoded.get() - check: - contentKeyDecoded.contentType == contentKey.contentType - contentKeyDecoded.masterAccumulatorKey.accumulaterKeyType == - contentKey.masterAccumulatorKey.accumulaterKeyType - - toContentId(contentKey) == parse(contentId, StUint[256], 10) - # In stint this does BE hex string - toContentId(contentKey).toHex() == contentIdHexBE - - test "Master Accumulator - Hash": - # Input - const accumulatorHash = Digest.fromHex( - "0x88cce8439ebc0c1d007177ffb6831c15c07b4361984cc52235b6fd728434f0c7") - - # Output - const - contentKeyHex = - "040188cce8439ebc0c1d007177ffb6831c15c07b4361984cc52235b6fd728434f0c7" - contentId = - "79362820890138237094338894474079140563693945795365426184460738681339857347750" - # or - contentIdHexBE = - "af75c3c9d0e89a5083361a3334a9c5583955f0dbe9a413eb79ba26400d1824a6" - - let contentKey = ContentKey( - contentType: masterAccumulator, - masterAccumulatorKey: MasterAccumulatorKey( - accumulaterKeyType: masterHash, masterHashKey: accumulatorHash)) - - let encoded = encode(contentKey) - check encoded.asSeq.toHex == contentKeyHex - let decoded = decode(encoded) - check decoded.isSome() - - let contentKeyDecoded = decoded.get() - check: - contentKeyDecoded.contentType == contentKey.contentType - contentKeyDecoded.masterAccumulatorKey.accumulaterKeyType == - contentKey.masterAccumulatorKey.accumulaterKeyType - contentKeyDecoded.masterAccumulatorKey.masterHashKey == - contentKey.masterAccumulatorKey.masterHashKey - - toContentId(contentKey) == parse(contentId, StUint[256], 10) - # In stint this does BE hex string - toContentId(contentKey).toHex() == contentIdHexBE diff --git a/fluffy/tests/test_accumulator.nim b/fluffy/tests/test_accumulator.nim index 4641e1939..28174f174 100644 --- a/fluffy/tests/test_accumulator.nim +++ b/fluffy/tests/test_accumulator.nim @@ -16,7 +16,6 @@ import ../network/history/[history_content, accumulator] func buildProof( - accumulator: Accumulator, epochAccumulators: seq[(ContentKey, EpochAccumulator)], header: BlockHeader): Result[seq[Digest], string] = @@ -56,13 +55,17 @@ suite "Header Accumulator": blockNumber: i.stuint(256), difficulty: 1.stuint(256))) let - accumulator = buildAccumulator(headers) + accumulatorRes = buildAccumulator(headers) epochAccumulators = buildAccumulatorData(headers) + check accumulatorRes.isOk() + + let accumulator = accumulatorRes.get() + block: # Test valid headers for i in headersToTest: let header = headers[i] - let proof = buildProof(accumulator, epochAccumulators, header) + let proof = buildProof(epochAccumulators, header) check: proof.isOk() verifyHeader(accumulator, header, proof.get()).isOk() @@ -74,7 +77,7 @@ suite "Header Accumulator": # Test altered block headers by altering the difficulty for i in headersToTest: - let proof = buildProof(accumulator, epochAccumulators, headers[i]) + let proof = buildProof( epochAccumulators, headers[i]) check: proof.isOk() # Alter the block header so the proof no longer matches @@ -92,6 +95,19 @@ suite "Header Accumulator": for i in headersToTest: check verifyHeader(accumulator, headers[i], proof).isErr() + test "Header Accumulator - Not Finished": + # Less headers than needed to finish the accumulator + const amount = mergeBlockNumber - 1 + + var headers: seq[BlockHeader] + for i in 0..