From 4e8c5f292e07ed3a6727f3a9e681ab23493d46db Mon Sep 17 00:00:00 2001 From: KonradStaniec Date: Thu, 12 May 2022 18:04:37 +0200 Subject: [PATCH] Dynamic radius adjustments (#1079) * Add config for node radius * Adjust radius when deleting content --- fluffy/conf.nim | 22 +++-- fluffy/content_db.nim | 4 +- fluffy/fluffy.nim | 16 ++-- fluffy/network/history/history_network.nim | 20 +--- fluffy/network/state/state_network.nim | 8 +- fluffy/network/wire/portal_protocol.nim | 91 +++++++++++++++++-- .../network/wire/portal_protocol_config.nim | 57 +++++++++++- fluffy/scripts/launch_local_testnet.sh | 2 +- fluffy/tests/test_content_db.nim | 11 +-- fluffy/tests/test_helpers.nim | 8 ++ fluffy/tests/test_portal_wire_protocol.nim | 36 ++++++++ 11 files changed, 213 insertions(+), 62 deletions(-) diff --git a/fluffy/conf.nim b/fluffy/conf.nim index 17ddf72dd..b5688d810 100644 --- a/fluffy/conf.nim +++ b/fluffy/conf.nim @@ -11,7 +11,8 @@ import std/os, uri, confutils, confutils/std/net, chronicles, eth/keys, eth/net/nat, eth/p2p/discoveryv5/[enr, node], - json_rpc/rpcproxy + json_rpc/rpcproxy, + ./network/wire/portal_protocol_config proc defaultDataDir*(): string = let dataDir = when defined(windows): @@ -36,6 +37,7 @@ const # 100mb seems a bit smallish we may consider increasing defaults after some # network measurements defaultStorageSize* = uint32(1000 * 1000 * 100) + defaultStorageSizeDesc* = $defaultStorageSize type PortalCmd* = enum @@ -162,13 +164,6 @@ type desc: "URI of eth client where to proxy unimplemented rpc methods to" name: "proxy-uri" .}: ClientConfig - logRadius* {. - desc: "Hardcoded (logarithmic) radius for each Portal network. This is " & - "a temporary development option which will be replaced in the " & - "future by e.g. a storage size limit" - defaultValue: 256 - name: "radius" .}: uint16 - tableIpLimit* {. hidden desc: "Maximum amount of nodes with the same IP in the routing tables" @@ -187,12 +182,23 @@ type defaultValue: DefaultBitsPerHop name: "bits-per-hop" .}: int + radiusConfig* {. + hidden + desc: "Radius configuration for a fluffy node. Radius can be either `dynamic`" & + "where node adjust radius based on storage size limit," & + "or `static:logRadius` where node have hardcoded logRadius value. " & + "Warning: Setting it `static:logRadius` disable storage size limits and" & + "makes fluffy node to store fraction of the network." + defaultValue: defaultRadiusConfig + name: "radius-config" .}: RadiusConfig + # TODO maybe it is worth defining minimal storage size and throw error if # value provided is smaller than minimum storageSize* {. desc: "Maximum amount (in bytes) of content which will be stored " & "in local database." defaultValue: defaultStorageSize + defaultValueDesc: $defaultStorageSizeDesc name: "storage-size" .}: uint32 case cmd* {. diff --git a/fluffy/content_db.nim b/fluffy/content_db.nim index 29a0489b1..6dd09d5e3 100644 --- a/fluffy/content_db.nim +++ b/fluffy/content_db.nim @@ -200,8 +200,6 @@ proc get*(db: ContentDB, key: ContentId): Option[seq[byte]] = # TODO: Here it is unfortunate that ContentId is a uint256 instead of Digest256. db.get(key.toByteArrayBE()) -# TODO: Public due to usage in populating portal db, should be made private after -# improving db populating to use local node id proc put*(db: ContentDB, key: ContentId, value: openArray[byte]) = db.put(key.toByteArrayBE(), value) @@ -250,7 +248,9 @@ proc put*( key: ContentId, value: openArray[byte], target: UInt256): PutResult = + db.put(key, value) + let dbSize = db.size() if dbSize < int64(db.maxSize): diff --git a/fluffy/fluffy.nim b/fluffy/fluffy.nim index 81bf14759..b8d679a97 100644 --- a/fluffy/fluffy.nim +++ b/fluffy/fluffy.nim @@ -22,11 +22,6 @@ import ./network/wire/[portal_stream, portal_protocol_config], "."/[content_db, populate_db] -proc fromLogRadius(T: type UInt256, logRadius: uint16): T = - # Get the max value of the logRadius range - pow((2).stuint(256), logRadius) - 1 - # For the min value do `pow((2).stuint(256), logRadius - 1)` - proc initializeBridgeClient(maybeUri: Option[string]): Option[BridgeClient] = try: if (maybeUri.isSome()): @@ -98,15 +93,18 @@ proc run(config: PortalConf) {.raises: [CatchableError, Defect].} = # This is done because the content in the db is dependant on the `NodeId` and # the selected `Radius`. let - radius = UInt256.fromLogRadius(config.logRadius) db = ContentDB.new(config.dataDir / "db" / "contentdb_" & d.localNode.id.toByteArrayBE().toOpenArray(0, 8).toHex(), maxSize = config.storageSize) portalConfig = PortalProtocolConfig.init( - config.tableIpLimit, config.bucketIpLimit, config.bitsPerHop) - stateNetwork = StateNetwork.new(d, db, radius, + config.tableIpLimit, + config.bucketIpLimit, + config.bitsPerHop, + config.radiusConfig + ) + stateNetwork = StateNetwork.new(d, db, bootstrapRecords = bootstrapRecords, portalConfig = portalConfig) - historyNetwork = HistoryNetwork.new(d, db, radius, + historyNetwork = HistoryNetwork.new(d, db, bootstrapRecords = bootstrapRecords, portalConfig = portalConfig) # One instance of UtpDiscv5Protocol is shared over all the PortalStreams. diff --git a/fluffy/network/history/history_network.nim b/fluffy/network/history/history_network.nim index 4dfd377af..545bd4384 100644 --- a/fluffy/network/history/history_network.nim +++ b/fluffy/network/history/history_network.nim @@ -146,14 +146,7 @@ proc getBlockHeader*( headerContent.content ) - if h.portalProtocol.inRange(contentId): - # content is valid and in our range, save it into our db - # TODO handle radius adjustments - discard h.contentDB.put( - contentId, - headerContent.content, - h.portalProtocol.localNode.id - ) + h.portalProtocol.storeContent(contentId, headerContent.content) return maybeHeader @@ -202,12 +195,7 @@ proc getBlock*( ) # content is in range and valid, put into db - if h.portalProtocol.inRange(contentId): - # TODO handle radius adjustments - discard h.contentDB.put( - contentId, bodyContent.content, - h.portalProtocol.localNode.id - ) + h.portalProtocol.storeContent(contentId, bodyContent.content) return some[Block]((header, blockBody)) @@ -233,13 +221,11 @@ proc new*( T: type HistoryNetwork, baseProtocol: protocol.Protocol, contentDB: ContentDB, - dataRadius = UInt256.high(), bootstrapRecords: openArray[Record] = [], portalConfig: PortalProtocolConfig = defaultPortalProtocolConfig): T = let portalProtocol = PortalProtocol.new( baseProtocol, historyProtocolId, contentDB, - toContentIdHandler, validateContent, - dataRadius, bootstrapRecords, + toContentIdHandler, validateContent, bootstrapRecords, config = portalConfig) return HistoryNetwork(portalProtocol: portalProtocol, contentDB: contentDB) diff --git a/fluffy/network/state/state_network.nim b/fluffy/network/state/state_network.nim index ffbf8331e..b79a06af3 100644 --- a/fluffy/network/state/state_network.nim +++ b/fluffy/network/state/state_network.nim @@ -53,7 +53,9 @@ proc getContent*(n: StateNetwork, key: ContentKey): # When content is found on the network and is in the radius range, store it. if content.isSome() and contentInRange: # TODO Add poke when working on state network - discard n.contentDB.put(contentId, contentResult.content, n.portalProtocol.localNode.id) + # TODO When working on state network, make it possible to pass different + # distance functions to store content + n.portalProtocol.storeContent(contentId, contentResult.content) # TODO: for now returning bytes, ultimately it would be nice to return proper # domain types. @@ -66,14 +68,12 @@ proc new*( T: type StateNetwork, baseProtocol: protocol.Protocol, contentDB: ContentDB, - dataRadius = UInt256.high(), bootstrapRecords: openArray[Record] = [], portalConfig: PortalProtocolConfig = defaultPortalProtocolConfig): T = let portalProtocol = PortalProtocol.new( baseProtocol, stateProtocolId, contentDB, toContentIdHandler, validateContent, - dataRadius, bootstrapRecords, stateDistanceCalculator, - config = portalConfig) + bootstrapRecords, stateDistanceCalculator, config = portalConfig) return StateNetwork(portalProtocol: portalProtocol, contentDB: contentDB) diff --git a/fluffy/network/wire/portal_protocol.nim b/fluffy/network/wire/portal_protocol.nim index 88a9acbdf..721daa854 100644 --- a/fluffy/network/wire/portal_protocol.nim +++ b/fluffy/network/wire/portal_protocol.nim @@ -153,6 +153,7 @@ type contentDB*: ContentDB toContentId: ToContentIdHandler validateContent: ContentValidationHandler + radiusConfig: RadiusConfig dataRadius*: UInt256 bootstrapRecords*: seq[Record] lastLookup: chronos.Moment @@ -420,18 +421,36 @@ proc processContent( stream: PortalStream, contentKeys: ContentKeysList, content: seq[byte]) {.gcsafe, raises: [Defect].} +proc fromLogRadius(T: type UInt256, logRadius: uint16): T = + # Get the max value of the logRadius range + pow((2).stuint(256), logRadius) - 1 + +proc getInitialRadius(rc: RadiusConfig): UInt256 = + case rc.kind + of Static: + return UInt256.fromLogRadius(rc.logRadius) + of Dynamic: + # In case of a dynamic radius we start from the maximum value to quickly + # gather as much data as possible, and also make sure each data piece in + # the database is in our range after a node restart. + # Alternative would be to store node the radius in database, and initialize it + # from database after a restart + return UInt256.high() + + proc new*(T: type PortalProtocol, baseProtocol: protocol.Protocol, protocolId: PortalProtocolId, contentDB: ContentDB, toContentId: ToContentIdHandler, validateContent: ContentValidationHandler, - dataRadius = UInt256.high(), bootstrapRecords: openArray[Record] = [], distanceCalculator: DistanceCalculator = XorDistanceCalculator, config: PortalProtocolConfig = defaultPortalProtocolConfig ): T = + let initialRadius: UInt256 = config.radiusConfig.getInitialRadius() + let proto = PortalProtocol( protocolHandler: messageHandler, protocolId: protocolId, @@ -442,7 +461,8 @@ proc new*(T: type PortalProtocol, contentDB: contentDB, toContentId: toContentId, validateContent: validateContent, - dataRadius: dataRadius, + radiusConfig: config.radiusConfig, + dataRadius: initialRadius, bootstrapRecords: @bootstrapRecords, radiusCache: RadiusCache.init(256), offerQueue: newAsyncQueue[OfferRequest](concurrentOffers)) @@ -1065,6 +1085,64 @@ proc neighborhoodGossip*( let req = OfferRequest(dst: node, kind: Direct, contentList: contentList) await p.offerQueue.addLast(req) +proc adjustRadius( + p: PortalProtocol, + fractionOfDeletedContent: float64, + furthestElementInDbDistance: UInt256) = + + if fractionOfDeletedContent == 0.0: + # even though pruning was triggered no content was deleted, it could happen + # in pathological case of really small database with really big values. + # log it as error as it should not happenn + error "Database pruning attempt resulted in no content deleted" + return + + # we need to invert fraction as our Uin256 implementation does not support + # multiplication by float + let invertedFractionAsInt = int64(1.0 / fractionOfDeletedContent) + + let scaledRadius = p.dataRadius div u256(invertedFractionAsInt) + + # Chose larger value to avoid situation, where furthestElementInDbDistance + # is super close to local id, so local radius would end up too small + # to accept any more data to local database + # If scaledRadius radius will be larger it will still contain all elements + let newRadius = max(scaledRadius, furthestElementInDbDistance) + + + debug "Database pruned", + oldRadius = p.dataRadius, + newRadius = newRadius, + furthestDistanceInDb = furthestElementInDbDistance, + fractionOfDeletedContent = fractionOfDeletedContent + + # both scaledRadius and furthestElementInDbDistance are smaller than current + # dataRadius, so the radius will constantly decrease through the node + # life time + p.dataRadius = newRadius + +proc storeContent*(p: PortalProtocol, key: ContentId, content: openArray[byte]) = + # always re-check that key is in node range, to make sure that invariant that + # all keys in database are always in node range hold. + # TODO current silent assumption is that both contentDb and portalProtocol are + # using the same xor distance function + if p.inRange(key): + case p.radiusConfig.kind: + of Dynamic: + # In case of dynamic radius setting we obey storage limits and adjust + # radius to store network fraction corresponding to those storage limits. + let res = p.contentDB.put(key, content, p.baseProtocol.localNode.id) + if res.kind == DbPruned: + p.adjustRadius( + res.fractionOfDeletedContent, + res.furthestStoredElementDistance + ) + of Static: + # If the config is set statically, radius is not adjusted, and is kept + # constant thorugh node life time, also database max size is disabled + # so we will effectivly store fraction of the network + p.contentDB.put(key, content) + proc processContent( stream: PortalStream, contentKeys: ContentKeysList, content: seq[byte]) {.gcsafe, raises: [Defect].} = @@ -1083,13 +1161,8 @@ proc processContent( return let contentId = contentIdOpt.get() - # Store content, should we recheck radius? - # TODO handle radius adjustments - discard p.contentDB.put( - contentId, - content, - p.baseProtocol.localNode.id - ) + + p.storeContent(contentId, content) info "Received valid offered content", contentKey diff --git a/fluffy/network/wire/portal_protocol_config.nim b/fluffy/network/wire/portal_protocol_config.nim index 8edc4ae3c..a1653a26d 100644 --- a/fluffy/network/wire/portal_protocol_config.nim +++ b/fluffy/network/wire/portal_protocol_config.nim @@ -1,26 +1,77 @@ import + std/strutils, + confutils, eth/p2p/discoveryv5/routing_table type + RadiusConfigKind* = enum + Static, Dynamic + + RadiusConfig* = object + case kind*: RadiusConfigKind + of Static: + logRadius*: uint16 + of Dynamic: + discard + PortalProtocolConfig* = object tableIpLimits*: TableIpLimits bitsPerHop*: int + radiusConfig*: RadiusConfig const + defaultRadiusConfig* = RadiusConfig(kind: Dynamic) + defaultPortalProtocolConfig* = PortalProtocolConfig( tableIpLimits: DefaultTableIpLimits, - bitsPerHop: DefaultBitsPerHop) + bitsPerHop: DefaultBitsPerHop, + radiusConfig: defaultRadiusConfig + ) proc init*( T: type PortalProtocolConfig, tableIpLimit: uint, bucketIpLimit: uint, - bitsPerHop: int): T = + bitsPerHop: int, + radiusConfig: RadiusConfig): T = PortalProtocolConfig( tableIpLimits: TableIpLimits( tableIpLimit: tableIpLimit, bucketIpLimit: bucketIpLimit), - bitsPerHop: bitsPerHop + bitsPerHop: bitsPerHop, + radiusConfig: radiusConfig ) +proc parseCmdArg*(T: type RadiusConfig, p: TaintedString): T + {.raises: [Defect, ConfigurationError].} = + + if p.startsWith("dynamic") and len(p) == 7: + return RadiusConfig(kind: Dynamic) + elif p.startsWith("static:"): + let num = p[7..^1] + try: + let parsed = uint16.parseCmdArg(num) + + if parsed > 256: + raise newException( + ConfigurationError, "Provided logRadius should be <= 256" + ) + + return RadiusConfig(kind: Static, logRadius: parsed) + except ValueError: + let msg = "Provided logRadius: " & num & " is not a valid number" + raise newException( + ConfigurationError, msg + ) + else: + let msg = + "Not supported radius config option: " & p & " . " & + "Supported options: dynamic, static:logRadius" + raise newException( + ConfigurationError, + msg + ) + +proc completeCmdArg*(T: type RadiusConfig, val: TaintedString): seq[string] = + return @[] diff --git a/fluffy/scripts/launch_local_testnet.sh b/fluffy/scripts/launch_local_testnet.sh index edbb6230e..4a877c13b 100755 --- a/fluffy/scripts/launch_local_testnet.sh +++ b/fluffy/scripts/launch_local_testnet.sh @@ -248,7 +248,7 @@ for NUM_NODE in $(seq 0 $(( NUM_NODES - 1 ))); do BOOTSTRAP_ARG="--bootstrap-file=${BOOTSTRAP_ENR_FILE}" # All nodes but bootstrap node run with log. radius of 254 which should # result in ~1/4th of the data set stored. - RADIUS_ARG="--radius=254" + RADIUS_ARG="--radius-config=static:254" # Wait for the bootstrap node to write out its enr file START_TIMESTAMP=$(date +%s) diff --git a/fluffy/tests/test_content_db.nim b/fluffy/tests/test_content_db.nim index 9543d83ac..05387e420 100644 --- a/fluffy/tests/test_content_db.nim +++ b/fluffy/tests/test_content_db.nim @@ -12,15 +12,8 @@ import unittest2, stint, eth/keys, ../network/state/state_content, - ../content_db - -proc genByteSeq(length: int): seq[byte] = - var i = 0 - var resultSeq = newSeq[byte](length) - while i < length: - resultSeq[i] = byte(i) - inc i - return resultSeq + ../content_db, + ./test_helpers proc generateNRandomU256(rng: var BrHmacDrbgContext, n: int): seq[UInt256] = var i = 0 diff --git a/fluffy/tests/test_helpers.nim b/fluffy/tests/test_helpers.nim index 82b2d25f9..f22ebdf65 100644 --- a/fluffy/tests/test_helpers.nim +++ b/fluffy/tests/test_helpers.nim @@ -34,3 +34,11 @@ proc initDiscoveryNode*(rng: ref BrHmacDrbgContext, rng = rng) result.open() + +proc genByteSeq*(length: int): seq[byte] = + var i = 0 + var resultSeq = newSeq[byte](length) + while i < length: + resultSeq[i] = byte(i) + inc i + return resultSeq diff --git a/fluffy/tests/test_portal_wire_protocol.nim b/fluffy/tests/test_portal_wire_protocol.nim index 74c49ef49..7a6aeeb74 100644 --- a/fluffy/tests/test_portal_wire_protocol.nim +++ b/fluffy/tests/test_portal_wire_protocol.nim @@ -8,6 +8,7 @@ {.used.} import + std/algorithm, chronos, testutils/unittests, stew/shims/net, eth/keys, eth/p2p/discoveryv5/routing_table, nimcrypto/[hash, sha2], eth/p2p/discoveryv5/protocol as discv5_protocol, @@ -334,3 +335,38 @@ procSuite "Portal Wire Protocol Tests": proto2.stop() await node1.closeWait() await node2.closeWait() + + asyncTest "Adjusting radius after hitting full database": + let + node1 = initDiscoveryNode( + rng, PrivateKey.random(rng[]), localAddress(20303)) + + dbLimit = 100000'u32 + db = ContentDB.new("", dbLimit, inMemory = true) + proto1 = PortalProtocol.new(node1, protocolId, db, testHandler, + validateContent) + + let item = genByteSeq(10000) + var distances: seq[UInt256] = @[] + + for i in 0..8: + proto1.storeContent(u256(i), item) + distances.add(u256(i) xor proto1.localNode.id) + + # With current setting i.e limit 100000bytes and 10000 byte element each + # two furthest elements should be delted i.e index 0 and 1. + # index 2 should be still be in database and it distance should always be + # <= updated radius + distances.sort(order = SortOrder.Descending) + + check: + db.get((distances[0] xor proto1.localNode.id)).isNone() + db.get((distances[1] xor proto1.localNode.id)).isNone() + db.get((distances[2] xor proto1.localNode.id)).isSome() + # our radius have been updated and is lower than max + proto1.dataRadius < UInt256.high + # but higher or equal to furthest non deleted element + proto1.dataRadius >= distances[2] + + proto1.stop() + await node1.closeWait()