mirror of
https://github.com/status-im/nimbus-eth1.git
synced 2025-01-24 11:11:59 +00:00
Add new portal JSON-RPC OfferReal as Offer is doing gossip now (#1387)
The portal_*Offer call was changed in the specs to actually do gossip. Make it no longer possible to test purely an offer with one node. Add OfferReal call for now until spec potentially gets adjusted. Also Add some Node information logging for FindContent and Offer to be able to better debug failures and interoperability.
This commit is contained in:
parent
e234a73b7e
commit
733d7e5ceb
@ -52,7 +52,7 @@ proc propagateEpochAccumulator*(
|
|||||||
# Note: The file actually holds the SSZ encoded accumulator, but we need
|
# Note: The file actually holds the SSZ encoded accumulator, but we need
|
||||||
# to decode as we need the root for the content key.
|
# to decode as we need the root for the content key.
|
||||||
encodedAccumulator = SSZ.encode(accumulator)
|
encodedAccumulator = SSZ.encode(accumulator)
|
||||||
info "Gossiping epoch accumulator", rootHash
|
info "Gossiping epoch accumulator", rootHash, contentKey = encKey
|
||||||
|
|
||||||
p.storeContent(
|
p.storeContent(
|
||||||
encKey,
|
encKey,
|
||||||
|
@ -589,6 +589,9 @@ proc findNodes*(
|
|||||||
|
|
||||||
proc findContent*(p: PortalProtocol, dst: Node, contentKey: ByteList):
|
proc findContent*(p: PortalProtocol, dst: Node, contentKey: ByteList):
|
||||||
Future[PortalResult[FoundContent]] {.async.} =
|
Future[PortalResult[FoundContent]] {.async.} =
|
||||||
|
logScope:
|
||||||
|
node = dst
|
||||||
|
contentKey
|
||||||
|
|
||||||
let contentMessageResponse = await p.findContentImpl(dst, contentKey)
|
let contentMessageResponse = await p.findContentImpl(dst, contentKey)
|
||||||
|
|
||||||
@ -596,15 +599,15 @@ proc findContent*(p: PortalProtocol, dst: Node, contentKey: ByteList):
|
|||||||
let m = contentMessageResponse.get()
|
let m = contentMessageResponse.get()
|
||||||
case m.contentMessageType:
|
case m.contentMessageType:
|
||||||
of connectionIdType:
|
of connectionIdType:
|
||||||
# uTP protocol uses BE for all values in the header, incl. connection id
|
|
||||||
let nodeAddress = NodeAddress.init(dst)
|
let nodeAddress = NodeAddress.init(dst)
|
||||||
if nodeAddress.isNone():
|
if nodeAddress.isNone():
|
||||||
# It should not happen as we are already after succesfull talkreq/talkresp
|
# It should not happen as we are already after the succesfull
|
||||||
# cycle
|
# talkreq/talkresp cycle
|
||||||
error "Trying to connect to node with unknown address",
|
error "Trying to connect to node with unknown address",
|
||||||
id = dst.id
|
id = dst.id
|
||||||
return err("Trying to connect to node with unknown address")
|
return err("Trying to connect to node with unknown address")
|
||||||
|
|
||||||
|
# uTP protocol uses BE for all values in the header, incl. connection id
|
||||||
let connFuture = p.stream.connectTo(
|
let connFuture = p.stream.connectTo(
|
||||||
nodeAddress.unsafeGet(),
|
nodeAddress.unsafeGet(),
|
||||||
uint16.fromBytesBE(m.connectionId)
|
uint16.fromBytesBE(m.connectionId)
|
||||||
@ -620,7 +623,7 @@ proc findContent*(p: PortalProtocol, dst: Node, contentKey: ByteList):
|
|||||||
raise connFuture.error
|
raise connFuture.error
|
||||||
|
|
||||||
if connectionResult.isErr():
|
if connectionResult.isErr():
|
||||||
debug "Utp connection error while trying to find content",
|
debug "uTP connection error while trying to find content",
|
||||||
error = connectionResult.error
|
error = connectionResult.error
|
||||||
return err("Error connecting uTP socket")
|
return err("Error connecting uTP socket")
|
||||||
|
|
||||||
@ -675,7 +678,8 @@ proc findContent*(p: PortalProtocol, dst: Node, contentKey: ByteList):
|
|||||||
else:
|
else:
|
||||||
return err("Content message returned invalid ENRs")
|
return err("Content message returned invalid ENRs")
|
||||||
else:
|
else:
|
||||||
warn "FindContent failed due to find content request failure ", error = contentMessageResponse.error, contentKey = contentKey
|
warn "FindContent failed due to find content request failure ",
|
||||||
|
error = contentMessageResponse.error
|
||||||
|
|
||||||
return err("No content response")
|
return err("No content response")
|
||||||
|
|
||||||
@ -710,8 +714,8 @@ proc offer(p: PortalProtocol, o: OfferRequest):
|
|||||||
Future[PortalResult[void]] {.async.} =
|
Future[PortalResult[void]] {.async.} =
|
||||||
## Offer triggers offer-accept interaction with one peer
|
## Offer triggers offer-accept interaction with one peer
|
||||||
## Whole flow has two phases:
|
## Whole flow has two phases:
|
||||||
## 1. Come to an agreement on what content to transfer, by using offer and accept
|
## 1. Come to an agreement on what content to transfer, by using offer and
|
||||||
## messages.
|
## accept messages.
|
||||||
## 2. Open uTP stream from content provider to content receiver and transfer
|
## 2. Open uTP stream from content provider to content receiver and transfer
|
||||||
## agreed content.
|
## agreed content.
|
||||||
## There are two types of possible offer requests:
|
## There are two types of possible offer requests:
|
||||||
@ -727,7 +731,11 @@ proc offer(p: PortalProtocol, o: OfferRequest):
|
|||||||
## guarantee content transfer.
|
## guarantee content transfer.
|
||||||
let contentKeys = getContentKeys(o)
|
let contentKeys = getContentKeys(o)
|
||||||
|
|
||||||
debug "Offering content", contentKeys = contentKeys
|
logScope:
|
||||||
|
node = o.dst
|
||||||
|
contentKeys
|
||||||
|
|
||||||
|
debug "Offering content"
|
||||||
|
|
||||||
portal_content_keys_offered.observe(contentKeys.len().int64)
|
portal_content_keys_offered.observe(contentKeys.len().int64)
|
||||||
|
|
||||||
@ -752,7 +760,7 @@ proc offer(p: PortalProtocol, o: OfferRequest):
|
|||||||
let acceptedKeysAmount = m.contentKeys.countOnes()
|
let acceptedKeysAmount = m.contentKeys.countOnes()
|
||||||
portal_content_keys_accepted.observe(acceptedKeysAmount.int64)
|
portal_content_keys_accepted.observe(acceptedKeysAmount.int64)
|
||||||
if acceptedKeysAmount == 0:
|
if acceptedKeysAmount == 0:
|
||||||
debug "No content acceppted", contentKeys = contentKeys
|
debug "No content accepted"
|
||||||
# Don't open an uTP stream if no content was requested
|
# Don't open an uTP stream if no content was requested
|
||||||
return ok()
|
return ok()
|
||||||
|
|
||||||
@ -772,7 +780,7 @@ proc offer(p: PortalProtocol, o: OfferRequest):
|
|||||||
|
|
||||||
if connectionResult.isErr():
|
if connectionResult.isErr():
|
||||||
debug "Utp connection error while trying to offer content",
|
debug "Utp connection error while trying to offer content",
|
||||||
error = connectionResult.error, contentKeys = contentKeys
|
error = connectionResult.error
|
||||||
return err("Error connecting uTP socket")
|
return err("Error connecting uTP socket")
|
||||||
|
|
||||||
let socket = connectionResult.get()
|
let socket = connectionResult.get()
|
||||||
@ -792,7 +800,8 @@ proc offer(p: PortalProtocol, o: OfferRequest):
|
|||||||
|
|
||||||
let dataWritten = await socket.write(output.getOutput)
|
let dataWritten = await socket.write(output.getOutput)
|
||||||
if dataWritten.isErr:
|
if dataWritten.isErr:
|
||||||
debug "Error writing requested data", error = dataWritten.error, contentKeys = contentKeys
|
debug "Error writing requested data",
|
||||||
|
error = dataWritten.error
|
||||||
# No point in trying to continue writing data
|
# No point in trying to continue writing data
|
||||||
socket.close()
|
socket.close()
|
||||||
return err("Error writing requested data")
|
return err("Error writing requested data")
|
||||||
@ -819,17 +828,19 @@ proc offer(p: PortalProtocol, o: OfferRequest):
|
|||||||
|
|
||||||
let dataWritten = await socket.write(output.getOutput)
|
let dataWritten = await socket.write(output.getOutput)
|
||||||
if dataWritten.isErr:
|
if dataWritten.isErr:
|
||||||
debug "Error writing requested data", error = dataWritten.error, contentKeys = contentKeys
|
debug "Error writing requested data",
|
||||||
|
error = dataWritten.error
|
||||||
# No point in trying to continue writing data
|
# No point in trying to continue writing data
|
||||||
socket.close()
|
socket.close()
|
||||||
return err("Error writing requested data")
|
return err("Error writing requested data")
|
||||||
|
|
||||||
debug "Content successfully offered", contentKeys = contentKeys
|
debug "Content successfully offered"
|
||||||
|
|
||||||
await socket.closeWait()
|
await socket.closeWait()
|
||||||
return ok()
|
return ok()
|
||||||
else:
|
else:
|
||||||
warn "Offer failed due to accept request failure ", error = acceptMessageResponse.error, contentKeys = contentKeys
|
warn "Offer failed due to accept request failure ",
|
||||||
|
error = acceptMessageResponse.error
|
||||||
return err("No accept response")
|
return err("No accept response")
|
||||||
|
|
||||||
proc offer*(p: PortalProtocol, dst: Node, contentKeys: ContentKeysList):
|
proc offer*(p: PortalProtocol, dst: Node, contentKeys: ContentKeysList):
|
||||||
|
@ -16,6 +16,8 @@ proc portal_stateFindContent(enr: Record, contentKey: string): tuple[
|
|||||||
proc portal_stateFindContentFull(enr: Record, contentKey: string): tuple[
|
proc portal_stateFindContentFull(enr: Record, contentKey: string): tuple[
|
||||||
content: Option[string],
|
content: Option[string],
|
||||||
enrs: Option[seq[Record]]]
|
enrs: Option[seq[Record]]]
|
||||||
|
proc portal_stateOfferReal(
|
||||||
|
enr: Record, contentKey: string, contentValue: string): bool
|
||||||
proc portal_stateOffer(contentKey: string, contentValue: string): int
|
proc portal_stateOffer(contentKey: string, contentValue: string): int
|
||||||
proc portal_stateRecursiveFindNodes(nodeId: NodeId): seq[Record]
|
proc portal_stateRecursiveFindNodes(nodeId: NodeId): seq[Record]
|
||||||
proc portal_stateRecursiveFindContent(contentKey: string): string
|
proc portal_stateRecursiveFindContent(contentKey: string): string
|
||||||
@ -40,6 +42,8 @@ proc portal_historyFindContent(enr: Record, contentKey: string): tuple[
|
|||||||
proc portal_historyFindContentFull(enr: Record, contentKey: string): tuple[
|
proc portal_historyFindContentFull(enr: Record, contentKey: string): tuple[
|
||||||
content: Option[string],
|
content: Option[string],
|
||||||
enrs: Option[seq[Record]]]
|
enrs: Option[seq[Record]]]
|
||||||
|
proc portal_historyOfferReal(
|
||||||
|
enr: Record, contentKey: string, contentValue: string): bool
|
||||||
proc portal_historyOffer(contentKey: string, contentValue: string): int
|
proc portal_historyOffer(contentKey: string, contentValue: string): int
|
||||||
proc portal_historyRecursiveFindNodes(nodeId: NodeId): seq[Record]
|
proc portal_historyRecursiveFindNodes(nodeId: NodeId): seq[Record]
|
||||||
proc portal_historyRecursiveFindContent(contentKey: string): string
|
proc portal_historyRecursiveFindContent(contentKey: string): string
|
||||||
|
@ -168,6 +168,22 @@ proc installPortalApiHandlers*(
|
|||||||
none(string),
|
none(string),
|
||||||
some(foundContent.nodes.map(proc(n: Node): Record = n.record)))
|
some(foundContent.nodes.map(proc(n: Node): Record = n.record)))
|
||||||
|
|
||||||
|
rpcServer.rpc("portal_" & network & "OfferReal") do(
|
||||||
|
enr: Record, contentKey: string, contentValue: string) -> bool:
|
||||||
|
# Note: unspecified RPC, but the spec took over the Offer call to actually
|
||||||
|
# do gossip. This should be adjusted.
|
||||||
|
let
|
||||||
|
node = toNodeWithAddress(enr)
|
||||||
|
key = hexToSeqByte(contentKey)
|
||||||
|
content = hexToSeqByte(contentValue)
|
||||||
|
contentInfo = ContentInfo(contentKey: ByteList.init(key), content: content)
|
||||||
|
res = await p.offer(node, @[contentInfo])
|
||||||
|
|
||||||
|
if res.isOk():
|
||||||
|
return true
|
||||||
|
else:
|
||||||
|
raise newException(ValueError, $res.error)
|
||||||
|
|
||||||
rpcServer.rpc("portal_" & network & "Offer") do(
|
rpcServer.rpc("portal_" & network & "Offer") do(
|
||||||
contentKey: string, contentValue: string) -> int:
|
contentKey: string, contentValue: string) -> int:
|
||||||
let
|
let
|
||||||
|
Loading…
x
Reference in New Issue
Block a user