Add log message on content query failure in lookup + refactor (#3079)

* Add log message on content query failure in lookup

* Refactor response handling Portal wire
This commit is contained in:
kdeme 2025-02-16 20:08:28 +01:00 committed by GitHub
parent c300b41c07
commit c0e329d768
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -758,41 +758,31 @@ proc ping*(
): Future[PortalResult[(uint64, CapabilitiesPayload)]] {. ): Future[PortalResult[(uint64, CapabilitiesPayload)]] {.
async: (raises: [CancelledError]) async: (raises: [CancelledError])
.} = .} =
let pongResponse = await p.pingImpl(dst) let pong = ?(await p.pingImpl(dst))
if pongResponse.isOk(): # Update last time we pinged this node
# Update last time we pinged this node p.pingTimings[dst.id] = now(chronos.Moment)
p.pingTimings[dst.id] = now(chronos.Moment)
let pong = pongResponse.get() # Note: currently only decoding as capabilities payload as this is the only
# one that we support sending.
if pong.payload_type != CapabilitiesType:
return err("Pong message contains invalid or error payload")
# Note: currently only decoding as capabilities payload as this is the only let payload = decodeSsz(pong.payload.asSeq(), CapabilitiesPayload).valueOr:
# one that we support sending. return err("Pong message contains invalid CapabilitiesPayload")
if pong.payload_type != CapabilitiesType:
return err("Pong message contains invalid or error payload")
let payload = decodeSsz(pong.payload.asSeq(), CapabilitiesPayload).valueOr: p.radiusCache.put(dst.id, payload.data_radius)
return err("Pong message contains invalid CapabilitiesPayload")
p.radiusCache.put(dst.id, payload.data_radius) ok((pong.enrSeq, payload))
ok((pong.enrSeq, payload))
else:
err(pongResponse.error)
proc findNodes*( proc findNodes*(
p: PortalProtocol, dst: Node, distances: seq[uint16] p: PortalProtocol, dst: Node, distances: seq[uint16]
): Future[PortalResult[seq[Node]]] {.async: (raises: [CancelledError]).} = ): Future[PortalResult[seq[Node]]] {.async: (raises: [CancelledError]).} =
let nodesMessage = await p.findNodesImpl(dst, List[uint16, 256](distances)) let response = ?(await p.findNodesImpl(dst, List[uint16, 256](distances)))
if nodesMessage.isOk():
let records = recordsFromBytes(nodesMessage.get().enrs) let records = ?recordsFromBytes(response.enrs)
if records.isOk(): # TODO: distance function is wrong here for state, fix + tests
# TODO: distance function is wrong here for state, fix + tests ok(verifyNodesRecords(records, dst, enrsResultLimit, distances))
return ok(verifyNodesRecords(records.get(), dst, enrsResultLimit, distances))
else:
return err(records.error)
else:
return err(nodesMessage.error)
proc findContent*( proc findContent*(
p: PortalProtocol, dst: Node, contentKey: ContentKeyByteList p: PortalProtocol, dst: Node, contentKey: ContentKeyByteList
@ -801,82 +791,71 @@ proc findContent*(
node = dst node = dst
contentKey contentKey
let contentMessageResponse = await p.findContentImpl(dst, contentKey) let response = ?(await p.findContentImpl(dst, contentKey))
if contentMessageResponse.isOk(): case response.contentMessageType
let m = contentMessageResponse.get() of connectionIdType:
case m.contentMessageType let nodeAddress = NodeAddress.init(dst).valueOr:
of connectionIdType: # This should not happen as it comes a after succesfull talkreq/talkresp
let nodeAddress = NodeAddress.init(dst).valueOr: return err("Trying to connect to node with unknown address: " & $dst.id)
# This should not happen as it comes a after succesfull talkreq/talkresp
return err("Trying to connect to node with unknown address: " & $dst.id)
let socket = let socket =
?( ?(
await p.stream.connectTo( await p.stream.connectTo(
# uTP protocol uses BE for all values in the header, incl. connection id # uTP protocol uses BE for all values in the header, incl. connection id
nodeAddress, nodeAddress,
uint16.fromBytesBE(m.connectionId), uint16.fromBytesBE(response.connectionId),
)
)
try:
# Read all bytes from the socket
# This will either end with a FIN, or because the read action times out.
# A FIN does not necessarily mean that the data read is complete.
# Further validation is required, using a length prefix here might be
# beneficial for this.
let readFut = socket.read()
readFut.cancelCallback = proc(udate: pointer) {.gcsafe.} =
debug "Socket read cancelled", socketKey = socket.socketKey
# In case this `findContent` gets cancelled while reading the data,
# send a FIN and clean up the socket.
socket.close()
if await readFut.withTimeout(p.stream.contentReadTimeout):
let content = await readFut
# socket received remote FIN and drained whole buffer, it can be
# safely destroyed without notifing remote
debug "Socket read fully", socketKey = socket.socketKey
socket.destroy()
return ok(
FoundContent(src: dst, kind: Content, content: content, utpTransfer: true)
)
else:
debug "Socket read time-out", socketKey = socket.socketKey
# Note: This might look a bit strange, but not doing a socket.close()
# here as this is already done internally. utp_socket `checkTimeouts`
# already does a socket.destroy() on timeout. Might want to change the
# API on this later though.
return err("Reading data from socket timed out, content request failed")
except CancelledError as exc:
# even though we already installed cancelCallback on readFut, it is worth
# catching CancelledError in case that withTimeout throws CancelledError
# but readFut have already finished.
debug "Socket read cancelled", socketKey = socket.socketKey
socket.close()
raise exc
of contentType:
return ok(
FoundContent(
src: dst, kind: Content, content: m.content.asSeq(), utpTransfer: false
) )
) )
of enrsType:
let records = recordsFromBytes(m.enrs)
if records.isOk():
let verifiedNodes = verifyNodesRecords(records.get(), dst, enrsResultLimit)
return ok(FoundContent(src: dst, kind: Nodes, nodes: verifiedNodes)) try:
# Read all bytes from the socket
# This will either end with a FIN, or because the read action times out.
# A FIN does not necessarily mean that the data read is complete.
# Further validation is required, using a length prefix here might be
# beneficial for this.
let readFut = socket.read()
readFut.cancelCallback = proc(udate: pointer) {.gcsafe.} =
debug "Socket read cancelled", socketKey = socket.socketKey
# In case this `findContent` gets cancelled while reading the data,
# send a FIN and clean up the socket.
socket.close()
if await readFut.withTimeout(p.stream.contentReadTimeout):
let content = await readFut
# socket received remote FIN and drained whole buffer, it can be
# safely destroyed without notifing remote
trace "Socket read fully", socketKey = socket.socketKey
socket.destroy()
return
ok(FoundContent(src: dst, kind: Content, content: content, utpTransfer: true))
else: else:
return err("Content message returned invalid ENRs") debug "Socket read time-out", socketKey = socket.socketKey
else: # Note: This might look a bit strange, but not doing a socket.close()
debug "FindContent failed due to find content request failure ", # here as this is already done internally. utp_socket `checkTimeouts`
error = contentMessageResponse.error # already does a socket.destroy() on timeout. Might want to change the
# API on this later though.
return err("Reading data from socket timed out, content request failed")
except CancelledError as exc:
# even though we already installed cancelCallback on readFut, it is worth
# catching CancelledError in case that withTimeout throws CancelledError
# but readFut have already finished.
debug "Socket read cancelled", socketKey = socket.socketKey
return err("No content response") socket.close()
raise exc
of contentType:
ok(
FoundContent(
src: dst, kind: Content, content: response.content.asSeq(), utpTransfer: false
)
)
of enrsType:
let records = ?recordsFromBytes(response.enrs)
let verifiedNodes = verifyNodesRecords(records, dst, enrsResultLimit)
ok(FoundContent(src: dst, kind: Nodes, nodes: verifiedNodes))
proc getContentKeys(o: OfferRequest): ContentKeysList = proc getContentKeys(o: OfferRequest): ContentKeysList =
case o.kind case o.kind
@ -884,9 +863,10 @@ proc getContentKeys(o: OfferRequest): ContentKeysList =
var contentKeys: ContentKeysList var contentKeys: ContentKeysList
for info in o.contentList: for info in o.contentList:
discard contentKeys.add(info.contentKey) discard contentKeys.add(info.contentKey)
return contentKeys
contentKeys
of Database: of Database:
return o.contentKeys o.contentKeys
func getMaxOfferedContentKeys*(protocolIdLen: uint32, maxKeySize: uint32): int = func getMaxOfferedContentKeys*(protocolIdLen: uint32, maxKeySize: uint32): int =
## Calculates how many ContentKeys will fit in one offer message which ## Calculates how many ContentKeys will fit in one offer message which
@ -929,62 +909,93 @@ proc offer(
node = o.dst node = o.dst
contentKeys contentKeys
debug "Offering content" trace "Offering content"
portal_content_keys_offered.observe( portal_content_keys_offered.observe(
contentKeys.len().int64, labelValues = [$p.protocolId] contentKeys.len().int64, labelValues = [$p.protocolId]
) )
let acceptMessageResponse = await p.offerImpl(o.dst, contentKeys) let response = ?(await p.offerImpl(o.dst, contentKeys))
if acceptMessageResponse.isOk():
let m = acceptMessageResponse.get()
let contentKeysLen =
case o.kind
of Direct:
o.contentList.len()
of Database:
o.contentKeys.len()
if m.contentKeys.len() != contentKeysLen:
# TODO:
# When there is such system, the peer should get scored negatively here.
error "Accepted content key bitlist has invalid size",
bitListLen = m.contentKeys.len(), contentKeysLen
return err("Accepted content key bitlist has invalid size")
let acceptedKeysAmount = m.contentKeys.countOnes()
portal_content_keys_accepted.observe(
acceptedKeysAmount.int64, labelValues = [$p.protocolId]
)
if acceptedKeysAmount == 0:
debug "No content accepted"
# Don't open an uTP stream if no content was requested
return ok(m.contentKeys)
let nodeAddress = NodeAddress.init(o.dst).valueOr:
# This should not happen as it comes a after succesfull talkreq/talkresp
return err("Trying to connect to node with unknown address: " & $o.dst.id)
let socket =
?(await p.stream.connectTo(nodeAddress, uint16.fromBytesBE(m.connectionId)))
template lenu32(x: untyped): untyped =
uint32(len(x))
let contentKeysLen =
case o.kind case o.kind
of Direct: of Direct:
for i, b in m.contentKeys: o.contentList.len()
if b: of Database:
let content = o.contentList[i].content o.contentKeys.len()
if response.contentKeys.len() != contentKeysLen:
# TODO:
# When there is such system, the peer should get scored negatively here.
error "Accepted content key bitlist has invalid size",
bitListLen = response.contentKeys.len(), contentKeysLen
return err("Accepted content key bitlist has invalid size")
let acceptedKeysAmount = response.contentKeys.countOnes()
portal_content_keys_accepted.observe(
acceptedKeysAmount.int64, labelValues = [$p.protocolId]
)
if acceptedKeysAmount == 0:
debug "No content accepted"
# Don't open an uTP stream if no content was requested
return ok(response.contentKeys)
let nodeAddress = NodeAddress.init(o.dst).valueOr:
# This should not happen as it comes a after succesfull talkreq/talkresp
return err("Trying to connect to node with unknown address: " & $o.dst.id)
let socket =
?(await p.stream.connectTo(nodeAddress, uint16.fromBytesBE(response.connectionId)))
template lenu32(x: untyped): untyped =
uint32(len(x))
case o.kind
of Direct:
for i, b in response.contentKeys:
if b:
let content = o.contentList[i].content
var output = memoryOutput()
try:
output.write(toBytes(content.lenu32, Leb128).toOpenArray())
output.write(content)
except IOError as e:
# This should not happen in case of in-memory streams
raiseAssert e.msg
let dataWritten = (await socket.write(output.getOutput)).valueOr:
debug "Error writing requested data", error
# No point in trying to continue writing data
socket.close()
return err("Error writing requested data")
trace "Offered content item send", dataWritten = dataWritten
of Database:
for i, b in response.contentKeys:
if b:
let
contentKey = o.contentKeys[i]
contentIdResult = p.toContentId(contentKey)
if contentIdResult.isOk():
let
contentId = contentIdResult.get()
contentResult = p.dbGet(contentKey, contentId)
var output = memoryOutput() var output = memoryOutput()
try: if contentResult.isOk():
output.write(toBytes(content.lenu32, Leb128).toOpenArray()) let content = contentResult.get()
output.write(content) try:
except IOError as e: output.write(toBytes(content.lenu32, Leb128).toOpenArray())
# This should not happen in case of in-memory streams output.write(content)
raiseAssert e.msg except IOError as e:
# This should not happen in case of in-memory streams
raiseAssert e.msg
else:
try:
# When data turns out missing, add a 0 size varint
output.write(toBytes(0'u8, Leb128).toOpenArray())
except IOError as e:
raiseAssert e.msg
let dataWritten = (await socket.write(output.getOutput)).valueOr: let dataWritten = (await socket.write(output.getOutput)).valueOr:
debug "Error writing requested data", error debug "Error writing requested data", error
@ -993,54 +1004,16 @@ proc offer(
return err("Error writing requested data") return err("Error writing requested data")
trace "Offered content item send", dataWritten = dataWritten trace "Offered content item send", dataWritten = dataWritten
of Database: await socket.closeWait()
for i, b in m.contentKeys: trace "Content successfully offered"
if b:
let
contentKey = o.contentKeys[i]
contentIdResult = p.toContentId(contentKey)
if contentIdResult.isOk():
let
contentId = contentIdResult.get()
contentResult = p.dbGet(contentKey, contentId)
var output = memoryOutput() return ok(response.contentKeys)
if contentResult.isOk():
let content = contentResult.get()
try:
output.write(toBytes(content.lenu32, Leb128).toOpenArray())
output.write(content)
except IOError as e:
# This should not happen in case of in-memory streams
raiseAssert e.msg
else:
try:
# When data turns out missing, add a 0 size varint
output.write(toBytes(0'u8, Leb128).toOpenArray())
except IOError as e:
raiseAssert e.msg
let dataWritten = (await socket.write(output.getOutput)).valueOr:
debug "Error writing requested data", error
# No point in trying to continue writing data
socket.close()
return err("Error writing requested data")
trace "Offered content item send", dataWritten = dataWritten
await socket.closeWait()
debug "Content successfully offered"
return ok(m.contentKeys)
else:
debug "Offer failed due to accept request failure ",
error = acceptMessageResponse.error
return err("No or invalid accept response: " & acceptMessageResponse.error)
proc offer*( proc offer*(
p: PortalProtocol, dst: Node, contentKeys: ContentKeysList p: PortalProtocol, dst: Node, contentKeys: ContentKeysList
): Future[PortalResult[ContentKeysBitList]] {.async: (raises: [CancelledError]).} = ): Future[PortalResult[ContentKeysBitList]] {.async: (raises: [CancelledError]).} =
let req = OfferRequest(dst: dst, kind: Database, contentKeys: contentKeys) let req = OfferRequest(dst: dst, kind: Database, contentKeys: contentKeys)
return await p.offer(req) await p.offer(req)
proc offer*( proc offer*(
p: PortalProtocol, dst: Node, content: seq[ContentKV] p: PortalProtocol, dst: Node, content: seq[ContentKV]
@ -1052,7 +1025,7 @@ proc offer*(
let contentList = List[ContentKV, contentKeysLimit].init(content) let contentList = List[ContentKV, contentKeysLimit].init(content)
let req = OfferRequest(dst: dst, kind: Direct, contentList: contentList) let req = OfferRequest(dst: dst, kind: Direct, contentList: contentList)
return await p.offer(req) await p.offer(req)
proc offerWorker(p: PortalProtocol) {.async: (raises: [CancelledError]).} = proc offerWorker(p: PortalProtocol) {.async: (raises: [CancelledError]).} =
while true: while true:
@ -1297,6 +1270,7 @@ proc contentLookup*(
) )
) )
else: else:
debug "Content query failed", error = contentResult.error
# Note: Not doing any retries here as retries can/should be done on a # Note: Not doing any retries here as retries can/should be done on a
# higher layer. However, depending on the failure we could attempt a retry, # higher layer. However, depending on the failure we could attempt a retry,
# e.g. on uTP specific errors. # e.g. on uTP specific errors.