don't gossip data back to the node we rcvd it from [#1227] (#1719)

This commit is contained in:
Daniel Sobol 2023-09-04 13:21:01 +03:00 committed by GitHub
parent 156f4f0140
commit 69a1a988b0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 43 additions and 37 deletions

View File

@ -60,7 +60,7 @@ proc propagateEpochAccumulator*(
encodedAccumulator
)
discard await p.neighborhoodGossip(
ContentKeysList(@[encKey]), @[encodedAccumulator])
Opt.none(NodeId), ContentKeysList(@[encKey]), @[encodedAccumulator])
return ok()
@ -87,14 +87,14 @@ proc historyPropagate*(
const concurrentGossips = 20
var gossipQueue =
newAsyncQueue[(ContentKeysList, seq[byte])](concurrentGossips)
newAsyncQueue[(Opt[NodeId], ContentKeysList, seq[byte])](concurrentGossips)
var gossipWorkers: seq[Future[void]]
proc gossipWorker(p: PortalProtocol) {.async.} =
while true:
let (keys, content) = await gossipQueue.popFirst()
let (srcNodeId, keys, content) = await gossipQueue.popFirst()
discard await p.neighborhoodGossip(keys, @[content])
discard await p.neighborhoodGossip(srcNodeId, keys, @[content])
for i in 0 ..< concurrentGossips:
gossipWorkers.add(gossipWorker(p))
@ -121,7 +121,7 @@ proc historyPropagate*(
p.storeContent(encKey, contentId, value[1])
await gossipQueue.addLast(
(ContentKeysList(@[encode(value[0])]), value[1]))
(Opt.none(NodeId), ContentKeysList(@[encode(value[0])]), value[1]))
return ok()
else:
@ -152,7 +152,7 @@ proc historyPropagateBlock*(
contentId = history_content.toContentId(encKey)
p.storeContent(encKey, contentId, value[1])
discard await p.neighborhoodGossip(ContentKeysList(@[encode(value[0])]), @[value[1]])
discard await p.neighborhoodGossip(Opt.none(NodeId), ContentKeysList(@[encode(value[0])]), @[value[1]])
return ok()
else:
@ -190,7 +190,7 @@ proc historyPropagateHeadersWithProof*(
p.storeContent(encKey, contentId, encodedContent)
let keys = ContentKeysList(@[encode(contentKey)])
discard await p.neighborhoodGossip(keys, @[encodedContent])
discard await p.neighborhoodGossip(Opt.none(NodeId), keys, @[encodedContent])
return ok()
@ -229,7 +229,7 @@ proc historyPropagateHeaders*(
while true:
let (keys, content) = await gossipQueue.popFirst()
discard await p.neighborhoodGossip(keys, @[content])
discard await p.neighborhoodGossip(Opt.none(NodeId), keys, @[content])
for i in 0 ..< concurrentGossips:
gossipWorkers.add(gossipWorker(p))

View File

@ -29,7 +29,7 @@ type
LightClientNetwork* = ref object
portalProtocol*: PortalProtocol
lightClientDb*: LightClientDb
contentQueue*: AsyncQueue[(ContentKeysList, seq[seq[byte]])]
contentQueue*: AsyncQueue[(Opt[NodeId], ContentKeysList, seq[seq[byte]])]
forkDigests*: ForkDigests
processContentLoop: Future[void]
@ -173,7 +173,7 @@ proc new*(
bootstrapRecords: openArray[Record] = [],
portalConfig: PortalProtocolConfig = defaultPortalProtocolConfig): T =
let
contentQueue = newAsyncQueue[(ContentKeysList, seq[seq[byte]])](50)
contentQueue = newAsyncQueue[(Opt[NodeId], ContentKeysList, seq[seq[byte]])](50)
stream = streamManager.registerNewStream(contentQueue)
@ -227,14 +227,15 @@ proc validateContent(
proc neighborhoodGossipDiscardPeers(
p: PortalProtocol,
srcNodeId: Opt[NodeId],
contentKeys: ContentKeysList,
content: seq[seq[byte]]): Future[void] {.async.} =
discard await p.neighborhoodGossip(contentKeys, content)
discard await p.neighborhoodGossip(srcNodeId, contentKeys, content)
proc processContentLoop(n: LightClientNetwork) {.async.} =
try:
while true:
let (contentKeys, contentItems) =
let (srcNodeId, contentKeys, contentItems) =
await n.contentQueue.popFirst()
# When there is one invalid content item, all other content items are
@ -243,7 +244,7 @@ proc processContentLoop(n: LightClientNetwork) {.async.} =
# due to missing network data for validation.
if await n.validateContent(contentKeys, contentItems):
asyncSpawn n.portalProtocol.neighborhoodGossipDiscardPeers(
contentKeys, contentItems
srcNodeId, contentKeys, contentItems
)
except CancelledError:

View File

@ -55,7 +55,7 @@ type
HistoryNetwork* = ref object
portalProtocol*: PortalProtocol
contentDB*: ContentDB
contentQueue*: AsyncQueue[(ContentKeysList, seq[seq[byte]])]
contentQueue*: AsyncQueue[(Opt[NodeId], ContentKeysList, seq[seq[byte]])]
accumulator*: FinishedAccumulator
processContentLoop: Future[void]
statusLogLoop: Future[void]
@ -707,7 +707,7 @@ proc new*(
bootstrapRecords: openArray[Record] = [],
portalConfig: PortalProtocolConfig = defaultPortalProtocolConfig): T =
let
contentQueue = newAsyncQueue[(ContentKeysList, seq[seq[byte]])](50)
contentQueue = newAsyncQueue[(Opt[NodeId], ContentKeysList, seq[seq[byte]])](50)
stream = streamManager.registerNewStream(contentQueue)
@ -748,14 +748,15 @@ proc validateContent(
proc neighborhoodGossipDiscardPeers(
p: PortalProtocol,
srcNodeId: Opt[NodeId],
contentKeys: ContentKeysList,
content: seq[seq[byte]]): Future[void] {.async.} =
discard await p.neighborhoodGossip(contentKeys, content)
discard await p.neighborhoodGossip(srcNodeId, contentKeys, content)
proc processContentLoop(n: HistoryNetwork) {.async.} =
try:
while true:
let (contentKeys, contentItems) =
let (srcNodeId, contentKeys, contentItems) =
await n.contentQueue.popFirst()
# When there is one invalid content item, all other content items are
@ -764,7 +765,7 @@ proc processContentLoop(n: HistoryNetwork) {.async.} =
# due to missing network data for validation.
if await n.validateContent(contentKeys, contentItems):
asyncSpawn n.portalProtocol.neighborhoodGossipDiscardPeers(
contentKeys, contentItems
srcNodeId, contentKeys, contentItems
)
except CancelledError:

View File

@ -132,13 +132,14 @@ proc depthContentPropagate*(
return ok()
func contentDataToKeys(
contentData: seq[ContentDataDist]): (ContentKeysList, seq[seq[byte]]) =
contentData: seq[ContentDataDist]):
(Opt[NodeId], ContentKeysList, seq[seq[byte]]) =
var contentKeys: seq[ByteList]
var content: seq[seq[byte]]
for cd in contentData:
contentKeys.add(ByteList.init(cd.contentKey))
content.add(cd.content)
return (ContentKeysList(contentKeys), content)
return (Opt.none(NodeId), ContentKeysList(contentKeys), content)
proc breadthContentPropagate*(
p: PortalProtocol, seedDbPath: string):
@ -152,15 +153,15 @@ proc breadthContentPropagate*(
const gossipsPerBatch = 5
var gossipQueue =
newAsyncQueue[(ContentKeysList, seq[seq[byte]])](concurrentGossips)
newAsyncQueue[(Opt[NodeId], ContentKeysList, seq[seq[byte]])](concurrentGossips)
var gossipWorkers: seq[Future[void]]
proc gossipWorker(p: PortalProtocol) {.async.} =
while true:
let (keys, content) = await gossipQueue.popFirst()
let (srcNodeId, keys, content) = await gossipQueue.popFirst()
discard await p.neighborhoodGossip(keys, content)
discard await p.neighborhoodGossip(srcNodeId, keys, content)
for i in 0 ..< concurrentGossips:
gossipWorkers.add(gossipWorker(p))

View File

@ -22,7 +22,7 @@ const
type StateNetwork* = ref object
portalProtocol*: PortalProtocol
contentDB*: ContentDB
contentQueue*: AsyncQueue[(ContentKeysList, seq[seq[byte]])]
contentQueue*: AsyncQueue[(Opt[NodeId], ContentKeysList, seq[seq[byte]])]
processContentLoop: Future[void]
func toContentIdHandler(contentKey: ByteList): results.Opt[ContentId] =
@ -70,7 +70,7 @@ proc new*(
bootstrapRecords: openArray[Record] = [],
portalConfig: PortalProtocolConfig = defaultPortalProtocolConfig): T =
let cq = newAsyncQueue[(ContentKeysList, seq[seq[byte]])](50)
let cq = newAsyncQueue[(Opt[NodeId], ContentKeysList, seq[seq[byte]])](50)
let s = streamManager.registerNewStream(cq)

View File

@ -20,7 +20,7 @@ import
"."/[portal_stream, portal_protocol_config],
./messages
export messages, routing_table
export messages, routing_table, protocol
declareCounter portal_message_requests_incoming,
"Portal wire protocol incoming message requests",
@ -1128,10 +1128,10 @@ proc getNClosestNodesWithRadius*(
proc neighborhoodGossip*(
p: PortalProtocol,
srcNodeId: Opt[NodeId],
contentKeys: ContentKeysList,
content: seq[seq[byte]]): Future[int] {.async.} =
## Returns number of peers to which content was gossiped
if content.len() == 0:
return 0
@ -1170,7 +1170,10 @@ proc neighborhoodGossip*(
let radius = p.radiusCache.get(node.id)
if radius.isSome():
if p.inRange(node.id, radius.unsafeGet(), contentId):
gossipNodes.add(node)
if srcNodeId.isNone:
gossipNodes.add(node)
elif node.id != srcNodeId.get():
gossipNodes.add(node)
if gossipNodes.len >= 8: # use local nodes for gossip
portal_gossip_without_lookup.inc(labelValues = [$p.protocolId])

View File

@ -67,7 +67,7 @@ type
connectionTimeout: Duration
contentReadTimeout*: Duration
rng: ref HmacDrbgContext
contentQueue*: AsyncQueue[(ContentKeysList, seq[seq[byte]])]
contentQueue*: AsyncQueue[(Opt[NodeId], ContentKeysList, seq[seq[byte]])]
StreamManager* = ref object
transport: UtpDiscv5Protocol
@ -248,12 +248,12 @@ proc readContentOffer(
# as `AcceptConnectionCallback` is `asyncSpawn`'ed and there are no limits
# on the `contentOffers`. Might move the queue to before the reading of the
# socket, and let the specific networks handle that.
await stream.contentQueue.put((offer.contentKeys, contentItems))
await stream.contentQueue.put((Opt.some(offer.nodeId), offer.contentKeys, contentItems))
proc new(
T: type PortalStream,
transport: UtpDiscv5Protocol,
contentQueue: AsyncQueue[(ContentKeysList, seq[seq[byte]])],
contentQueue: AsyncQueue[(Opt[NodeId], ContentKeysList, seq[seq[byte]])],
connectionTimeout: Duration,
contentReadTimeout: Duration,
rng: ref HmacDrbgContext): T =
@ -340,7 +340,7 @@ proc new*(T: type StreamManager, d: protocol.Protocol): T =
proc registerNewStream*(
m : StreamManager,
contentQueue: AsyncQueue[(ContentKeysList, seq[seq[byte]])],
contentQueue: AsyncQueue[(Opt[NodeId], ContentKeysList, seq[seq[byte]])],
connectionTimeout = defaultConnectionTimeout,
contentReadTimeout = defaultContentReadTimeout): PortalStream =

View File

@ -202,6 +202,6 @@ proc installPortalApiHandlers*(
key = hexToSeqByte(contentKey)
content = hexToSeqByte(contentValue)
contentKeys = ContentKeysList(@[ByteList.init(key)])
numberOfPeers = await p.neighborhoodGossip(contentKeys, @[content])
numberOfPeers = await p.neighborhoodGossip(Opt.none(NodeId), contentKeys, @[content])
return numberOfPeers

View File

@ -35,7 +35,7 @@ proc initPortalProtocol(
d = initDiscoveryNode(rng, privKey, address, bootstrapRecords)
db = ContentDB.new("", uint32.high, inMemory = true)
manager = StreamManager.new(d)
q = newAsyncQueue[(ContentKeysList, seq[seq[byte]])](50)
q = newAsyncQueue[(Opt[NodeId], ContentKeysList, seq[seq[byte]])](50)
stream = manager.registerNewStream(q)
proto = PortalProtocol.new(
@ -171,7 +171,7 @@ procSuite "Portal Wire Protocol Tests":
check res.isOk()
let (contentKeys, contentItems) =
let (srcNodeId, contentKeys, contentItems) =
await proto2.stream.contentQueue.popFirst()
check contentItems.len() == content.len()
@ -332,7 +332,7 @@ procSuite "Portal Wire Protocol Tests":
dbLimit = 100_000'u32
db = ContentDB.new("", dbLimit, inMemory = true)
m = StreamManager.new(node1)
q = newAsyncQueue[(ContentKeysList, seq[seq[byte]])](50)
q = newAsyncQueue[(Opt[NodeId], ContentKeysList, seq[seq[byte]])](50)
stream = m.registerNewStream(q)
proto1 = PortalProtocol.new(

View File

@ -224,7 +224,7 @@ proc run(config: PortalCliConf) =
let
db = ContentDB.new("", config.storageSize, inMemory = true)
sm = StreamManager.new(d)
cq = newAsyncQueue[(ContentKeysList, seq[seq[byte]])](50)
cq = newAsyncQueue[(Opt[NodeId], ContentKeysList, seq[seq[byte]])](50)
stream = sm.registerNewStream(cq)
portal = PortalProtocol.new(d, config.protocolId,
testContentIdHandler, createGetHandler(db), stream,