nimbus-eth1/fluffy/network/network_seed.nim

299 lines
9.1 KiB
Nim

# Fluffy
# Copyright (c) 2022-2023 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [].}
import
std/math,
chronos,
eth/p2p/discoveryv5/[node, random2],
./wire/portal_protocol,
./history/[history_content, history_network],
../database/seed_db
# Experimental module which implements different content seeding strategies.
# Module is oblivious to content stored in seed database as all content related
# parameters should be available in seed db i.e (contentId, contentKey, content)
# One thing which might need to be parameterized per network basis in the future is
# the distance function.
# TODO: At this point all calls are one shot calls but we can also experiment with
# approaches which start some process which continuously seeds data.
# This would require creation of separate object which would manage started task
# like:
# type NetworkSeedingManager = ref object
# seedTask: Future[void]
# and creating few procs which would start/stop given seedTask or even few
# seed tasks
const
#TODO currently we are using value for history network, but this should be
#caluculated per netowork basis
maxItemsPerOfferBySize = getMaxOfferedContentKeys(
uint32(len(history_network.historyProtocolId)),
uint32(history_content.maxContentKeySize)
)
# Offering is restricted to max 64 items
maxItemPerOfferByLen = 64
maxItemsPerOffer = min(maxItemsPerOfferBySize, maxItemPerOfferByLen)
proc depthContentPropagate*(
p: PortalProtocol, seedDbPath: string, maxClosestNodes: uint32):
Future[Result[void, string]] {.async.} =
## Choses `maxClosestNodes` closest known nodes with known radius and tries to
## offer as much content as possible in their range from seed db. Offers are made conccurently
## with at most one offer per peer at the time.
const batchSize = maxItemsPerOffer
var gossipWorkers: seq[Future[void]]
# TODO improve peer selection strategy, to be sure more network is covered, although
# it still does not need to be perfect as nodes which receive content will still
# propagate it further by neighbour gossip
let closestWithRadius = p.getNClosestNodesWithRadius(
p.localNode.id,
int(maxClosestNodes),
seenOnly = true
)
proc worker(
p: PortalProtocol, db: SeedDb, node: Node, radius: UInt256):
Future[void] {.async.} =
var offset = 0
while true:
let content = db.getContentInRange(node.id, radius, batchSize, offset)
if len(content) == 0:
break
var contentKV: seq[ContentKV]
for e in content:
let info = ContentKV(
contentKey: ByteList.init(e.contentKey), content: e.content)
contentKV.add(info)
let offerResult = await p.offer(node, contentKV)
if offerResult.isErr() or len(content) < batchSize:
# peer failed or we reached end of database stop offering more content
break
offset = offset + batchSize
proc saveDataToLocalDb(p: PortalProtocol, db: SeedDb) =
let localBatchSize = 10000
var offset = 0
while true:
let content = db.getContentInRange(
p.localNode.id, p.dataRadius, localBatchSize, offset)
if len(content) == 0:
break
for e in content:
p.storeContent(
ByteList.init(e.contentKey),
UInt256.fromBytesBE(e.contentId),
e.content
)
if len(content) < localBatchSize:
# got to the end of db.
break
offset = offset + localBatchSize
let maybePathAndDbName = getDbBasePathAndName(seedDbPath)
if maybePathAndDbName.isNone():
return err("Provided path is not valid sqlite database path")
let
(dbPath, dbName) = maybePathAndDbName.unsafeGet()
db = SeedDb.new(path = dbPath, name = dbName)
for n in closestWithRadius:
gossipWorkers.add(p.worker(db, n[0], n[1]))
p.saveDataToLocalDb(db)
await allFutures(gossipWorkers)
db.close()
return ok()
func contentDataToKeys(
contentData: seq[ContentDataDist]):
(Opt[NodeId], ContentKeysList, seq[seq[byte]]) =
var contentKeys: seq[ByteList]
var content: seq[seq[byte]]
for cd in contentData:
contentKeys.add(ByteList.init(cd.contentKey))
content.add(cd.content)
return (Opt.none(NodeId), ContentKeysList(contentKeys), content)
proc breadthContentPropagate*(
p: PortalProtocol, seedDbPath: string):
Future[Result[void, string]] {.async.} =
## Iterates over whole seed database, and offer batches of content to different
## set of nodes
const concurrentGossips = 20
const gossipsPerBatch = 5
var gossipQueue =
newAsyncQueue[(Opt[NodeId], ContentKeysList, seq[seq[byte]])](concurrentGossips)
var gossipWorkers: seq[Future[void]]
proc gossipWorker(p: PortalProtocol) {.async.} =
while true:
let (srcNodeId, keys, content) = await gossipQueue.popFirst()
discard await p.neighborhoodGossip(srcNodeId, keys, content)
for i in 0 ..< concurrentGossips:
gossipWorkers.add(gossipWorker(p))
let maybePathAndDbName = getDbBasePathAndName(seedDbPath)
if maybePathAndDbName.isNone():
return err("Provided path is not valid sqlite database path")
let
(dbPath, dbName) = maybePathAndDbName.unsafeGet()
batchSize = maxItemsPerOffer
db = SeedDb.new(path = dbPath, name = dbName)
target = p.localNode.id
var offset = 0
while true:
# Setting radius to `UInt256.high` and using batchSize and offset, means
# we will iterate over whole database in batches of `maxItemsPerOffer` items
var contentData = db.getContentInRange(
target, UInt256.high, batchSize, offset)
if len(contentData) == 0:
break
for cd in contentData:
p.storeContent(
ByteList.init(cd.contentKey),
UInt256.fromBytesBE(cd.contentId),
cd.content
)
# TODO this a bit hacky way to make sure we will engage more valid peers for each
# batch of data. This maybe removed after improving neighborhoodGossip
# to better chose peers based on propagated content
for i in 0 ..< gossipsPerBatch:
p.baseProtocol.rng[].shuffle(contentData)
let keysWithContent = contentDataToKeys(contentData)
await gossipQueue.put(keysWithContent)
if len(contentData) < batchSize:
break
offset = offset + batchSize
db.close()
return ok()
proc offerContentInNodeRange*(
p: PortalProtocol,
seedDbPath: string,
nodeId: NodeId,
max: uint32,
starting: uint32): Future[PortalResult[int]] {.async.} =
## Offers `max` closest elements starting from `starting` index to peer
## with given `nodeId`.
## Maximum value of `max` is 64 , as this is limit for single offer. Although
## `starting` argument is needed as seed_db is read only, so if there is
## more content in peer range than max, then to offer 64 closest elements
## it needs to be set to 0. To offer next 64 elements it need to be set to
## 64 etc.
## Return number of items really offered to remote peer.
let numberToToOffer = min(int(max), maxItemsPerOffer)
let maybePathAndDbName = getDbBasePathAndName(seedDbPath)
if maybePathAndDbName.isNone():
return err("Provided path is not valid sqlite database path")
let (dbPath, dbName) = maybePathAndDbName.unsafeGet()
let maybeNodeAndRadius = await p.resolveWithRadius(nodeId)
if maybeNodeAndRadius.isNone():
return err("Could not find node with provided nodeId")
let
db = SeedDb.new(path = dbPath, name = dbName)
(node, radius) = maybeNodeAndRadius.unsafeGet()
content = db.getContentInRange(
node.id, radius, int64(numberToToOffer), int64(starting))
# We got all we wanted from seed_db, it can be closed now.
db.close()
var ci: seq[ContentKV]
for cont in content:
let k = ByteList.init(cont.contentKey)
let info = ContentKV(contentKey: k, content: cont.content)
ci.add(info)
# waiting for offer result, by the end of this call remote node should
# have received offered content
let offerResult = await p.offer(node, ci)
if offerResult.isOk():
return ok(len(content))
else:
return err(offerResult.error)
proc storeContentInNodeRange*(
p: PortalProtocol,
seedDbPath: string,
max: uint32,
starting: uint32): PortalResult[void] =
let maybePathAndDbName = getDbBasePathAndName(seedDbPath)
if maybePathAndDbName.isNone():
return err("Provided path is not valid sqlite database path")
let (dbPath, dbName) = maybePathAndDbName.unsafeGet()
let
localRadius = p.dataRadius
db = SeedDb.new(path = dbPath, name = dbName)
localId = p.localNode.id
contentInRange = db.getContentInRange(
localId, localRadius, int64(max), int64(starting))
db.close()
for contentData in contentInRange:
let cid = UInt256.fromBytesBE(contentData.contentId)
p.storeContent(
ByteList.init(contentData.contentKey),
cid,
contentData.content
)
return ok()