Add bulk seeding to multiple peers (#1170)
* Add bulk seeding to multiple peers
This commit is contained in:
parent
eb972fd598
commit
e3cabaff7f
|
@ -0,0 +1,256 @@
|
|||
# Nimbus
|
||||
# Copyright (c) 2022 Status Research & Development GmbH
|
||||
# Licensed and distributed under either of
|
||||
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
||||
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
||||
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
{.push raises: [Defect].}
|
||||
|
||||
import
|
||||
chronos,
|
||||
eth/p2p/discoveryv5/[node, random2],
|
||||
./wire/portal_protocol,
|
||||
../seed_db
|
||||
|
||||
# Experimental module which implements different content seeding strategies.
|
||||
# Module is oblivious to content stored in seed database as all content related
|
||||
# parameters should be available in seed db i.e (contentId, contentKey, content)
|
||||
# One thing which might need to be parameterized per network basis in the future is
|
||||
# the distance function.
|
||||
# TODO: At this point all calls are one shot calls but we can also experiment with
|
||||
# approaches which start some process which continuously seeds data.
|
||||
# This would require creation of separate object which would manage started task
|
||||
# like:
|
||||
# type NetworkSeedingManager = ref object
|
||||
# seedTask: Future[void]
|
||||
# and creating few procs which would start/stop given seedTask or even few
|
||||
# seed tasks
|
||||
|
||||
proc depthContentPropagate*(
|
||||
p: PortalProtocol, seedDbPath: string, maxClosestNodes: uint32):
|
||||
Future[Result[void, string]] {.async.} =
|
||||
|
||||
## Choses `maxClosestNodes` closest known nodes with known radius and tries to
|
||||
## offer as much content as possible in their range from seed db. Offers are made conccurently
|
||||
## with at most one offer per peer at the time.
|
||||
|
||||
const batchSize = 64
|
||||
|
||||
var gossipWorkers: seq[Future[void]]
|
||||
|
||||
# TODO improve peer selection strategy, to be sure more network is covered, although
|
||||
# it still does not need to be perfect as nodes which receive content will still
|
||||
# propagate it further by neighbour gossip
|
||||
let closestWithRadius = p.getNClosestNodesWithRadius(
|
||||
p.localNode.id,
|
||||
int(maxClosestNodes),
|
||||
seenOnly = true
|
||||
)
|
||||
|
||||
proc worker(p: PortalProtocol, db: SeedDb, node: Node, radius: UInt256): Future[void] {.async.} =
|
||||
var offset = 0
|
||||
while true:
|
||||
let content = db.getContentInRange(node.id, radius, batchSize, offset)
|
||||
|
||||
if len(content) == 0:
|
||||
break
|
||||
|
||||
var contentInfo: seq[ContentInfo]
|
||||
for e in content:
|
||||
let info = ContentInfo(contentKey: ByteList.init(e.contentKey), content: e.content)
|
||||
contentInfo.add(info)
|
||||
|
||||
let offerResult = await p.offer(node, contentInfo)
|
||||
|
||||
if offerResult.isErr() or len(content) < batchSize:
|
||||
# peer failed or we reached end of database stop offering more content
|
||||
break
|
||||
|
||||
offset = offset + batchSize
|
||||
|
||||
proc saveDataToLocalDb(p: PortalProtocol, db: SeedDb) =
|
||||
let localBatchSize = 10000
|
||||
|
||||
var offset = 0
|
||||
while true:
|
||||
let content = db.getContentInRange(p.localNode.id, p.dataRadius, localBatchSize, offset)
|
||||
|
||||
if len(content) == 0:
|
||||
break
|
||||
|
||||
for e in content:
|
||||
p.storeContent(UInt256.fromBytesBE(e.contentId), e.content)
|
||||
|
||||
if len(content) < localBatchSize:
|
||||
# got to the end of db.
|
||||
break
|
||||
|
||||
offset = offset + localBatchSize
|
||||
|
||||
let maybePathAndDbName = getDbBasePathAndName(seedDbPath)
|
||||
|
||||
if maybePathAndDbName.isNone():
|
||||
return err("Provided path is not valid sqlite database path")
|
||||
|
||||
let
|
||||
(dbPath, dbName) = maybePathAndDbName.unsafeGet()
|
||||
db = SeedDb.new(path = dbPath, name = dbName)
|
||||
|
||||
for n in closestWithRadius:
|
||||
gossipWorkers.add(p.worker(db, n[0], n[1]))
|
||||
|
||||
p.saveDataToLocalDb(db)
|
||||
|
||||
await allFutures(gossipWorkers)
|
||||
|
||||
db.close()
|
||||
|
||||
return ok()
|
||||
|
||||
func contentDataToKeys(contentData: seq[ContentDataDist]): (ContentKeysList, seq[seq[byte]]) =
|
||||
var contentKeys: seq[ByteList]
|
||||
var content: seq[seq[byte]]
|
||||
for cd in contentData:
|
||||
contentKeys.add(ByteList.init(cd.contentKey))
|
||||
content.add(cd.content)
|
||||
return (ContentKeysList(contentKeys), content)
|
||||
|
||||
proc breadthContentPropagate*(
|
||||
p: PortalProtocol, seedDbPath: string):
|
||||
Future[Result[void, string]] {.async.} =
|
||||
|
||||
## Iterates over whole seed database, and offer batches of content to different
|
||||
## set of nodes
|
||||
|
||||
const concurrentGossips = 20
|
||||
|
||||
const gossipsPerBatch = 5
|
||||
|
||||
var gossipQueue =
|
||||
newAsyncQueue[(ContentKeysList, seq[seq[byte]])](concurrentGossips)
|
||||
|
||||
var gossipWorkers: seq[Future[void]]
|
||||
|
||||
proc gossipWorker(p: PortalProtocol) {.async.} =
|
||||
while true:
|
||||
let (keys, content) = await gossipQueue.popFirst()
|
||||
|
||||
await p.neighborhoodGossip(keys, content)
|
||||
|
||||
for i in 0 ..< concurrentGossips:
|
||||
gossipWorkers.add(gossipWorker(p))
|
||||
|
||||
let maybePathAndDbName = getDbBasePathAndName(seedDbPath)
|
||||
|
||||
if maybePathAndDbName.isNone():
|
||||
return err("Provided path is not valid sqlite database path")
|
||||
|
||||
let
|
||||
(dbPath, dbName) = maybePathAndDbName.unsafeGet()
|
||||
batchSize = 64
|
||||
db = SeedDb.new(path = dbPath, name = dbName)
|
||||
target = p.localNode.id
|
||||
|
||||
var offset = 0
|
||||
|
||||
while true:
|
||||
# Setting radius to `UInt256.high` and using batchSize and offset, means
|
||||
# we will iterate over whole database in batches of 64 items
|
||||
var contentData = db.getContentInRange(target, UInt256.high, batchSize, offset)
|
||||
|
||||
if len(contentData) == 0:
|
||||
break
|
||||
|
||||
for cd in contentData:
|
||||
p.storeContent(UInt256.fromBytesBE(cd.contentId), cd.content)
|
||||
|
||||
# TODO this a bit hacky way to make sure we will engage more valid peers for each
|
||||
# batch of data. This maybe removed after improving neighborhoodGossip
|
||||
# to better chose peers based on propagated content
|
||||
for i in 0 ..< gossipsPerBatch:
|
||||
p.baseProtocol.rng[].shuffle(contentData)
|
||||
let keysWithContent = contentDataToKeys(contentData)
|
||||
await gossipQueue.put(keysWithContent)
|
||||
|
||||
if len(contentData) < batchSize:
|
||||
break
|
||||
|
||||
offset = offset + batchSize
|
||||
|
||||
db.close()
|
||||
|
||||
return ok()
|
||||
|
||||
proc offerContentInNodeRange*(
|
||||
p: PortalProtocol,
|
||||
seedDbPath: string,
|
||||
nodeId: NodeId,
|
||||
max: uint32,
|
||||
starting: uint32): Future[PortalResult[void]] {.async.} =
|
||||
## Offers `max` closest elements starting from `starting` index to peer
|
||||
## with given `nodeId`.
|
||||
## Maximum value of `max` is 64 , as this is limit for single offer.
|
||||
## `starting` argument is needed as seed_db is read only, so if there is
|
||||
## more content in peer range than max, then to offer 64 closest elements
|
||||
# it needs to be set to 0. To offer next 64 elements it need to be set to
|
||||
# 64 etc.
|
||||
|
||||
let maybePathAndDbName = getDbBasePathAndName(seedDbPath)
|
||||
|
||||
if maybePathAndDbName.isNone():
|
||||
return err("Provided path is not valid sqlite database path")
|
||||
|
||||
let (dbPath, dbName) = maybePathAndDbName.unsafeGet()
|
||||
|
||||
let maybeNodeAndRadius = await p.resolveWithRadius(nodeId)
|
||||
|
||||
if maybeNodeAndRadius.isNone():
|
||||
return err("Could not find node with provided nodeId")
|
||||
|
||||
let
|
||||
db = SeedDb.new(path = dbPath, name = dbName)
|
||||
(node, radius) = maybeNodeAndRadius.unsafeGet()
|
||||
content = db.getContentInRange(node.id, radius, int64(max), int64(starting))
|
||||
|
||||
# We got all we wanted from seed_db, it can be closed now.
|
||||
db.close()
|
||||
|
||||
var ci: seq[ContentInfo]
|
||||
|
||||
for cont in content:
|
||||
let k = ByteList.init(cont.contentKey)
|
||||
let info = ContentInfo(contentKey: k, content: cont.content)
|
||||
ci.add(info)
|
||||
|
||||
let offerResult = await p.offer(node, ci)
|
||||
|
||||
# waiting for offer result, by the end of this call remote node should
|
||||
# have received offered content
|
||||
return offerResult
|
||||
|
||||
proc storeContentInNodeRange*(
|
||||
p: PortalProtocol,
|
||||
seedDbPath: string,
|
||||
max: uint32,
|
||||
starting: uint32): PortalResult[void] =
|
||||
let maybePathAndDbName = getDbBasePathAndName(seedDbPath)
|
||||
|
||||
if maybePathAndDbName.isNone():
|
||||
return err("Provided path is not valid sqlite database path")
|
||||
|
||||
let (dbPath, dbName) = maybePathAndDbName.unsafeGet()
|
||||
|
||||
let
|
||||
localRadius = p.dataRadius
|
||||
db = SeedDb.new(path = dbPath, name = dbName)
|
||||
localId = p.localNode.id
|
||||
contentInRange = db.getContentInRange(localId, localRadius, int64(max), int64(starting))
|
||||
|
||||
db.close()
|
||||
|
||||
for contentData in contentInRange:
|
||||
let cid = UInt256.fromBytesBE(contentData.contentId)
|
||||
p.storeContent(cid, contentData.content)
|
||||
|
||||
return ok()
|
|
@ -1071,6 +1071,21 @@ proc queryRandom*(p: PortalProtocol): Future[seq[Node]] =
|
|||
## Perform a query for a random target, return all nodes discovered.
|
||||
p.query(NodeId.random(p.baseProtocol.rng[]))
|
||||
|
||||
proc getNClosestNodesWithRadius*(
|
||||
p: PortalProtocol,
|
||||
targetId: NodeId,
|
||||
n: int,
|
||||
seenOnly: bool = false): seq[(Node, UInt256)] =
|
||||
let closestLocalNodes = p.routingTable.neighbours(
|
||||
targetId, k = n, seenOnly = seenOnly)
|
||||
|
||||
var nodesWithRadiuses: seq[(Node, UInt256)]
|
||||
for node in closestLocalNodes:
|
||||
let radius = p.radiusCache.get(node.id)
|
||||
if radius.isSome():
|
||||
nodesWithRadiuses.add((node, radius.unsafeGet()))
|
||||
return nodesWithRadiuses
|
||||
|
||||
proc neighborhoodGossip*(
|
||||
p: PortalProtocol, contentKeys: ContentKeysList, content: seq[seq[byte]])
|
||||
{.async.} =
|
||||
|
@ -1371,76 +1386,3 @@ proc resolveWithRadius*(p: PortalProtocol, id: NodeId): Future[Option[(Node, UIn
|
|||
return some((node, maybeRadius.unsafeGet()))
|
||||
else:
|
||||
return none((Node, UInt256))
|
||||
|
||||
proc offerContentInNodeRange*(
|
||||
p: PortalProtocol,
|
||||
seedDbPath: string,
|
||||
nodeId: NodeId,
|
||||
max: uint32,
|
||||
starting: uint32): Future[PortalResult[void]] {.async.} =
|
||||
## Offers `max` closest elements starting from `starting` index to peer
|
||||
## with given `nodeId`.
|
||||
## Maximum value of `max` is 64 , as this is limit for single offer.
|
||||
## `starting` argument is needed as seed_db is read only, so if there is
|
||||
## more content in peer range than max, then to offer 64 closest elements
|
||||
# it needs to be set to 0. To offer next 64 elements it need to be set to
|
||||
# 64 etc.
|
||||
|
||||
let maybePathAndDbName = getDbBasePathAndName(seedDbPath)
|
||||
|
||||
if maybePathAndDbName.isNone():
|
||||
return err("Provided path is not valid sqlite database path")
|
||||
|
||||
let (dbPath, dbName) = maybePathAndDbName.unsafeGet()
|
||||
|
||||
let maybeNodeAndRadius = await p.resolveWithRadius(nodeId)
|
||||
|
||||
if maybeNodeAndRadius.isNone():
|
||||
return err("Could not find node with provided nodeId")
|
||||
|
||||
let
|
||||
db = SeedDb.new(path = dbPath, name = dbName)
|
||||
(node, radius) = maybeNodeAndRadius.unsafeGet()
|
||||
content = db.getContentInRange(node.id, radius, int64(max), int64(starting))
|
||||
|
||||
# We got all we wanted from seed_db, it can be closed now.
|
||||
db.close()
|
||||
|
||||
var ci: seq[ContentInfo]
|
||||
|
||||
for cont in content:
|
||||
let k = ByteList.init(cont.contentKey)
|
||||
let info = ContentInfo(contentKey: k, content: cont.content)
|
||||
ci.add(info)
|
||||
|
||||
let offerResult = await p.offer(node, ci)
|
||||
|
||||
# waiting for offer result, by the end of this call remote node should
|
||||
# have received offered content
|
||||
return offerResult
|
||||
|
||||
proc storeContentInNodeRange*(
|
||||
p: PortalProtocol,
|
||||
seedDbPath: string,
|
||||
max: uint32,
|
||||
starting: uint32): PortalResult[void] =
|
||||
let maybePathAndDbName = getDbBasePathAndName(seedDbPath)
|
||||
|
||||
if maybePathAndDbName.isNone():
|
||||
return err("Provided path is not valid sqlite database path")
|
||||
|
||||
let (dbPath, dbName) = maybePathAndDbName.unsafeGet()
|
||||
|
||||
let
|
||||
localRadius = p.dataRadius
|
||||
db = SeedDb.new(path = dbPath, name = dbName)
|
||||
localId = p.localNode.id
|
||||
contentInRange = db.getContentInRange(localId, localRadius, int64(max), int64(starting))
|
||||
|
||||
db.close()
|
||||
|
||||
for contentData in contentInRange:
|
||||
let cid = UInt256.fromBytesBE(contentData.contentId)
|
||||
p.storeContent(cid, contentData.content)
|
||||
|
||||
return ok()
|
||||
|
|
|
@ -13,3 +13,10 @@ proc portal_history_offerContentInNodeRange(
|
|||
nodeId: NodeId,
|
||||
max: uint32,
|
||||
starting: uint32): bool
|
||||
|
||||
proc portal_history_depthContentPropagate(
|
||||
dbPath: string,
|
||||
max: uint32): bool
|
||||
|
||||
proc portal_history_breadthContentPropagate(
|
||||
dbPath: string): bool
|
||||
|
|
|
@ -10,6 +10,7 @@
|
|||
import
|
||||
json_rpc/[rpcproxy, rpcserver], stew/byteutils,
|
||||
../network/wire/portal_protocol,
|
||||
../network/network_seed,
|
||||
".."/[content_db, seed_db]
|
||||
|
||||
export rpcserver
|
||||
|
@ -80,3 +81,28 @@ proc installPortalDebugApiHandlers*(
|
|||
return true
|
||||
else:
|
||||
raise newException(ValueError, $offerResult.error)
|
||||
|
||||
rpcServer.rpc("portal_" & network & "_depthContentPropagate") do(
|
||||
dbPath: string,
|
||||
max: uint32) -> bool:
|
||||
|
||||
# TODO Consider making this call asynchronously without waiting for result
|
||||
# as for big seed db size it could take a loot of time.
|
||||
let propagateResult = await p.depthContentPropagate(dbPath, max)
|
||||
|
||||
if propagateResult.isOk():
|
||||
return true
|
||||
else:
|
||||
raise newException(ValueError, $propagateResult.error)
|
||||
|
||||
rpcServer.rpc("portal_" & network & "_breadthContentPropagate") do(
|
||||
dbPath: string) -> bool:
|
||||
|
||||
# TODO Consider making this call asynchronously without waiting for result
|
||||
# as for big seed db size it could take a loot of time.
|
||||
let propagateResult = await p.breadthContentPropagate(dbPath)
|
||||
|
||||
if propagateResult.isOk():
|
||||
return true
|
||||
else:
|
||||
raise newException(ValueError, $propagateResult.error)
|
||||
|
|
|
@ -51,7 +51,8 @@ proc withRetries[A](
|
|||
f: FutureCallback[A],
|
||||
check: CheckCallback[A],
|
||||
numRetries: int,
|
||||
initialWait: Duration): Future[A] {.async.} =
|
||||
initialWait: Duration,
|
||||
checkFailMessage: string): Future[A] {.async.} =
|
||||
## Retries given future callback until either:
|
||||
## it returns successfuly and given check is true
|
||||
## or
|
||||
|
@ -63,15 +64,16 @@ proc withRetries[A](
|
|||
while true:
|
||||
try:
|
||||
let res = await f()
|
||||
|
||||
if check(res):
|
||||
return res
|
||||
else:
|
||||
raise newException(ValueError, checkFailMessage)
|
||||
except CatchableError as exc:
|
||||
inc tries
|
||||
if tries > numRetries:
|
||||
# if we reached max number of retries fail
|
||||
raise exc
|
||||
|
||||
inc tries
|
||||
# wait before new retry
|
||||
await sleepAsync(currentDuration)
|
||||
currentDuration = currentDuration * 2
|
||||
|
@ -79,9 +81,12 @@ proc withRetries[A](
|
|||
# Sometimes we need to wait till data will be propagated over the network.
|
||||
# To avoid long sleeps, this combinator can be used to retry some calls until
|
||||
# success or until some condition hold (or both)
|
||||
proc retryUntilDataPropagated[A](f: FutureCallback[A], c: CheckCallback[A]): Future[A] =
|
||||
proc retryUntilDataPropagated[A](
|
||||
f: FutureCallback[A],
|
||||
c: CheckCallback[A],
|
||||
checkFailMessage: string): Future[A] =
|
||||
# some reasonable limits, which will cause waits as: 1, 2, 4, 8, 16 seconds
|
||||
return withRetries(f, c, 5, seconds(1))
|
||||
return withRetries(f, c, 5, seconds(1), checkFailMessage)
|
||||
|
||||
# Note:
|
||||
# When doing json-rpc requests following `RpcPostError` can occur:
|
||||
|
@ -251,7 +256,8 @@ procSuite "Portal testnet tests":
|
|||
await client.close()
|
||||
raise exc
|
||||
,
|
||||
proc (mc: Option[BlockObject]): bool = return mc.isSome()
|
||||
proc (mc: Option[BlockObject]): bool = return mc.isSome(),
|
||||
"Did not receive expected Block with hash " & $hash
|
||||
)
|
||||
check content.isSome()
|
||||
let blockObj = content.get()
|
||||
|
@ -276,7 +282,8 @@ procSuite "Portal testnet tests":
|
|||
await client.close()
|
||||
raise exc
|
||||
,
|
||||
proc (mc: seq[FilterLog]): bool = return true
|
||||
proc (mc: seq[FilterLog]): bool = return true,
|
||||
""
|
||||
)
|
||||
|
||||
for l in logs:
|
||||
|
@ -344,7 +351,74 @@ procSuite "Portal testnet tests":
|
|||
await client.close()
|
||||
raise exc
|
||||
,
|
||||
proc (mc: Option[BlockObject]): bool = return mc.isSome()
|
||||
proc (mc: Option[BlockObject]): bool = return mc.isSome(),
|
||||
"Did not receive expected Block with hash " & $hash
|
||||
)
|
||||
check content.isSome()
|
||||
|
||||
let blockObj = content.get()
|
||||
check blockObj.hash.get() == hash
|
||||
|
||||
for tx in blockObj.transactions:
|
||||
var txObj: TransactionObject
|
||||
tx.fromJson("tx", txObj)
|
||||
check txObj.blockHash.get() == hash
|
||||
|
||||
await client.close()
|
||||
finally:
|
||||
db.close()
|
||||
removeDir(dbFile)
|
||||
|
||||
asyncTest "Portal History - Propagate content from seed db in depth first fashion":
|
||||
let clients = await connectToRpcServers(config)
|
||||
|
||||
var nodeInfos: seq[NodeInfo]
|
||||
for client in clients:
|
||||
let nodeInfo = await client.portal_history_nodeInfo()
|
||||
await client.close()
|
||||
nodeInfos.add(nodeInfo)
|
||||
|
||||
# different set of data for each test as tests are statefull so previously propagated
|
||||
# block are already in the network
|
||||
const dataPath = "./fluffy/tests/blocks/mainnet_blocks_1000040_1000050.json"
|
||||
|
||||
# path for temporary db, separate dir is used as sqlite usually also creates
|
||||
# wal files, and we do not want for those to linger in filesystem
|
||||
const tempDbPath = "./fluffy/tests/blocks/tempDir/mainnet_blocks_1000040_100050.sqlite3"
|
||||
|
||||
let (dbFile, dbName) = getDbBasePathAndName(tempDbPath).unsafeGet()
|
||||
|
||||
let blockData = readBlockDataTable(dataPath)
|
||||
check blockData.isOk()
|
||||
let bd = blockData.get()
|
||||
|
||||
createDir(dbFile)
|
||||
let db = SeedDb.new(path = dbFile, name = dbName)
|
||||
|
||||
try:
|
||||
# populate temp database from json file
|
||||
for t in blocksContent(bd, false):
|
||||
db.put(t[0], t[1], t[2])
|
||||
|
||||
check (await clients[0].portal_history_depthContentPropagate(tempDbPath, 64))
|
||||
await clients[0].close()
|
||||
|
||||
for client in clients:
|
||||
# Note: Once there is the Canonical Indices Network, we don't need to
|
||||
# access this file anymore here for the block hashes.
|
||||
for hash in bd.blockHashes():
|
||||
let content = await retryUntilDataPropagated(
|
||||
proc (): Future[Option[BlockObject]] {.async.} =
|
||||
try:
|
||||
let res = await client.eth_getBlockByHash(hash.ethHashStr(), false)
|
||||
await client.close()
|
||||
return res
|
||||
except CatchableError as exc:
|
||||
await client.close()
|
||||
raise exc
|
||||
,
|
||||
proc (mc: Option[BlockObject]): bool = return mc.isSome(),
|
||||
"Did not receive expected Block with hash " & $hash
|
||||
)
|
||||
check content.isSome()
|
||||
|
||||
|
|
|
@ -22,7 +22,7 @@ type
|
|||
contentKey: seq[byte]
|
||||
content: seq[byte]
|
||||
|
||||
ContentDataDist = tuple
|
||||
ContentDataDist* = tuple
|
||||
contentId: array[32, byte]
|
||||
contentKey: seq[byte]
|
||||
content: seq[byte]
|
||||
|
|
File diff suppressed because one or more lines are too long
Loading…
Reference in New Issue