diff --git a/fluffy/eth_data/history_data_seeding.nim b/fluffy/eth_data/history_data_seeding.nim index 88e53523e..de1e432ac 100644 --- a/fluffy/eth_data/history_data_seeding.nim +++ b/fluffy/eth_data/history_data_seeding.nim @@ -60,7 +60,7 @@ proc propagateEpochAccumulator*( encodedAccumulator ) discard await p.neighborhoodGossip( - ContentKeysList(@[encKey]), @[encodedAccumulator]) + Opt.none(NodeId), ContentKeysList(@[encKey]), @[encodedAccumulator]) return ok() @@ -87,14 +87,14 @@ proc historyPropagate*( const concurrentGossips = 20 var gossipQueue = - newAsyncQueue[(ContentKeysList, seq[byte])](concurrentGossips) + newAsyncQueue[(Opt[NodeId], ContentKeysList, seq[byte])](concurrentGossips) var gossipWorkers: seq[Future[void]] proc gossipWorker(p: PortalProtocol) {.async.} = while true: - let (keys, content) = await gossipQueue.popFirst() + let (srcNodeId, keys, content) = await gossipQueue.popFirst() - discard await p.neighborhoodGossip(keys, @[content]) + discard await p.neighborhoodGossip(srcNodeId, keys, @[content]) for i in 0 ..< concurrentGossips: gossipWorkers.add(gossipWorker(p)) @@ -121,7 +121,7 @@ proc historyPropagate*( p.storeContent(encKey, contentId, value[1]) await gossipQueue.addLast( - (ContentKeysList(@[encode(value[0])]), value[1])) + (Opt.none(NodeId), ContentKeysList(@[encode(value[0])]), value[1])) return ok() else: @@ -152,7 +152,7 @@ proc historyPropagateBlock*( contentId = history_content.toContentId(encKey) p.storeContent(encKey, contentId, value[1]) - discard await p.neighborhoodGossip(ContentKeysList(@[encode(value[0])]), @[value[1]]) + discard await p.neighborhoodGossip(Opt.none(NodeId), ContentKeysList(@[encode(value[0])]), @[value[1]]) return ok() else: @@ -190,7 +190,7 @@ proc historyPropagateHeadersWithProof*( p.storeContent(encKey, contentId, encodedContent) let keys = ContentKeysList(@[encode(contentKey)]) - discard await p.neighborhoodGossip(keys, @[encodedContent]) + discard await p.neighborhoodGossip(Opt.none(NodeId), keys, @[encodedContent]) return ok() @@ -229,7 +229,7 @@ proc historyPropagateHeaders*( while true: let (keys, content) = await gossipQueue.popFirst() - discard await p.neighborhoodGossip(keys, @[content]) + discard await p.neighborhoodGossip(Opt.none(NodeId), keys, @[content]) for i in 0 ..< concurrentGossips: gossipWorkers.add(gossipWorker(p)) diff --git a/fluffy/network/beacon_light_client/beacon_light_client_network.nim b/fluffy/network/beacon_light_client/beacon_light_client_network.nim index 03a18c0a7..3c8fe91d3 100644 --- a/fluffy/network/beacon_light_client/beacon_light_client_network.nim +++ b/fluffy/network/beacon_light_client/beacon_light_client_network.nim @@ -29,7 +29,7 @@ type LightClientNetwork* = ref object portalProtocol*: PortalProtocol lightClientDb*: LightClientDb - contentQueue*: AsyncQueue[(ContentKeysList, seq[seq[byte]])] + contentQueue*: AsyncQueue[(Opt[NodeId], ContentKeysList, seq[seq[byte]])] forkDigests*: ForkDigests processContentLoop: Future[void] @@ -173,7 +173,7 @@ proc new*( bootstrapRecords: openArray[Record] = [], portalConfig: PortalProtocolConfig = defaultPortalProtocolConfig): T = let - contentQueue = newAsyncQueue[(ContentKeysList, seq[seq[byte]])](50) + contentQueue = newAsyncQueue[(Opt[NodeId], ContentKeysList, seq[seq[byte]])](50) stream = streamManager.registerNewStream(contentQueue) @@ -227,14 +227,15 @@ proc validateContent( proc neighborhoodGossipDiscardPeers( p: PortalProtocol, + srcNodeId: Opt[NodeId], contentKeys: ContentKeysList, content: seq[seq[byte]]): Future[void] {.async.} = - discard await p.neighborhoodGossip(contentKeys, content) + discard await p.neighborhoodGossip(srcNodeId, contentKeys, content) proc processContentLoop(n: LightClientNetwork) {.async.} = try: while true: - let (contentKeys, contentItems) = + let (srcNodeId, contentKeys, contentItems) = await n.contentQueue.popFirst() # When there is one invalid content item, all other content items are @@ -243,7 +244,7 @@ proc processContentLoop(n: LightClientNetwork) {.async.} = # due to missing network data for validation. if await n.validateContent(contentKeys, contentItems): asyncSpawn n.portalProtocol.neighborhoodGossipDiscardPeers( - contentKeys, contentItems + srcNodeId, contentKeys, contentItems ) except CancelledError: diff --git a/fluffy/network/history/history_network.nim b/fluffy/network/history/history_network.nim index a41dff7db..c5c19ea5d 100644 --- a/fluffy/network/history/history_network.nim +++ b/fluffy/network/history/history_network.nim @@ -55,7 +55,7 @@ type HistoryNetwork* = ref object portalProtocol*: PortalProtocol contentDB*: ContentDB - contentQueue*: AsyncQueue[(ContentKeysList, seq[seq[byte]])] + contentQueue*: AsyncQueue[(Opt[NodeId], ContentKeysList, seq[seq[byte]])] accumulator*: FinishedAccumulator processContentLoop: Future[void] statusLogLoop: Future[void] @@ -707,7 +707,7 @@ proc new*( bootstrapRecords: openArray[Record] = [], portalConfig: PortalProtocolConfig = defaultPortalProtocolConfig): T = let - contentQueue = newAsyncQueue[(ContentKeysList, seq[seq[byte]])](50) + contentQueue = newAsyncQueue[(Opt[NodeId], ContentKeysList, seq[seq[byte]])](50) stream = streamManager.registerNewStream(contentQueue) @@ -748,14 +748,15 @@ proc validateContent( proc neighborhoodGossipDiscardPeers( p: PortalProtocol, + srcNodeId: Opt[NodeId], contentKeys: ContentKeysList, content: seq[seq[byte]]): Future[void] {.async.} = - discard await p.neighborhoodGossip(contentKeys, content) + discard await p.neighborhoodGossip(srcNodeId, contentKeys, content) proc processContentLoop(n: HistoryNetwork) {.async.} = try: while true: - let (contentKeys, contentItems) = + let (srcNodeId, contentKeys, contentItems) = await n.contentQueue.popFirst() # When there is one invalid content item, all other content items are @@ -764,7 +765,7 @@ proc processContentLoop(n: HistoryNetwork) {.async.} = # due to missing network data for validation. if await n.validateContent(contentKeys, contentItems): asyncSpawn n.portalProtocol.neighborhoodGossipDiscardPeers( - contentKeys, contentItems + srcNodeId, contentKeys, contentItems ) except CancelledError: diff --git a/fluffy/network/network_seed.nim b/fluffy/network/network_seed.nim index a083671bd..50aa3f967 100644 --- a/fluffy/network/network_seed.nim +++ b/fluffy/network/network_seed.nim @@ -132,13 +132,14 @@ proc depthContentPropagate*( return ok() func contentDataToKeys( - contentData: seq[ContentDataDist]): (ContentKeysList, seq[seq[byte]]) = + contentData: seq[ContentDataDist]): + (Opt[NodeId], ContentKeysList, seq[seq[byte]]) = var contentKeys: seq[ByteList] var content: seq[seq[byte]] for cd in contentData: contentKeys.add(ByteList.init(cd.contentKey)) content.add(cd.content) - return (ContentKeysList(contentKeys), content) + return (Opt.none(NodeId), ContentKeysList(contentKeys), content) proc breadthContentPropagate*( p: PortalProtocol, seedDbPath: string): @@ -152,15 +153,15 @@ proc breadthContentPropagate*( const gossipsPerBatch = 5 var gossipQueue = - newAsyncQueue[(ContentKeysList, seq[seq[byte]])](concurrentGossips) + newAsyncQueue[(Opt[NodeId], ContentKeysList, seq[seq[byte]])](concurrentGossips) var gossipWorkers: seq[Future[void]] proc gossipWorker(p: PortalProtocol) {.async.} = while true: - let (keys, content) = await gossipQueue.popFirst() + let (srcNodeId, keys, content) = await gossipQueue.popFirst() - discard await p.neighborhoodGossip(keys, content) + discard await p.neighborhoodGossip(srcNodeId, keys, content) for i in 0 ..< concurrentGossips: gossipWorkers.add(gossipWorker(p)) diff --git a/fluffy/network/state/state_network.nim b/fluffy/network/state/state_network.nim index edae2ad1b..51a512f47 100644 --- a/fluffy/network/state/state_network.nim +++ b/fluffy/network/state/state_network.nim @@ -22,7 +22,7 @@ const type StateNetwork* = ref object portalProtocol*: PortalProtocol contentDB*: ContentDB - contentQueue*: AsyncQueue[(ContentKeysList, seq[seq[byte]])] + contentQueue*: AsyncQueue[(Opt[NodeId], ContentKeysList, seq[seq[byte]])] processContentLoop: Future[void] func toContentIdHandler(contentKey: ByteList): results.Opt[ContentId] = @@ -70,7 +70,7 @@ proc new*( bootstrapRecords: openArray[Record] = [], portalConfig: PortalProtocolConfig = defaultPortalProtocolConfig): T = - let cq = newAsyncQueue[(ContentKeysList, seq[seq[byte]])](50) + let cq = newAsyncQueue[(Opt[NodeId], ContentKeysList, seq[seq[byte]])](50) let s = streamManager.registerNewStream(cq) diff --git a/fluffy/network/wire/portal_protocol.nim b/fluffy/network/wire/portal_protocol.nim index 03d8728ec..5c3a6db24 100644 --- a/fluffy/network/wire/portal_protocol.nim +++ b/fluffy/network/wire/portal_protocol.nim @@ -20,7 +20,7 @@ import "."/[portal_stream, portal_protocol_config], ./messages -export messages, routing_table +export messages, routing_table, protocol declareCounter portal_message_requests_incoming, "Portal wire protocol incoming message requests", @@ -1128,10 +1128,10 @@ proc getNClosestNodesWithRadius*( proc neighborhoodGossip*( p: PortalProtocol, + srcNodeId: Opt[NodeId], contentKeys: ContentKeysList, content: seq[seq[byte]]): Future[int] {.async.} = ## Returns number of peers to which content was gossiped - if content.len() == 0: return 0 @@ -1170,7 +1170,10 @@ proc neighborhoodGossip*( let radius = p.radiusCache.get(node.id) if radius.isSome(): if p.inRange(node.id, radius.unsafeGet(), contentId): - gossipNodes.add(node) + if srcNodeId.isNone: + gossipNodes.add(node) + elif node.id != srcNodeId.get(): + gossipNodes.add(node) if gossipNodes.len >= 8: # use local nodes for gossip portal_gossip_without_lookup.inc(labelValues = [$p.protocolId]) diff --git a/fluffy/network/wire/portal_stream.nim b/fluffy/network/wire/portal_stream.nim index 102a0d452..68496645c 100644 --- a/fluffy/network/wire/portal_stream.nim +++ b/fluffy/network/wire/portal_stream.nim @@ -67,7 +67,7 @@ type connectionTimeout: Duration contentReadTimeout*: Duration rng: ref HmacDrbgContext - contentQueue*: AsyncQueue[(ContentKeysList, seq[seq[byte]])] + contentQueue*: AsyncQueue[(Opt[NodeId], ContentKeysList, seq[seq[byte]])] StreamManager* = ref object transport: UtpDiscv5Protocol @@ -248,12 +248,12 @@ proc readContentOffer( # as `AcceptConnectionCallback` is `asyncSpawn`'ed and there are no limits # on the `contentOffers`. Might move the queue to before the reading of the # socket, and let the specific networks handle that. - await stream.contentQueue.put((offer.contentKeys, contentItems)) + await stream.contentQueue.put((Opt.some(offer.nodeId), offer.contentKeys, contentItems)) proc new( T: type PortalStream, transport: UtpDiscv5Protocol, - contentQueue: AsyncQueue[(ContentKeysList, seq[seq[byte]])], + contentQueue: AsyncQueue[(Opt[NodeId], ContentKeysList, seq[seq[byte]])], connectionTimeout: Duration, contentReadTimeout: Duration, rng: ref HmacDrbgContext): T = @@ -340,7 +340,7 @@ proc new*(T: type StreamManager, d: protocol.Protocol): T = proc registerNewStream*( m : StreamManager, - contentQueue: AsyncQueue[(ContentKeysList, seq[seq[byte]])], + contentQueue: AsyncQueue[(Opt[NodeId], ContentKeysList, seq[seq[byte]])], connectionTimeout = defaultConnectionTimeout, contentReadTimeout = defaultContentReadTimeout): PortalStream = diff --git a/fluffy/rpc/rpc_portal_api.nim b/fluffy/rpc/rpc_portal_api.nim index a3433ef6e..bf5cd2091 100644 --- a/fluffy/rpc/rpc_portal_api.nim +++ b/fluffy/rpc/rpc_portal_api.nim @@ -202,6 +202,6 @@ proc installPortalApiHandlers*( key = hexToSeqByte(contentKey) content = hexToSeqByte(contentValue) contentKeys = ContentKeysList(@[ByteList.init(key)]) - numberOfPeers = await p.neighborhoodGossip(contentKeys, @[content]) + numberOfPeers = await p.neighborhoodGossip(Opt.none(NodeId), contentKeys, @[content]) return numberOfPeers diff --git a/fluffy/tests/test_portal_wire_protocol.nim b/fluffy/tests/test_portal_wire_protocol.nim index 933033480..8a7dd7f82 100644 --- a/fluffy/tests/test_portal_wire_protocol.nim +++ b/fluffy/tests/test_portal_wire_protocol.nim @@ -35,7 +35,7 @@ proc initPortalProtocol( d = initDiscoveryNode(rng, privKey, address, bootstrapRecords) db = ContentDB.new("", uint32.high, inMemory = true) manager = StreamManager.new(d) - q = newAsyncQueue[(ContentKeysList, seq[seq[byte]])](50) + q = newAsyncQueue[(Opt[NodeId], ContentKeysList, seq[seq[byte]])](50) stream = manager.registerNewStream(q) proto = PortalProtocol.new( @@ -171,7 +171,7 @@ procSuite "Portal Wire Protocol Tests": check res.isOk() - let (contentKeys, contentItems) = + let (srcNodeId, contentKeys, contentItems) = await proto2.stream.contentQueue.popFirst() check contentItems.len() == content.len() @@ -332,7 +332,7 @@ procSuite "Portal Wire Protocol Tests": dbLimit = 100_000'u32 db = ContentDB.new("", dbLimit, inMemory = true) m = StreamManager.new(node1) - q = newAsyncQueue[(ContentKeysList, seq[seq[byte]])](50) + q = newAsyncQueue[(Opt[NodeId], ContentKeysList, seq[seq[byte]])](50) stream = m.registerNewStream(q) proto1 = PortalProtocol.new( diff --git a/fluffy/tools/portalcli.nim b/fluffy/tools/portalcli.nim index e24795495..6fedfd395 100644 --- a/fluffy/tools/portalcli.nim +++ b/fluffy/tools/portalcli.nim @@ -224,7 +224,7 @@ proc run(config: PortalCliConf) = let db = ContentDB.new("", config.storageSize, inMemory = true) sm = StreamManager.new(d) - cq = newAsyncQueue[(ContentKeysList, seq[seq[byte]])](50) + cq = newAsyncQueue[(Opt[NodeId], ContentKeysList, seq[seq[byte]])](50) stream = sm.registerNewStream(cq) portal = PortalProtocol.new(d, config.protocolId, testContentIdHandler, createGetHandler(db), stream,