From c0e329d768fafebcf0d814ba9f3534137ec897d9 Mon Sep 17 00:00:00 2001 From: kdeme <7857583+kdeme@users.noreply.github.com> Date: Sun, 16 Feb 2025 20:08:28 +0100 Subject: [PATCH] Add log message on content query failure in lookup + refactor (#3079) * Add log message on content query failure in lookup * Refactor response handling Portal wire --- fluffy/network/wire/portal_protocol.nim | 348 +++++++++++------------- 1 file changed, 161 insertions(+), 187 deletions(-) diff --git a/fluffy/network/wire/portal_protocol.nim b/fluffy/network/wire/portal_protocol.nim index c48dab4cb..4ec4e11ac 100644 --- a/fluffy/network/wire/portal_protocol.nim +++ b/fluffy/network/wire/portal_protocol.nim @@ -758,41 +758,31 @@ proc ping*( ): Future[PortalResult[(uint64, CapabilitiesPayload)]] {. async: (raises: [CancelledError]) .} = - let pongResponse = await p.pingImpl(dst) + let pong = ?(await p.pingImpl(dst)) - if pongResponse.isOk(): - # Update last time we pinged this node - p.pingTimings[dst.id] = now(chronos.Moment) + # Update last time we pinged this node + p.pingTimings[dst.id] = now(chronos.Moment) - let pong = pongResponse.get() + # Note: currently only decoding as capabilities payload as this is the only + # one that we support sending. + if pong.payload_type != CapabilitiesType: + return err("Pong message contains invalid or error payload") - # Note: currently only decoding as capabilities payload as this is the only - # one that we support sending. - if pong.payload_type != CapabilitiesType: - return err("Pong message contains invalid or error payload") + let payload = decodeSsz(pong.payload.asSeq(), CapabilitiesPayload).valueOr: + return err("Pong message contains invalid CapabilitiesPayload") - let payload = decodeSsz(pong.payload.asSeq(), CapabilitiesPayload).valueOr: - return err("Pong message contains invalid CapabilitiesPayload") + p.radiusCache.put(dst.id, payload.data_radius) - p.radiusCache.put(dst.id, payload.data_radius) - - ok((pong.enrSeq, payload)) - else: - err(pongResponse.error) + ok((pong.enrSeq, payload)) proc findNodes*( p: PortalProtocol, dst: Node, distances: seq[uint16] ): Future[PortalResult[seq[Node]]] {.async: (raises: [CancelledError]).} = - let nodesMessage = await p.findNodesImpl(dst, List[uint16, 256](distances)) - if nodesMessage.isOk(): - let records = recordsFromBytes(nodesMessage.get().enrs) - if records.isOk(): - # TODO: distance function is wrong here for state, fix + tests - return ok(verifyNodesRecords(records.get(), dst, enrsResultLimit, distances)) - else: - return err(records.error) - else: - return err(nodesMessage.error) + let response = ?(await p.findNodesImpl(dst, List[uint16, 256](distances))) + + let records = ?recordsFromBytes(response.enrs) + # TODO: distance function is wrong here for state, fix + tests + ok(verifyNodesRecords(records, dst, enrsResultLimit, distances)) proc findContent*( p: PortalProtocol, dst: Node, contentKey: ContentKeyByteList @@ -801,82 +791,71 @@ proc findContent*( node = dst contentKey - let contentMessageResponse = await p.findContentImpl(dst, contentKey) + let response = ?(await p.findContentImpl(dst, contentKey)) - if contentMessageResponse.isOk(): - let m = contentMessageResponse.get() - case m.contentMessageType - of connectionIdType: - let nodeAddress = NodeAddress.init(dst).valueOr: - # This should not happen as it comes a after succesfull talkreq/talkresp - return err("Trying to connect to node with unknown address: " & $dst.id) + case response.contentMessageType + of connectionIdType: + let nodeAddress = NodeAddress.init(dst).valueOr: + # This should not happen as it comes a after succesfull talkreq/talkresp + return err("Trying to connect to node with unknown address: " & $dst.id) - let socket = - ?( - await p.stream.connectTo( - # uTP protocol uses BE for all values in the header, incl. connection id - nodeAddress, - uint16.fromBytesBE(m.connectionId), - ) - ) - - try: - # Read all bytes from the socket - # This will either end with a FIN, or because the read action times out. - # A FIN does not necessarily mean that the data read is complete. - # Further validation is required, using a length prefix here might be - # beneficial for this. - let readFut = socket.read() - - readFut.cancelCallback = proc(udate: pointer) {.gcsafe.} = - debug "Socket read cancelled", socketKey = socket.socketKey - # In case this `findContent` gets cancelled while reading the data, - # send a FIN and clean up the socket. - socket.close() - - if await readFut.withTimeout(p.stream.contentReadTimeout): - let content = await readFut - # socket received remote FIN and drained whole buffer, it can be - # safely destroyed without notifing remote - debug "Socket read fully", socketKey = socket.socketKey - socket.destroy() - return ok( - FoundContent(src: dst, kind: Content, content: content, utpTransfer: true) - ) - else: - debug "Socket read time-out", socketKey = socket.socketKey - # Note: This might look a bit strange, but not doing a socket.close() - # here as this is already done internally. utp_socket `checkTimeouts` - # already does a socket.destroy() on timeout. Might want to change the - # API on this later though. - return err("Reading data from socket timed out, content request failed") - except CancelledError as exc: - # even though we already installed cancelCallback on readFut, it is worth - # catching CancelledError in case that withTimeout throws CancelledError - # but readFut have already finished. - debug "Socket read cancelled", socketKey = socket.socketKey - - socket.close() - raise exc - of contentType: - return ok( - FoundContent( - src: dst, kind: Content, content: m.content.asSeq(), utpTransfer: false + let socket = + ?( + await p.stream.connectTo( + # uTP protocol uses BE for all values in the header, incl. connection id + nodeAddress, + uint16.fromBytesBE(response.connectionId), ) ) - of enrsType: - let records = recordsFromBytes(m.enrs) - if records.isOk(): - let verifiedNodes = verifyNodesRecords(records.get(), dst, enrsResultLimit) - return ok(FoundContent(src: dst, kind: Nodes, nodes: verifiedNodes)) + try: + # Read all bytes from the socket + # This will either end with a FIN, or because the read action times out. + # A FIN does not necessarily mean that the data read is complete. + # Further validation is required, using a length prefix here might be + # beneficial for this. + let readFut = socket.read() + + readFut.cancelCallback = proc(udate: pointer) {.gcsafe.} = + debug "Socket read cancelled", socketKey = socket.socketKey + # In case this `findContent` gets cancelled while reading the data, + # send a FIN and clean up the socket. + socket.close() + + if await readFut.withTimeout(p.stream.contentReadTimeout): + let content = await readFut + # socket received remote FIN and drained whole buffer, it can be + # safely destroyed without notifing remote + trace "Socket read fully", socketKey = socket.socketKey + socket.destroy() + return + ok(FoundContent(src: dst, kind: Content, content: content, utpTransfer: true)) else: - return err("Content message returned invalid ENRs") - else: - debug "FindContent failed due to find content request failure ", - error = contentMessageResponse.error + debug "Socket read time-out", socketKey = socket.socketKey + # Note: This might look a bit strange, but not doing a socket.close() + # here as this is already done internally. utp_socket `checkTimeouts` + # already does a socket.destroy() on timeout. Might want to change the + # API on this later though. + return err("Reading data from socket timed out, content request failed") + except CancelledError as exc: + # even though we already installed cancelCallback on readFut, it is worth + # catching CancelledError in case that withTimeout throws CancelledError + # but readFut have already finished. + debug "Socket read cancelled", socketKey = socket.socketKey - return err("No content response") + socket.close() + raise exc + of contentType: + ok( + FoundContent( + src: dst, kind: Content, content: response.content.asSeq(), utpTransfer: false + ) + ) + of enrsType: + let records = ?recordsFromBytes(response.enrs) + let verifiedNodes = verifyNodesRecords(records, dst, enrsResultLimit) + + ok(FoundContent(src: dst, kind: Nodes, nodes: verifiedNodes)) proc getContentKeys(o: OfferRequest): ContentKeysList = case o.kind @@ -884,9 +863,10 @@ proc getContentKeys(o: OfferRequest): ContentKeysList = var contentKeys: ContentKeysList for info in o.contentList: discard contentKeys.add(info.contentKey) - return contentKeys + + contentKeys of Database: - return o.contentKeys + o.contentKeys func getMaxOfferedContentKeys*(protocolIdLen: uint32, maxKeySize: uint32): int = ## Calculates how many ContentKeys will fit in one offer message which @@ -929,62 +909,93 @@ proc offer( node = o.dst contentKeys - debug "Offering content" + trace "Offering content" portal_content_keys_offered.observe( contentKeys.len().int64, labelValues = [$p.protocolId] ) - let acceptMessageResponse = await p.offerImpl(o.dst, contentKeys) - - if acceptMessageResponse.isOk(): - let m = acceptMessageResponse.get() - - let contentKeysLen = - case o.kind - of Direct: - o.contentList.len() - of Database: - o.contentKeys.len() - - if m.contentKeys.len() != contentKeysLen: - # TODO: - # When there is such system, the peer should get scored negatively here. - error "Accepted content key bitlist has invalid size", - bitListLen = m.contentKeys.len(), contentKeysLen - return err("Accepted content key bitlist has invalid size") - - let acceptedKeysAmount = m.contentKeys.countOnes() - portal_content_keys_accepted.observe( - acceptedKeysAmount.int64, labelValues = [$p.protocolId] - ) - if acceptedKeysAmount == 0: - debug "No content accepted" - # Don't open an uTP stream if no content was requested - return ok(m.contentKeys) - - let nodeAddress = NodeAddress.init(o.dst).valueOr: - # This should not happen as it comes a after succesfull talkreq/talkresp - return err("Trying to connect to node with unknown address: " & $o.dst.id) - - let socket = - ?(await p.stream.connectTo(nodeAddress, uint16.fromBytesBE(m.connectionId))) - - template lenu32(x: untyped): untyped = - uint32(len(x)) + let response = ?(await p.offerImpl(o.dst, contentKeys)) + let contentKeysLen = case o.kind of Direct: - for i, b in m.contentKeys: - if b: - let content = o.contentList[i].content + o.contentList.len() + of Database: + o.contentKeys.len() + + if response.contentKeys.len() != contentKeysLen: + # TODO: + # When there is such system, the peer should get scored negatively here. + error "Accepted content key bitlist has invalid size", + bitListLen = response.contentKeys.len(), contentKeysLen + return err("Accepted content key bitlist has invalid size") + + let acceptedKeysAmount = response.contentKeys.countOnes() + portal_content_keys_accepted.observe( + acceptedKeysAmount.int64, labelValues = [$p.protocolId] + ) + if acceptedKeysAmount == 0: + debug "No content accepted" + # Don't open an uTP stream if no content was requested + return ok(response.contentKeys) + + let nodeAddress = NodeAddress.init(o.dst).valueOr: + # This should not happen as it comes a after succesfull talkreq/talkresp + return err("Trying to connect to node with unknown address: " & $o.dst.id) + + let socket = + ?(await p.stream.connectTo(nodeAddress, uint16.fromBytesBE(response.connectionId))) + + template lenu32(x: untyped): untyped = + uint32(len(x)) + + case o.kind + of Direct: + for i, b in response.contentKeys: + if b: + let content = o.contentList[i].content + var output = memoryOutput() + try: + output.write(toBytes(content.lenu32, Leb128).toOpenArray()) + output.write(content) + except IOError as e: + # This should not happen in case of in-memory streams + raiseAssert e.msg + + let dataWritten = (await socket.write(output.getOutput)).valueOr: + debug "Error writing requested data", error + # No point in trying to continue writing data + socket.close() + return err("Error writing requested data") + + trace "Offered content item send", dataWritten = dataWritten + of Database: + for i, b in response.contentKeys: + if b: + let + contentKey = o.contentKeys[i] + contentIdResult = p.toContentId(contentKey) + if contentIdResult.isOk(): + let + contentId = contentIdResult.get() + contentResult = p.dbGet(contentKey, contentId) + var output = memoryOutput() - try: - output.write(toBytes(content.lenu32, Leb128).toOpenArray()) - output.write(content) - except IOError as e: - # This should not happen in case of in-memory streams - raiseAssert e.msg + if contentResult.isOk(): + let content = contentResult.get() + try: + output.write(toBytes(content.lenu32, Leb128).toOpenArray()) + output.write(content) + except IOError as e: + # This should not happen in case of in-memory streams + raiseAssert e.msg + else: + try: + # When data turns out missing, add a 0 size varint + output.write(toBytes(0'u8, Leb128).toOpenArray()) + except IOError as e: + raiseAssert e.msg let dataWritten = (await socket.write(output.getOutput)).valueOr: debug "Error writing requested data", error @@ -993,54 +1004,16 @@ proc offer( return err("Error writing requested data") trace "Offered content item send", dataWritten = dataWritten - of Database: - for i, b in m.contentKeys: - if b: - let - contentKey = o.contentKeys[i] - contentIdResult = p.toContentId(contentKey) - if contentIdResult.isOk(): - let - contentId = contentIdResult.get() - contentResult = p.dbGet(contentKey, contentId) + await socket.closeWait() + trace "Content successfully offered" - var output = memoryOutput() - if contentResult.isOk(): - let content = contentResult.get() - try: - output.write(toBytes(content.lenu32, Leb128).toOpenArray()) - output.write(content) - except IOError as e: - # This should not happen in case of in-memory streams - raiseAssert e.msg - else: - try: - # When data turns out missing, add a 0 size varint - output.write(toBytes(0'u8, Leb128).toOpenArray()) - except IOError as e: - raiseAssert e.msg - - let dataWritten = (await socket.write(output.getOutput)).valueOr: - debug "Error writing requested data", error - # No point in trying to continue writing data - socket.close() - return err("Error writing requested data") - - trace "Offered content item send", dataWritten = dataWritten - await socket.closeWait() - debug "Content successfully offered" - - return ok(m.contentKeys) - else: - debug "Offer failed due to accept request failure ", - error = acceptMessageResponse.error - return err("No or invalid accept response: " & acceptMessageResponse.error) + return ok(response.contentKeys) proc offer*( p: PortalProtocol, dst: Node, contentKeys: ContentKeysList ): Future[PortalResult[ContentKeysBitList]] {.async: (raises: [CancelledError]).} = let req = OfferRequest(dst: dst, kind: Database, contentKeys: contentKeys) - return await p.offer(req) + await p.offer(req) proc offer*( p: PortalProtocol, dst: Node, content: seq[ContentKV] @@ -1052,7 +1025,7 @@ proc offer*( let contentList = List[ContentKV, contentKeysLimit].init(content) let req = OfferRequest(dst: dst, kind: Direct, contentList: contentList) - return await p.offer(req) + await p.offer(req) proc offerWorker(p: PortalProtocol) {.async: (raises: [CancelledError]).} = while true: @@ -1297,6 +1270,7 @@ proc contentLookup*( ) ) else: + debug "Content query failed", error = contentResult.error # Note: Not doing any retries here as retries can/should be done on a # higher layer. However, depending on the failure we could attempt a retry, # e.g. on uTP specific errors.