Add verification of block data by use of accumulator (#1178)

This commit is contained in:
Kim De Mey 2022-08-01 21:00:21 +02:00 committed by GitHub
parent a5d4759bfd
commit 50af402f59
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 1199 additions and 461 deletions

View File

@ -117,6 +117,22 @@ type
defaultValueDesc: "none"
name: "netkey-unsafe" .}: Option[PrivateKey]
accumulatorDataFile* {.
desc:
"Build the master accumulator snapshot from a data file containing " &
"blocks instead of getting it from peers on the network."
defaultValue: none(InputFile)
defaultValueDesc: "none"
name: "accumulator-data-file" .}: Option[InputFile]
accumulatorFile* {.
desc:
"Get the master accumulator snapshot from a file containing a " &
"pre-build master accumulator."
defaultValue: none(InputFile)
defaultValueDesc: "none"
name: "accumulator-file" .}: Option[InputFile]
metricsEnabled* {.
defaultValue: false
desc: "Enable the metrics server"

View File

@ -42,6 +42,7 @@ type
ContentDB* = ref object
kv: KvStoreRef
kvPermanent: KvStoreRef
maxSize: uint32
sizeStmt: SqliteStmt[NoParams, int64]
unusedSizeStmt: SqliteStmt[NoParams, int64]
@ -61,6 +62,14 @@ type
fractionOfDeletedContent*: float64
numOfDeletedElements*: int64
DbKey* = enum
kLatestAccumulator
# Note: Might eventually evolve in DbKey Prefix + actual key, but for now this
# is enough
func subkey*(kind: DbKey): array[1, byte] =
[byte ord(kind)]
func xorDistance(
a: openArray[byte],
b: openArray[byte]
@ -82,7 +91,9 @@ template expectDb(x: auto): untyped =
# full disk - this requires manual intervention, so we'll panic for now
x.expect("working database (disk broken/full?)")
proc new*(T: type ContentDB, path: string, maxSize: uint32, inMemory = false): ContentDB =
proc new*(
T: type ContentDB, path: string, maxSize: uint32, inMemory = false):
ContentDB =
let db =
if inMemory:
SqStoreRef.init("", "fluffy-test", inMemory = true).expect(
@ -117,8 +128,21 @@ proc new*(T: type ContentDB, path: string, maxSize: uint32, inMemory = false): C
array[32, byte], RowInfo
).get()
# Using a whole new db for the "permanent" (meaning: non pruned) data, as else
# it might intervene with the pruning mechanism of the regular db. Might put
# them together in the future though.
let dbPerm =
if inMemory:
SqStoreRef.init("", "fluffy-test-perm", inMemory = true).expect(
"working database (out of memory?)")
else:
SqStoreRef.init(path, "fluffy-perm").expectDb()
let kvPermanentStore = kvStore dbPerm.openKvStore("kv_permanent").expectDb()
ContentDB(
kv: kvStore,
kvPermanent: kvPermanentStore,
maxSize: maxSize,
sizeStmt: getSizeStmt,
vacStmt: vacStmt,
@ -127,6 +151,56 @@ proc new*(T: type ContentDB, path: string, maxSize: uint32, inMemory = false): C
getAllOrderedByDistanceStmt: getAllOrderedByDistanceStmt
)
## Private KvStoreRef Calls
proc get(kv: KvStoreRef, key: openArray[byte]): Option[seq[byte]] =
var res: Option[seq[byte]]
proc onData(data: openArray[byte]) = res = some(@data)
discard kv.get(key, onData).expectDb()
return res
proc getSszDecoded(kv: KvStoreRef, key: openArray[byte], T: type auto): Option[T] =
let res = kv.get(key)
if res.isSome():
try:
some(SSZ.decode(res.get(), T))
except SszError:
raiseAssert("Stored data should always be serialized correctly")
else:
none(T)
## Private ContentDB calls
proc get(db: ContentDB, key: openArray[byte]): Option[seq[byte]] =
db.kv.get(key)
proc put(db: ContentDB, key, value: openArray[byte]) =
db.kv.put(key, value).expectDb()
proc contains(db: ContentDB, key: openArray[byte]): bool =
db.kv.contains(key).expectDb()
proc del(db: ContentDB, key: openArray[byte]) =
db.kv.del(key).expectDb()
proc getSszDecoded*(
db: ContentDB, key: openArray[byte], T: type auto): Option[T] =
db.kv.getSszDecoded(key, T)
## Public permanent kvstore calls
proc getPermanent*(db: ContentDB, key: openArray[byte]): Option[seq[byte]] =
db.kvPermanent.get(key)
proc putPermanent*(db: ContentDB, key, value: openArray[byte]) =
db.kvPermanent.put(key, value).expectDb()
proc getPermanentSszDecoded*(
db: ContentDB, key: openArray[byte], T: type auto): Option[T] =
db.kvPermanent.getSszDecoded(key, T)
proc reclaimSpace*(db: ContentDB): void =
## Runs sqlite VACUUM commands which rebuilds the db, repacking it into a
## minimal amount of disk space.
@ -168,22 +242,7 @@ proc contentSize(db: ContentDB): int64 =
size = res).expectDb()
return size
proc get*(db: ContentDB, key: openArray[byte]): Option[seq[byte]] =
var res: Option[seq[byte]]
proc onData(data: openArray[byte]) = res = some(@data)
discard db.kv.get(key, onData).expectDb()
return res
proc put(db: ContentDB, key, value: openArray[byte]) =
db.kv.put(key, value).expectDb()
proc contains*(db: ContentDB, key: openArray[byte]): bool =
db.kv.contains(key).expectDb()
proc del*(db: ContentDB, key: openArray[byte]) =
db.kv.del(key).expectDb()
## Public ContentId based ContentDB calls
# TODO: Could also decide to use the ContentKey SSZ bytestring, as this is what
# gets send over the network in requests, but that would be a bigger key. Or the
@ -206,6 +265,9 @@ proc contains*(db: ContentDB, key: ContentId): bool =
proc del*(db: ContentDB, key: ContentId) =
db.del(key.toByteArrayBE())
proc getSszDecoded*(db: ContentDB, key: ContentId, T: type auto): Option[T] =
db.getSszDecoded(key.toByteArrayBE(), T)
proc deleteContentFraction(
db: ContentDB,
target: UInt256,
@ -214,7 +276,7 @@ proc deleteContentFraction(
## First, content furthest from provided `target` is deleted.
doAssert(
fraction > 0 and fraction < 1,
fraction > 0 and fraction < 1,
"Deleted fraction should be > 0 and < 1"
)
@ -234,7 +296,7 @@ proc deleteContentFraction(
return (
UInt256.fromBytesBE(ri.distance),
bytesDeleted,
totalContentSize,
totalContentSize,
numOfDeletedElements
)
@ -245,7 +307,7 @@ proc put*(
target: UInt256): PutResult =
db.put(key, value)
# We use real size for our pruning threshold, which means that database file
# will reach size specified in db.maxSize, and will stay that size thorough
# node life time, as after content deletion free pages will be re used.

View File

@ -161,9 +161,21 @@ proc run(config: PortalConf) {.raises: [CatchableError, Defect].} =
let bridgeClient = initializeBridgeClient(config.bridgeUri)
d.start()
stateNetwork.start()
historyNetwork.start()
let accumulator =
if config.accumulatorDataFile.isSome():
some(buildAccumulator(string config.accumulatorDataFile.get()).expect(
"Need a valid data file to build the master accumulator locally"))
elif config.accumulatorFile.isSome():
some(readAccumulator(string config.accumulatorFile.get()).expect(
"Need a valid accumulator file to store the master accumulator locally"))
else:
none(Accumulator)
waitFor historyNetwork.initMasterAccumulator(accumulator)
stateNetwork.start()
runForever()
when isMainModule:

View File

@ -8,15 +8,13 @@
{.push raises: [Defect].}
import
eth/db/kvstore,
eth/db/kvstore_sqlite3,
std/hashes,
eth/common/eth_types,
ssz_serialization, ssz_serialization/[proofs, merkleization],
../../common/common_types,
../../populate_db,
./history_content
export kvstore_sqlite3, merkleization
export ssz_serialization, merkleization, proofs
# Header Accumulator
# Part from specification
@ -60,163 +58,35 @@ func updateAccumulator*(a: var Accumulator, header: BlockHeader) =
let res = a.currentEpoch.add(headerRecord)
doAssert(res, "Can't fail because of currentEpoch length check")
type
# Note:
# This database should eventually just be a part of the ContentDB.
# The reason it is currently separated is because it is experimental and
# because accumulator data will in the first tests be used aside to verify
# headers without actually transferring the data over the network. Hence,
# all data needs to be available and no pruning should be done on this data.
AccumulatorDB* = ref object
kv: KvStoreRef
func hash*(a: Accumulator): hashes.Hash =
# TODO: This is used for the CountTable but it will be expensive.
hash(hash_tree_root(a).data)
# This is a bit of a hacky way to access the latest accumulator right now,
# hacky in the sense that in theory some contentId could result in this key.
# Could have a prefix for each key access, but that will not play along nicely
# with calls that use distance function (pruning, range access)
# Could drop it in a seperate table/kvstore. And could have a mapping of
# certain specific requests (e.g. latest) to their hash.
DbKey = enum
kLatestAccumulator
func subkey(kind: DbKey): array[1, byte] =
[byte ord(kind)]
template expectDb(x: auto): untyped =
# There's no meaningful error handling implemented for a corrupt database or
# full disk - this requires manual intervention, so we'll panic for now
x.expect("working database (disk broken/full?)")
proc new*(T: type AccumulatorDB, path: string, inMemory = false): AccumulatorDB =
let db =
if inMemory:
SqStoreRef.init("", "fluffy-acc-db", inMemory = true).expect(
"working database (out of memory?)")
else:
SqStoreRef.init(path, "fluffy-acc-db").expectDb()
AccumulatorDB(kv: kvStore db.openKvStore().expectDb())
proc get(db: AccumulatorDB, key: openArray[byte]): Option[seq[byte]] =
var res: Option[seq[byte]]
proc onData(data: openArray[byte]) = res = some(@data)
discard db.kv.get(key, onData).expectDb()
return res
proc put(db: AccumulatorDB, key, value: openArray[byte]) =
db.kv.put(key, value).expectDb()
proc contains(db: AccumulatorDB, key: openArray[byte]): bool =
db.kv.contains(key).expectDb()
proc del(db: AccumulatorDB, key: openArray[byte]) =
db.kv.del(key).expectDb()
proc get*(db: AccumulatorDB, key: ContentId): Option[seq[byte]] =
db.get(key.toByteArrayBE())
proc put*(db: AccumulatorDB, key: ContentId, value: openArray[byte]) =
db.put(key.toByteArrayBE(), value)
proc contains*(db: AccumulatorDB, key: ContentId): bool =
db.contains(key.toByteArrayBE())
proc del*(db: AccumulatorDB, key: ContentId) =
db.del(key.toByteArrayBE())
proc get(
db: AccumulatorDB, key: openArray[byte],
T: type auto): Option[T] =
let res = db.get(key)
if res.isSome():
try:
some(SSZ.decode(res.get(), T))
except SszError:
raiseAssert("Stored data should always be serialized correctly")
else:
none(T)
# TODO: Will it be required to store more than just the latest accumulator?
proc getAccumulator*(db: AccumulatorDB, key: ContentId): Option[Accumulator] =
db.get(key.toByteArrayBE, Accumulator)
proc getAccumulator*(db: AccumulatorDB): Option[Accumulator] =
db.get(subkey(kLatestAccumulator), Accumulator)
proc getAccumulatorSSZ*(db: AccumulatorDB): Option[seq[byte]] =
db.get(subkey(kLatestAccumulator))
proc putAccumulator*(db: AccumulatorDB, value: openArray[byte]) =
db.put(subkey(kLatestAccumulator), value)
proc getEpochAccumulator*(
db: AccumulatorDB, key: ContentId): Option[EpochAccumulator] =
db.get(key.toByteArrayBE(), EpochAccumulator)
# Following calls are there for building up the accumulator from a bit set of
# headers, which then can be used to inject into the network and to generate
# header proofs from.
# It will not be used in the more general usage of Fluffy
# Note: One could also make a Portal network and or json-rpc eth1 endpoint
# version of this.
proc buildAccumulator*(db: AccumulatorDB, headers: seq[BlockHeader]) =
var accumulator: Accumulator
for header in headers:
updateAccumulator(accumulator, header)
if accumulator.currentEpoch.len() == epochSize:
let rootHash = accumulator.currentEpoch.hash_tree_root()
let key = ContentKey(
contentType: epochAccumulator,
epochAccumulatorKey: EpochAccumulatorKey(
epochHash: rootHash))
db.put(key.toContentId(), SSZ.encode(accumulator.currentEpoch))
db.putAccumulator(SSZ.encode(accumulator))
proc buildAccumulator*(
db: AccumulatorDB, dataFile: string): Result[void, string] =
let blockData = ? readBlockDataTable(dataFile)
var headers: seq[BlockHeader]
# Len of headers from blockdata + genesis header
headers.setLen(blockData.len() + 1)
headers[0] = getGenesisHeader()
for k, v in blockData.pairs:
let header = ? v.readBlockHeader()
headers[header.blockNumber.truncate(int)] = header
db.buildAccumulator(headers)
ok()
func buildAccumulator(headers: seq[BlockHeader]): Accumulator =
func buildAccumulator*(headers: seq[BlockHeader]): Accumulator =
var accumulator: Accumulator
for header in headers:
updateAccumulator(accumulator, header)
accumulator
proc buildAccumulator*(dataFile: string): Result[Accumulator, string] =
let blockData = ? readBlockDataTable(dataFile)
func buildAccumulatorData*(headers: seq[BlockHeader]):
seq[(ContentKey, EpochAccumulator)] =
var accumulator: Accumulator
var epochAccumulators: seq[(ContentKey, EpochAccumulator)]
for header in headers:
updateAccumulator(accumulator, header)
var headers: seq[BlockHeader]
# Len of headers from blockdata + genesis header
headers.setLen(blockData.len() + 1)
if accumulator.currentEpoch.len() == epochSize:
let
rootHash = accumulator.currentEpoch.hash_tree_root()
key = ContentKey(
contentType: epochAccumulator,
epochAccumulatorKey: EpochAccumulatorKey(
epochHash: rootHash))
headers[0] = getGenesisHeader()
epochAccumulators.add((key, accumulator.currentEpoch))
for k, v in blockData.pairs:
let header = ? v.readBlockHeader()
headers[header.blockNumber.truncate(int)] = header
ok(buildAccumulator(headers))
epochAccumulators
## Calls and helper calls for building header proofs and verifying headers
## against the Accumulator and the header proofs.
@ -226,43 +96,14 @@ func inCurrentEpoch*(header: BlockHeader, a: Accumulator): bool =
blockNumber > uint64(a.historicalEpochs.len() * epochSize) - 1
func getEpochIndex(header: BlockHeader): uint64 =
func getEpochIndex*(header: BlockHeader): uint64 =
## Get the index for the historical epochs
header.blockNumber.truncate(uint64) div epochSize
func getHeaderRecordIndex(header: BlockHeader, epochIndex: uint64): uint64 =
func getHeaderRecordIndex*(header: BlockHeader, epochIndex: uint64): uint64 =
## Get the relative header index for the epoch accumulator
uint64(header.blockNumber.truncate(uint64) - epochIndex * epochSize)
proc buildProof*(db: AccumulatorDB, header: BlockHeader):
Result[seq[Digest], string] =
let accumulatorOpt = db.getAccumulator()
if accumulatorOpt.isNone():
return err("Master accumulator not found in database")
let
accumulator = accumulatorOpt.get()
epochIndex = getEpochIndex(header)
epochHash = Digest(data: accumulator.historicalEpochs[epochIndex])
key = ContentKey(
contentType: epochAccumulator,
epochAccumulatorKey: EpochAccumulatorKey(
epochHash: epochHash))
epochAccumulatorOpt = db.getEpochAccumulator(key.toContentId())
if epochAccumulatorOpt.isNone():
return err("Epoch accumulator not found in database")
let
epochAccumulator = epochAccumulatorOpt.get()
headerRecordIndex = getHeaderRecordIndex(header, epochIndex)
# TODO: Implement more generalized `get_generalized_index`
gIndex = GeneralizedIndex(epochSize*2*2 + (headerRecordIndex*2))
epochAccumulator.build_proof(gIndex)
func verifyProof*(
a: Accumulator, proof: openArray[Digest], header: BlockHeader): bool =
let
@ -277,27 +118,9 @@ func verifyProof*(
verify_merkle_multiproof(@[leave], proof, @[gIndex], epochAccumulatorHash)
proc verifyProof*(
db: AccumulatorDB, proof: openArray[Digest], header: BlockHeader):
Result[void, string] =
let accumulatorOpt = db.getAccumulator()
if accumulatorOpt.isNone():
return err("Master accumulator not found in database")
if accumulatorOpt.get().verifyProof(proof, header):
ok()
else:
err("Proof verification failed")
proc verifyHeader*(
db: AccumulatorDB, header: BlockHeader, proof: Option[seq[Digest]]):
accumulator: Accumulator, header: BlockHeader, proof: Option[seq[Digest]]):
Result[void, string] =
let accumulatorOpt = db.getAccumulator()
if accumulatorOpt.isNone():
return err("Master accumulator not found in database")
let accumulator = accumulatorOpt.get()
if header.inCurrentEpoch(accumulator):
let blockNumber = header.blockNumber.truncate(uint64)
let relIndex = blockNumber - uint64(accumulator.historicalEpochs.len()) * epochSize

View File

@ -0,0 +1,200 @@
# Nimbus
# Copyright (c) 2022 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [Defect].}
import
eth/db/kvstore,
eth/db/kvstore_sqlite3,
eth/common/eth_types,
ssz_serialization, ssz_serialization/[proofs, merkleization],
../../common/common_types,
../../populate_db,
"."/[history_content, accumulator]
type
# TODO:
# This database should probably dissappear and become part of ContentDB and
# SeedDB. Some of the ContentDB work has been done regarding this.
AccumulatorDB* = ref object
kv: KvStoreRef
# This is a bit of a hacky way to access the latest accumulator right now,
# hacky in the sense that in theory some contentId could result in this key.
# Could have a prefix for each key access, but that will not play along nicely
# with calls that use distance function (pruning, range access)
# Could drop it in a seperate table/kvstore. And could have a mapping of
# certain specific requests (e.g. latest) to their hash.
DbKey = enum
kLatestAccumulator
func subkey(kind: DbKey): array[1, byte] =
[byte ord(kind)]
template expectDb(x: auto): untyped =
# There's no meaningful error handling implemented for a corrupt database or
# full disk - this requires manual intervention, so we'll panic for now
x.expect("working database (disk broken/full?)")
proc new*(T: type AccumulatorDB, path: string, inMemory = false): AccumulatorDB =
let db =
if inMemory:
SqStoreRef.init("", "fluffy-acc-db", inMemory = true).expect(
"working database (out of memory?)")
else:
SqStoreRef.init(path, "fluffy-acc-db").expectDb()
AccumulatorDB(kv: kvStore db.openKvStore().expectDb())
proc get(db: AccumulatorDB, key: openArray[byte]): Option[seq[byte]] =
var res: Option[seq[byte]]
proc onData(data: openArray[byte]) = res = some(@data)
discard db.kv.get(key, onData).expectDb()
return res
proc put(db: AccumulatorDB, key, value: openArray[byte]) =
db.kv.put(key, value).expectDb()
proc contains(db: AccumulatorDB, key: openArray[byte]): bool =
db.kv.contains(key).expectDb()
proc del(db: AccumulatorDB, key: openArray[byte]) =
db.kv.del(key).expectDb()
proc get*(db: AccumulatorDB, key: ContentId): Option[seq[byte]] =
db.get(key.toByteArrayBE())
proc put*(db: AccumulatorDB, key: ContentId, value: openArray[byte]) =
db.put(key.toByteArrayBE(), value)
proc contains*(db: AccumulatorDB, key: ContentId): bool =
db.contains(key.toByteArrayBE())
proc del*(db: AccumulatorDB, key: ContentId) =
db.del(key.toByteArrayBE())
proc get(
db: AccumulatorDB, key: openArray[byte],
T: type auto): Option[T] =
let res = db.get(key)
if res.isSome():
try:
some(SSZ.decode(res.get(), T))
except SszError:
raiseAssert("Stored data should always be serialized correctly")
else:
none(T)
# TODO: Will it be required to store more than just the latest accumulator?
proc getAccumulator*(db: AccumulatorDB, key: ContentId): Option[Accumulator] =
db.get(key.toByteArrayBE, Accumulator)
proc getAccumulator*(db: AccumulatorDB): Option[Accumulator] =
db.get(subkey(kLatestAccumulator), Accumulator)
proc getAccumulatorSSZ*(db: AccumulatorDB): Option[seq[byte]] =
db.get(subkey(kLatestAccumulator))
proc putAccumulator*(db: AccumulatorDB, value: openArray[byte]) =
db.put(subkey(kLatestAccumulator), value)
proc getEpochAccumulator*(
db: AccumulatorDB, key: ContentId): Option[EpochAccumulator] =
db.get(key.toByteArrayBE(), EpochAccumulator)
# Following calls are there for building up the accumulator from a bit set of
# headers, which then can be used to inject into the network and to generate
# header proofs from.
# It will not be used in the more general usage of Fluffy
# Note: One could also make a Portal network and or json-rpc eth1 endpoint
# version of this.
proc buildAccumulator*(db: AccumulatorDB, headers: seq[BlockHeader]) =
var accumulator: Accumulator
for header in headers:
updateAccumulator(accumulator, header)
if accumulator.currentEpoch.len() == epochSize:
let rootHash = accumulator.currentEpoch.hash_tree_root()
let key = ContentKey(
contentType: epochAccumulator,
epochAccumulatorKey: EpochAccumulatorKey(
epochHash: rootHash))
db.put(key.toContentId(), SSZ.encode(accumulator.currentEpoch))
db.putAccumulator(SSZ.encode(accumulator))
proc buildAccumulator*(
db: AccumulatorDB, dataFile: string): Result[void, string] =
let blockData = ? readJsonType(dataFile, BlockDataTable)
var headers: seq[BlockHeader]
# Len of headers from blockdata + genesis header
headers.setLen(blockData.len() + 1)
headers[0] = getGenesisHeader()
for k, v in blockData.pairs:
let header = ? v.readBlockHeader()
headers[header.blockNumber.truncate(int)] = header
db.buildAccumulator(headers)
ok()
proc buildProof*(db: AccumulatorDB, header: BlockHeader):
Result[seq[Digest], string] =
let accumulatorOpt = db.getAccumulator()
if accumulatorOpt.isNone():
return err("Master accumulator not found in database")
let
accumulator = accumulatorOpt.get()
epochIndex = getEpochIndex(header)
epochHash = Digest(data: accumulator.historicalEpochs[epochIndex])
key = ContentKey(
contentType: epochAccumulator,
epochAccumulatorKey: EpochAccumulatorKey(
epochHash: epochHash))
epochAccumulatorOpt = db.getEpochAccumulator(key.toContentId())
if epochAccumulatorOpt.isNone():
return err("Epoch accumulator not found in database")
let
epochAccumulator = epochAccumulatorOpt.get()
headerRecordIndex = getHeaderRecordIndex(header, epochIndex)
# TODO: Implement more generalized `get_generalized_index`
gIndex = GeneralizedIndex(epochSize*2*2 + (headerRecordIndex*2))
epochAccumulator.build_proof(gIndex)
proc verifyProof*(
db: AccumulatorDB, proof: openArray[Digest], header: BlockHeader):
Result[void, string] =
let accumulatorOpt = db.getAccumulator()
if accumulatorOpt.isNone():
return err("Master accumulator not found in database")
if accumulatorOpt.get().verifyProof(proof, header):
ok()
else:
err("Proof verification failed")
proc verifyHeader*(
db: AccumulatorDB, header: BlockHeader, proof: Option[seq[Digest]]):
Result[void, string] =
let accumulatorOpt = db.getAccumulator()
if accumulatorOpt.isNone():
err("Master accumulator not found in database")
else:
verifyHeader(accumulatorOpt.get(), header, proof)

View File

@ -8,18 +8,31 @@
{.push raises: [Defect].}
import
std/options,
std/[options, tables],
stew/results, chronos, chronicles, nimcrypto/[keccak, hash],
eth/[common/eth_types, rlp, trie, trie/db],
eth/p2p/discoveryv5/[protocol, enr],
../../content_db,
../../../nimbus/constants,
../wire/[portal_protocol, portal_stream, portal_protocol_config],
"."/[history_content, accumulator]
"."/[history_content, accumulator],
../../populate_db
logScope:
topics = "portal_hist"
export accumulator
# TODO: To currently verify if content is from the canonical chain it is
# required to download the right epoch accunulator, which is ~0.5 MB. This is
# too much, at least for the local testnet tests. This needs to be improved
# by adding the proofs to the block header content. Another independent
# improvement would be to have a content cache (LRU or so). The latter would
# probably help mostly for the local testnet tests.
# For now, we disable this verification default until further improvements are
# made.
const canonicalVerify* {.booldefine.} = false
const
historyProtocolId* = [byte 0x50, 0x0B]
@ -206,19 +219,7 @@ proc validateReceiptsBytes*(
seq[Receipt].fromReceipts(receipts)
## ContentDB getters for specific history network types
proc getSszDecoded(
db: ContentDB, contentId: ContentID,
T: type auto): Option[T] =
let res = db.get(contentId)
if res.isSome():
try:
some(SSZ.decode(res.get(), T))
except SszError as e:
raiseAssert("Stored data should always be serialized correctly: " & e.msg)
else:
none(T)
## ContentDB helper calls for specific history network types
proc get(db: ContentDB, T: type BlockHeader, contentId: ContentID): Option[T] =
let contentFromDB = db.get(contentId)
@ -253,6 +254,16 @@ proc get(db: ContentDB, T: type seq[Receipt], contentId: ContentID): Option[T] =
else:
none(T)
proc get(
db: ContentDB, T: type EpochAccumulator, contentId: ContentID): Option[T] =
db.getSszDecoded(contentId, T)
proc getAccumulator(db: ContentDB): Option[Accumulator] =
db.getPermanentSszDecoded(subkey(kLatestAccumulator), Accumulator)
proc putAccumulator*(db: ContentDB, value: openArray[byte]) =
db.putPermanent(subkey(kLatestAccumulator), value)
proc getContentFromDb(
n: HistoryNetwork, T: type, contentId: ContentId): Option[T] =
if n.portalProtocol.inRange(contentId):
@ -260,6 +271,21 @@ proc getContentFromDb(
else:
none(T)
proc dbGetHandler(db: ContentDB, contentKey: ByteList):
(Option[ContentId], Option[seq[byte]]) {.raises: [Defect], gcsafe.} =
let keyOpt = decode(contentKey)
if keyOpt.isNone():
return (none(ContentId), none(seq[byte]))
let key = keyOpt.get()
case key.contentType:
of masterAccumulator:
(none(ContentId), db.getPermanent(subkey(kLatestAccumulator)))
else:
let contentId = key.toContentId()
(some(contentId), db.get(contentId))
## Public API to get the history network specific types, either from database
## or through a lookup on the Portal Network
@ -427,21 +453,146 @@ proc getReceipts*(
return none(seq[Receipt])
func validateEpochAccumulator(bytes: openArray[byte]): bool =
# For now just validate by checking if de-serialization works
try:
discard SSZ.decode(bytes, EpochAccumulator)
true
except SszError:
false
proc getEpochAccumulator(
n: HistoryNetwork, epochHash: Digest):
Future[Option[EpochAccumulator]] {.async.} =
let
contentKey = ContentKey(
contentType: epochAccumulator,
epochAccumulatorKey: EpochAccumulatorKey(epochHash: epochHash))
func validateMasterAccumulator(bytes: openArray[byte]): bool =
# For now just validate by checking if de-serialization works
try:
discard SSZ.decode(bytes, Accumulator)
true
except SszError:
false
keyEncoded = encode(contentKey)
contentId = toContentId(keyEncoded)
accumulatorFromDb = n.getContentFromDb(EpochAccumulator, contentId)
if accumulatorFromDb.isSome():
info "Fetched epoch accumulator from database", epochHash
return accumulatorFromDb
for i in 0..<requestRetries:
let contentLookup =
await n.portalProtocol.contentLookup(keyEncoded, contentId)
if contentLookup.isNone():
warn "Failed fetching epoch accumulator from the network", epochHash
return none(EpochAccumulator)
let accumulatorContent = contentLookup.unsafeGet()
let epochAccumulator =
try:
SSZ.decode(accumulatorContent.content, EpochAccumulator)
except SszError:
continue
# return none(EpochAccumulator)
let hash = hash_tree_root(epochAccumulator)
if hash == epochHash:
info "Fetched epoch accumulator from the network", epochHash
n.portalProtocol.triggerPoke(
accumulatorContent.nodesInterestedInContent,
keyEncoded,
accumulatorContent.content
)
n.portalProtocol.storeContent(contentId, accumulatorContent.content)
return some(epochAccumulator)
else:
warn "Validation of epoch accumulator failed",
hash, expectedHash = epochHash
return none(EpochAccumulator)
proc getInitialMasterAccumulator*(
n: HistoryNetwork):
Future[bool] {.async.} =
let
contentKey = ContentKey(
contentType: masterAccumulator,
masterAccumulatorKey: MasterAccumulatorKey(accumulaterKeyType: latest))
keyEncoded = encode(contentKey)
let nodes = await n.portalProtocol.queryRandom()
var hashes: CountTable[Accumulator]
for node in nodes:
# TODO: Could make concurrent
let foundContentRes = await n.portalProtocol.findContent(node, keyEncoded)
if foundContentRes.isOk():
let foundContent = foundContentRes.get()
if foundContent.kind == Content:
let masterAccumulator =
try:
SSZ.decode(foundContent.content, Accumulator)
except SszError:
continue
hashes.inc(masterAccumulator)
let (accumulator, count) = hashes.largest()
if count > 1: # Should be increased eventually
n.contentDB.putAccumulator(foundContent.content)
return true
# Could not find a common accumulator from all the queried nodes
return false
proc buildProof*(n: HistoryNetwork, header: BlockHeader):
Future[Result[seq[Digest], string]] {.async.} =
# Note: Temporarily needed proc until proofs are send over with headers.
let accumulatorOpt = n.contentDB.getAccumulator()
if accumulatorOpt.isNone():
return err("Master accumulator not found in database")
let
accumulator = accumulatorOpt.get()
epochIndex = getEpochIndex(header)
epochHash = Digest(data: accumulator.historicalEpochs[epochIndex])
epochAccumulatorOpt = await n.getEpochAccumulator(epochHash)
if epochAccumulatorOpt.isNone():
return err("Epoch accumulator not found")
let
epochAccumulator = epochAccumulatorOpt.get()
headerRecordIndex = getHeaderRecordIndex(header, epochIndex)
# TODO: Implement more generalized `get_generalized_index`
gIndex = GeneralizedIndex(epochSize*2*2 + (headerRecordIndex*2))
return epochAccumulator.build_proof(gIndex)
proc verifyCanonicalChain(
n: HistoryNetwork, header: BlockHeader):
Future[Result[void, string]] {.async.} =
when not canonicalVerify:
return ok()
let accumulatorOpt = n.contentDB.getAccumulator()
if accumulatorOpt.isNone():
# Should acquire a master accumulator first
return err("Cannot accept any data without a master accumulator")
let accumulator = accumulatorOpt.get()
# Note: It is a bit silly to build a proof, as we still need to request the
# epoch accumulators for it, and could just verify it with those. But the
# idea here is that eventually this gets changed so that the proof is send
# together with the header.
let proofOpt =
if header.inCurrentEpoch(accumulator):
none(seq[Digest])
else:
let proof = await n.buildProof(header)
if proof.isErr():
# Can't verify without master and epoch accumulators
return err("Cannot build proof: " & proof.error)
else:
some(proof.get())
return verifyHeader(accumulator, header, proofOpt)
proc validateContent(
n: HistoryNetwork, content: seq[byte], contentKey: ByteList):
@ -455,33 +606,97 @@ proc validateContent(
case key.contentType:
of blockHeader:
# TODO: Add validation based on accumulator data.
return validateBlockHeaderBytes(content, key.blockHeaderKey.blockHash).isOk()
let validateResult =
validateBlockHeaderBytes(content, key.blockHeaderKey.blockHash)
if validateResult.isErr():
warn "Invalid block header offered", error = validateResult.error
return false
let header = validateResult.get()
let verifyResult = await n.verifyCanonicalChain(header)
if verifyResult.isErr():
warn "Failed on check if header is part of canonical chain",
error = verifyResult.error
return false
else:
return true
of blockBody:
let headerOpt = await n.getBlockHeader(
key.blockBodyKey.chainId, key.blockBodyKey.blockHash)
if headerOpt.isSome():
let header = headerOpt.get()
return validateBlockBodyBytes(content, header.txRoot, header.ommersHash).isOk()
else:
# Can't find the header, no way to validate the block body
if headerOpt.isNone():
warn "Cannot find the header, no way to validate the block body"
return false
let header = headerOpt.get()
let validationResult =
validateBlockBodyBytes(content, header.txRoot, header.ommersHash)
if validationResult.isErr():
warn "Failed validating block body", error = validationResult.error
return false
let verifyResult = await n.verifyCanonicalChain(header)
if verifyResult.isErr():
warn "Failed on check if header is part of canonical chain",
error = verifyResult.error
return false
else:
return true
of receipts:
let headerOpt = await n.getBlockHeader(
key.receiptsKey.chainId, key.receiptsKey.blockHash)
if headerOpt.isSome():
let header = headerOpt.get()
return validateReceiptsBytes(content, header.receiptRoot).isOk()
else:
# Can't find the header, no way to validate the receipts
if headerOpt.isNone():
warn "Cannot find the header, no way to validate the receipts"
return false
let header = headerOpt.get()
let validationResult =
validateReceiptsBytes(content, header.receiptRoot)
if validationResult.isErr():
warn "Failed validating receipts", error = validationResult.error
return false
let verifyResult = await n.verifyCanonicalChain(header)
if verifyResult.isErr():
warn "Failed on check if header is part of canonical chain",
error = verifyResult.error
return false
else:
return true
of epochAccumulator:
# TODO: Add validation based on MasterAccumulator
return validateEpochAccumulator(content)
# Check first if epochHash is part of master accumulator
let masterAccumulator = n.contentDB.getAccumulator()
if masterAccumulator.isNone():
error "Cannot accept any data without a master accumulator"
return false
let epochHash = key.epochAccumulatorKey.epochHash
if not masterAccumulator.get().historicalEpochs.contains(epochHash.data):
warn "Offered epoch accumulator is not part of master accumulator"
return false
let epochAccumulator =
try:
SSZ.decode(content, EpochAccumulator)
except SszError:
warn "Failed decoding epoch accumulator"
return false
# Next check the hash tree root, as this is probably more expensive
let hash = hash_tree_root(epochAccumulator)
if hash != epochHash:
warn "Epoch accumulator has invalid root hash"
return false
else:
return true
of masterAccumulator:
return validateMasterAccumulator(content)
# Don't allow a master accumulator to be offered, we only request it.
warn "Node does not accept master accumulators through offer/accept"
return false
proc new*(
T: type HistoryNetwork,
@ -491,7 +706,7 @@ proc new*(
portalConfig: PortalProtocolConfig = defaultPortalProtocolConfig): T =
let portalProtocol = PortalProtocol.new(
baseProtocol, historyProtocolId, contentDB,
toContentIdHandler, bootstrapRecords,
toContentIdHandler, dbGetHandler, bootstrapRecords,
config = portalConfig)
return HistoryNetwork(portalProtocol: portalProtocol, contentDB: contentDB)
@ -514,9 +729,9 @@ proc processContentLoop(n: HistoryNetwork) {.async.} =
n.portalProtocol.storeContent(contentId, contentItem)
info "Received valid offered content", contentKey
info "Received offered content validated successfully", contentKey
else:
error "Received invalid offered content", contentKey
error "Received offered content failed validation", contentKey
# On one invalid piece of content we drop all and don't forward any of it
# TODO: Could also filter it out and still gossip the rest.
continue
@ -532,6 +747,24 @@ proc start*(n: HistoryNetwork) =
n.processContentLoop = processContentLoop(n)
proc initMasterAccumulator*(
n: HistoryNetwork,
accumulator: Option[Accumulator]) {.async.} =
if accumulator.isSome():
n.contentDB.putAccumulator(SSZ.encode(accumulator.get()))
info "Successfully retrieved master accumulator from local data"
else:
while true:
if await n.getInitialMasterAccumulator():
info "Successfully retrieved master accumulator from the network"
return
else:
warn "Could not retrieve initial master accumulator from the network"
when not canonicalVerify:
return
else:
await sleepAsync(2.seconds)
proc stop*(n: HistoryNetwork) =
n.portalProtocol.stop()

View File

@ -28,9 +28,17 @@ type StateNetwork* = ref object
func setStreamTransport*(n: StateNetwork, transport: UtpDiscv5Protocol) =
setTransport(n.portalProtocol.stream, transport)
proc toContentIdHandler(contentKey: ByteList): Option[ContentId] =
func toContentIdHandler(contentKey: ByteList): Option[ContentId] =
toContentId(contentKey)
proc dbGetHandler(db: ContentDB, contentKey: ByteList):
(Option[ContentId], Option[seq[byte]]) =
let contentIdOpt = contentKey.toContentId()
if contentIdOpt.isSome():
(contentIdOpt, db.get(contentIdOpt.get()))
else:
(contentIdOpt, none(seq[byte]))
proc getContent*(n: StateNetwork, key: ContentKey):
Future[Option[seq[byte]]] {.async.} =
let
@ -73,7 +81,7 @@ proc new*(
portalConfig: PortalProtocolConfig = defaultPortalProtocolConfig): T =
let portalProtocol = PortalProtocol.new(
baseProtocol, stateProtocolId, contentDB,
toContentIdHandler,
toContentIdHandler, dbGetHandler,
bootstrapRecords, stateDistanceCalculator,
config = portalConfig)

View File

@ -131,6 +131,10 @@ type
ToContentIdHandler* =
proc(contentKey: ByteList): Option[ContentId] {.raises: [Defect], gcsafe.}
DbGetHandler* =
proc(contentDB: ContentDB, contentKey: ByteList):
(Option[ContentId], Option[seq[byte]]) {.raises: [Defect], gcsafe.}
PortalProtocolId* = array[2, byte]
RadiusCache* = LRUCache[NodeId, UInt256]
@ -156,6 +160,7 @@ type
baseProtocol*: protocol.Protocol
contentDB*: ContentDB
toContentId*: ToContentIdHandler
dbGet*: DbGetHandler
radiusConfig: RadiusConfig
dataRadius*: UInt256
bootstrapRecords*: seq[Record]
@ -303,41 +308,38 @@ proc handleFindNodes(p: PortalProtocol, fn: FindNodesMessage): seq[byte] =
proc handleFindContent(
p: PortalProtocol, fc: FindContentMessage, srcId: NodeId): seq[byte] =
let contentIdOpt = p.toContentId(fc.contentKey)
if contentIdOpt.isSome():
const
contentOverhead = 1 + 1 # msg id + SSZ Union selector
maxPayloadSize = maxDiscv5PacketSize - talkRespOverhead - contentOverhead
enrOverhead = 4 # per added ENR, 4 bytes offset overhead
const
contentOverhead = 1 + 1 # msg id + SSZ Union selector
maxPayloadSize = maxDiscv5PacketSize - talkRespOverhead - contentOverhead
enrOverhead = 4 # per added ENR, 4 bytes offset overhead
let
contentId = contentIdOpt.get()
# TODO: Should we first do a simple check on ContentId versus Radius
# before accessing the database?
maybeContent = p.contentDB.get(contentId)
if maybeContent.isSome():
let content = maybeContent.get()
if content.len <= maxPayloadSize:
encodeMessage(ContentMessage(
contentMessageType: contentType, content: ByteList(content)))
else:
let connectionId = p.stream.addContentRequest(srcId, content)
encodeMessage(ContentMessage(
contentMessageType: connectionIdType, connectionId: connectionId))
let (contentIdOpt, contentOpt) = p.dbGet(p.contentDb, fc.contentKey)
if contentOpt.isSome():
let content = contentOpt.get()
if content.len <= maxPayloadSize:
encodeMessage(ContentMessage(
contentMessageType: contentType, content: ByteList(content)))
else:
# Don't have the content, send closest neighbours to content id.
let
closestNodes = p.routingTable.neighbours(
NodeId(contentId), seenOnly = true)
enrs = truncateEnrs(closestNodes, maxPayloadSize, enrOverhead)
portal_content_enrs_packed.observe(enrs.len().int64)
let connectionId = p.stream.addContentRequest(srcId, content)
encodeMessage(ContentMessage(contentMessageType: enrsType, enrs: enrs))
encodeMessage(ContentMessage(
contentMessageType: connectionIdType, connectionId: connectionId))
elif contentIdOpt.isSome():
# Don't have the content, send closest neighbours to content id.
let
closestNodes = p.routingTable.neighbours(
NodeId(contentIdOpt.get()), seenOnly = true)
enrs = truncateEnrs(closestNodes, maxPayloadSize, enrOverhead)
portal_content_enrs_packed.observe(enrs.len().int64)
encodeMessage(ContentMessage(contentMessageType: enrsType, enrs: enrs))
else:
# Return empty response when content key validation fails
# TODO: Better would be to return no message at all, needs changes on
# Return empty response when:
# a. content key validation fails
# b. it is a special case such as "latest accumulator"
# TODO: Better would be to return no message at all for a, needs changes on
# discv5 layer.
# TODO: Better would be to have a specific protocol message for b.
@[]
proc handleOffer(p: PortalProtocol, o: OfferMessage, srcId: NodeId): seq[byte] =
@ -441,6 +443,7 @@ proc new*(T: type PortalProtocol,
protocolId: PortalProtocolId,
contentDB: ContentDB,
toContentId: ToContentIdHandler,
dbGet: DbGetHandler,
bootstrapRecords: openArray[Record] = [],
distanceCalculator: DistanceCalculator = XorDistanceCalculator,
config: PortalProtocolConfig = defaultPortalProtocolConfig
@ -457,6 +460,7 @@ proc new*(T: type PortalProtocol,
baseProtocol: baseProtocol,
contentDB: contentDB,
toContentId: toContentId,
dbGet: dbGet,
radiusConfig: config.radiusConfig,
dataRadius: initialRadius,
bootstrapRecords: @bootstrapRecords,

View File

@ -23,8 +23,8 @@ logScope:
const
utpProtocolId* = "utp".toBytes()
defaultConnectionTimeout = 5.seconds
defaultContentReadTimeout = 15.seconds
defaultConnectionTimeout = 15.seconds
defaultContentReadTimeout = 60.seconds
# TalkReq message is used as transport for uTP. It is assumed here that Portal
# protocol messages were exchanged before sending uTP over discv5 data. This

View File

@ -16,7 +16,7 @@ import
../nimbus/[chain_config, genesis],
"."/[content_db, seed_db],
./network/wire/portal_protocol,
./network/history/history_content
./network/history/[history_content, accumulator]
export results, tables
@ -35,17 +35,28 @@ type
# Fix in nim-json-serialization or should I overload something here?
number*: int
AccumulatorData = object
accumulatorHash: string
maxBlockNumber: int
accumulator: string
AccumulatorObject = object
accumulator: AccumulatorData
EpochAccumulatorObject = object
epochAccumulator: string
BlockDataTable* = Table[string, BlockData]
proc readBlockDataTable*(dataFile: string): Result[BlockDataTable, string] =
let blockData = readAllFile(dataFile)
if blockData.isErr(): # TODO: map errors
proc readJsonType*(dataFile: string, T: type): Result[T, string] =
let data = readAllFile(dataFile)
if data.isErr(): # TODO: map errors
return err("Failed reading data-file")
let decoded =
try:
Json.decode(blockData.get(), BlockDataTable)
except CatchableError as e:
Json.decode(data.get(), T)
except SerializationError as e:
return err("Failed decoding json data-file: " & e.msg)
ok(decoded)
@ -152,10 +163,70 @@ proc getGenesisHeader*(id: NetworkId = MainNet): BlockHeader =
except RlpError:
raise (ref Defect)(msg: "Genesis should be valid")
proc buildAccumulator*(dataFile: string): Result[Accumulator, string] =
let blockData = ? readJsonType(dataFile, BlockDataTable)
var headers: seq[BlockHeader]
# Len of headers from blockdata + genesis header
headers.setLen(blockData.len() + 1)
headers[0] = getGenesisHeader()
for k, v in blockData.pairs:
let header = ? v.readBlockHeader()
headers[header.blockNumber.truncate(int)] = header
ok(buildAccumulator(headers))
proc buildAccumulatorData*(
dataFile: string):
Result[seq[(ContentKey, EpochAccumulator)], string] =
let blockData = ? readJsonType(dataFile, BlockDataTable)
var headers: seq[BlockHeader]
# Len of headers from blockdata + genesis header
headers.setLen(blockData.len() + 1)
headers[0] = getGenesisHeader()
for k, v in blockData.pairs:
let header = ? v.readBlockHeader()
headers[header.blockNumber.truncate(int)] = header
ok(buildAccumulatorData(headers))
proc readAccumulator*(dataFile: string): Result[Accumulator, string] =
let res = ? readJsonType(dataFile, AccumulatorObject)
let encodedAccumulator =
try:
res.accumulator.accumulator.hexToSeqByte()
except ValueError as e:
return err("Invalid hex data for accumulator: " & e.msg)
try:
ok(SSZ.decode(encodedAccumulator, Accumulator))
except SszError as e:
err("Decoding accumulator failed: " & e.msg)
proc readEpochAccumulator*(dataFile: string): Result[EpochAccumulator, string] =
let res = ? readJsonType(dataFile, EpochAccumulatorObject)
let encodedAccumulator =
try:
res.epochAccumulator.hexToSeqByte()
except ValueError as e:
return err("Invalid hex data for accumulator: " & e.msg)
try:
ok(SSZ.decode(encodedAccumulator, EpochAccumulator))
except SszError as e:
err("Decoding epoch accumulator failed: " & e.msg)
proc historyStore*(
p: PortalProtocol, dataFile: string, verify = false):
Result[void, string] =
let blockData = ? readBlockDataTable(dataFile)
let blockData = ? readJsonType(dataFile, BlockDataTable)
for b in blocks(blockData, verify):
for value in b:
@ -164,6 +235,50 @@ proc historyStore*(
ok()
proc propagateAccumulatorData*(
p: PortalProtocol, dataFile: string):
Future[Result[void, string]] {.async.} =
## Propagate all epoch accumulators created when building the accumulator
## from the block headers.
## dataFile holds block data
let epochAccumulators = buildAccumulatorData(dataFile)
if epochAccumulators.isErr():
return err(epochAccumulators.error)
else:
for (key, epochAccumulator) in epochAccumulators.get():
let content = SSZ.encode(epochAccumulator)
p.storeContent(
history_content.toContentId(key), content)
await p.neighborhoodGossip(
ContentKeysList(@[encode(key)]), @[content])
return ok()
proc propagateEpochAccumulator*(
p: PortalProtocol, dataFile: string):
Future[Result[void, string]] {.async.} =
## Propagate a specific epoch accumulator into the network.
## dataFile holds the SSZ serialized epoch accumulator
let epochAccumulatorRes = readEpochAccumulator(dataFile)
if epochAccumulatorRes.isErr():
return err(epochAccumulatorRes.error)
else:
let
accumulator = epochAccumulatorRes.get()
rootHash = accumulator.hash_tree_root()
key = ContentKey(
contentType: epochAccumulator,
epochAccumulatorKey: EpochAccumulatorKey(
epochHash: rootHash))
p.storeContent(
history_content.toContentId(key), SSZ.encode(accumulator))
await p.neighborhoodGossip(
ContentKeysList(@[encode(key)]), @[SSZ.encode(accumulator)])
return ok()
proc historyPropagate*(
p: PortalProtocol, dataFile: string, verify = false):
Future[Result[void, string]] {.async.} =
@ -182,7 +297,7 @@ proc historyPropagate*(
for i in 0 ..< concurrentGossips:
gossipWorkers.add(gossipWorker(p))
let blockData = readBlockDataTable(dataFile)
let blockData = readJsonType(dataFile, BlockDataTable)
if blockData.isOk():
for b in blocks(blockData.get(), verify):
for value in b:
@ -205,7 +320,7 @@ proc historyPropagate*(
proc historyPropagateBlock*(
p: PortalProtocol, dataFile: string, blockHash: string, verify = false):
Future[Result[void, string]] {.async.} =
let blockDataTable = readBlockDataTable(dataFile)
let blockDataTable = readJsonType(dataFile, BlockDataTable)
if blockDataTable.isOk():
let b =

View File

@ -3,20 +3,16 @@ proc portal_history_store(contentKey: string, content: string): bool
proc portal_history_storeContent(dataFile: string): bool
proc portal_history_propagate(dataFile: string): bool
proc portal_history_propagateBlock(dataFile: string, blockHash: string): bool
proc portal_history_propagateAccumulatorData(
dataFile: string): bool
proc portal_history_propagateEpochAccumulator(
dataFile: string): bool
proc portal_history_storeContentInNodeRange(
dbPath: string,
max: uint32,
starting: uint32): bool
dbPath: string, max: uint32, starting: uint32): bool
proc portal_history_offerContentInNodeRange(
dbPath: string,
nodeId: NodeId,
max: uint32,
starting: uint32): bool
dbPath: string, nodeId: NodeId, max: uint32, starting: uint32): bool
proc portal_history_depthContentPropagate(
dbPath: string,
max: uint32): bool
dbPath: string, max: uint32): bool
proc portal_history_breadthContentPropagate(
dbPath: string): bool

View File

@ -56,11 +56,27 @@ proc installPortalDebugApiHandlers*(
else:
raise newException(ValueError, $res.error)
rpcServer.rpc("portal_" & network & "_propagateAccumulatorData") do(
dataFile: string) -> bool:
let res = await p.propagateAccumulatorData(dataFile)
if res.isOk():
return true
else:
raise newException(ValueError, $res.error)
rpcServer.rpc("portal_" & network & "_propagateEpochAccumulator") do(
dataFile: string) -> bool:
let res = await p.propagateEpochAccumulator(dataFile)
if res.isOk():
return true
else:
echo $res.error
raise newException(ValueError, $res.error)
rpcServer.rpc("portal_" & network & "_storeContentInNodeRange") do(
dbPath: string,
max: uint32,
starting: uint32) -> bool:
let storeResult = p.storeContentInNodeRange(dbPath, max, starting)
if storeResult.isOk():
@ -73,19 +89,18 @@ proc installPortalDebugApiHandlers*(
nodeId: NodeId,
max: uint32,
starting: uint32) -> bool:
# waiting for offer result, by the end of this call remote node should
# have received offered content
let offerResult = await p.offerContentInNodeRange(dbPath, nodeId, max, starting)
# waiting for offer result, by the end of this call remote node should
# have received offered content
let offerResult = await p.offerContentInNodeRange(dbPath, nodeId, max, starting)
if offerResult.isOk():
return true
else:
raise newException(ValueError, $offerResult.error)
if offerResult.isOk():
return true
else:
raise newException(ValueError, $offerResult.error)
rpcServer.rpc("portal_" & network & "_depthContentPropagate") do(
dbPath: string,
max: uint32) -> bool:
# TODO Consider making this call asynchronously without waiting for result
# as for big seed db size it could take a loot of time.
let propagateResult = await p.depthContentPropagate(dbPath, max)
@ -97,7 +112,6 @@ proc installPortalDebugApiHandlers*(
rpcServer.rpc("portal_" & network & "_breadthContentPropagate") do(
dbPath: string) -> bool:
# TODO Consider making this call asynchronously without waiting for result
# as for big seed db size it could take a loot of time.
let propagateResult = await p.breadthContentPropagate(dbPath)

View File

@ -233,6 +233,10 @@ dump_logs() {
BOOTSTRAP_NODE=0
BOOTSTRAP_TIMEOUT=5 # in seconds
BOOTSTRAP_ENR_FILE="${DATA_DIR}/node${BOOTSTRAP_NODE}/fluffy_node.enr"
# Amount of nodes in the testnet that will build their master accumulator
# locally from a block data file.
# TODO: Currently not enabled
LOCAL_ACCUMULATOR_NODES=$((NUM_NODES / 4))
for NUM_NODE in $(seq 0 $(( NUM_NODES - 1 ))); do
NODE_DATA_DIR="${DATA_DIR}/node${NUM_NODE}"
@ -242,6 +246,10 @@ done
echo "Starting ${NUM_NODES} nodes."
for NUM_NODE in $(seq 0 $(( NUM_NODES - 1 ))); do
# Reset arguments
BOOTSTRAP_ARG=""
ACCUMULATOR_ARG=""
NODE_DATA_DIR="${DATA_DIR}/node${NUM_NODE}"
if [[ ${NUM_NODE} != ${BOOTSTRAP_NODE} ]]; then
@ -263,6 +271,10 @@ for NUM_NODE in $(seq 0 $(( NUM_NODES - 1 ))); do
done
fi
# if [[ ${NUM_NODE} -lt ${LOCAL_ACCUMULATOR_NODES} ]]; then
# ACCUMULATOR_ARG="--accumulator-file=./fluffy/scripts/eth-accumulator.json"
# fi
# Running with bits-per-hop of 1 to make the lookups more likely requiring
# to request to nodes over the network instead of having most of them in the
# own routing table.
@ -283,6 +295,7 @@ for NUM_NODE in $(seq 0 $(( NUM_NODES - 1 ))); do
--bucket-ip-limit=24 \
--bits-per-hop=1 \
${RADIUS_ARG} \
${ACCUMULATOR_ARG} \
${EXTRA_ARGS} \
> "${DATA_DIR}/log${NUM_NODE}.txt" 2>&1 &

View File

@ -232,13 +232,18 @@ procSuite "Portal testnet tests":
await client.close()
nodeInfos.add(nodeInfo)
const dataFile = "./fluffy/tests/blocks/mainnet_blocks_selected.json"
# const dataFileEpoch = "./fluffy/scripts/eth-epoch-accumulator.json"
# check (await clients[0].portal_history_propagateEpochAccumulator(dataFileEpoch))
# await clients[0].close()
# await sleepAsync(60.seconds)
const dataFile = "./fluffy/tests/blocks/mainnet_blocks_1000001_1000010.json"
# This will fill the first node its db with blocks from the data file. Next,
# this node wil offer all these blocks their headers one by one.
check (await clients[0].portal_history_propagate(dataFile))
await clients[0].close()
let blockData = readBlockDataTable(dataFile)
let blockData = readJsonType(dataFile, BlockDataTable)
check blockData.isOk()
for i, client in clients:
@ -311,15 +316,15 @@ procSuite "Portal testnet tests":
await client.close()
nodeInfos.add(nodeInfo)
const dataPath = "./fluffy/tests/blocks/mainnet_blocks_1000000_1000020.json"
const dataPath = "./fluffy/tests/blocks/mainnet_blocks_1000011_1000030.json"
# path for temporary db, separate dir is used as sqlite usually also creates
# wal files, and we do not want for those to linger in filesystem
const tempDbPath = "./fluffy/tests/blocks/tempDir/mainnet_blocks_1000000_1000020.sqlite3"
const tempDbPath = "./fluffy/tests/blocks/tempDir/mainnet_blocks_1000011_1000030.sqlite3"
let (dbFile, dbName) = getDbBasePathAndName(tempDbPath).unsafeGet()
let blockData = readBlockDataTable(dataPath)
let blockData = readJsonType(dataPath, BlockDataTable)
check blockData.isOk()
let bd = blockData.get()
@ -408,7 +413,7 @@ procSuite "Portal testnet tests":
let (dbFile, dbName) = getDbBasePathAndName(tempDbPath).unsafeGet()
let blockData = readBlockDataTable(dataPath)
let blockData = readJsonType(dataPath, BlockDataTable)
check blockData.isOk()
let bd = blockData.get()

File diff suppressed because one or more lines are too long

View File

@ -13,7 +13,7 @@ import
unittest2, stint, stew/byteutils,
eth/common/eth_types,
../populate_db,
../network/history/accumulator
../network/history/[accumulator, accumulator_db]
suite "Header Accumulator":
test "Header Accumulator Update":
@ -25,7 +25,7 @@ suite "Header Accumulator":
dataFile = "./fluffy/tests/blocks/mainnet_blocks_1-2.json"
let blockDataRes = readBlockDataTable(dataFile)
let blockDataRes = readJsonType(dataFile, BlockDataTable)
check blockDataRes.isOk()
let blockData = blockDataRes.get()

View File

@ -24,7 +24,7 @@ const
"0xce8f770a56203e10afe19c7dd7e2deafc356e6cce0a560a30a85add03da56137"
suite "History Network Content Validation":
let blockDataTable = readBlockDataTable(dataFile).expect(
let blockDataTable = readJsonType(dataFile, BlockDataTable).expect(
"Valid data file should parse")
let blockData =

View File

@ -25,6 +25,14 @@ proc toContentId(contentKey: ByteList): Option[ContentId] =
let idHash = sha256.digest(contentKey.asSeq())
some(readUintBE[256](idHash.data))
proc dbGetHandler(db: ContentDB, contentKey: ByteList):
(Option[ContentId], Option[seq[byte]]) =
let contentIdOpt = contentKey.toContentId()
if contentIdOpt.isSome():
(contentIdOpt, db.get(contentIdOpt.get()))
else:
(contentIdOpt, none(seq[byte]))
proc initPortalProtocol(
rng: ref HmacDrbgContext,
privKey: PrivateKey,
@ -34,7 +42,7 @@ proc initPortalProtocol(
d = initDiscoveryNode(rng, privKey, address, bootstrapRecords)
db = ContentDB.new("", uint32.high, inMemory = true)
proto = PortalProtocol.new(
d, protocolId, db, toContentId,
d, protocolId, db, toContentId, dbGetHandler,
bootstrapRecords = bootstrapRecords)
socketConfig = SocketConfig.init(
@ -338,7 +346,8 @@ procSuite "Portal Wire Protocol Tests":
dbLimit = 100_000'u32
db = ContentDB.new("", dbLimit, inMemory = true)
proto1 = PortalProtocol.new(node1, protocolId, db, toContentId)
proto1 = PortalProtocol.new(
node1, protocolId, db, toContentId, dbGetHandler)
let item = genByteSeq(10_000)
var distances: seq[UInt256] = @[]

View File

@ -46,8 +46,8 @@ import
eth/common/eth_types_json_serialization,
json_rpc/rpcclient,
../seed_db,
../../premix/downloader,
../network/history/history_content
../../premix/[downloader, parser],
../network/history/[history_content, accumulator]
# Need to be selective due to the `Block` type conflict from downloader
from ../network/history/history_network import encode
@ -64,9 +64,14 @@ proc defaultDataDir*(): string =
const
defaultDataDirDesc = defaultDataDir()
defaultFileName = "eth-history-data"
defaultBlockFileName = "eth-block-data.json"
defaultAccumulatorFileName = "eth-accumulator.json"
type
ExporterCmd* = enum
exportBlockData
exportAccumulatorData
StorageMode* = enum
Json, Db
@ -77,34 +82,57 @@ type
desc: "Sets the log level"
name: "log-level" .}: LogLevel
initialBlock* {.
desc: "Number of first block which should be downloaded"
desc: "Number of the first block which should be downloaded"
defaultValue: 0
name: "initial-block" .}: uint64
endBlock* {.
desc: "Number of last block which should be downloaded"
desc: "Number of the last block which should be downloaded"
defaultValue: 0
name: "end-block" .}: uint64
dataDir* {.
desc: "The directory where generated file will be placed"
desc: "The directory where generated data files will be exported to"
defaultValue: defaultDataDir()
defaultValueDesc: $defaultDataDirDesc
name: "data-dir" .}: OutDir
filename* {.
desc: "File name (minus extension) where history data will be exported to"
defaultValue: defaultFileName
defaultValueDesc: $defaultFileName
name: "filename" .}: string
storageMode* {.
desc: "Storage mode of data export"
defaultValue: Json
name: "storage-mode" .}: StorageMode
case cmd* {.
command
defaultValue: exportBlockData .}: ExporterCmd
of exportBlockData:
fileName* {.
desc: "File name (minus extension) where block data will be exported to"
defaultValue: defaultBlockFileName
defaultValueDesc: $defaultBlockFileName
name: "file-name" .}: string
storageMode* {.
desc: "Storage mode of block data export"
defaultValue: Json
name: "storage-mode" .}: StorageMode
headersOnly* {.
desc: "Only export the headers instead of full blocks and receipts"
defaultValue: false
name: "headers-only" .}: bool
of exportAccumulatorData:
accumulatorFileName* {.
desc: "File to which the serialized accumulator data is written"
defaultValue: defaultAccumulatorFileName
defaultValueDesc: $defaultAccumulatorFileName
name: "accumulator-file-name" .}: string
DataRecord = object
HeaderRecord = object
header: string
number: uint64
BlockRecord = object
header: string
body: string
receipts: string
number: uint64
AccumulatorRecord = object
accumulatorHash: string
maxBlockNumber: uint64
accumulator: string
proc parseCmdArg*(T: type StorageMode, p: TaintedString): T
{.raises: [Defect, ConfigurationError].} =
if p == "db":
@ -118,10 +146,23 @@ proc parseCmdArg*(T: type StorageMode, p: TaintedString): T
proc completeCmdArg*(T: type StorageMode, val: TaintedString): seq[string] =
return @[]
proc writeBlock(writer: var JsonWriter, blck: Block)
proc writeHeaderRecord(
writer: var JsonWriter, header: BlockHeader)
{.raises: [IOError, Defect].} =
let
dataRecord = DataRecord(
dataRecord = HeaderRecord(
header: rlp.encode(header).to0xHex(),
number: header.blockNumber.truncate(uint64))
headerHash = to0xHex(rlpHash(header).data)
writer.writeField(headerHash, dataRecord)
proc writeBlockRecord(
writer: var JsonWriter, blck: Block)
{.raises: [IOError, Defect].} =
let
dataRecord = BlockRecord(
header: rlp.encode(blck.header).to0xHex(),
body: encode(blck.body).to0xHex(),
receipts: encode(blck.receipts).to0xHex(),
@ -131,6 +172,36 @@ proc writeBlock(writer: var JsonWriter, blck: Block)
writer.writeField(headerHash, dataRecord)
proc writeAccumulatorRecord(
writer: var JsonWriter, accumulator: Accumulator)
{.raises: [IOError, Defect].} =
let
maxBlockNumber =
accumulator.historicalEpochs.len() * epochSize +
accumulator.currentEpoch.len()
accumulatorHash = hash_tree_root(accumulator).data.to0xHex()
accumulatorRecord = AccumulatorRecord(
accumulatorHash: accumulatorHash,
maxBlockNumber: uint64(maxBlockNumber),
accumulator: SSZ.encode(accumulator).to0xHex())
writer.writeField("accumulator", accumulatorRecord)
proc writeEpochAccumulatorRecord(
writer: var JsonWriter, accumulator: EpochAccumulator)
{.raises: [IOError, Defect].} =
writer.writeField("epochAccumulator", SSZ.encode(accumulator).to0xHex())
proc downloadHeader(client: RpcClient, i: uint64): BlockHeader =
let blockNumber = u256(i)
try:
let jsonHeader = requestHeader(blockNumber, some(client))
parseBlockHeader(jsonHeader)
except CatchableError as e:
fatal "Error while requesting BlockHeader", error = e.msg, number = i
quit 1
proc downloadBlock(i: uint64, client: RpcClient): Block =
let num = u256(i)
try:
@ -139,59 +210,81 @@ proc downloadBlock(i: uint64, client: RpcClient): Block =
fatal "Error while requesting Block", error = e.msg, number = i
quit 1
proc createAndOpenFile(config: ExporterConf): OutputStreamHandle =
# Creates directory and file specified in config, if file already exists
proc createAndOpenFile(dataDir: string, fileName: string): OutputStreamHandle =
# Creates directory and file, if file already exists
# program is aborted with info to user, to avoid losing data
let fileName: string =
if not config.filename.endsWith(".json"):
config.filename & ".json"
if not filename.endsWith(".json"):
filename & ".json"
else:
config.filename
filename
let filePath = config.dataDir / fileName
let filePath = dataDir / fileName
if isFile(filePath):
fatal "File under provided path already exists and would be overwritten",
path = filePath
quit 1
let res = createPath(distinctBase(config.dataDir))
let res = createPath(dataDir)
if res.isErr():
fatal "Error occurred while creating directory", error = res.error
fatal "Error occurred while creating directory",
error = ioErrorMsg(res.error)
quit 1
try:
# this means that each time file be overwritten, but it is ok for such one
# off toll
return fileOutput(filePath)
except IOError as e:
fatal "Error occurred while opening the file", error = e.msg
quit 1
proc writeToJson(config: ExporterConf, client: RpcClient) =
let fh = createAndOpenFile(config)
proc writeHeadersToJson(config: ExporterConf, client: RpcClient) =
let fh = createAndOpenFile(string config.dataDir, string config.fileName)
try:
var writer = JsonWriter[DefaultFlavor].init(fh.s, pretty = true)
writer.beginRecord()
for i in config.initialBlock..config.endBlock:
let blck = client.downloadHeader(i)
writer.writeHeaderRecord(blck)
if ((i - config.initialBlock) mod 8192) == 0 and i != config.initialBlock:
info "Downloaded 8192 new block headers", currentHeader = i
writer.endRecord()
info "File successfully written", path = config.dataDir / config.fileName
except IOError as e:
fatal "Error occured while writing to file", error = e.msg
quit 1
finally:
try:
fh.close()
except IOError as e:
fatal "Error occured while closing file", error = e.msg
quit 1
proc writeBlocksToJson(config: ExporterConf, client: RpcClient) =
let fh = createAndOpenFile(string config.dataDir, string config.fileName)
try:
var writer = JsonWriter[DefaultFlavor].init(fh.s, pretty = true)
writer.beginRecord()
for i in config.initialBlock..config.endBlock:
let blck = downloadBlock(i, client)
writer.writeBlock(blck)
writer.writeBlockRecord(blck)
if ((i - config.initialBlock) mod 8192) == 0 and i != config.initialBlock:
info "Downloaded 8192 new blocks", currentBlock = i
writer.endRecord()
info "File successfully written"
info "File successfully written", path = config.dataDir / config.fileName
except IOError as e:
fatal "Error occoured while writing to file", error = e.msg
fatal "Error occured while writing to file", error = e.msg
quit 1
finally:
try:
fh.close()
except IOError as e:
fatal "Error occoured while closing file", error = e.msg
fatal "Error occured while closing file", error = e.msg
quit 1
proc writeToDb(config: ExporterConf, client: RpcClient) =
proc writeBlocksToDb(config: ExporterConf, client: RpcClient) =
let db = SeedDb.new(distinctBase(config.dataDir), config.filename)
defer:
@ -222,39 +315,110 @@ proc writeToDb(config: ExporterConf, client: RpcClient) =
info "Data successfuly written to db"
proc run(config: ExporterConf, client: RpcClient) =
proc writeAccumulatorToJson(
dataDir: string, fileName: string, accumulator: Accumulator) =
let fh = createAndOpenFile(dataDir, fileName)
try:
var writer = JsonWriter[DefaultFlavor].init(fh.s, pretty = true)
writer.beginRecord()
writer.writeAccumulatorRecord(accumulator)
writer.endRecord()
info "File successfully written", path = dataDir / fileName
except IOError as e:
fatal "Error occured while writing to file", error = e.msg
quit 1
finally:
try:
fh.close()
except IOError as e:
fatal "Error occured while closing file", error = e.msg
quit 1
proc writeEpochAccumulatorToJson(
dataDir: string, fileName: string, accumulator: EpochAccumulator) =
let fh = createAndOpenFile(dataDir, fileName)
try:
var writer = JsonWriter[DefaultFlavor].init(fh.s, pretty = true)
writer.beginRecord()
writer.writeEpochAccumulatorRecord(accumulator)
writer.endRecord()
info "File successfully written", path = dataDir / fileName
except IOError as e:
fatal "Error occured while writing to file", error = e.msg
quit 1
finally:
try:
fh.close()
except IOError as e:
fatal "Error occured while closing file", error = e.msg
quit 1
proc exportBlocks(config: ExporterConf, client: RpcClient) =
case config.storageMode
of Json:
writeToJson(config, client)
if config.headersOnly:
writeHeadersToJson(config, client)
else:
writeBlocksToJson(config, client)
of Db:
writeToDb(config, client)
if config.headersOnly:
fatal "Db mode not available for headers only"
quit 1
else:
writeBlocksToDb(config, client)
when isMainModule:
{.pop.}
let config = ExporterConf.load()
{.push raises: [Defect].}
setLogLevel(config.logLevel)
if (config.endBlock < config.initialBlock):
fatal "Initial block number should be smaller than end block number",
initialBlock = config.initialBlock,
endBlock = config.endBlock
quit 1
setLogLevel(config.logLevel)
var client: RpcClient
try:
let c = newRpcWebSocketClient()
# TODO Currently hardcoded to default geth ws address, at some point it may
# be moved to config
# TODO: Hardcoded to the default geth ws address. This should become
# a configurable cli option
waitFor c.connect("ws://127.0.0.1:8546")
client = c
except CatchableError as e:
fatal "Error while connecting to data provider", error = e.msg
quit 1
try:
run(config, client)
finally:
case config.cmd
of ExporterCmd.exportBlockData:
try:
exportBlocks(config, client)
finally:
waitFor client.close()
of ExporterCmd.exportAccumulatorData:
var headers: seq[BlockHeader]
for i in config.initialBlock..config.endBlock:
let header = client.downloadHeader(i)
headers.add(header)
if ((i - config.initialBlock) mod 8192) == 0 and i != config.initialBlock:
info "Downloaded 8192 new block headers", currentBlock = i
waitFor client.close()
info "Building the accumulator"
let accumulator = buildAccumulator(headers)
writeAccumulatorToJson(
string config.dataDir, string config.accumulatorFileName, accumulator)
let epochAccumulators = buildAccumulatorData(headers)
for i, epochAccumulator in epochAccumulators:
writeEpochAccumulatorToJson(
string config.dataDir, "eth-epoch-accumulator_" & $i & ".json",
epochAccumulator[1])

View File

@ -190,7 +190,7 @@ proc discover(d: discv5_protocol.Protocol) {.async.} =
info "Lookup finished", nodes = discovered.len
await sleepAsync(30.seconds)
proc testHandler(contentKey: ByteList): Option[ContentId] =
proc testContentIdHandler(contentKey: ByteList): Option[ContentId] =
# Note: Returning a static content id here, as in practice this depends
# on the content key to content id derivation, which is different for the
# different content networks. And we want these tests to be independent from
@ -198,6 +198,14 @@ proc testHandler(contentKey: ByteList): Option[ContentId] =
let idHash = sha256.digest("test")
some(readUintBE[256](idHash.data))
proc dbGetHandler(db: ContentDB, contentKey: ByteList):
(Option[ContentId], Option[seq[byte]]) =
let contentIdOpt = contentKey.toContentId()
if contentIdOpt.isSome():
(contentIdOpt, db.get(contentIdOpt.get()))
else:
(contentIdOpt, none(seq[byte]))
proc run(config: PortalCliConf) =
let
rng = newRng()
@ -224,7 +232,7 @@ proc run(config: PortalCliConf) =
let
db = ContentDB.new("", config.storageSize, inMemory = true)
portal = PortalProtocol.new(d, config.protocolId, db,
testHandler,
testContentIdHandler, dbGetHandler,
bootstrapRecords = bootstrapRecords)
socketConfig = SocketConfig.init(
incomingSocketReceiveTimeout = none(Duration))

View File

@ -80,7 +80,7 @@ task utp_test, "Run uTP integration tests":
test "fluffy/tools/utp_testing", "utp_test", "-d:chronicles_log_level=ERROR -d:chronosStrictException"
task test_portal_testnet, "Build test_portal_testnet":
buildBinary "test_portal_testnet", "fluffy/scripts/", "-d:chronicles_log_level=DEBUG -d:chronosStrictException -d:unittest2DisableParamFiltering"
buildBinary "test_portal_testnet", "fluffy/scripts/", "-d:chronicles_log_level=DEBUG -d:chronosStrictException -d:unittest2DisableParamFiltering -d:PREFER_BLST_SHA256=false"
task testfluffy, "Run fluffy tests":
# Need the nimbus_db_backend in state network tests as we need a Hexary to