More verbose error handling for Portal stream content reading (#2737)

This commit is contained in:
Kim De Mey 2024-10-14 11:53:28 +02:00 committed by GitHub
parent bdbea9853e
commit 9c9a41a16d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 27 additions and 29 deletions

View File

@ -170,13 +170,13 @@ proc writeContentRequest(
proc readVarint(
socket: UtpSocket[NodeAddress]
): Future[Opt[uint32]] {.async: (raises: [CancelledError]).} =
): Future[Result[uint32, string]] {.async: (raises: [CancelledError]).} =
var buffer: array[5, byte]
for i in 0 ..< len(buffer):
let dataRead = await socket.read(1)
if dataRead.len() == 0:
return err()
return err("No data read")
buffer[i] = dataRead[0]
@ -186,53 +186,51 @@ proc readVarint(
elif bytesRead == 0:
continue
else:
return err()
return err("Failed to read varint")
proc readContentItem(
proc readContentValue(
socket: UtpSocket[NodeAddress]
): Future[Opt[seq[byte]]] {.async: (raises: [CancelledError]).} =
let len = await socket.readVarint()
): Future[Result[seq[byte], string]] {.async: (raises: [CancelledError]).} =
let len = (await socket.readVarint()).valueOr:
return err($error)
if len.isOk():
let contentItem = await socket.read(len.get())
if contentItem.len() == len.get().int:
return ok(contentItem)
else:
return err()
let contentValue = await socket.read(len)
if contentValue.len() == len.int:
ok(contentValue)
else:
return err()
err("Content value length mismatch")
proc readContentOffer(
socket: UtpSocket[NodeAddress], stream: PortalStream, offer: ContentOffer
) {.async: (raises: [CancelledError]).} =
# Read number of content items according to amount of ContentKeys accepted.
# Read number of content values according to amount of ContentKeys accepted.
# This will either end with a FIN, or because the read action times out or
# because the number of expected items was read (if this happens and no FIN
# because the number of expected values was read (if this happens and no FIN
# was received yet, a FIN will be send from this side).
# None of this means that the contentItems are valid, further validation is
# None of this means that the contentValues are valid, further validation is
# required.
# Socket will be closed when this call ends.
# TODO: Currently reading from the socket 1 item at a time, and validating
# items at later time. Uncertain what is best approach here (mostly from a
# TODO: Currently reading from the socket one value at a time, and validating
# values at later time. Uncertain what is best approach here (mostly from a
# security PoV), e.g. other options such as reading all content from socket at
# once, then processing the individual content items. Or reading and
# once, then processing the individual content values. Or reading and
# validating one per time.
let amount = offer.contentKeys.len()
var contentItems: seq[seq[byte]]
var contentValues: seq[seq[byte]]
for i in 0 ..< amount:
let contentItemFut = socket.readContentItem()
if await contentItemFut.withTimeout(stream.contentReadTimeout):
let contentItem = await contentItemFut
let contentValueFut = socket.readContentValue()
if await contentValueFut.withTimeout(stream.contentReadTimeout):
let contentValue = await contentValueFut
if contentItem.isOk():
contentItems.add(contentItem.get())
if contentValue.isOk():
contentValues.add(contentValue.get())
else:
# Invalid data, stop reading content, but still process data received
# so far.
debug "Reading content item failed, content offer failed",
contentKeys = offer.contentKeys
debug "Reading content value failed, content offer failed",
contentKeys = offer.contentKeys, error = contentValue.error
break
else:
# Read timed out, stop further reading, but still process data received
@ -267,12 +265,12 @@ proc readContentOffer(
asyncSpawn socket.delayedDestroy(4.seconds)
# TODO: This could currently create a backlog of content items to be validated
# TODO: This could currently create a backlog of content values to be validated
# as `AcceptConnectionCallback` is `asyncSpawn`'ed and there are no limits
# on the `contentOffers`. Might move the queue to before the reading of the
# socket, and let the specific networks handle that.
await stream.contentQueue.put(
(Opt.some(offer.nodeId), offer.contentKeys, contentItems)
(Opt.some(offer.nodeId), offer.contentKeys, contentValues)
)
proc new(