Add validation for offered block bodies and receipts (#1158)

Also use a AsyncQueue now to handle offered data instead of the
network specific validation callbacks.
This commit is contained in:
Kim De Mey 2022-07-11 16:29:16 +02:00 committed by GitHub
parent 87ea6ffb92
commit 5abf05e455
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 139 additions and 113 deletions

View File

@ -27,6 +27,7 @@ type
HistoryNetwork* = ref object
portalProtocol*: PortalProtocol
contentDB*: ContentDB
processContentLoop: Future[void]
Block* = (BlockHeader, BlockBody)
@ -253,9 +254,9 @@ proc get(db: ContentDB, T: type seq[Receipt], contentId: ContentID): Option[T] =
none(T)
proc getContentFromDb(
h: HistoryNetwork, T: type, contentId: ContentId): Option[T] =
if h.portalProtocol.inRange(contentId):
h.contentDB.get(T, contentId)
n: HistoryNetwork, T: type, contentId: ContentId): Option[T] =
if n.portalProtocol.inRange(contentId):
n.contentDB.get(T, contentId)
else:
none(T)
@ -272,19 +273,19 @@ const requestRetries = 4
# however that response is not yet validated at that moment.
proc getBlockHeader*(
h: HistoryNetwork, chainId: uint16, hash: BlockHash):
n: HistoryNetwork, chainId: uint16, hash: BlockHash):
Future[Option[BlockHeader]] {.async.} =
let (keyEncoded, contentId) =
getEncodedKeyForContent(blockHeader, chainId, hash)
let headerFromDb = h.getContentFromDb(BlockHeader, contentId)
let headerFromDb = n.getContentFromDb(BlockHeader, contentId)
if headerFromDb.isSome():
info "Fetched block header from database", hash
return headerFromDb
for i in 0..<requestRetries:
let headerContentLookup =
await h.portalProtocol.contentLookup(keyEncoded, contentId)
await n.portalProtocol.contentLookup(keyEncoded, contentId)
if headerContentLookup.isNone():
warn "Failed fetching block header from the network", hash
return none(BlockHeader)
@ -295,13 +296,13 @@ proc getBlockHeader*(
if res.isOk():
info "Fetched block header from the network", hash
# Content is valid we can propagate it to interested peers
h.portalProtocol.triggerPoke(
n.portalProtocol.triggerPoke(
headerContent.nodesInterestedInContent,
keyEncoded,
headerContent.content
)
h.portalProtocol.storeContent(contentId, headerContent.content)
n.portalProtocol.storeContent(contentId, headerContent.content)
return some(res.get())
else:
@ -311,11 +312,11 @@ proc getBlockHeader*(
return none(BlockHeader)
proc getBlockBody*(
h: HistoryNetwork, chainId: uint16, hash: BlockHash, header: BlockHeader):
n: HistoryNetwork, chainId: uint16, hash: BlockHash, header: BlockHeader):
Future[Option[BlockBody]] {.async.} =
let
(keyEncoded, contentId) = getEncodedKeyForContent(blockBody, chainId, hash)
bodyFromDb = h.getContentFromDb(BlockBody, contentId)
bodyFromDb = n.getContentFromDb(BlockBody, contentId)
if bodyFromDb.isSome():
info "Fetched block body from database", hash
@ -323,7 +324,7 @@ proc getBlockBody*(
for i in 0..<requestRetries:
let bodyContentLookup =
await h.portalProtocol.contentLookup(keyEncoded, contentId)
await n.portalProtocol.contentLookup(keyEncoded, contentId)
if bodyContentLookup.isNone():
warn "Failed fetching block body from the network", hash
@ -335,13 +336,13 @@ proc getBlockBody*(
info "Fetched block body from the network", hash
# body is valid, propagate it to interested peers
h.portalProtocol.triggerPoke(
n.portalProtocol.triggerPoke(
bodyContent.nodesInterestedInContent,
keyEncoded,
bodyContent.content
)
h.portalProtocol.storeContent(contentId, bodyContent.content)
n.portalProtocol.storeContent(contentId, bodyContent.content)
return some(res.get())
else:
@ -350,16 +351,16 @@ proc getBlockBody*(
return none(BlockBody)
proc getBlock*(
h: HistoryNetwork, chainId: uint16, hash: BlockHash):
n: HistoryNetwork, chainId: uint16, hash: BlockHash):
Future[Option[Block]] {.async.} =
let headerOpt = await h.getBlockHeader(chainId, hash)
let headerOpt = await n.getBlockHeader(chainId, hash)
if headerOpt.isNone():
# Cannot validate block without header.
return none(Block)
let header = headerOpt.unsafeGet()
let bodyOpt = await h.getBlockBody(chainId, hash, header)
let bodyOpt = await n.getBlockBody(chainId, hash, header)
if bodyOpt.isNone():
return none(Block)
@ -369,7 +370,7 @@ proc getBlock*(
return some((header, body))
proc getReceipts*(
h: HistoryNetwork,
n: HistoryNetwork,
chainId: uint16,
hash: BlockHash,
header: BlockHeader): Future[Option[seq[Receipt]]] {.async.} =
@ -379,7 +380,7 @@ proc getReceipts*(
let (keyEncoded, contentId) = getEncodedKeyForContent(receipts, chainId, hash)
let receiptsFromDb = h.getContentFromDb(seq[Receipt], contentId)
let receiptsFromDb = n.getContentFromDb(seq[Receipt], contentId)
if receiptsFromDb.isSome():
info "Fetched receipts from database", hash
@ -387,7 +388,7 @@ proc getReceipts*(
for i in 0..<requestRetries:
let receiptsContentLookup =
await h.portalProtocol.contentLookup(keyEncoded, contentId)
await n.portalProtocol.contentLookup(keyEncoded, contentId)
if receiptsContentLookup.isNone():
warn "Failed fetching receipts from the network", hash
return none(seq[Receipt])
@ -401,13 +402,13 @@ proc getReceipts*(
let receipts = res.get()
# receipts are valid, propagate it to interested peers
h.portalProtocol.triggerPoke(
n.portalProtocol.triggerPoke(
receiptsContent.nodesInterestedInContent,
keyEncoded,
receiptsContent.content
)
h.portalProtocol.storeContent(contentId, receiptsContent.content)
n.portalProtocol.storeContent(contentId, receiptsContent.content)
return some(res.get())
else:
@ -431,7 +432,9 @@ func validateMasterAccumulator(bytes: openArray[byte]): bool =
except SszError:
false
proc validateContent(content: openArray[byte], contentKey: ByteList): bool =
proc validateContent(
n: HistoryNetwork, content: seq[byte], contentKey: ByteList):
Future[bool] {.async.} =
let keyOpt = contentKey.decode()
if keyOpt.isNone():
@ -441,17 +444,33 @@ proc validateContent(content: openArray[byte], contentKey: ByteList): bool =
case key.contentType:
of blockHeader:
validateBlockHeaderBytes(content, key.blockHeaderKey.blockHash).isOk()
# TODO: Add validation based on accumulator data.
return validateBlockHeaderBytes(content, key.blockHeaderKey.blockHash).isOk()
of blockBody:
true
# TODO: Need to get the header from the db or the network for this. Or how
# to deal with this?
let headerOpt = await n.getBlockHeader(
key.blockBodyKey.chainId, key.blockBodyKey.blockHash)
if headerOpt.isSome():
let header = headerOpt.get()
return validateBlockBodyBytes(content, header.txRoot, header.ommersHash).isOk()
else:
# Can't find the header, no way to validate the block body
return false
of receipts:
true
let headerOpt = await n.getBlockHeader(
key.receiptsKey.chainId, key.receiptsKey.blockHash)
if headerOpt.isSome():
let header = headerOpt.get()
return validateReceiptsBytes(content, header.receiptRoot).isOk()
else:
# Can't find the header, no way to validate the receipts
return false
of epochAccumulator:
validateEpochAccumulator(content)
# TODO: Add validation based on MasterAccumulator
return validateEpochAccumulator(content)
of masterAccumulator:
validateMasterAccumulator(content)
return validateMasterAccumulator(content)
proc new*(
T: type HistoryNetwork,
@ -461,15 +480,50 @@ proc new*(
portalConfig: PortalProtocolConfig = defaultPortalProtocolConfig): T =
let portalProtocol = PortalProtocol.new(
baseProtocol, historyProtocolId, contentDB,
toContentIdHandler, validateContent, bootstrapRecords,
toContentIdHandler, bootstrapRecords,
config = portalConfig)
return HistoryNetwork(portalProtocol: portalProtocol, contentDB: contentDB)
proc start*(p: HistoryNetwork) =
info "Starting Portal execution history network",
protocolId = p.portalProtocol.protocolId
p.portalProtocol.start()
proc processContentLoop(n: HistoryNetwork) {.async.} =
try:
while true:
let (contentKeys, contentItems) =
await n.portalProtocol.stream.contentQueue.popFirst()
proc stop*(p: HistoryNetwork) =
p.portalProtocol.stop()
# content passed here can have less items then contentKeys, but not more.
for i, contentItem in contentItems:
echo contentItem.len()
let contentKey = contentKeys[i]
if await n.validateContent(contentItem, contentKey):
let contentIdOpt = n.portalProtocol.toContentId(contentKey)
if contentIdOpt.isNone():
continue
let contentId = contentIdOpt.get()
n.portalProtocol.storeContent(contentId, contentItem)
info "Received valid offered content", contentKey
else:
error "Received invalid offered content", contentKey
# On one invalid piece of content we drop all and don't forward any of it
# TODO: Could also filter it out and still gossip the rest.
continue
asyncSpawn n.portalProtocol.neighborhoodGossip(contentKeys, contentItems)
except CancelledError:
trace "processContentLoop canceled"
proc start*(n: HistoryNetwork) =
info "Starting Portal execution history network",
protocolId = n.portalProtocol.protocolId
n.portalProtocol.start()
n.processContentLoop = processContentLoop(n)
proc stop*(n: HistoryNetwork) =
n.portalProtocol.stop()
if not n.processContentLoop.isNil:
n.processContentLoop.cancel()

View File

@ -23,6 +23,7 @@ const
type StateNetwork* = ref object
portalProtocol*: PortalProtocol
contentDB*: ContentDB
processContentLoop: Future[void]
func setStreamTransport*(n: StateNetwork, transport: UtpDiscv5Protocol) =
setTransport(n.portalProtocol.stream, transport)
@ -72,15 +73,29 @@ proc new*(
portalConfig: PortalProtocolConfig = defaultPortalProtocolConfig): T =
let portalProtocol = PortalProtocol.new(
baseProtocol, stateProtocolId, contentDB,
toContentIdHandler, validateContent,
bootstrapRecords, stateDistanceCalculator, config = portalConfig)
toContentIdHandler,
bootstrapRecords, stateDistanceCalculator,
config = portalConfig)
return StateNetwork(portalProtocol: portalProtocol, contentDB: contentDB)
proc processContentLoop(n: StateNetwork) {.async.} =
try:
while true:
# Just dropping state date for now
discard await n.portalProtocol.stream.contentQueue.popFirst()
except CancelledError:
trace "processContentLoop canceled"
proc start*(n: StateNetwork) =
info "Starting Portal execution state network",
protocolId = n.portalProtocol.protocolId
n.portalProtocol.start()
n.processContentLoop = processContentLoop(n)
proc stop*(n: StateNetwork) =
n.portalProtocol.stop()
if not n.processContentLoop.isNil:
n.processContentLoop.cancel()

View File

@ -131,10 +131,6 @@ type
ToContentIdHandler* =
proc(contentKey: ByteList): Option[ContentId] {.raises: [Defect], gcsafe.}
ContentValidationHandler* =
proc(content: openArray[byte], contentKey: ByteList):
bool {.raises: [Defect], gcsafe.}
PortalProtocolId* = array[2, byte]
RadiusCache* = LRUCache[NodeId, UInt256]
@ -160,7 +156,6 @@ type
baseProtocol*: protocol.Protocol
contentDB*: ContentDB
toContentId*: ToContentIdHandler
validateContent: ContentValidationHandler
radiusConfig: RadiusConfig
dataRadius*: UInt256
bootstrapRecords*: seq[Record]
@ -425,10 +420,6 @@ proc messageHandler(protocol: TalkProtocol, request: seq[byte],
debug "Packet decoding error", error = decoded.error, srcId, srcUdpAddress
@[]
proc processContent(
stream: PortalStream, contentKeys: ContentKeysList,
content: seq[seq[byte]]) {.gcsafe, raises: [Defect].}
proc fromLogRadius(T: type UInt256, logRadius: uint16): T =
# Get the max value of the logRadius range
pow((2).stuint(256), logRadius) - 1
@ -450,7 +441,6 @@ proc new*(T: type PortalProtocol,
protocolId: PortalProtocolId,
contentDB: ContentDB,
toContentId: ToContentIdHandler,
validateContent: ContentValidationHandler,
bootstrapRecords: openArray[Record] = [],
distanceCalculator: DistanceCalculator = XorDistanceCalculator,
config: PortalProtocolConfig = defaultPortalProtocolConfig
@ -467,7 +457,6 @@ proc new*(T: type PortalProtocol,
baseProtocol: baseProtocol,
contentDB: contentDB,
toContentId: toContentId,
validateContent: validateContent,
radiusConfig: config.radiusConfig,
dataRadius: initialRadius,
bootstrapRecords: @bootstrapRecords,
@ -477,8 +466,7 @@ proc new*(T: type PortalProtocol,
proto.baseProtocol.registerTalkProtocol(@(proto.protocolId), proto).expect(
"Only one protocol should have this id")
let stream = PortalStream.new(
processContent, udata = proto, rng = proto.baseProtocol.rng)
let stream = PortalStream.new(udata = proto, rng = proto.baseProtocol.rng)
proto.stream = stream
@ -1211,32 +1199,6 @@ proc storeContent*(p: PortalProtocol, key: ContentId, content: openArray[byte])
# so we will effectivly store fraction of the network
p.contentDB.put(key, content)
proc processContent(
stream: PortalStream, contentKeys: ContentKeysList,
content: seq[seq[byte]]) {.gcsafe, raises: [Defect].} =
let p = getUserData[PortalProtocol](stream)
# content passed here can have less items then contentKeys, but not more.
for i, contentItem in content:
let contentKey = contentKeys[i]
if p.validateContent(contentItem, contentKey):
let contentIdOpt = p.toContentId(contentKey)
if contentIdOpt.isNone():
return
let contentId = contentIdOpt.get()
p.storeContent(contentId, contentItem)
info "Received valid offered content", contentKey
else:
error "Received invalid offered content", contentKey
# On one invalid piece of content we drop all and don't forward any of it
# TODO: Could also filter it out and still gossip the rest.
return
asyncSpawn neighborhoodGossip(p, contentKeys, content)
proc seedTable*(p: PortalProtocol) =
## Seed the table with specifically provided Portal bootstrap nodes. These are
## nodes that must support the wire protocol for the specific content network.

View File

@ -55,10 +55,6 @@ type
contentKeys: ContentKeysList
timeout: Moment
ContentHandlerCallback* = proc(
stream: PortalStream, contentKeys: ContentKeysList,
content: seq[seq[byte]]) {.gcsafe, raises: [Defect].}
PortalStream* = ref object
transport: UtpDiscv5Protocol
# TODO:
@ -80,7 +76,7 @@ type
contentReadTimeout*: Duration
rng: ref HmacDrbgContext
udata: pointer
contentHandler: ContentHandlerCallback
contentQueue*: AsyncQueue[(ContentKeysList, seq[seq[byte]])]
proc pruneAllowedConnections(stream: PortalStream) =
# Prune requests and offers that didn't receive a connection request
@ -235,7 +231,6 @@ proc readContentOffer(
# security PoV), e.g. other options such as reading all content from socket at
# once, then processing the individual content items. Or reading and
# validating one per time.
let amount = offer.contentKeys.len()
var contentItems: seq[seq[byte]]
@ -268,12 +263,14 @@ proc readContentOffer(
# Not waiting here for its ACK however, so no `closeWait`
socket.close()
if not stream.contentHandler.isNil():
stream.contentHandler(stream, offer.contentKeys, contentItems)
# TODO: This could currently create a backlog of content items to be validated
# as `AcceptConnectionCallback` is `asyncSpawn`'ed and there are no limits
# on the `contentOffers`. Might move the queue to before the reading of the
# socket, and let the specific networks handle that.
await stream.contentQueue.put((offer.contentKeys, contentItems))
proc new*(
T: type PortalStream,
contentHandler: ContentHandlerCallback,
udata: ref,
connectionTimeout = defaultConnectionTimeout,
contentReadTimeout = defaultContentReadTimeout,
@ -281,10 +278,10 @@ proc new*(
GC_ref(udata)
let
stream = PortalStream(
contentHandler: contentHandler,
udata: cast[pointer](udata),
connectionTimeout: connectionTimeout,
contentReadTimeout: contentReadTimeout,
contentQueue: newAsyncQueue[(ContentKeysList, seq[seq[byte]])](50),
rng: rng)
stream

View File

@ -158,7 +158,6 @@ proc historyStore*(
proc historyPropagate*(
p: PortalProtocol, dataFile: string, verify = false):
Future[Result[void, string]] {.async.} =
const concurrentGossips = 20
var gossipQueue =
@ -175,17 +174,20 @@ proc historyPropagate*(
gossipWorkers.add(gossipWorker(p))
let blockData = readBlockDataTable(dataFile)
if blockData.isOk():
for b in blocks(blockData.get(), verify):
for value in b:
info "Seeding block content into the network", contentKey = value[0]
# Note: This is the slowest part due to the hashing that takes place.
let contentId = history_content.toContentId(value[0])
p.storeContent(contentId, value[1])
# Only sending non empty data, e.g. empty receipts are not send
# TODO: Could do a similar thing for a combination of empty
# txs and empty uncles, as then the serialization is always the same.
if value[1].len() > 0:
info "Seeding block content into the network", contentKey = value[0]
# Note: This is the slowest part due to the hashing that takes place.
let contentId = history_content.toContentId(value[0])
p.storeContent(contentId, value[1])
await gossipQueue.addLast(
(ContentKeysList(@[encode(value[0])]), value[1]))
await gossipQueue.addLast(
(ContentKeysList(@[encode(value[0])]), value[1]))
return ok()
else:

View File

@ -25,9 +25,6 @@ proc toContentId(contentKey: ByteList): Option[ContentId] =
let idHash = sha256.digest(contentKey.asSeq())
some(readUintBE[256](idHash.data))
proc validateContent(content: openArray[byte], contentKey: ByteList): bool =
true
proc initPortalProtocol(
rng: ref HmacDrbgContext,
privKey: PrivateKey,
@ -37,7 +34,7 @@ proc initPortalProtocol(
d = initDiscoveryNode(rng, privKey, address, bootstrapRecords)
db = ContentDB.new("", uint32.high, inMemory = true)
proto = PortalProtocol.new(
d, protocolId, db, toContentId, validateContent,
d, protocolId, db, toContentId,
bootstrapRecords = bootstrapRecords)
socketConfig = SocketConfig.init(
@ -180,15 +177,18 @@ procSuite "Portal Wire Protocol Tests":
let res = await proto1.offer(proto2.baseProtocol.localNode, content)
check:
res.isOk()
check res.isOk()
for contentInfo in content:
let receivedContent = proto2.contentDB.get(
toContentId(contentInfo.contentKey).get())
let (contentKeys, contentItems) =
await proto2.stream.contentQueue.popFirst()
check contentItems.len() == content.len()
for i, contentItem in contentItems:
let contentInfo = content[i]
check:
receivedContent.isSome()
receivedContent.get() == contentInfo.content
contentItem == contentInfo.content
contentKeys[i] == contentInfo.contentKey
await proto1.stopPortalProtocol()
await proto2.stopPortalProtocol()
@ -338,8 +338,7 @@ procSuite "Portal Wire Protocol Tests":
dbLimit = 100_000'u32
db = ContentDB.new("", dbLimit, inMemory = true)
proto1 = PortalProtocol.new(node1, protocolId, db, toContentId,
validateContent)
proto1 = PortalProtocol.new(node1, protocolId, db, toContentId)
let item = genByteSeq(10_000)
var distances: seq[UInt256] = @[]

View File

@ -27,7 +27,7 @@ const
# 100mb seems a bit smallish we may consider increasing defaults after some
# network measurements
defaultStorageSize* = uint32(1000 * 1000 * 100)
type
PortalCmd* = enum
noCommand
@ -198,9 +198,6 @@ proc testHandler(contentKey: ByteList): Option[ContentId] =
let idHash = sha256.digest("test")
some(readUintBE[256](idHash.data))
proc validateContent(content: openArray[byte], contentKey: ByteList): bool =
true
proc run(config: PortalCliConf) =
let
rng = newRng()
@ -227,7 +224,7 @@ proc run(config: PortalCliConf) =
let
db = ContentDB.new("", config.storageSize, inMemory = true)
portal = PortalProtocol.new(d, config.protocolId, db,
testHandler, validateContent,
testHandler,
bootstrapRecords = bootstrapRecords)
socketConfig = SocketConfig.init(
incomingSocketReceiveTimeout = none(Duration))