From 733d7e5ceb84c5909aa5bd4831bea90650e91e44 Mon Sep 17 00:00:00 2001 From: Kim De Mey Date: Fri, 16 Dec 2022 17:47:52 +0100 Subject: [PATCH] Add new portal JSON-RPC OfferReal as Offer is doing gossip now (#1387) The portal_*Offer call was changed in the specs to actually do gossip. Make it no longer possible to test purely an offer with one node. Add OfferReal call for now until spec potentially gets adjusted. Also Add some Node information logging for FindContent and Offer to be able to better debug failures and interoperability. --- fluffy/data/history_data_seeding.nim | 2 +- fluffy/network/wire/portal_protocol.nim | 39 +++++++++++++++-------- fluffy/rpc/rpc_calls/rpc_portal_calls.nim | 4 +++ fluffy/rpc/rpc_portal_api.nim | 16 ++++++++++ 4 files changed, 46 insertions(+), 15 deletions(-) diff --git a/fluffy/data/history_data_seeding.nim b/fluffy/data/history_data_seeding.nim index b99911b84..fcb035371 100644 --- a/fluffy/data/history_data_seeding.nim +++ b/fluffy/data/history_data_seeding.nim @@ -52,7 +52,7 @@ proc propagateEpochAccumulator*( # Note: The file actually holds the SSZ encoded accumulator, but we need # to decode as we need the root for the content key. encodedAccumulator = SSZ.encode(accumulator) - info "Gossiping epoch accumulator", rootHash + info "Gossiping epoch accumulator", rootHash, contentKey = encKey p.storeContent( encKey, diff --git a/fluffy/network/wire/portal_protocol.nim b/fluffy/network/wire/portal_protocol.nim index aea970e7d..62b76399f 100644 --- a/fluffy/network/wire/portal_protocol.nim +++ b/fluffy/network/wire/portal_protocol.nim @@ -589,6 +589,9 @@ proc findNodes*( proc findContent*(p: PortalProtocol, dst: Node, contentKey: ByteList): Future[PortalResult[FoundContent]] {.async.} = + logScope: + node = dst + contentKey let contentMessageResponse = await p.findContentImpl(dst, contentKey) @@ -596,15 +599,15 @@ proc findContent*(p: PortalProtocol, dst: Node, contentKey: ByteList): let m = contentMessageResponse.get() case m.contentMessageType: of connectionIdType: - # uTP protocol uses BE for all values in the header, incl. connection id let nodeAddress = NodeAddress.init(dst) if nodeAddress.isNone(): - # It should not happen as we are already after succesfull talkreq/talkresp - # cycle + # It should not happen as we are already after the succesfull + # talkreq/talkresp cycle error "Trying to connect to node with unknown address", id = dst.id return err("Trying to connect to node with unknown address") + # uTP protocol uses BE for all values in the header, incl. connection id let connFuture = p.stream.connectTo( nodeAddress.unsafeGet(), uint16.fromBytesBE(m.connectionId) @@ -620,7 +623,7 @@ proc findContent*(p: PortalProtocol, dst: Node, contentKey: ByteList): raise connFuture.error if connectionResult.isErr(): - debug "Utp connection error while trying to find content", + debug "uTP connection error while trying to find content", error = connectionResult.error return err("Error connecting uTP socket") @@ -675,7 +678,8 @@ proc findContent*(p: PortalProtocol, dst: Node, contentKey: ByteList): else: return err("Content message returned invalid ENRs") else: - warn "FindContent failed due to find content request failure ", error = contentMessageResponse.error, contentKey = contentKey + warn "FindContent failed due to find content request failure ", + error = contentMessageResponse.error return err("No content response") @@ -710,8 +714,8 @@ proc offer(p: PortalProtocol, o: OfferRequest): Future[PortalResult[void]] {.async.} = ## Offer triggers offer-accept interaction with one peer ## Whole flow has two phases: - ## 1. Come to an agreement on what content to transfer, by using offer and accept - ## messages. + ## 1. Come to an agreement on what content to transfer, by using offer and + ## accept messages. ## 2. Open uTP stream from content provider to content receiver and transfer ## agreed content. ## There are two types of possible offer requests: @@ -727,7 +731,11 @@ proc offer(p: PortalProtocol, o: OfferRequest): ## guarantee content transfer. let contentKeys = getContentKeys(o) - debug "Offering content", contentKeys = contentKeys + logScope: + node = o.dst + contentKeys + + debug "Offering content" portal_content_keys_offered.observe(contentKeys.len().int64) @@ -752,7 +760,7 @@ proc offer(p: PortalProtocol, o: OfferRequest): let acceptedKeysAmount = m.contentKeys.countOnes() portal_content_keys_accepted.observe(acceptedKeysAmount.int64) if acceptedKeysAmount == 0: - debug "No content acceppted", contentKeys = contentKeys + debug "No content accepted" # Don't open an uTP stream if no content was requested return ok() @@ -772,7 +780,7 @@ proc offer(p: PortalProtocol, o: OfferRequest): if connectionResult.isErr(): debug "Utp connection error while trying to offer content", - error = connectionResult.error, contentKeys = contentKeys + error = connectionResult.error return err("Error connecting uTP socket") let socket = connectionResult.get() @@ -792,7 +800,8 @@ proc offer(p: PortalProtocol, o: OfferRequest): let dataWritten = await socket.write(output.getOutput) if dataWritten.isErr: - debug "Error writing requested data", error = dataWritten.error, contentKeys = contentKeys + debug "Error writing requested data", + error = dataWritten.error # No point in trying to continue writing data socket.close() return err("Error writing requested data") @@ -819,17 +828,19 @@ proc offer(p: PortalProtocol, o: OfferRequest): let dataWritten = await socket.write(output.getOutput) if dataWritten.isErr: - debug "Error writing requested data", error = dataWritten.error, contentKeys = contentKeys + debug "Error writing requested data", + error = dataWritten.error # No point in trying to continue writing data socket.close() return err("Error writing requested data") - debug "Content successfully offered", contentKeys = contentKeys + debug "Content successfully offered" await socket.closeWait() return ok() else: - warn "Offer failed due to accept request failure ", error = acceptMessageResponse.error, contentKeys = contentKeys + warn "Offer failed due to accept request failure ", + error = acceptMessageResponse.error return err("No accept response") proc offer*(p: PortalProtocol, dst: Node, contentKeys: ContentKeysList): diff --git a/fluffy/rpc/rpc_calls/rpc_portal_calls.nim b/fluffy/rpc/rpc_calls/rpc_portal_calls.nim index 2b39fd195..f47e54a5c 100644 --- a/fluffy/rpc/rpc_calls/rpc_portal_calls.nim +++ b/fluffy/rpc/rpc_calls/rpc_portal_calls.nim @@ -16,6 +16,8 @@ proc portal_stateFindContent(enr: Record, contentKey: string): tuple[ proc portal_stateFindContentFull(enr: Record, contentKey: string): tuple[ content: Option[string], enrs: Option[seq[Record]]] +proc portal_stateOfferReal( + enr: Record, contentKey: string, contentValue: string): bool proc portal_stateOffer(contentKey: string, contentValue: string): int proc portal_stateRecursiveFindNodes(nodeId: NodeId): seq[Record] proc portal_stateRecursiveFindContent(contentKey: string): string @@ -40,6 +42,8 @@ proc portal_historyFindContent(enr: Record, contentKey: string): tuple[ proc portal_historyFindContentFull(enr: Record, contentKey: string): tuple[ content: Option[string], enrs: Option[seq[Record]]] +proc portal_historyOfferReal( + enr: Record, contentKey: string, contentValue: string): bool proc portal_historyOffer(contentKey: string, contentValue: string): int proc portal_historyRecursiveFindNodes(nodeId: NodeId): seq[Record] proc portal_historyRecursiveFindContent(contentKey: string): string diff --git a/fluffy/rpc/rpc_portal_api.nim b/fluffy/rpc/rpc_portal_api.nim index bacc821d0..68bda3301 100644 --- a/fluffy/rpc/rpc_portal_api.nim +++ b/fluffy/rpc/rpc_portal_api.nim @@ -168,6 +168,22 @@ proc installPortalApiHandlers*( none(string), some(foundContent.nodes.map(proc(n: Node): Record = n.record))) + rpcServer.rpc("portal_" & network & "OfferReal") do( + enr: Record, contentKey: string, contentValue: string) -> bool: + # Note: unspecified RPC, but the spec took over the Offer call to actually + # do gossip. This should be adjusted. + let + node = toNodeWithAddress(enr) + key = hexToSeqByte(contentKey) + content = hexToSeqByte(contentValue) + contentInfo = ContentInfo(contentKey: ByteList.init(key), content: content) + res = await p.offer(node, @[contentInfo]) + + if res.isOk(): + return true + else: + raise newException(ValueError, $res.error) + rpcServer.rpc("portal_" & network & "Offer") do( contentKey: string, contentValue: string) -> int: let