Dynamic radius adjustments (#1079)
* Add config for node radius * Adjust radius when deleting content
This commit is contained in:
parent
41e9ea8d45
commit
4e8c5f292e
|
@ -11,7 +11,8 @@ import
|
|||
std/os,
|
||||
uri, confutils, confutils/std/net, chronicles,
|
||||
eth/keys, eth/net/nat, eth/p2p/discoveryv5/[enr, node],
|
||||
json_rpc/rpcproxy
|
||||
json_rpc/rpcproxy,
|
||||
./network/wire/portal_protocol_config
|
||||
|
||||
proc defaultDataDir*(): string =
|
||||
let dataDir = when defined(windows):
|
||||
|
@ -36,6 +37,7 @@ const
|
|||
# 100mb seems a bit smallish we may consider increasing defaults after some
|
||||
# network measurements
|
||||
defaultStorageSize* = uint32(1000 * 1000 * 100)
|
||||
defaultStorageSizeDesc* = $defaultStorageSize
|
||||
|
||||
type
|
||||
PortalCmd* = enum
|
||||
|
@ -162,13 +164,6 @@ type
|
|||
desc: "URI of eth client where to proxy unimplemented rpc methods to"
|
||||
name: "proxy-uri" .}: ClientConfig
|
||||
|
||||
logRadius* {.
|
||||
desc: "Hardcoded (logarithmic) radius for each Portal network. This is " &
|
||||
"a temporary development option which will be replaced in the " &
|
||||
"future by e.g. a storage size limit"
|
||||
defaultValue: 256
|
||||
name: "radius" .}: uint16
|
||||
|
||||
tableIpLimit* {.
|
||||
hidden
|
||||
desc: "Maximum amount of nodes with the same IP in the routing tables"
|
||||
|
@ -187,12 +182,23 @@ type
|
|||
defaultValue: DefaultBitsPerHop
|
||||
name: "bits-per-hop" .}: int
|
||||
|
||||
radiusConfig* {.
|
||||
hidden
|
||||
desc: "Radius configuration for a fluffy node. Radius can be either `dynamic`" &
|
||||
"where node adjust radius based on storage size limit," &
|
||||
"or `static:logRadius` where node have hardcoded logRadius value. " &
|
||||
"Warning: Setting it `static:logRadius` disable storage size limits and" &
|
||||
"makes fluffy node to store fraction of the network."
|
||||
defaultValue: defaultRadiusConfig
|
||||
name: "radius-config" .}: RadiusConfig
|
||||
|
||||
# TODO maybe it is worth defining minimal storage size and throw error if
|
||||
# value provided is smaller than minimum
|
||||
storageSize* {.
|
||||
desc: "Maximum amount (in bytes) of content which will be stored " &
|
||||
"in local database."
|
||||
defaultValue: defaultStorageSize
|
||||
defaultValueDesc: $defaultStorageSizeDesc
|
||||
name: "storage-size" .}: uint32
|
||||
|
||||
case cmd* {.
|
||||
|
|
|
@ -200,8 +200,6 @@ proc get*(db: ContentDB, key: ContentId): Option[seq[byte]] =
|
|||
# TODO: Here it is unfortunate that ContentId is a uint256 instead of Digest256.
|
||||
db.get(key.toByteArrayBE())
|
||||
|
||||
# TODO: Public due to usage in populating portal db, should be made private after
|
||||
# improving db populating to use local node id
|
||||
proc put*(db: ContentDB, key: ContentId, value: openArray[byte]) =
|
||||
db.put(key.toByteArrayBE(), value)
|
||||
|
||||
|
@ -250,7 +248,9 @@ proc put*(
|
|||
key: ContentId,
|
||||
value: openArray[byte],
|
||||
target: UInt256): PutResult =
|
||||
|
||||
db.put(key, value)
|
||||
|
||||
let dbSize = db.size()
|
||||
|
||||
if dbSize < int64(db.maxSize):
|
||||
|
|
|
@ -22,11 +22,6 @@ import
|
|||
./network/wire/[portal_stream, portal_protocol_config],
|
||||
"."/[content_db, populate_db]
|
||||
|
||||
proc fromLogRadius(T: type UInt256, logRadius: uint16): T =
|
||||
# Get the max value of the logRadius range
|
||||
pow((2).stuint(256), logRadius) - 1
|
||||
# For the min value do `pow((2).stuint(256), logRadius - 1)`
|
||||
|
||||
proc initializeBridgeClient(maybeUri: Option[string]): Option[BridgeClient] =
|
||||
try:
|
||||
if (maybeUri.isSome()):
|
||||
|
@ -98,15 +93,18 @@ proc run(config: PortalConf) {.raises: [CatchableError, Defect].} =
|
|||
# This is done because the content in the db is dependant on the `NodeId` and
|
||||
# the selected `Radius`.
|
||||
let
|
||||
radius = UInt256.fromLogRadius(config.logRadius)
|
||||
db = ContentDB.new(config.dataDir / "db" / "contentdb_" &
|
||||
d.localNode.id.toByteArrayBE().toOpenArray(0, 8).toHex(), maxSize = config.storageSize)
|
||||
|
||||
portalConfig = PortalProtocolConfig.init(
|
||||
config.tableIpLimit, config.bucketIpLimit, config.bitsPerHop)
|
||||
stateNetwork = StateNetwork.new(d, db, radius,
|
||||
config.tableIpLimit,
|
||||
config.bucketIpLimit,
|
||||
config.bitsPerHop,
|
||||
config.radiusConfig
|
||||
)
|
||||
stateNetwork = StateNetwork.new(d, db,
|
||||
bootstrapRecords = bootstrapRecords, portalConfig = portalConfig)
|
||||
historyNetwork = HistoryNetwork.new(d, db, radius,
|
||||
historyNetwork = HistoryNetwork.new(d, db,
|
||||
bootstrapRecords = bootstrapRecords, portalConfig = portalConfig)
|
||||
|
||||
# One instance of UtpDiscv5Protocol is shared over all the PortalStreams.
|
||||
|
|
|
@ -146,14 +146,7 @@ proc getBlockHeader*(
|
|||
headerContent.content
|
||||
)
|
||||
|
||||
if h.portalProtocol.inRange(contentId):
|
||||
# content is valid and in our range, save it into our db
|
||||
# TODO handle radius adjustments
|
||||
discard h.contentDB.put(
|
||||
contentId,
|
||||
headerContent.content,
|
||||
h.portalProtocol.localNode.id
|
||||
)
|
||||
h.portalProtocol.storeContent(contentId, headerContent.content)
|
||||
|
||||
return maybeHeader
|
||||
|
||||
|
@ -202,12 +195,7 @@ proc getBlock*(
|
|||
)
|
||||
|
||||
# content is in range and valid, put into db
|
||||
if h.portalProtocol.inRange(contentId):
|
||||
# TODO handle radius adjustments
|
||||
discard h.contentDB.put(
|
||||
contentId, bodyContent.content,
|
||||
h.portalProtocol.localNode.id
|
||||
)
|
||||
h.portalProtocol.storeContent(contentId, bodyContent.content)
|
||||
|
||||
return some[Block]((header, blockBody))
|
||||
|
||||
|
@ -233,13 +221,11 @@ proc new*(
|
|||
T: type HistoryNetwork,
|
||||
baseProtocol: protocol.Protocol,
|
||||
contentDB: ContentDB,
|
||||
dataRadius = UInt256.high(),
|
||||
bootstrapRecords: openArray[Record] = [],
|
||||
portalConfig: PortalProtocolConfig = defaultPortalProtocolConfig): T =
|
||||
let portalProtocol = PortalProtocol.new(
|
||||
baseProtocol, historyProtocolId, contentDB,
|
||||
toContentIdHandler, validateContent,
|
||||
dataRadius, bootstrapRecords,
|
||||
toContentIdHandler, validateContent, bootstrapRecords,
|
||||
config = portalConfig)
|
||||
|
||||
return HistoryNetwork(portalProtocol: portalProtocol, contentDB: contentDB)
|
||||
|
|
|
@ -53,7 +53,9 @@ proc getContent*(n: StateNetwork, key: ContentKey):
|
|||
# When content is found on the network and is in the radius range, store it.
|
||||
if content.isSome() and contentInRange:
|
||||
# TODO Add poke when working on state network
|
||||
discard n.contentDB.put(contentId, contentResult.content, n.portalProtocol.localNode.id)
|
||||
# TODO When working on state network, make it possible to pass different
|
||||
# distance functions to store content
|
||||
n.portalProtocol.storeContent(contentId, contentResult.content)
|
||||
|
||||
# TODO: for now returning bytes, ultimately it would be nice to return proper
|
||||
# domain types.
|
||||
|
@ -66,14 +68,12 @@ proc new*(
|
|||
T: type StateNetwork,
|
||||
baseProtocol: protocol.Protocol,
|
||||
contentDB: ContentDB,
|
||||
dataRadius = UInt256.high(),
|
||||
bootstrapRecords: openArray[Record] = [],
|
||||
portalConfig: PortalProtocolConfig = defaultPortalProtocolConfig): T =
|
||||
let portalProtocol = PortalProtocol.new(
|
||||
baseProtocol, stateProtocolId, contentDB,
|
||||
toContentIdHandler, validateContent,
|
||||
dataRadius, bootstrapRecords, stateDistanceCalculator,
|
||||
config = portalConfig)
|
||||
bootstrapRecords, stateDistanceCalculator, config = portalConfig)
|
||||
|
||||
return StateNetwork(portalProtocol: portalProtocol, contentDB: contentDB)
|
||||
|
||||
|
|
|
@ -153,6 +153,7 @@ type
|
|||
contentDB*: ContentDB
|
||||
toContentId: ToContentIdHandler
|
||||
validateContent: ContentValidationHandler
|
||||
radiusConfig: RadiusConfig
|
||||
dataRadius*: UInt256
|
||||
bootstrapRecords*: seq[Record]
|
||||
lastLookup: chronos.Moment
|
||||
|
@ -420,18 +421,36 @@ proc processContent(
|
|||
stream: PortalStream, contentKeys: ContentKeysList, content: seq[byte])
|
||||
{.gcsafe, raises: [Defect].}
|
||||
|
||||
proc fromLogRadius(T: type UInt256, logRadius: uint16): T =
|
||||
# Get the max value of the logRadius range
|
||||
pow((2).stuint(256), logRadius) - 1
|
||||
|
||||
proc getInitialRadius(rc: RadiusConfig): UInt256 =
|
||||
case rc.kind
|
||||
of Static:
|
||||
return UInt256.fromLogRadius(rc.logRadius)
|
||||
of Dynamic:
|
||||
# In case of a dynamic radius we start from the maximum value to quickly
|
||||
# gather as much data as possible, and also make sure each data piece in
|
||||
# the database is in our range after a node restart.
|
||||
# Alternative would be to store node the radius in database, and initialize it
|
||||
# from database after a restart
|
||||
return UInt256.high()
|
||||
|
||||
|
||||
proc new*(T: type PortalProtocol,
|
||||
baseProtocol: protocol.Protocol,
|
||||
protocolId: PortalProtocolId,
|
||||
contentDB: ContentDB,
|
||||
toContentId: ToContentIdHandler,
|
||||
validateContent: ContentValidationHandler,
|
||||
dataRadius = UInt256.high(),
|
||||
bootstrapRecords: openArray[Record] = [],
|
||||
distanceCalculator: DistanceCalculator = XorDistanceCalculator,
|
||||
config: PortalProtocolConfig = defaultPortalProtocolConfig
|
||||
): T =
|
||||
|
||||
let initialRadius: UInt256 = config.radiusConfig.getInitialRadius()
|
||||
|
||||
let proto = PortalProtocol(
|
||||
protocolHandler: messageHandler,
|
||||
protocolId: protocolId,
|
||||
|
@ -442,7 +461,8 @@ proc new*(T: type PortalProtocol,
|
|||
contentDB: contentDB,
|
||||
toContentId: toContentId,
|
||||
validateContent: validateContent,
|
||||
dataRadius: dataRadius,
|
||||
radiusConfig: config.radiusConfig,
|
||||
dataRadius: initialRadius,
|
||||
bootstrapRecords: @bootstrapRecords,
|
||||
radiusCache: RadiusCache.init(256),
|
||||
offerQueue: newAsyncQueue[OfferRequest](concurrentOffers))
|
||||
|
@ -1065,6 +1085,64 @@ proc neighborhoodGossip*(
|
|||
let req = OfferRequest(dst: node, kind: Direct, contentList: contentList)
|
||||
await p.offerQueue.addLast(req)
|
||||
|
||||
proc adjustRadius(
|
||||
p: PortalProtocol,
|
||||
fractionOfDeletedContent: float64,
|
||||
furthestElementInDbDistance: UInt256) =
|
||||
|
||||
if fractionOfDeletedContent == 0.0:
|
||||
# even though pruning was triggered no content was deleted, it could happen
|
||||
# in pathological case of really small database with really big values.
|
||||
# log it as error as it should not happenn
|
||||
error "Database pruning attempt resulted in no content deleted"
|
||||
return
|
||||
|
||||
# we need to invert fraction as our Uin256 implementation does not support
|
||||
# multiplication by float
|
||||
let invertedFractionAsInt = int64(1.0 / fractionOfDeletedContent)
|
||||
|
||||
let scaledRadius = p.dataRadius div u256(invertedFractionAsInt)
|
||||
|
||||
# Chose larger value to avoid situation, where furthestElementInDbDistance
|
||||
# is super close to local id, so local radius would end up too small
|
||||
# to accept any more data to local database
|
||||
# If scaledRadius radius will be larger it will still contain all elements
|
||||
let newRadius = max(scaledRadius, furthestElementInDbDistance)
|
||||
|
||||
|
||||
debug "Database pruned",
|
||||
oldRadius = p.dataRadius,
|
||||
newRadius = newRadius,
|
||||
furthestDistanceInDb = furthestElementInDbDistance,
|
||||
fractionOfDeletedContent = fractionOfDeletedContent
|
||||
|
||||
# both scaledRadius and furthestElementInDbDistance are smaller than current
|
||||
# dataRadius, so the radius will constantly decrease through the node
|
||||
# life time
|
||||
p.dataRadius = newRadius
|
||||
|
||||
proc storeContent*(p: PortalProtocol, key: ContentId, content: openArray[byte]) =
|
||||
# always re-check that key is in node range, to make sure that invariant that
|
||||
# all keys in database are always in node range hold.
|
||||
# TODO current silent assumption is that both contentDb and portalProtocol are
|
||||
# using the same xor distance function
|
||||
if p.inRange(key):
|
||||
case p.radiusConfig.kind:
|
||||
of Dynamic:
|
||||
# In case of dynamic radius setting we obey storage limits and adjust
|
||||
# radius to store network fraction corresponding to those storage limits.
|
||||
let res = p.contentDB.put(key, content, p.baseProtocol.localNode.id)
|
||||
if res.kind == DbPruned:
|
||||
p.adjustRadius(
|
||||
res.fractionOfDeletedContent,
|
||||
res.furthestStoredElementDistance
|
||||
)
|
||||
of Static:
|
||||
# If the config is set statically, radius is not adjusted, and is kept
|
||||
# constant thorugh node life time, also database max size is disabled
|
||||
# so we will effectivly store fraction of the network
|
||||
p.contentDB.put(key, content)
|
||||
|
||||
proc processContent(
|
||||
stream: PortalStream, contentKeys: ContentKeysList, content: seq[byte])
|
||||
{.gcsafe, raises: [Defect].} =
|
||||
|
@ -1083,13 +1161,8 @@ proc processContent(
|
|||
return
|
||||
|
||||
let contentId = contentIdOpt.get()
|
||||
# Store content, should we recheck radius?
|
||||
# TODO handle radius adjustments
|
||||
discard p.contentDB.put(
|
||||
contentId,
|
||||
content,
|
||||
p.baseProtocol.localNode.id
|
||||
)
|
||||
|
||||
p.storeContent(contentId, content)
|
||||
|
||||
info "Received valid offered content", contentKey
|
||||
|
||||
|
|
|
@ -1,26 +1,77 @@
|
|||
import
|
||||
std/strutils,
|
||||
confutils,
|
||||
eth/p2p/discoveryv5/routing_table
|
||||
|
||||
type
|
||||
RadiusConfigKind* = enum
|
||||
Static, Dynamic
|
||||
|
||||
RadiusConfig* = object
|
||||
case kind*: RadiusConfigKind
|
||||
of Static:
|
||||
logRadius*: uint16
|
||||
of Dynamic:
|
||||
discard
|
||||
|
||||
PortalProtocolConfig* = object
|
||||
tableIpLimits*: TableIpLimits
|
||||
bitsPerHop*: int
|
||||
radiusConfig*: RadiusConfig
|
||||
|
||||
const
|
||||
defaultRadiusConfig* = RadiusConfig(kind: Dynamic)
|
||||
|
||||
defaultPortalProtocolConfig* = PortalProtocolConfig(
|
||||
tableIpLimits: DefaultTableIpLimits,
|
||||
bitsPerHop: DefaultBitsPerHop)
|
||||
bitsPerHop: DefaultBitsPerHop,
|
||||
radiusConfig: defaultRadiusConfig
|
||||
)
|
||||
|
||||
proc init*(
|
||||
T: type PortalProtocolConfig,
|
||||
tableIpLimit: uint,
|
||||
bucketIpLimit: uint,
|
||||
bitsPerHop: int): T =
|
||||
bitsPerHop: int,
|
||||
radiusConfig: RadiusConfig): T =
|
||||
|
||||
PortalProtocolConfig(
|
||||
tableIpLimits: TableIpLimits(
|
||||
tableIpLimit: tableIpLimit,
|
||||
bucketIpLimit: bucketIpLimit),
|
||||
bitsPerHop: bitsPerHop
|
||||
bitsPerHop: bitsPerHop,
|
||||
radiusConfig: radiusConfig
|
||||
)
|
||||
|
||||
proc parseCmdArg*(T: type RadiusConfig, p: TaintedString): T
|
||||
{.raises: [Defect, ConfigurationError].} =
|
||||
|
||||
if p.startsWith("dynamic") and len(p) == 7:
|
||||
return RadiusConfig(kind: Dynamic)
|
||||
elif p.startsWith("static:"):
|
||||
let num = p[7..^1]
|
||||
try:
|
||||
let parsed = uint16.parseCmdArg(num)
|
||||
|
||||
if parsed > 256:
|
||||
raise newException(
|
||||
ConfigurationError, "Provided logRadius should be <= 256"
|
||||
)
|
||||
|
||||
return RadiusConfig(kind: Static, logRadius: parsed)
|
||||
except ValueError:
|
||||
let msg = "Provided logRadius: " & num & " is not a valid number"
|
||||
raise newException(
|
||||
ConfigurationError, msg
|
||||
)
|
||||
else:
|
||||
let msg =
|
||||
"Not supported radius config option: " & p & " . " &
|
||||
"Supported options: dynamic, static:logRadius"
|
||||
raise newException(
|
||||
ConfigurationError,
|
||||
msg
|
||||
)
|
||||
|
||||
proc completeCmdArg*(T: type RadiusConfig, val: TaintedString): seq[string] =
|
||||
return @[]
|
||||
|
|
|
@ -248,7 +248,7 @@ for NUM_NODE in $(seq 0 $(( NUM_NODES - 1 ))); do
|
|||
BOOTSTRAP_ARG="--bootstrap-file=${BOOTSTRAP_ENR_FILE}"
|
||||
# All nodes but bootstrap node run with log. radius of 254 which should
|
||||
# result in ~1/4th of the data set stored.
|
||||
RADIUS_ARG="--radius=254"
|
||||
RADIUS_ARG="--radius-config=static:254"
|
||||
|
||||
# Wait for the bootstrap node to write out its enr file
|
||||
START_TIMESTAMP=$(date +%s)
|
||||
|
|
|
@ -12,15 +12,8 @@ import
|
|||
unittest2, stint,
|
||||
eth/keys,
|
||||
../network/state/state_content,
|
||||
../content_db
|
||||
|
||||
proc genByteSeq(length: int): seq[byte] =
|
||||
var i = 0
|
||||
var resultSeq = newSeq[byte](length)
|
||||
while i < length:
|
||||
resultSeq[i] = byte(i)
|
||||
inc i
|
||||
return resultSeq
|
||||
../content_db,
|
||||
./test_helpers
|
||||
|
||||
proc generateNRandomU256(rng: var BrHmacDrbgContext, n: int): seq[UInt256] =
|
||||
var i = 0
|
||||
|
|
|
@ -34,3 +34,11 @@ proc initDiscoveryNode*(rng: ref BrHmacDrbgContext,
|
|||
rng = rng)
|
||||
|
||||
result.open()
|
||||
|
||||
proc genByteSeq*(length: int): seq[byte] =
|
||||
var i = 0
|
||||
var resultSeq = newSeq[byte](length)
|
||||
while i < length:
|
||||
resultSeq[i] = byte(i)
|
||||
inc i
|
||||
return resultSeq
|
||||
|
|
|
@ -8,6 +8,7 @@
|
|||
{.used.}
|
||||
|
||||
import
|
||||
std/algorithm,
|
||||
chronos, testutils/unittests, stew/shims/net,
|
||||
eth/keys, eth/p2p/discoveryv5/routing_table, nimcrypto/[hash, sha2],
|
||||
eth/p2p/discoveryv5/protocol as discv5_protocol,
|
||||
|
@ -334,3 +335,38 @@ procSuite "Portal Wire Protocol Tests":
|
|||
proto2.stop()
|
||||
await node1.closeWait()
|
||||
await node2.closeWait()
|
||||
|
||||
asyncTest "Adjusting radius after hitting full database":
|
||||
let
|
||||
node1 = initDiscoveryNode(
|
||||
rng, PrivateKey.random(rng[]), localAddress(20303))
|
||||
|
||||
dbLimit = 100000'u32
|
||||
db = ContentDB.new("", dbLimit, inMemory = true)
|
||||
proto1 = PortalProtocol.new(node1, protocolId, db, testHandler,
|
||||
validateContent)
|
||||
|
||||
let item = genByteSeq(10000)
|
||||
var distances: seq[UInt256] = @[]
|
||||
|
||||
for i in 0..8:
|
||||
proto1.storeContent(u256(i), item)
|
||||
distances.add(u256(i) xor proto1.localNode.id)
|
||||
|
||||
# With current setting i.e limit 100000bytes and 10000 byte element each
|
||||
# two furthest elements should be delted i.e index 0 and 1.
|
||||
# index 2 should be still be in database and it distance should always be
|
||||
# <= updated radius
|
||||
distances.sort(order = SortOrder.Descending)
|
||||
|
||||
check:
|
||||
db.get((distances[0] xor proto1.localNode.id)).isNone()
|
||||
db.get((distances[1] xor proto1.localNode.id)).isNone()
|
||||
db.get((distances[2] xor proto1.localNode.id)).isSome()
|
||||
# our radius have been updated and is lower than max
|
||||
proto1.dataRadius < UInt256.high
|
||||
# but higher or equal to furthest non deleted element
|
||||
proto1.dataRadius >= distances[2]
|
||||
|
||||
proto1.stop()
|
||||
await node1.closeWait()
|
||||
|
|
Loading…
Reference in New Issue