diff --git a/fluffy/data/history_data_seeding.nim b/fluffy/data/history_data_seeding.nim index b99911b84..fcb035371 100644 --- a/fluffy/data/history_data_seeding.nim +++ b/fluffy/data/history_data_seeding.nim @@ -52,7 +52,7 @@ proc propagateEpochAccumulator*( # Note: The file actually holds the SSZ encoded accumulator, but we need # to decode as we need the root for the content key. encodedAccumulator = SSZ.encode(accumulator) - info "Gossiping epoch accumulator", rootHash + info "Gossiping epoch accumulator", rootHash, contentKey = encKey p.storeContent( encKey, diff --git a/fluffy/network/wire/portal_protocol.nim b/fluffy/network/wire/portal_protocol.nim index aea970e7d..62b76399f 100644 --- a/fluffy/network/wire/portal_protocol.nim +++ b/fluffy/network/wire/portal_protocol.nim @@ -589,6 +589,9 @@ proc findNodes*( proc findContent*(p: PortalProtocol, dst: Node, contentKey: ByteList): Future[PortalResult[FoundContent]] {.async.} = + logScope: + node = dst + contentKey let contentMessageResponse = await p.findContentImpl(dst, contentKey) @@ -596,15 +599,15 @@ proc findContent*(p: PortalProtocol, dst: Node, contentKey: ByteList): let m = contentMessageResponse.get() case m.contentMessageType: of connectionIdType: - # uTP protocol uses BE for all values in the header, incl. connection id let nodeAddress = NodeAddress.init(dst) if nodeAddress.isNone(): - # It should not happen as we are already after succesfull talkreq/talkresp - # cycle + # It should not happen as we are already after the succesfull + # talkreq/talkresp cycle error "Trying to connect to node with unknown address", id = dst.id return err("Trying to connect to node with unknown address") + # uTP protocol uses BE for all values in the header, incl. connection id let connFuture = p.stream.connectTo( nodeAddress.unsafeGet(), uint16.fromBytesBE(m.connectionId) @@ -620,7 +623,7 @@ proc findContent*(p: PortalProtocol, dst: Node, contentKey: ByteList): raise connFuture.error if connectionResult.isErr(): - debug "Utp connection error while trying to find content", + debug "uTP connection error while trying to find content", error = connectionResult.error return err("Error connecting uTP socket") @@ -675,7 +678,8 @@ proc findContent*(p: PortalProtocol, dst: Node, contentKey: ByteList): else: return err("Content message returned invalid ENRs") else: - warn "FindContent failed due to find content request failure ", error = contentMessageResponse.error, contentKey = contentKey + warn "FindContent failed due to find content request failure ", + error = contentMessageResponse.error return err("No content response") @@ -710,8 +714,8 @@ proc offer(p: PortalProtocol, o: OfferRequest): Future[PortalResult[void]] {.async.} = ## Offer triggers offer-accept interaction with one peer ## Whole flow has two phases: - ## 1. Come to an agreement on what content to transfer, by using offer and accept - ## messages. + ## 1. Come to an agreement on what content to transfer, by using offer and + ## accept messages. ## 2. Open uTP stream from content provider to content receiver and transfer ## agreed content. ## There are two types of possible offer requests: @@ -727,7 +731,11 @@ proc offer(p: PortalProtocol, o: OfferRequest): ## guarantee content transfer. let contentKeys = getContentKeys(o) - debug "Offering content", contentKeys = contentKeys + logScope: + node = o.dst + contentKeys + + debug "Offering content" portal_content_keys_offered.observe(contentKeys.len().int64) @@ -752,7 +760,7 @@ proc offer(p: PortalProtocol, o: OfferRequest): let acceptedKeysAmount = m.contentKeys.countOnes() portal_content_keys_accepted.observe(acceptedKeysAmount.int64) if acceptedKeysAmount == 0: - debug "No content acceppted", contentKeys = contentKeys + debug "No content accepted" # Don't open an uTP stream if no content was requested return ok() @@ -772,7 +780,7 @@ proc offer(p: PortalProtocol, o: OfferRequest): if connectionResult.isErr(): debug "Utp connection error while trying to offer content", - error = connectionResult.error, contentKeys = contentKeys + error = connectionResult.error return err("Error connecting uTP socket") let socket = connectionResult.get() @@ -792,7 +800,8 @@ proc offer(p: PortalProtocol, o: OfferRequest): let dataWritten = await socket.write(output.getOutput) if dataWritten.isErr: - debug "Error writing requested data", error = dataWritten.error, contentKeys = contentKeys + debug "Error writing requested data", + error = dataWritten.error # No point in trying to continue writing data socket.close() return err("Error writing requested data") @@ -819,17 +828,19 @@ proc offer(p: PortalProtocol, o: OfferRequest): let dataWritten = await socket.write(output.getOutput) if dataWritten.isErr: - debug "Error writing requested data", error = dataWritten.error, contentKeys = contentKeys + debug "Error writing requested data", + error = dataWritten.error # No point in trying to continue writing data socket.close() return err("Error writing requested data") - debug "Content successfully offered", contentKeys = contentKeys + debug "Content successfully offered" await socket.closeWait() return ok() else: - warn "Offer failed due to accept request failure ", error = acceptMessageResponse.error, contentKeys = contentKeys + warn "Offer failed due to accept request failure ", + error = acceptMessageResponse.error return err("No accept response") proc offer*(p: PortalProtocol, dst: Node, contentKeys: ContentKeysList): diff --git a/fluffy/rpc/rpc_calls/rpc_portal_calls.nim b/fluffy/rpc/rpc_calls/rpc_portal_calls.nim index 2b39fd195..f47e54a5c 100644 --- a/fluffy/rpc/rpc_calls/rpc_portal_calls.nim +++ b/fluffy/rpc/rpc_calls/rpc_portal_calls.nim @@ -16,6 +16,8 @@ proc portal_stateFindContent(enr: Record, contentKey: string): tuple[ proc portal_stateFindContentFull(enr: Record, contentKey: string): tuple[ content: Option[string], enrs: Option[seq[Record]]] +proc portal_stateOfferReal( + enr: Record, contentKey: string, contentValue: string): bool proc portal_stateOffer(contentKey: string, contentValue: string): int proc portal_stateRecursiveFindNodes(nodeId: NodeId): seq[Record] proc portal_stateRecursiveFindContent(contentKey: string): string @@ -40,6 +42,8 @@ proc portal_historyFindContent(enr: Record, contentKey: string): tuple[ proc portal_historyFindContentFull(enr: Record, contentKey: string): tuple[ content: Option[string], enrs: Option[seq[Record]]] +proc portal_historyOfferReal( + enr: Record, contentKey: string, contentValue: string): bool proc portal_historyOffer(contentKey: string, contentValue: string): int proc portal_historyRecursiveFindNodes(nodeId: NodeId): seq[Record] proc portal_historyRecursiveFindContent(contentKey: string): string diff --git a/fluffy/rpc/rpc_portal_api.nim b/fluffy/rpc/rpc_portal_api.nim index bacc821d0..68bda3301 100644 --- a/fluffy/rpc/rpc_portal_api.nim +++ b/fluffy/rpc/rpc_portal_api.nim @@ -168,6 +168,22 @@ proc installPortalApiHandlers*( none(string), some(foundContent.nodes.map(proc(n: Node): Record = n.record))) + rpcServer.rpc("portal_" & network & "OfferReal") do( + enr: Record, contentKey: string, contentValue: string) -> bool: + # Note: unspecified RPC, but the spec took over the Offer call to actually + # do gossip. This should be adjusted. + let + node = toNodeWithAddress(enr) + key = hexToSeqByte(contentKey) + content = hexToSeqByte(contentValue) + contentInfo = ContentInfo(contentKey: ByteList.init(key), content: content) + res = await p.offer(node, @[contentInfo]) + + if res.isOk(): + return true + else: + raise newException(ValueError, $res.error) + rpcServer.rpc("portal_" & network & "Offer") do( contentKey: string, contentValue: string) -> int: let