diff --git a/fluffy/data/history_data_seeding.nim b/fluffy/data/history_data_seeding.nim index 73fa35d81..87e876ca6 100644 --- a/fluffy/data/history_data_seeding.nim +++ b/fluffy/data/history_data_seeding.nim @@ -77,7 +77,7 @@ proc propagateAccumulatorData*( p.storeContent( history_content.toContentId(key), content) - await p.neighborhoodGossip( + discard await p.neighborhoodGossip( ContentKeysList(@[encode(key)]), @[content]) return ok() @@ -101,7 +101,7 @@ proc propagateEpochAccumulator*( p.storeContent( history_content.toContentId(key), SSZ.encode(accumulator)) - await p.neighborhoodGossip( + discard await p.neighborhoodGossip( ContentKeysList(@[encode(key)]), @[SSZ.encode(accumulator)]) return ok() @@ -119,7 +119,7 @@ proc historyPropagate*( while true: let (keys, content) = await gossipQueue.popFirst() - await p.neighborhoodGossip(keys, @[content]) + discard await p.neighborhoodGossip(keys, @[content]) for i in 0 ..< concurrentGossips: gossipWorkers.add(gossipWorker(p)) @@ -173,7 +173,7 @@ proc historyPropagateBlock*( let contentId = history_content.toContentId(value[0]) p.storeContent(contentId, value[1]) - await p.neighborhoodGossip(ContentKeysList(@[encode(value[0])]), @[value[1]]) + discard await p.neighborhoodGossip(ContentKeysList(@[encode(value[0])]), @[value[1]]) return ok() else: @@ -193,7 +193,7 @@ proc historyPropagateHeaders*( while true: let (keys, content) = await gossipQueue.popFirst() - await p.neighborhoodGossip(keys, @[content]) + discard await p.neighborhoodGossip(keys, @[content]) for i in 0 ..< concurrentGossips: gossipWorkers.add(gossipWorker(p)) diff --git a/fluffy/network/history/history_network.nim b/fluffy/network/history/history_network.nim index a01c4b79e..9b1f8920a 100644 --- a/fluffy/network/history/history_network.nim +++ b/fluffy/network/history/history_network.nim @@ -768,6 +768,12 @@ proc validateContent( return true +proc neighborhoodGossipDiscardPeers( + p: PortalProtocol, + contentKeys: ContentKeysList, + content: seq[seq[byte]]): Future[void] {.async.} = + discard await p.neighborhoodGossip(contentKeys, content) + proc processContentLoop(n: HistoryNetwork) {.async.} = try: while true: @@ -779,7 +785,9 @@ proc processContentLoop(n: HistoryNetwork) {.async.} = # TODO: Differentiate between failures due to invalid data and failures # due to missing network data for validation. if await n.validateContent(contentKeys, contentItems): - asyncSpawn n.portalProtocol.neighborhoodGossip(contentKeys, contentItems) + asyncSpawn n.portalProtocol.neighborhoodGossipDiscardPeers( + contentKeys, contentItems + ) except CancelledError: trace "processContentLoop canceled" diff --git a/fluffy/network/network_seed.nim b/fluffy/network/network_seed.nim index 1d402e091..f71d8d8f5 100644 --- a/fluffy/network/network_seed.nim +++ b/fluffy/network/network_seed.nim @@ -151,7 +151,7 @@ proc breadthContentPropagate*( while true: let (keys, content) = await gossipQueue.popFirst() - await p.neighborhoodGossip(keys, content) + discard await p.neighborhoodGossip(keys, content) for i in 0 ..< concurrentGossips: gossipWorkers.add(gossipWorker(p)) diff --git a/fluffy/network/wire/portal_protocol.nim b/fluffy/network/wire/portal_protocol.nim index e86635ab2..95d8ccd13 100644 --- a/fluffy/network/wire/portal_protocol.nim +++ b/fluffy/network/wire/portal_protocol.nim @@ -1118,10 +1118,13 @@ proc getNClosestNodesWithRadius*( return nodesWithRadiuses proc neighborhoodGossip*( - p: PortalProtocol, contentKeys: ContentKeysList, content: seq[seq[byte]]) - {.async.} = + p: PortalProtocol, + contentKeys: ContentKeysList, + content: seq[seq[byte]]): Future[int] {.async.} = + ## Returns number of peers to which content was gossiped + if content.len() == 0: - return + return 0 var contentList = List[ContentInfo, contentKeysLimit].init(@[]) for i, contentItem in content: @@ -1133,7 +1136,7 @@ proc neighborhoodGossip*( # TODO: come up with something better? let contentIdOpt = p.toContentId(contentList[0].contentKey) if contentIdOpt.isNone(): - return + return 0 let contentId = contentIdOpt.get() @@ -1166,19 +1169,23 @@ proc neighborhoodGossip*( if gossipNodes.len >= 8: # use local nodes for gossip portal_gossip_without_lookup.inc(labelValues = [$p.protocolId]) - for node in gossipNodes[0.. bool: - # Only allow 1 content key for now + contentKey: string, content: string) -> int: + let - node = toNodeWithAddress(enr) - contentKeys = ContentKeysList(@[ByteList.init(hexToSeqByte(contentKey))]) - accept = await p.offer(node, contentKeys) - if accept.isErr(): - raise newException(ValueError, $accept.error) - else: - return true + ck = hexToSeqByte(contentKey) + ct = hexToSeqByte(content) + contentKeys = ContentKeysList(@[ByteList.init(ck)]) + numberOfPeers = await p.neighborhoodGossip(contentKeys, @[ct]) + + return numberOfPeers rpcServer.rpc("portal_" & network & "RecursiveFindNodes") do() -> seq[Record]: let discovered = await p.queryRandom()