Add async raises annotations for Portal wire and Portal networks (#2361)

This commit is contained in:
Kim De Mey 2024-06-14 14:21:30 +02:00 committed by GitHub
parent debba5a620
commit c31fc37c62
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 169 additions and 107 deletions

View File

@ -96,7 +96,9 @@ proc new*(
proc lightClientVerifier(
obj: SomeForkedLightClientObject
): Future[Result[void, VerifierError]] =
): Future[Result[void, VerifierError]] {.
async: (raises: [CancelledError], raw: true)
.} =
let resfut = Future[Result[void, VerifierError]].Raising([CancelledError]).init(
"lightClientVerifier"
)

View File

@ -38,8 +38,9 @@ type
FinalityUpdate = Endpoint[Slot, ForkedLightClientFinalityUpdate]
OptimisticUpdate = Endpoint[Slot, ForkedLightClientOptimisticUpdate]
ValueVerifier[V] =
proc(v: V): Future[Result[void, VerifierError]] {.gcsafe, raises: [].}
ValueVerifier[V] = proc(v: V): Future[Result[void, VerifierError]] {.
async: (raises: [CancelledError], raw: true)
.}
BootstrapVerifier* = ValueVerifier[ForkedLightClientBootstrap]
UpdateVerifier* = ValueVerifier[ForkedLightClientUpdate]
FinalityUpdateVerifier* = ValueVerifier[ForkedLightClientFinalityUpdate]
@ -104,7 +105,9 @@ proc getOptimisticPeriod(self: LightClientManager): SyncCommitteePeriod =
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.1/specs/altair/light-client/p2p-interface.md#getlightclientbootstrap
proc doRequest(
e: typedesc[Bootstrap], n: BeaconNetwork, blockRoot: Eth2Digest
): Future[NetRes[ForkedLightClientBootstrap]] =
): Future[NetRes[ForkedLightClientBootstrap]] {.
async: (raises: [CancelledError], raw: true)
.} =
n.getLightClientBootstrap(blockRoot)
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.1/specs/altair/light-client/p2p-interface.md#lightclientupdatesbyrange
@ -113,7 +116,9 @@ proc doRequest(
e: typedesc[UpdatesByRange],
n: BeaconNetwork,
key: tuple[startPeriod: SyncCommitteePeriod, count: uint64],
): Future[LightClientUpdatesByRangeResponse] {.async.} =
): Future[LightClientUpdatesByRangeResponse] {.
async: (raises: [ResponseError, CancelledError])
.} =
let (startPeriod, count) = key
doAssert count > 0 and count <= MAX_REQUEST_LIGHT_CLIENT_UPDATES
let response = await n.getLightClientUpdatesByRange(startPeriod, count)
@ -126,13 +131,17 @@ proc doRequest(
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.1/specs/altair/light-client/p2p-interface.md#getlightclientfinalityupdate
proc doRequest(
e: typedesc[FinalityUpdate], n: BeaconNetwork, finalizedSlot: Slot
): Future[NetRes[ForkedLightClientFinalityUpdate]] =
): Future[NetRes[ForkedLightClientFinalityUpdate]] {.
async: (raises: [CancelledError], raw: true)
.} =
n.getLightClientFinalityUpdate(distinctBase(finalizedSlot))
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.1/specs/altair/light-client/p2p-interface.md#getlightclientoptimisticupdate
proc doRequest(
e: typedesc[OptimisticUpdate], n: BeaconNetwork, optimisticSlot: Slot
): Future[NetRes[ForkedLightClientOptimisticUpdate]] =
): Future[NetRes[ForkedLightClientOptimisticUpdate]] {.
async: (raises: [CancelledError], raw: true)
.} =
n.getLightClientOptimisticUpdate(distinctBase(optimisticSlot))
template valueVerifier[E](
@ -161,7 +170,7 @@ iterator values(v: auto): auto =
proc workerTask[E](
self: LightClientManager, e: typedesc[E], key: E.K
): Future[bool] {.async.} =
): Future[bool] {.async: (raises: [CancelledError]).} =
var didProgress = false
try:
let value =
@ -230,13 +239,12 @@ proc workerTask[E](
warn "Received invalid response", error = exc.msg, endpoint = E.name
except CancelledError as exc:
raise exc
except CatchableError as exc:
debug "Unexpected exception while receiving value", exc = exc.msg
raise exc
return didProgress
proc query[E](self: LightClientManager, e: typedesc[E], key: E.K): Future[bool] =
proc query[E](
self: LightClientManager, e: typedesc[E], key: E.K
): Future[bool] {.async: (raises: [CancelledError], raw: true).} =
# Note:
# The libp2p version does concurrent requests here. But it seems to be done
# for the same key and thus as redundant request to avoid waiting on a not
@ -249,7 +257,7 @@ proc query[E](self: LightClientManager, e: typedesc[E], key: E.K): Future[bool]
self.workerTask(e, key)
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.1/specs/altair/light-client/light-client.md#light-client-sync-process
proc loop(self: LightClientManager) {.async.} =
proc loop(self: LightClientManager) {.async: (raises: [CancelledError]).} =
var nextSyncTaskTime = self.getBeaconTime()
while true:
# Periodically wake and check for changes
@ -312,8 +320,8 @@ proc start*(self: var LightClientManager) =
doAssert self.loopFuture == nil
self.loopFuture = self.loop()
proc stop*(self: var LightClientManager) {.async.} =
proc stop*(self: var LightClientManager) {.async: (raises: []).} =
## Stop light client manager's loop.
if self.loopFuture != nil:
await self.loopFuture.cancelAndWait()
await noCancel self.loopFuture.cancelAndWait()
self.loopFuture = nil

View File

@ -59,7 +59,7 @@ proc validateHistoricalSummaries(
proc getContent(
n: BeaconNetwork, contentKey: ContentKey
): Future[results.Opt[seq[byte]]] {.async.} =
): Future[results.Opt[seq[byte]]] {.async: (raises: [CancelledError]).} =
let
contentKeyEncoded = encode(contentKey)
contentId = toContentId(contentKeyEncoded)
@ -79,7 +79,7 @@ proc getContent(
proc getLightClientBootstrap*(
n: BeaconNetwork, trustedRoot: Digest
): Future[results.Opt[ForkedLightClientBootstrap]] {.async.} =
): Future[results.Opt[ForkedLightClientBootstrap]] {.async: (raises: [CancelledError]).} =
let
contentKey = bootstrapContentKey(trustedRoot)
contentResult = await n.getContent(contentKey)
@ -100,7 +100,9 @@ proc getLightClientBootstrap*(
proc getLightClientUpdatesByRange*(
n: BeaconNetwork, startPeriod: SyncCommitteePeriod, count: uint64
): Future[results.Opt[ForkedLightClientUpdateList]] {.async.} =
): Future[results.Opt[ForkedLightClientUpdateList]] {.
async: (raises: [CancelledError])
.} =
let
contentKey = updateContentKey(distinctBase(startPeriod), count)
contentResult = await n.getContent(contentKey)
@ -121,7 +123,9 @@ proc getLightClientUpdatesByRange*(
proc getLightClientFinalityUpdate*(
n: BeaconNetwork, finalizedSlot: uint64
): Future[results.Opt[ForkedLightClientFinalityUpdate]] {.async.} =
): Future[results.Opt[ForkedLightClientFinalityUpdate]] {.
async: (raises: [CancelledError])
.} =
let
contentKey = finalityUpdateContentKey(finalizedSlot)
contentResult = await n.getContent(contentKey)
@ -141,7 +145,9 @@ proc getLightClientFinalityUpdate*(
proc getLightClientOptimisticUpdate*(
n: BeaconNetwork, optimisticSlot: uint64
): Future[results.Opt[ForkedLightClientOptimisticUpdate]] {.async.} =
): Future[results.Opt[ForkedLightClientOptimisticUpdate]] {.
async: (raises: [CancelledError])
.} =
let
contentKey = optimisticUpdateContentKey(optimisticSlot)
contentResult = await n.getContent(contentKey)
@ -161,7 +167,7 @@ proc getLightClientOptimisticUpdate*(
proc getHistoricalSummaries*(
n: BeaconNetwork, epoch: uint64
): Future[results.Opt[HistoricalSummaries]] {.async.} =
): Future[results.Opt[HistoricalSummaries]] {.async: (raises: [CancelledError]).} =
# Note: when taken from the db, it does not need to verify the proof.
let
contentKey = historicalSummariesContentKey(epoch)
@ -279,7 +285,7 @@ proc validateContent(
proc validateContent(
n: BeaconNetwork, contentKeys: ContentKeysList, contentItems: seq[seq[byte]]
): Future[bool] {.async.} =
): Future[bool] {.async: (raises: [CancelledError]).} =
# content passed here can have less items then contentKeys, but not more.
for i, contentItem in contentItems:
let
@ -302,7 +308,7 @@ proc validateContent(
return true
proc processContentLoop(n: BeaconNetwork) {.async.} =
proc processContentLoop(n: BeaconNetwork) {.async: (raises: []).} =
try:
while true:
let (srcNodeId, contentKeys, contentItems) = await n.contentQueue.popFirst()

View File

@ -424,7 +424,7 @@ func verifyHeader(
proc getVerifiedBlockHeader*(
n: HistoryNetwork, hash: BlockHash
): Future[Opt[BlockHeader]] {.async.} =
): Future[Opt[BlockHeader]] {.async: (raises: [CancelledError]).} =
let
contentKey = ContentKey.init(blockHeader, hash).encode()
contentId = contentKey.toContentId()
@ -473,7 +473,7 @@ proc getVerifiedBlockHeader*(
proc getBlockBody*(
n: HistoryNetwork, hash: BlockHash, header: BlockHeader
): Future[Opt[BlockBody]] {.async.} =
): Future[Opt[BlockBody]] {.async: (raises: [CancelledError]).} =
if header.txRoot == EMPTY_ROOT_HASH and header.ommersHash == EMPTY_UNCLE_HASH:
# Short path for empty body indicated by txRoot and ommersHash
return Opt.some(BlockBody(transactions: @[], uncles: @[]))
@ -513,7 +513,9 @@ proc getBlockBody*(
# Bodies were requested `requestRetries` times and all failed on validation
return Opt.none(BlockBody)
proc getBlock*(n: HistoryNetwork, hash: BlockHash): Future[Opt[Block]] {.async.} =
proc getBlock*(
n: HistoryNetwork, hash: BlockHash
): Future[Opt[Block]] {.async: (raises: [CancelledError]).} =
debug "Trying to retrieve block with hash", hash
# Note: Using `getVerifiedBlockHeader` instead of getBlockHeader even though
@ -531,7 +533,7 @@ proc getBlock*(n: HistoryNetwork, hash: BlockHash): Future[Opt[Block]] {.async.}
proc getReceipts*(
n: HistoryNetwork, hash: BlockHash, header: BlockHeader
): Future[Opt[seq[Receipt]]] {.async.} =
): Future[Opt[seq[Receipt]]] {.async: (raises: [CancelledError]).} =
if header.receiptsRoot == EMPTY_ROOT_HASH:
# Short path for empty receipts indicated by receipts root
return Opt.some(newSeq[Receipt]())
@ -569,7 +571,7 @@ proc getReceipts*(
proc getEpochAccumulator(
n: HistoryNetwork, epochHash: Digest
): Future[Opt[EpochAccumulator]] {.async.} =
): Future[Opt[EpochAccumulator]] {.async: (raises: [CancelledError]).} =
let
contentKey = ContentKey.init(epochAccumulator, epochHash).encode()
contentId = contentKey.toContentId()
@ -592,7 +594,7 @@ proc getEpochAccumulator(
epochAccumulator =
try:
SSZ.decode(accumulatorContent.content, EpochAccumulator)
except SszError:
except SerializationError:
continue
let hash = hash_tree_root(epochAccumulator)
@ -612,7 +614,7 @@ proc getEpochAccumulator(
proc getBlockHashByNumber*(
n: HistoryNetwork, bn: UInt256
): Future[Result[BlockHash, string]] {.async.} =
): Future[Result[BlockHash, string]] {.async: (raises: [CancelledError]).} =
let
epochData = n.accumulator.getBlockEpochDataForBlockNumber(bn).valueOr:
return err(error)
@ -624,7 +626,7 @@ proc getBlockHashByNumber*(
proc getBlock*(
n: HistoryNetwork, bn: UInt256
): Future[Result[Opt[Block], string]] {.async.} =
): Future[Result[Opt[Block], string]] {.async: (raises: [CancelledError]).} =
let
blockHash = ?(await n.getBlockHashByNumber(bn))
maybeBlock = await n.getBlock(blockHash)
@ -633,7 +635,7 @@ proc getBlock*(
proc validateContent(
n: HistoryNetwork, content: seq[byte], contentKey: ByteList
): Future[bool] {.async.} =
): Future[bool] {.async: (raises: [CancelledError]).} =
let key = contentKey.decode().valueOr:
return false
@ -687,7 +689,7 @@ proc validateContent(
let epochAccumulator =
try:
SSZ.decode(content, EpochAccumulator)
except SszError:
except SerializationError:
warn "Failed decoding epoch accumulator"
return false
@ -737,7 +739,7 @@ proc new*(
proc validateContent(
n: HistoryNetwork, contentKeys: ContentKeysList, contentItems: seq[seq[byte]]
): Future[bool] {.async.} =
): Future[bool] {.async: (raises: [CancelledError]).} =
# content passed here can have less items then contentKeys, but not more.
for i, contentItem in contentItems:
let contentKey = contentKeys[i]
@ -755,7 +757,7 @@ proc validateContent(
return true
proc processContentLoop(n: HistoryNetwork) {.async.} =
proc processContentLoop(n: HistoryNetwork) {.async: (raises: []).} =
try:
while true:
let (srcNodeId, contentKeys, contentItems) = await n.contentQueue.popFirst()
@ -771,7 +773,7 @@ proc processContentLoop(n: HistoryNetwork) {.async.} =
except CancelledError:
trace "processContentLoop canceled"
proc statusLogLoop(n: HistoryNetwork) {.async.} =
proc statusLogLoop(n: HistoryNetwork) {.async: (raises: []).} =
try:
while true:
# This is the data radius percentage compared to full storage. This will
@ -804,3 +806,6 @@ proc stop*(n: HistoryNetwork) =
if not n.processContentLoop.isNil:
n.processContentLoop.cancelSoon()
if not n.processContentLoop.isNil:
n.statusLogLoop.cancelSoon()

View File

@ -73,7 +73,7 @@ proc getNextNodeHash(
proc getAccountProof(
n: StateNetwork, stateRoot: KeccakHash, address: Address
): Future[Opt[TrieProof]] {.async.} =
): Future[Opt[TrieProof]] {.async: (raises: [CancelledError]).} =
let nibbles = address.toPath().unpackNibbles()
var
@ -100,7 +100,7 @@ proc getAccountProof(
proc getStorageProof(
n: StateNetwork, storageRoot: KeccakHash, address: Address, storageKey: UInt256
): Future[Opt[TrieProof]] {.async.} =
): Future[Opt[TrieProof]] {.async: (raises: [CancelledError]).} =
let nibbles = storageKey.toPath().unpackNibbles()
var
@ -127,7 +127,7 @@ proc getStorageProof(
proc getAccount(
n: StateNetwork, blockHash: BlockHash, address: Address
): Future[Opt[Account]] {.async.} =
): Future[Opt[Account]] {.async: (raises: [CancelledError]).} =
let
stateRoot = (await n.getStateRootByBlockHash(blockHash)).valueOr:
warn "Failed to get state root by block hash"
@ -144,7 +144,7 @@ proc getAccount(
# Used by: eth_getBalance,
proc getBalance*(
n: StateNetwork, blockHash: BlockHash, address: Address
): Future[Opt[UInt256]] {.async.} =
): Future[Opt[UInt256]] {.async: (raises: [CancelledError]).} =
let account = (await n.getAccount(blockHash, address)).valueOr:
return Opt.none(UInt256)
@ -153,7 +153,7 @@ proc getBalance*(
# Used by: eth_getTransactionCount
proc getTransactionCount*(
n: StateNetwork, blockHash: BlockHash, address: Address
): Future[Opt[AccountNonce]] {.async.} =
): Future[Opt[AccountNonce]] {.async: (raises: [CancelledError]).} =
let account = (await n.getAccount(blockHash, address)).valueOr:
return Opt.none(AccountNonce)
@ -162,7 +162,7 @@ proc getTransactionCount*(
# Used by: eth_getStorageAt
proc getStorageAt*(
n: StateNetwork, blockHash: BlockHash, address: Address, slotKey: UInt256
): Future[Opt[UInt256]] {.async.} =
): Future[Opt[UInt256]] {.async: (raises: [CancelledError]).} =
let
account = (await n.getAccount(blockHash, address)).valueOr:
return Opt.none(UInt256)
@ -178,7 +178,7 @@ proc getStorageAt*(
# Used by: eth_getCode
proc getCode*(
n: StateNetwork, blockHash: BlockHash, address: Address
): Future[Opt[Bytecode]] {.async.} =
): Future[Opt[Bytecode]] {.async: (raises: [CancelledError]).} =
let
account = (await n.getAccount(blockHash, address)).valueOr:
return Opt.none(Bytecode)

View File

@ -98,7 +98,7 @@ proc gossipOffer*(
offerBytes: seq[byte],
key: AccountTrieNodeKey,
offer: AccountTrieNodeOffer,
) {.async.} =
) {.async: (raises: [CancelledError]).} =
let req1Peers = await p.neighborhoodGossip(
srcNodeId, ContentKeysList.init(@[keyBytes]), @[offerBytes]
)
@ -111,7 +111,7 @@ proc gossipOffer*(
offerBytes: seq[byte],
key: ContractTrieNodeKey,
offer: ContractTrieNodeOffer,
) {.async.} =
) {.async: (raises: [CancelledError]).} =
let req1Peers = await p.neighborhoodGossip(
srcNodeId, ContentKeysList.init(@[keyBytes]), @[offerBytes]
)
@ -124,7 +124,7 @@ proc gossipOffer*(
offerBytes: seq[byte],
key: ContractCodeKey,
offer: ContractCodeOffer,
) {.async.} =
) {.async: (raises: [CancelledError]).} =
let peers = await p.neighborhoodGossip(
srcNodeId, ContentKeysList.init(@[keyBytes]), @[offerBytes]
)
@ -139,7 +139,7 @@ proc recursiveGossipOffer*(
offerBytes: seq[byte],
key: AccountTrieNodeKey,
offer: AccountTrieNodeOffer,
) {.async.} =
) {.async: (raises: [CancelledError]).} =
asyncSpawn gossipOffer(p, srcNodeId, keyBytes, offerBytes, key, offer)
# root node, recursive gossip is finished
@ -164,7 +164,7 @@ proc recursiveGossipOffer*(
offerBytes: seq[byte],
key: ContractTrieNodeKey,
offer: ContractTrieNodeOffer,
) {.async.} =
) {.async: (raises: [CancelledError]).} =
asyncSpawn gossipOffer(p, srcNodeId, keyBytes, offerBytes, key, offer)
# root node, recursive gossip is finished

View File

@ -78,7 +78,7 @@ proc getContent(
n: StateNetwork,
key: AccountTrieNodeKey | ContractTrieNodeKey | ContractCodeKey,
V: type ContentRetrievalType,
): Future[Opt[V]] {.async.} =
): Future[Opt[V]] {.async: (raises: [CancelledError]).} =
let
contentKeyBytes = key.toContentKey().encode()
contentId = contentKeyBytes.toContentId()
@ -114,22 +114,26 @@ proc getContent(
proc getAccountTrieNode*(
n: StateNetwork, key: AccountTrieNodeKey
): Future[Opt[AccountTrieNodeRetrieval]] {.inline.} =
): Future[Opt[AccountTrieNodeRetrieval]] {.
async: (raw: true, raises: [CancelledError])
.} =
n.getContent(key, AccountTrieNodeRetrieval)
proc getContractTrieNode*(
n: StateNetwork, key: ContractTrieNodeKey
): Future[Opt[ContractTrieNodeRetrieval]] {.inline.} =
): Future[Opt[ContractTrieNodeRetrieval]] {.
async: (raw: true, raises: [CancelledError])
.} =
n.getContent(key, ContractTrieNodeRetrieval)
proc getContractCode*(
n: StateNetwork, key: ContractCodeKey
): Future[Opt[ContractCodeRetrieval]] {.inline.} =
): Future[Opt[ContractCodeRetrieval]] {.async: (raw: true, raises: [CancelledError]).} =
n.getContent(key, ContractCodeRetrieval)
proc getStateRootByBlockHash*(
n: StateNetwork, hash: BlockHash
): Future[Opt[KeccakHash]] {.async.} =
): Future[Opt[KeccakHash]] {.async: (raises: [CancelledError]).} =
if n.historyNetwork.isNone():
warn "History network is not available. Unable to get state root by block hash"
return Opt.none(KeccakHash)
@ -147,7 +151,7 @@ proc processOffer*(
contentValueBytes: seq[byte],
contentKey: AccountTrieNodeKey | ContractTrieNodeKey | ContractCodeKey,
V: type ContentOfferType,
): Future[Result[void, string]] {.async.} =
): Future[Result[void, string]] {.async: (raises: [CancelledError]).} =
let contentValue = V.decode(contentValueBytes).valueOr:
return err("Unable to decode offered content value")
@ -178,7 +182,7 @@ proc processOffer*(
ok()
proc processContentLoop(n: StateNetwork) {.async.} =
proc processContentLoop(n: StateNetwork) {.async: (raises: []).} =
try:
while true:
let (srcNodeId, contentKeys, contentValues) = await n.contentQueue.popFirst()

View File

@ -564,7 +564,7 @@ proc new*(
# validates the proper response, and updates the Portal Network routing table.
proc reqResponse[Request: SomeMessage, Response: SomeMessage](
p: PortalProtocol, dst: Node, request: Request
): Future[PortalResult[Response]] {.async.} =
): Future[PortalResult[Response]] {.async: (raises: [CancelledError]).} =
logScope:
protocolId = p.protocolId
@ -613,7 +613,7 @@ proc reqResponse[Request: SomeMessage, Response: SomeMessage](
proc pingImpl*(
p: PortalProtocol, dst: Node
): Future[PortalResult[PongMessage]] {.async.} =
): Future[PortalResult[PongMessage]] {.async: (raises: [CancelledError]).} =
let customPayload = CustomPayload(dataRadius: p.dataRadius)
let ping = PingMessage(
enrSeq: p.localNode.record.seqNum,
@ -624,7 +624,7 @@ proc pingImpl*(
proc findNodesImpl*(
p: PortalProtocol, dst: Node, distances: List[uint16, 256]
): Future[PortalResult[NodesMessage]] {.async.} =
): Future[PortalResult[NodesMessage]] {.async: (raises: [CancelledError]).} =
let fn = FindNodesMessage(distances: distances)
# TODO Add nodes validation
@ -632,14 +632,14 @@ proc findNodesImpl*(
proc findContentImpl*(
p: PortalProtocol, dst: Node, contentKey: ByteList
): Future[PortalResult[ContentMessage]] {.async.} =
): Future[PortalResult[ContentMessage]] {.async: (raises: [CancelledError]).} =
let fc = FindContentMessage(contentKey: contentKey)
return await reqResponse[FindContentMessage, ContentMessage](p, dst, fc)
proc offerImpl*(
p: PortalProtocol, dst: Node, contentKeys: ContentKeysList
): Future[PortalResult[AcceptMessage]] {.async.} =
): Future[PortalResult[AcceptMessage]] {.async: (raises: [CancelledError]).} =
let offer = OfferMessage(contentKeys: contentKeys)
return await reqResponse[OfferMessage, AcceptMessage](p, dst, offer)
@ -657,7 +657,9 @@ proc recordsFromBytes*(rawRecords: List[ByteList, 32]): PortalResult[seq[Record]
ok(records)
proc ping*(p: PortalProtocol, dst: Node): Future[PortalResult[PongMessage]] {.async.} =
proc ping*(
p: PortalProtocol, dst: Node
): Future[PortalResult[PongMessage]] {.async: (raises: [CancelledError]).} =
let pongResponse = await p.pingImpl(dst)
if pongResponse.isOk():
@ -669,7 +671,7 @@ proc ping*(p: PortalProtocol, dst: Node): Future[PortalResult[PongMessage]] {.as
let customPayloadDecoded =
try:
SSZ.decode(pong.customPayload.asSeq(), CustomPayload)
except MalformedSszError, SszSizeMismatchError:
except SerializationError:
# invalid custom payload
return err("Pong message contains invalid custom payload")
@ -679,7 +681,7 @@ proc ping*(p: PortalProtocol, dst: Node): Future[PortalResult[PongMessage]] {.as
proc findNodes*(
p: PortalProtocol, dst: Node, distances: seq[uint16]
): Future[PortalResult[seq[Node]]] {.async.} =
): Future[PortalResult[seq[Node]]] {.async: (raises: [CancelledError]).} =
let nodesMessage = await p.findNodesImpl(dst, List[uint16, 256](distances))
if nodesMessage.isOk():
let records = recordsFromBytes(nodesMessage.get().enrs)
@ -693,7 +695,7 @@ proc findNodes*(
proc findContent*(
p: PortalProtocol, dst: Node, contentKey: ByteList
): Future[PortalResult[FoundContent]] {.async.} =
): Future[PortalResult[FoundContent]] {.async: (raises: [CancelledError]).} =
logScope:
node = dst
contentKey
@ -735,7 +737,7 @@ proc findContent*(
socket.close()
if await readFut.withTimeout(p.stream.contentReadTimeout):
let content = readFut.read
let content = await readFut
# socket received remote FIN and drained whole buffer, it can be
# safely destroyed without notifing remote
debug "Socket read fully", socketKey = socket.socketKey
@ -805,7 +807,7 @@ func getMaxOfferedContentKeys*(protocolIdLen: uint32, maxKeySize: uint32): int =
proc offer(
p: PortalProtocol, o: OfferRequest
): Future[PortalResult[ContentKeysBitList]] {.async.} =
): Future[PortalResult[ContentKeysBitList]] {.async: (raises: [CancelledError]).} =
## Offer triggers offer-accept interaction with one peer
## Whole flow has two phases:
## 1. Come to an agreement on what content to transfer, by using offer and
@ -885,10 +887,14 @@ proc offer(
for i, b in m.contentKeys:
if b:
let content = o.contentList[i].content
# TODO: stop using faststreams for this
var output = memoryOutput()
output.write(toBytes(content.lenu32, Leb128).toOpenArray())
output.write(content)
try:
output.write(toBytes(content.lenu32, Leb128).toOpenArray())
output.write(content)
except IOError as e:
# This should not happen in case of in-memory streams
raiseAssert e.msg
let dataWritten = (await socket.write(output.getOutput)).valueOr:
debug "Error writing requested data", error
@ -911,12 +917,18 @@ proc offer(
var output = memoryOutput()
if contentResult.isOk():
let content = contentResult.get()
output.write(toBytes(content.lenu32, Leb128).toOpenArray())
output.write(content)
try:
output.write(toBytes(content.lenu32, Leb128).toOpenArray())
output.write(content)
except IOError as e:
# This should not happen in case of in-memory streams
raiseAssert e.msg
else:
# When data turns out missing, add a 0 size varint
output.write(toBytes(0'u8, Leb128).toOpenArray())
try:
# When data turns out missing, add a 0 size varint
output.write(toBytes(0'u8, Leb128).toOpenArray())
except IOError as e:
raiseAssert e.msg
let dataWritten = (await socket.write(output.getOutput)).valueOr:
debug "Error writing requested data", error
@ -936,13 +948,13 @@ proc offer(
proc offer*(
p: PortalProtocol, dst: Node, contentKeys: ContentKeysList
): Future[PortalResult[ContentKeysBitList]] {.async.} =
): Future[PortalResult[ContentKeysBitList]] {.async: (raises: [CancelledError]).} =
let req = OfferRequest(dst: dst, kind: Database, contentKeys: contentKeys)
return await p.offer(req)
proc offer*(
p: PortalProtocol, dst: Node, content: seq[ContentKV]
): Future[PortalResult[ContentKeysBitList]] {.async.} =
): Future[PortalResult[ContentKeysBitList]] {.async: (raises: [CancelledError]).} =
if len(content) > contentKeysLimit:
return err("Cannot offer more than 64 content items")
@ -950,7 +962,7 @@ proc offer*(
let req = OfferRequest(dst: dst, kind: Direct, contentList: contentList)
return await p.offer(req)
proc offerWorker(p: PortalProtocol) {.async.} =
proc offerWorker(p: PortalProtocol) {.async: (raises: [CancelledError]).} =
while true:
let req = await p.offerQueue.popFirst()
@ -965,7 +977,7 @@ proc offerQueueEmpty*(p: PortalProtocol): bool =
proc lookupWorker(
p: PortalProtocol, dst: Node, target: NodeId
): Future[seq[Node]] {.async.} =
): Future[seq[Node]] {.async: (raises: [CancelledError]).} =
let distances = lookupDistances(target, dst.id)
let nodesMessage = await p.findNodes(dst, distances)
if nodesMessage.isOk():
@ -977,7 +989,9 @@ proc lookupWorker(
else:
return @[]
proc lookup*(p: PortalProtocol, target: NodeId): Future[seq[Node]] {.async.} =
proc lookup*(
p: PortalProtocol, target: NodeId
): Future[seq[Node]] {.async: (raises: [CancelledError]).} =
## Perform a lookup for the given target, return the closest n nodes to the
## target. Maximum value for n is `BUCKET_SIZE`.
# `closestNodes` holds the k closest nodes to target found, sorted by distance
@ -990,7 +1004,7 @@ proc lookup*(p: PortalProtocol, target: NodeId): Future[seq[Node]] {.async.} =
for node in closestNodes:
seen.incl(node.id)
var pendingQueries = newSeqOfCap[Future[seq[Node]]](alpha)
var pendingQueries = newSeqOfCap[Future[seq[Node]].Raising([CancelledError])](alpha)
var requestAmount = 0'i64
while true:
@ -1009,7 +1023,12 @@ proc lookup*(p: PortalProtocol, target: NodeId): Future[seq[Node]] {.async.} =
if pendingQueries.len == 0:
break
let query = await one(pendingQueries)
let query =
try:
await one(pendingQueries)
except ValueError:
raiseAssert("pendingQueries should not have been empty")
trace "Got lookup query response"
let index = pendingQueries.find(query)
@ -1018,7 +1037,7 @@ proc lookup*(p: PortalProtocol, target: NodeId): Future[seq[Node]] {.async.} =
else:
error "Resulting query should have been in the pending queries"
let nodes = query.read
let nodes = await query
# TODO: Remove node on timed-out query?
for n in nodes:
if not seen.containsOrIncl(n.id):
@ -1070,7 +1089,7 @@ proc triggerPoke*(
# and make it more generaic
proc contentLookup*(
p: PortalProtocol, target: ByteList, targetId: UInt256
): Future[Opt[ContentLookupResult]] {.async.} =
): Future[Opt[ContentLookupResult]] {.async: (raises: [CancelledError]).} =
## Perform a lookup for the given target, return the closest n nodes to the
## target. Maximum value for n is `BUCKET_SIZE`.
# `closestNodes` holds the k closest nodes to target found, sorted by distance
@ -1086,7 +1105,8 @@ proc contentLookup*(
for node in closestNodes:
seen.incl(node.id)
var pendingQueries = newSeqOfCap[Future[PortalResult[FoundContent]]](alpha)
var pendingQueries =
newSeqOfCap[Future[PortalResult[FoundContent]].Raising([CancelledError])](alpha)
var requestAmount = 0'i64
var nodesWithoutContent: seq[Node] = newSeq[Node]()
@ -1107,7 +1127,12 @@ proc contentLookup*(
if pendingQueries.len == 0:
break
let query = await one(pendingQueries)
let query =
try:
await one(pendingQueries)
except ValueError:
raiseAssert("pendingQueries should not have been empty")
trace "Got lookup query response"
let index = pendingQueries.find(query)
@ -1116,8 +1141,7 @@ proc contentLookup*(
else:
error "Resulting query should have been in the pending queries"
let contentResult = query.read
let contentResult = await query
if contentResult.isOk():
let content = contentResult.get()
@ -1169,7 +1193,7 @@ proc contentLookup*(
proc traceContentLookup*(
p: PortalProtocol, target: ByteList, targetId: UInt256
): Future[TraceContentLookupResult] {.async.} =
): Future[TraceContentLookupResult] {.async: (raises: [CancelledError]).} =
## Perform a lookup for the given target, return the closest n nodes to the
## target. Maximum value for n is `BUCKET_SIZE`.
# `closestNodes` holds the k closest nodes to target found, sorted by distance
@ -1203,7 +1227,8 @@ proc traceContentLookup*(
metadata["0x" & $cn.id] =
NodeMetadata(enr: cn.record, distance: p.distance(cn.id, targetId))
var pendingQueries = newSeqOfCap[Future[PortalResult[FoundContent]]](alpha)
var pendingQueries =
newSeqOfCap[Future[PortalResult[FoundContent]].Raising([CancelledError])](alpha)
var pendingNodes = newSeq[Node]()
var requestAmount = 0'i64
@ -1226,7 +1251,11 @@ proc traceContentLookup*(
if pendingQueries.len == 0:
break
let query = await one(pendingQueries)
let query =
try:
await one(pendingQueries)
except ValueError:
raiseAssert("pendingQueries should not have been empty")
trace "Got lookup query response"
let index = pendingQueries.find(query)
@ -1236,7 +1265,7 @@ proc traceContentLookup*(
else:
error "Resulting query should have been in the pending queries"
let contentResult = query.read
let contentResult = await query
if contentResult.isOk():
let content = contentResult.get()
@ -1346,7 +1375,7 @@ proc traceContentLookup*(
proc query*(
p: PortalProtocol, target: NodeId, k = BUCKET_SIZE
): Future[seq[Node]] {.async.} =
): Future[seq[Node]] {.async: (raises: [CancelledError]).} =
## Query k nodes for the given target, returns all nodes found, including the
## nodes queried.
##
@ -1361,7 +1390,7 @@ proc query*(
for node in queryBuffer:
seen.incl(node.id)
var pendingQueries = newSeqOfCap[Future[seq[Node]]](alpha)
var pendingQueries = newSeqOfCap[Future[seq[Node]].Raising([CancelledError])](alpha)
while true:
var i = 0
@ -1376,7 +1405,11 @@ proc query*(
if pendingQueries.len == 0:
break
let query = await one(pendingQueries)
let query =
try:
await one(pendingQueries)
except ValueError:
raiseAssert("pendingQueries should not have been empty")
trace "Got lookup query response"
let index = pendingQueries.find(query)
@ -1385,7 +1418,7 @@ proc query*(
else:
error "Resulting query should have been in the pending queries"
let nodes = query.read
let nodes = await query
# TODO: Remove node on timed-out query?
for n in nodes:
if not seen.containsOrIncl(n.id):
@ -1394,7 +1427,9 @@ proc query*(
p.lastLookup = now(chronos.Moment)
return queryBuffer
proc queryRandom*(p: PortalProtocol): Future[seq[Node]] =
proc queryRandom*(
p: PortalProtocol
): Future[seq[Node]] {.async: (raw: true, raises: [CancelledError]).} =
## Perform a query for a random target, return all nodes discovered.
p.query(NodeId.random(p.baseProtocol.rng[]))
@ -1416,7 +1451,7 @@ proc neighborhoodGossip*(
srcNodeId: Opt[NodeId],
contentKeys: ContentKeysList,
content: seq[seq[byte]],
): Future[int] {.async.} =
): Future[int] {.async: (raises: [CancelledError]).} =
## Run neighborhood gossip for provided content.
## Returns the number of peers to which content was attempted to be gossiped.
if content.len() == 0:
@ -1486,7 +1521,7 @@ proc neighborhoodGossipDiscardPeers*(
srcNodeId: Opt[NodeId],
contentKeys: ContentKeysList,
content: seq[seq[byte]],
): Future[void] {.async.} =
): Future[void] {.async: (raises: [CancelledError]).} =
discard await p.neighborhoodGossip(srcNodeId, contentKeys, content)
proc randomGossip*(
@ -1494,7 +1529,7 @@ proc randomGossip*(
srcNodeId: Opt[NodeId],
contentKeys: ContentKeysList,
content: seq[seq[byte]],
): Future[int] {.async.} =
): Future[int] {.async: (raises: [CancelledError]).} =
## Run random gossip for provided content.
## Returns the number of peers to which content was attempted to be gossiped.
if content.len() == 0:
@ -1518,7 +1553,7 @@ proc randomGossipDiscardPeers*(
srcNodeId: Opt[NodeId],
contentKeys: ContentKeysList,
content: seq[seq[byte]],
): Future[void] {.async.} =
): Future[void] {.async: (raises: [CancelledError]).} =
discard await p.randomGossip(srcNodeId, contentKeys, content)
proc storeContent*(
@ -1546,7 +1581,7 @@ proc seedTable*(p: PortalProtocol) =
error "Bootstrap node could not be added",
uri = toURI(record), protocolId = p.protocolId
proc populateTable(p: PortalProtocol) {.async.} =
proc populateTable(p: PortalProtocol) {.async: (raises: [CancelledError]).} =
## Do a set of initial lookups to quickly populate the table.
# start with a self target query (neighbour nodes)
logScope:
@ -1561,7 +1596,7 @@ proc populateTable(p: PortalProtocol) {.async.} =
debug "Total nodes in routing table after populate", total = p.routingTable.len()
proc revalidateNode*(p: PortalProtocol, n: Node) {.async.} =
proc revalidateNode*(p: PortalProtocol, n: Node) {.async: (raises: [CancelledError]).} =
let pong = await p.ping(n)
if pong.isOk():
@ -1587,7 +1622,7 @@ proc getNodeForRevalidation(p: PortalProtocol): Opt[Node] =
else:
Opt.none(Node)
proc revalidateLoop(p: PortalProtocol) {.async.} =
proc revalidateLoop(p: PortalProtocol) {.async: (raises: []).} =
## Loop which revalidates the nodes in the routing table by sending the ping
## message.
try:
@ -1599,7 +1634,7 @@ proc revalidateLoop(p: PortalProtocol) {.async.} =
except CancelledError:
trace "revalidateLoop canceled"
proc refreshLoop(p: PortalProtocol) {.async.} =
proc refreshLoop(p: PortalProtocol) {.async: (raises: []).} =
## Loop that refreshes the routing table by starting a random query in case
## no queries were done since `refreshInterval` or more.
## It also refreshes the majority address voted for via pong responses.
@ -1642,7 +1677,9 @@ proc stop*(p: PortalProtocol) =
worker.cancelSoon()
p.offerWorkers = @[]
proc resolve*(p: PortalProtocol, id: NodeId): Future[Opt[Node]] {.async.} =
proc resolve*(
p: PortalProtocol, id: NodeId
): Future[Opt[Node]] {.async: (raises: [CancelledError]).} =
## Resolve a `Node` based on provided `NodeId`.
##
## This will first look in the own routing table. If the node is known, it
@ -1671,7 +1708,7 @@ proc resolve*(p: PortalProtocol, id: NodeId): Future[Opt[Node]] {.async.} =
proc resolveWithRadius*(
p: PortalProtocol, id: NodeId
): Future[Opt[(Node, UInt256)]] {.async.} =
): Future[Opt[(Node, UInt256)]] {.async: (raises: [CancelledError]).} =
## Resolve a `Node` based on provided `NodeId`, also try to establish what
## is known radius of found node.
##

View File

@ -138,7 +138,7 @@ proc addContentRequest*(
proc connectTo*(
stream: PortalStream, nodeAddress: NodeAddress, connectionId: uint16
): Future[Result[UtpSocket[NodeAddress], string]] {.async.} =
): Future[Result[UtpSocket[NodeAddress], string]] {.async: (raises: [CancelledError]).} =
let connectRes = await stream.transport.connectTo(nodeAddress, connectionId)
if connectRes.isErr():
case connectRes.error