Fluffy: Make the Kademlia concurrency factor configurable via a debug parameter (#2868)

This commit is contained in:
bhartnett 2024-11-25 20:16:27 +08:00 committed by GitHub
parent fbfc1611d7
commit 6e7e63adf5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 34 additions and 17 deletions

View File

@ -45,6 +45,7 @@ const
defaultTableIpLimitDesc* = $defaultPortalProtocolConfig.tableIpLimits.tableIpLimit defaultTableIpLimitDesc* = $defaultPortalProtocolConfig.tableIpLimits.tableIpLimit
defaultBucketIpLimitDesc* = $defaultPortalProtocolConfig.tableIpLimits.bucketIpLimit defaultBucketIpLimitDesc* = $defaultPortalProtocolConfig.tableIpLimits.bucketIpLimit
defaultBitsPerHopDesc* = $defaultPortalProtocolConfig.bitsPerHop defaultBitsPerHopDesc* = $defaultPortalProtocolConfig.bitsPerHop
defaultAlphaDesc* = $defaultPortalProtocolConfig.alpha
defaultMaxGossipNodesDesc* = $defaultPortalProtocolConfig.maxGossipNodes defaultMaxGossipNodesDesc* = $defaultPortalProtocolConfig.maxGossipNodes
defaultRpcApis* = @["eth", "portal"] defaultRpcApis* = @["eth", "portal"]
defaultRpcApisDesc* = "eth,portal" defaultRpcApisDesc* = "eth,portal"
@ -254,6 +255,14 @@ type
name: "bits-per-hop" name: "bits-per-hop"
.}: int .}: int
alpha* {.
hidden,
desc: "The Kademlia concurrency factor",
defaultValue: defaultPortalProtocolConfig.alpha,
defaultValueDesc: $defaultAlphaDesc,
name: "debug-alpha"
.}: int
maxGossipNodes* {. maxGossipNodes* {.
hidden, hidden,
desc: "The maximum number of nodes to send content to during gossip", desc: "The maximum number of nodes to send content to during gossip",

View File

@ -181,9 +181,9 @@ proc run(
## Portal node setup ## Portal node setup
let let
portalProtocolConfig = PortalProtocolConfig.init( portalProtocolConfig = PortalProtocolConfig.init(
config.tableIpLimit, config.bucketIpLimit, config.bitsPerHop, config.radiusConfig, config.tableIpLimit, config.bucketIpLimit, config.bitsPerHop, config.alpha,
config.disablePoke, config.maxGossipNodes, config.contentCacheSize, config.radiusConfig, config.disablePoke, config.maxGossipNodes,
config.disableContentCache, config.maxConcurrentOffers, config.contentCacheSize, config.disableContentCache, config.maxConcurrentOffers,
) )
portalNodeConfig = PortalNodeConfig( portalNodeConfig = PortalNodeConfig(

View File

@ -116,7 +116,6 @@ logScope:
topics = "portal_wire" topics = "portal_wire"
const const
alpha = 3 ## Kademlia concurrency factor
enrsResultLimit* = 32 ## Maximum amount of ENRs in the total Nodes messages enrsResultLimit* = 32 ## Maximum amount of ENRs in the total Nodes messages
## that will be processed ## that will be processed
refreshInterval = 5.minutes ## Interval of launching a random query to refreshInterval = 5.minutes ## Interval of launching a random query to
@ -1035,14 +1034,15 @@ proc lookup*(
for node in closestNodes: for node in closestNodes:
seen.incl(node.id) seen.incl(node.id)
var pendingQueries = newSeqOfCap[Future[seq[Node]].Raising([CancelledError])](alpha) var pendingQueries =
newSeqOfCap[Future[seq[Node]].Raising([CancelledError])](p.config.alpha)
var requestAmount = 0'i64 var requestAmount = 0'i64
while true: while true:
var i = 0 var i = 0
# Doing `alpha` amount of requests at once as long as closer non queried # Doing `p.config.alpha` amount of requests at once as long as closer non queried
# nodes are discovered. # nodes are discovered.
while i < closestNodes.len and pendingQueries.len < alpha: while i < closestNodes.len and pendingQueries.len < p.config.alpha:
let n = closestNodes[i] let n = closestNodes[i]
if not asked.containsOrIncl(n.id): if not asked.containsOrIncl(n.id):
pendingQueries.add(p.lookupWorker(n, target)) pendingQueries.add(p.lookupWorker(n, target))
@ -1156,17 +1156,18 @@ proc contentLookup*(
for node in closestNodes: for node in closestNodes:
seen.incl(node.id) seen.incl(node.id)
var pendingQueries = var pendingQueries = newSeqOfCap[
newSeqOfCap[Future[PortalResult[FoundContent]].Raising([CancelledError])](alpha) Future[PortalResult[FoundContent]].Raising([CancelledError])
](p.config.alpha)
var requestAmount = 0'i64 var requestAmount = 0'i64
var nodesWithoutContent: seq[Node] = newSeq[Node]() var nodesWithoutContent: seq[Node] = newSeq[Node]()
while true: while true:
var i = 0 var i = 0
# Doing `alpha` amount of requests at once as long as closer non queried # Doing `p.config.alpha` amount of requests at once as long as closer non queried
# nodes are discovered. # nodes are discovered.
while i < closestNodes.len and pendingQueries.len < alpha: while i < closestNodes.len and pendingQueries.len < p.config.alpha:
let n = closestNodes[i] let n = closestNodes[i]
if not asked.containsOrIncl(n.id): if not asked.containsOrIncl(n.id):
pendingQueries.add(p.findContent(n, target)) pendingQueries.add(p.findContent(n, target))
@ -1277,8 +1278,9 @@ proc traceContentLookup*(
metadata["0x" & $cn.id] = metadata["0x" & $cn.id] =
NodeMetadata(enr: cn.record, distance: p.distance(cn.id, targetId)) NodeMetadata(enr: cn.record, distance: p.distance(cn.id, targetId))
var pendingQueries = var pendingQueries = newSeqOfCap[
newSeqOfCap[Future[PortalResult[FoundContent]].Raising([CancelledError])](alpha) Future[PortalResult[FoundContent]].Raising([CancelledError])
](p.config.alpha)
var pendingNodes = newSeq[Node]() var pendingNodes = newSeq[Node]()
var requestAmount = 0'i64 var requestAmount = 0'i64
@ -1286,9 +1288,9 @@ proc traceContentLookup*(
while true: while true:
var i = 0 var i = 0
# Doing `alpha` amount of requests at once as long as closer non queried # Doing `p.config.alpha` amount of requests at once as long as closer non queried
# nodes are discovered. # nodes are discovered.
while i < closestNodes.len and pendingQueries.len < alpha: while i < closestNodes.len and pendingQueries.len < p.config.alpha:
let n = closestNodes[i] let n = closestNodes[i]
if not asked.containsOrIncl(n.id): if not asked.containsOrIncl(n.id):
pendingQueries.add(p.findContent(n, target)) pendingQueries.add(p.findContent(n, target))
@ -1439,11 +1441,12 @@ proc query*(
for node in queryBuffer: for node in queryBuffer:
seen.incl(node.id) seen.incl(node.id)
var pendingQueries = newSeqOfCap[Future[seq[Node]].Raising([CancelledError])](alpha) var pendingQueries =
newSeqOfCap[Future[seq[Node]].Raising([CancelledError])](p.config.alpha)
while true: while true:
var i = 0 var i = 0
while i < min(queryBuffer.len, k) and pendingQueries.len < alpha: while i < min(queryBuffer.len, k) and pendingQueries.len < p.config.alpha:
let n = queryBuffer[i] let n = queryBuffer[i]
if not asked.containsOrIncl(n.id): if not asked.containsOrIncl(n.id):
pendingQueries.add(p.lookupWorker(n, target)) pendingQueries.add(p.lookupWorker(n, target))

View File

@ -38,6 +38,7 @@ type
PortalProtocolConfig* = object PortalProtocolConfig* = object
tableIpLimits*: TableIpLimits tableIpLimits*: TableIpLimits
bitsPerHop*: int bitsPerHop*: int
alpha*: int
radiusConfig*: RadiusConfig radiusConfig*: RadiusConfig
disablePoke*: bool disablePoke*: bool
maxGossipNodes*: int maxGossipNodes*: int
@ -53,11 +54,13 @@ const
defaultContentCacheSize* = 100 defaultContentCacheSize* = 100
defaultDisableContentCache* = false defaultDisableContentCache* = false
defaultMaxConcurrentOffers* = 50 defaultMaxConcurrentOffers* = 50
defaultAlpha* = 3
revalidationTimeout* = chronos.seconds(30) revalidationTimeout* = chronos.seconds(30)
defaultPortalProtocolConfig* = PortalProtocolConfig( defaultPortalProtocolConfig* = PortalProtocolConfig(
tableIpLimits: DefaultTableIpLimits, tableIpLimits: DefaultTableIpLimits,
bitsPerHop: DefaultBitsPerHop, bitsPerHop: DefaultBitsPerHop,
alpha: defaultAlpha,
radiusConfig: defaultRadiusConfig, radiusConfig: defaultRadiusConfig,
disablePoke: defaultDisablePoke, disablePoke: defaultDisablePoke,
maxGossipNodes: defaultMaxGossipNodes, maxGossipNodes: defaultMaxGossipNodes,
@ -71,6 +74,7 @@ proc init*(
tableIpLimit: uint, tableIpLimit: uint,
bucketIpLimit: uint, bucketIpLimit: uint,
bitsPerHop: int, bitsPerHop: int,
alpha: int,
radiusConfig: RadiusConfig, radiusConfig: RadiusConfig,
disablePoke: bool, disablePoke: bool,
maxGossipNodes: int, maxGossipNodes: int,
@ -82,6 +86,7 @@ proc init*(
tableIpLimits: tableIpLimits:
TableIpLimits(tableIpLimit: tableIpLimit, bucketIpLimit: bucketIpLimit), TableIpLimits(tableIpLimit: tableIpLimit, bucketIpLimit: bucketIpLimit),
bitsPerHop: bitsPerHop, bitsPerHop: bitsPerHop,
alpha: alpha,
radiusConfig: radiusConfig, radiusConfig: radiusConfig,
disablePoke: disablePoke, disablePoke: disablePoke,
maxGossipNodes: maxGossipNodes, maxGossipNodes: maxGossipNodes,