mirror of
https://github.com/status-im/nimbus-eth1.git
synced 2025-01-11 12:54:13 +00:00
Make portal_historyOffer spec compliant (#1259)
* Make portal_historyOffer spec compliant
This commit is contained in:
parent
dd1748fd49
commit
181243b6c4
@ -77,7 +77,7 @@ proc propagateAccumulatorData*(
|
||||
|
||||
p.storeContent(
|
||||
history_content.toContentId(key), content)
|
||||
await p.neighborhoodGossip(
|
||||
discard await p.neighborhoodGossip(
|
||||
ContentKeysList(@[encode(key)]), @[content])
|
||||
|
||||
return ok()
|
||||
@ -101,7 +101,7 @@ proc propagateEpochAccumulator*(
|
||||
|
||||
p.storeContent(
|
||||
history_content.toContentId(key), SSZ.encode(accumulator))
|
||||
await p.neighborhoodGossip(
|
||||
discard await p.neighborhoodGossip(
|
||||
ContentKeysList(@[encode(key)]), @[SSZ.encode(accumulator)])
|
||||
|
||||
return ok()
|
||||
@ -119,7 +119,7 @@ proc historyPropagate*(
|
||||
while true:
|
||||
let (keys, content) = await gossipQueue.popFirst()
|
||||
|
||||
await p.neighborhoodGossip(keys, @[content])
|
||||
discard await p.neighborhoodGossip(keys, @[content])
|
||||
|
||||
for i in 0 ..< concurrentGossips:
|
||||
gossipWorkers.add(gossipWorker(p))
|
||||
@ -173,7 +173,7 @@ proc historyPropagateBlock*(
|
||||
let contentId = history_content.toContentId(value[0])
|
||||
p.storeContent(contentId, value[1])
|
||||
|
||||
await p.neighborhoodGossip(ContentKeysList(@[encode(value[0])]), @[value[1]])
|
||||
discard await p.neighborhoodGossip(ContentKeysList(@[encode(value[0])]), @[value[1]])
|
||||
|
||||
return ok()
|
||||
else:
|
||||
@ -193,7 +193,7 @@ proc historyPropagateHeaders*(
|
||||
while true:
|
||||
let (keys, content) = await gossipQueue.popFirst()
|
||||
|
||||
await p.neighborhoodGossip(keys, @[content])
|
||||
discard await p.neighborhoodGossip(keys, @[content])
|
||||
|
||||
for i in 0 ..< concurrentGossips:
|
||||
gossipWorkers.add(gossipWorker(p))
|
||||
|
@ -768,6 +768,12 @@ proc validateContent(
|
||||
|
||||
return true
|
||||
|
||||
proc neighborhoodGossipDiscardPeers(
|
||||
p: PortalProtocol,
|
||||
contentKeys: ContentKeysList,
|
||||
content: seq[seq[byte]]): Future[void] {.async.} =
|
||||
discard await p.neighborhoodGossip(contentKeys, content)
|
||||
|
||||
proc processContentLoop(n: HistoryNetwork) {.async.} =
|
||||
try:
|
||||
while true:
|
||||
@ -779,7 +785,9 @@ proc processContentLoop(n: HistoryNetwork) {.async.} =
|
||||
# TODO: Differentiate between failures due to invalid data and failures
|
||||
# due to missing network data for validation.
|
||||
if await n.validateContent(contentKeys, contentItems):
|
||||
asyncSpawn n.portalProtocol.neighborhoodGossip(contentKeys, contentItems)
|
||||
asyncSpawn n.portalProtocol.neighborhoodGossipDiscardPeers(
|
||||
contentKeys, contentItems
|
||||
)
|
||||
|
||||
except CancelledError:
|
||||
trace "processContentLoop canceled"
|
||||
|
@ -151,7 +151,7 @@ proc breadthContentPropagate*(
|
||||
while true:
|
||||
let (keys, content) = await gossipQueue.popFirst()
|
||||
|
||||
await p.neighborhoodGossip(keys, content)
|
||||
discard await p.neighborhoodGossip(keys, content)
|
||||
|
||||
for i in 0 ..< concurrentGossips:
|
||||
gossipWorkers.add(gossipWorker(p))
|
||||
|
@ -1118,10 +1118,13 @@ proc getNClosestNodesWithRadius*(
|
||||
return nodesWithRadiuses
|
||||
|
||||
proc neighborhoodGossip*(
|
||||
p: PortalProtocol, contentKeys: ContentKeysList, content: seq[seq[byte]])
|
||||
{.async.} =
|
||||
p: PortalProtocol,
|
||||
contentKeys: ContentKeysList,
|
||||
content: seq[seq[byte]]): Future[int] {.async.} =
|
||||
## Returns number of peers to which content was gossiped
|
||||
|
||||
if content.len() == 0:
|
||||
return
|
||||
return 0
|
||||
|
||||
var contentList = List[ContentInfo, contentKeysLimit].init(@[])
|
||||
for i, contentItem in content:
|
||||
@ -1133,7 +1136,7 @@ proc neighborhoodGossip*(
|
||||
# TODO: come up with something better?
|
||||
let contentIdOpt = p.toContentId(contentList[0].contentKey)
|
||||
if contentIdOpt.isNone():
|
||||
return
|
||||
return 0
|
||||
|
||||
let contentId = contentIdOpt.get()
|
||||
|
||||
@ -1166,19 +1169,23 @@ proc neighborhoodGossip*(
|
||||
|
||||
if gossipNodes.len >= 8: # use local nodes for gossip
|
||||
portal_gossip_without_lookup.inc(labelValues = [$p.protocolId])
|
||||
for node in gossipNodes[0..<min(gossipNodes.len, maxGossipNodes)]:
|
||||
let numberOfGossipedNodes = min(gossipNodes.len, maxGossipNodes)
|
||||
for node in gossipNodes[0..<numberOfGossipedNodes]:
|
||||
let req = OfferRequest(dst: node, kind: Direct, contentList: contentList)
|
||||
await p.offerQueue.addLast(req)
|
||||
return numberOfGossipedNodes
|
||||
else: # use looked up nodes for gossip
|
||||
portal_gossip_with_lookup.inc(labelValues = [$p.protocolId])
|
||||
let closestNodes = await p.lookup(NodeId(contentId))
|
||||
for node in closestNodes[0..<min(closestNodes.len, maxGossipNodes)]:
|
||||
let numberOfGossipedNodes = min(closestNodes.len, maxGossipNodes)
|
||||
for node in closestNodes[0..<numberOfGossipedNodes]:
|
||||
# Note: opportunistically not checking if the radius of the node is known
|
||||
# and thus if the node is in radius with the content. Reason is, these
|
||||
# should really be the closest nodes in the DHT, and thus are most likely
|
||||
# going to be in range of the requested content.
|
||||
let req = OfferRequest(dst: node, kind: Direct, contentList: contentList)
|
||||
await p.offerQueue.addLast(req)
|
||||
return numberOfGossipedNodes
|
||||
|
||||
proc adjustRadius(
|
||||
p: PortalProtocol,
|
||||
|
@ -31,5 +31,5 @@ proc portal_historyFindContentRaw(enr: Record, contentKey: string): tuple[
|
||||
proc portal_historyFindContent(enr: Record, contentKey: string): tuple[
|
||||
content: Option[string],
|
||||
enrs: Option[seq[Record]]]
|
||||
proc portal_historyOffer(enr: Record, contentKey: string): bool
|
||||
proc portal_historyOffer(contentKey: string, content: string): int
|
||||
proc portal_historyRecursiveFindNodes(): seq[Record]
|
||||
|
@ -139,16 +139,15 @@ proc installPortalApiHandlers*(
|
||||
some(foundContent.nodes.map(proc(n: Node): Record = n.record)))
|
||||
|
||||
rpcServer.rpc("portal_" & network & "Offer") do(
|
||||
enr: Record, contentKey: string) -> bool:
|
||||
# Only allow 1 content key for now
|
||||
contentKey: string, content: string) -> int:
|
||||
|
||||
let
|
||||
node = toNodeWithAddress(enr)
|
||||
contentKeys = ContentKeysList(@[ByteList.init(hexToSeqByte(contentKey))])
|
||||
accept = await p.offer(node, contentKeys)
|
||||
if accept.isErr():
|
||||
raise newException(ValueError, $accept.error)
|
||||
else:
|
||||
return true
|
||||
ck = hexToSeqByte(contentKey)
|
||||
ct = hexToSeqByte(content)
|
||||
contentKeys = ContentKeysList(@[ByteList.init(ck)])
|
||||
numberOfPeers = await p.neighborhoodGossip(contentKeys, @[ct])
|
||||
|
||||
return numberOfPeers
|
||||
|
||||
rpcServer.rpc("portal_" & network & "RecursiveFindNodes") do() -> seq[Record]:
|
||||
let discovered = await p.queryRandom()
|
||||
|
Loading…
x
Reference in New Issue
Block a user