diff --git a/fluffy/tools/beacon_lc_bridge/nim.cfg b/fluffy/tools/beacon_lc_bridge/nim.cfg index 701b7f2c3..11364b6ba 100644 --- a/fluffy/tools/beacon_lc_bridge/nim.cfg +++ b/fluffy/tools/beacon_lc_bridge/nim.cfg @@ -2,4 +2,3 @@ -d:"libp2p_pki_schemes=secp256k1" -d:"chronicles_sinks=textlines[dynamic],json[dynamic]" --d:"chronicles_sinks=textlines[dynamic]" diff --git a/fluffy/tools/portal_bridge/nim.cfg b/fluffy/tools/portal_bridge/nim.cfg index 9647eec90..0745d2ac5 100644 --- a/fluffy/tools/portal_bridge/nim.cfg +++ b/fluffy/tools/portal_bridge/nim.cfg @@ -1,2 +1 @@ -d:"chronicles_sinks=textlines[dynamic],json[dynamic]" --d:"chronicles_sinks=textlines[dynamic]" diff --git a/fluffy/tools/portal_bridge/portal_bridge_state.nim b/fluffy/tools/portal_bridge/portal_bridge_state.nim index 75e02b606..23c104d57 100644 --- a/fluffy/tools/portal_bridge/portal_bridge_state.nim +++ b/fluffy/tools/portal_bridge/portal_bridge_state.nim @@ -8,7 +8,7 @@ {.push raises: [].} import - std/sequtils, + std/[sequtils, algorithm], chronicles, chronos, stint, @@ -231,7 +231,7 @@ proc runBackfillBuildBlockOffersLoop( db.putLastPersistedBlockNumber(blockData.blockNumber) proc collectOffer( - offersMap: TableRef[seq[byte], seq[byte]], + offersMap: OrderedTableRef[seq[byte], seq[byte]], offerWithKey: AccountTrieOfferWithKey | ContractTrieOfferWithKey | ContractCodeOfferWithKey, ) {.inline.} = @@ -239,7 +239,7 @@ proc collectOffer( offersMap[keyBytes] = offerWithKey.offer.encode() proc recursiveCollectOffer( - offersMap: TableRef[seq[byte], seq[byte]], + offersMap: OrderedTableRef[seq[byte], seq[byte]], offerWithKey: AccountTrieOfferWithKey | ContractTrieOfferWithKey, ) = offersMap.collectOffer(offerWithKey) @@ -254,6 +254,7 @@ proc recursiveCollectOffer( proc runBackfillGossipBlockOffersLoop( blockOffersQueue: AsyncQueue[BlockOffersRef], portalClient: RpcClient, + portalNodeId: NodeId, verifyGossip: bool, workerId: int, ) {.async: (raises: [CancelledError]).} = @@ -264,7 +265,7 @@ proc runBackfillGossipBlockOffersLoop( while true: # A table of offer key, value pairs is used to filter out duplicates so # that we don't gossip the same offer multiple times. - let offersMap = newTable[seq[byte], seq[byte]]() + let offersMap = newOrderedTable[seq[byte], seq[byte]]() for offerWithKey in blockOffers.accountTrieOffers: offersMap.recursiveCollectOffer(offerWithKey) @@ -273,6 +274,20 @@ proc runBackfillGossipBlockOffersLoop( for offerWithKey in blockOffers.contractCodeOffers: offersMap.collectOffer(offerWithKey) + # We need to use a closure here because nodeId is required to calculate the + # distance of each content id from the node + proc offersMapCmp(x, y: (seq[byte], seq[byte])): int = + let + xId = ContentKeyByteList.init(x[0]).toContentId() + yId = ContentKeyByteList.init(y[0]).toContentId() + xDistance = portalNodeId xor xId + yDistance = portalNodeId xor yId + if xDistance >= yDistance: 1 else: -1 + + # Sort the offers based on the distance from the node so that we will gossip + # content that is closest to the node first + offersMap.sort(offersMapCmp) + var retryGossip = false for k, v in offersMap: try: @@ -337,14 +352,22 @@ proc runBackfillMetricsLoop( proc runState*(config: PortalBridgeConf) = let portalClient = newRpcClientConnect(config.portalRpcUrl) - web3Client = newRpcClientConnect(config.web3UrlState) - db = DatabaseRef.init(config.stateDir.string).get() - defer: - db.close() + portalNodeId = + try: + (waitFor portalClient.portal_stateNodeInfo()).nodeId + except CatchableError as e: + fatal "Failed to connect to portal client", error = $e.msg + quit QuitFailure + info "Connected to portal client with nodeId", nodeId = portalNodeId + let web3Client = newRpcClientConnect(config.web3UrlState) if web3Client of RpcHttpClient: warn "Using a WebSocket connection to the JSON-RPC API is recommended to improve performance" + let db = DatabaseRef.init(config.stateDir.string).get() + defer: + db.close() + let maybeLastPersistedBlock = db.getLastPersistedBlockNumber() if maybeLastPersistedBlock.isSome(): info "Last persisted block found in the database: ", @@ -376,7 +399,7 @@ proc runState*(config: PortalBridgeConf) = for workerId in 1 .. config.gossipWorkersCount.int: asyncSpawn runBackfillGossipBlockOffersLoop( - blockOffersQueue, portalClient, config.verifyGossip, workerId + blockOffersQueue, portalClient, portalNodeId, config.verifyGossip, workerId ) asyncSpawn runBackfillMetricsLoop(blockDataQueue, blockOffersQueue)