mirror of
https://github.com/status-im/nimbus-eth1.git
synced 2025-02-28 11:50:45 +00:00
Fluffy: Portal subnetwork peer ban list (#3007)
This commit is contained in:
parent
a1a9c6b027
commit
296b319f9f
@ -372,6 +372,15 @@ type
|
||||
name: "disable-state-root-validation"
|
||||
.}: bool
|
||||
|
||||
disableBanNodes* {.
|
||||
hidden,
|
||||
desc:
|
||||
"Disable node banning functionality for both discv5 and portal sub-protocols",
|
||||
defaultValue: defaultDisableBanNodes,
|
||||
defaultValueDesc: $defaultDisableBanNodes,
|
||||
name: "debug-disable-ban-nodes"
|
||||
.}: bool
|
||||
|
||||
case cmd* {.command, defaultValue: noCommand.}: PortalCmd
|
||||
of noCommand:
|
||||
discard
|
||||
|
@ -148,6 +148,7 @@ proc run(
|
||||
enrAutoUpdate = config.enrAutoUpdate,
|
||||
config = discoveryConfig,
|
||||
rng = rng,
|
||||
banNodes = not config.disableBanNodes,
|
||||
)
|
||||
|
||||
d.open()
|
||||
@ -184,6 +185,7 @@ proc run(
|
||||
config.tableIpLimit, config.bucketIpLimit, config.bitsPerHop, config.alpha,
|
||||
config.radiusConfig, config.disablePoke, config.maxGossipNodes,
|
||||
config.contentCacheSize, config.disableContentCache, config.maxConcurrentOffers,
|
||||
config.disableBanNodes,
|
||||
)
|
||||
|
||||
portalNodeConfig = PortalNodeConfig(
|
||||
|
@ -147,6 +147,9 @@ proc getVerifiedBlockHeader*(
|
||||
return Opt.none(Header)
|
||||
|
||||
header = validateCanonicalHeaderBytes(headerContent.content, id, n.accumulator).valueOr:
|
||||
n.portalProtocol.banNode(
|
||||
headerContent.receivedFrom.id, NodeBanDurationContentLookupFailedValidation
|
||||
)
|
||||
warn "Validation of block header failed",
|
||||
error = error, node = headerContent.receivedFrom.record.toURI()
|
||||
continue
|
||||
@ -192,6 +195,9 @@ proc getBlockBody*(
|
||||
return Opt.none(BlockBody)
|
||||
|
||||
body = validateBlockBodyBytes(bodyContent.content, header).valueOr:
|
||||
n.portalProtocol.banNode(
|
||||
bodyContent.receivedFrom.id, NodeBanDurationContentLookupFailedValidation
|
||||
)
|
||||
warn "Validation of block body failed",
|
||||
error, node = bodyContent.receivedFrom.record.toURI()
|
||||
continue
|
||||
@ -266,7 +272,11 @@ proc getReceipts*(
|
||||
receiptsContent = (await n.portalProtocol.contentLookup(contentKey, contentId)).valueOr:
|
||||
debug "Failed fetching receipts from the network"
|
||||
return Opt.none(seq[Receipt])
|
||||
|
||||
receipts = validateReceiptsBytes(receiptsContent.content, header.receiptsRoot).valueOr:
|
||||
n.portalProtocol.banNode(
|
||||
receiptsContent.receivedFrom.id, NodeBanDurationContentLookupFailedValidation
|
||||
)
|
||||
warn "Validation of receipts failed",
|
||||
error, node = receiptsContent.receivedFrom.record.toURI()
|
||||
continue
|
||||
@ -384,6 +394,9 @@ proc validateContent(
|
||||
|
||||
debug "Received offered content validated successfully", srcNodeId, contentKey
|
||||
else:
|
||||
if srcNodeId.isSome():
|
||||
n.portalProtocol.banNode(srcNodeId.get(), NodeBanDurationOfferFailedValidation)
|
||||
|
||||
debug "Received offered content failed validation",
|
||||
srcNodeId, contentKey, error = res.error
|
||||
return false
|
||||
|
@ -112,6 +112,9 @@ proc getContent(
|
||||
continue
|
||||
|
||||
validateRetrieval(key, contentValue).isOkOr:
|
||||
n.portalProtocol.banNode(
|
||||
lookupRes.receivedFrom.id, NodeBanDurationContentLookupFailedValidation
|
||||
)
|
||||
error "Validation of retrieved state content failed"
|
||||
continue
|
||||
|
||||
@ -246,6 +249,10 @@ proc processContentLoop(n: StateNetwork) {.async: (raises: []).} =
|
||||
debug "Received offered content validated successfully",
|
||||
srcNodeId, contentKeyBytes
|
||||
else:
|
||||
if srcNodeId.isSome():
|
||||
n.portalProtocol.banNode(
|
||||
srcNodeId.get(), NodeBanDurationOfferFailedValidation
|
||||
)
|
||||
state_network_offers_failed.inc(labelValues = [$n.portalProtocol.protocolId])
|
||||
error "Received offered content failed validation",
|
||||
srcNodeId, contentKeyBytes, error = offerRes.error()
|
||||
|
@ -126,6 +126,11 @@ const
|
||||
## value in milliseconds
|
||||
initialLookups = 1 ## Amount of lookups done when populating the routing table
|
||||
|
||||
## Ban durations for banned nodes in the routing table
|
||||
NodeBanDurationInvalidResponse = 30.minutes
|
||||
NodeBanDurationContentLookupFailedValidation* = 60.minutes
|
||||
NodeBanDurationOfferFailedValidation* = 60.minutes
|
||||
|
||||
type
|
||||
ToContentIdHandler* =
|
||||
proc(contentKey: ContentKeyByteList): results.Opt[ContentId] {.raises: [], gcsafe.}
|
||||
@ -285,6 +290,13 @@ func getProtocolId*(
|
||||
of PortalSubnetwork.transactionGossip:
|
||||
[portalPrefix, 0x4F]
|
||||
|
||||
proc banNode*(p: PortalProtocol, nodeId: NodeId, period: chronos.Duration) =
|
||||
if not p.config.disableBanNodes:
|
||||
p.routingTable.banNode(nodeId, period)
|
||||
|
||||
proc isBanned*(p: PortalProtocol, nodeId: NodeId): bool =
|
||||
p.config.disableBanNodes == false and p.routingTable.isBanned(nodeId)
|
||||
|
||||
func `$`(id: PortalProtocolId): string =
|
||||
id.toHex()
|
||||
|
||||
@ -300,8 +312,10 @@ func getNode*(p: PortalProtocol, id: NodeId): Opt[Node] =
|
||||
func localNode*(p: PortalProtocol): Node =
|
||||
p.baseProtocol.localNode
|
||||
|
||||
func neighbours*(p: PortalProtocol, id: NodeId, seenOnly = false): seq[Node] =
|
||||
p.routingTable.neighbours(id = id, seenOnly = seenOnly)
|
||||
template neighbours*(
|
||||
p: PortalProtocol, id: NodeId, k: int = BUCKET_SIZE, seenOnly = false
|
||||
): seq[Node] =
|
||||
p.routingTable.neighbours(id, k, seenOnly)
|
||||
|
||||
func distance(p: PortalProtocol, a, b: NodeId): UInt256 =
|
||||
p.routingTable.distance(a, b)
|
||||
@ -480,7 +494,7 @@ proc handleFindContent(
|
||||
# Node does not have the content, or content is not even in radius,
|
||||
# send closest neighbours to the requested content id.
|
||||
let
|
||||
closestNodes = p.routingTable.neighbours(NodeId(contentId), seenOnly = true)
|
||||
closestNodes = p.neighbours(NodeId(contentId), seenOnly = true)
|
||||
enrs = truncateEnrs(closestNodes, maxPayloadSize, enrOverhead)
|
||||
portal_content_enrs_packed.observe(enrs.len().int64, labelValues = [$p.protocolId])
|
||||
|
||||
@ -557,6 +571,12 @@ proc messageHandler(
|
||||
|
||||
let p = PortalProtocol(protocol)
|
||||
|
||||
if p.isBanned(srcId):
|
||||
# The sender of the message is in the temporary node ban list
|
||||
# so we don't process the message
|
||||
debug "Dropping message from banned node", srcId, srcUdpAddress
|
||||
return @[] # Reply with an empty response message
|
||||
|
||||
let decoded = decodeMessage(request)
|
||||
if decoded.isOk():
|
||||
let message = decoded.get()
|
||||
@ -661,7 +681,7 @@ proc reqResponse[Request: SomeMessage, Response: SomeMessage](
|
||||
labelValues = [$p.protocolId, $messageKind(Request)]
|
||||
)
|
||||
|
||||
let talkresp =
|
||||
let talkResp =
|
||||
await talkReq(p.baseProtocol, dst, @(p.protocolId), encodeMessage(request))
|
||||
|
||||
# Note: Failure of `decodeMessage` might also simply mean that the peer is
|
||||
@ -669,7 +689,7 @@ proc reqResponse[Request: SomeMessage, Response: SomeMessage](
|
||||
# an empty response needs to be send in that case.
|
||||
# See: https://github.com/ethereum/devp2p/blob/master/discv5/discv5-wire.md#talkreq-request-0x05
|
||||
|
||||
let messageResponse = talkresp
|
||||
let messageResponse = talkResp
|
||||
.mapErr(
|
||||
proc(x: cstring): string =
|
||||
$x
|
||||
@ -680,7 +700,11 @@ proc reqResponse[Request: SomeMessage, Response: SomeMessage](
|
||||
)
|
||||
.flatMap(
|
||||
proc(m: Message): Result[Response, string] =
|
||||
getInnerMessage[Response](m)
|
||||
let r = getInnerMessage[Response](m)
|
||||
# Ban nodes that that send wrong type of response message
|
||||
if r.isErr():
|
||||
p.banNode(dst.id, NodeBanDurationInvalidResponse)
|
||||
return r
|
||||
)
|
||||
|
||||
if messageResponse.isOk():
|
||||
@ -758,6 +782,9 @@ proc ping*(
|
||||
): Future[PortalResult[(uint64, CapabilitiesPayload)]] {.
|
||||
async: (raises: [CancelledError])
|
||||
.} =
|
||||
if p.isBanned(dst.id):
|
||||
return err("destination node is banned")
|
||||
|
||||
let pong = ?(await p.pingImpl(dst))
|
||||
|
||||
# Update last time we pinged this node
|
||||
@ -778,11 +805,18 @@ proc ping*(
|
||||
proc findNodes*(
|
||||
p: PortalProtocol, dst: Node, distances: seq[uint16]
|
||||
): Future[PortalResult[seq[Node]]] {.async: (raises: [CancelledError]).} =
|
||||
if p.isBanned(dst.id):
|
||||
return err("destination node is banned")
|
||||
|
||||
let response = ?(await p.findNodesImpl(dst, List[uint16, 256](distances)))
|
||||
|
||||
let records = ?recordsFromBytes(response.enrs)
|
||||
# TODO: distance function is wrong here for state, fix + tests
|
||||
ok(verifyNodesRecords(records, dst, enrsResultLimit, distances))
|
||||
ok(
|
||||
verifyNodesRecords(records, dst, enrsResultLimit, distances).filterIt(
|
||||
not p.isBanned(it.id)
|
||||
)
|
||||
)
|
||||
|
||||
proc findContent*(
|
||||
p: PortalProtocol, dst: Node, contentKey: ContentKeyByteList
|
||||
@ -791,6 +825,9 @@ proc findContent*(
|
||||
node = dst
|
||||
contentKey
|
||||
|
||||
if p.isBanned(dst.id):
|
||||
return err("destination node is banned")
|
||||
|
||||
let response = ?(await p.findContentImpl(dst, contentKey))
|
||||
|
||||
case response.contentMessageType
|
||||
@ -855,7 +892,11 @@ proc findContent*(
|
||||
let records = ?recordsFromBytes(response.enrs)
|
||||
let verifiedNodes = verifyNodesRecords(records, dst, enrsResultLimit)
|
||||
|
||||
ok(FoundContent(src: dst, kind: Nodes, nodes: verifiedNodes))
|
||||
ok(
|
||||
FoundContent(
|
||||
src: dst, kind: Nodes, nodes: verifiedNodes.filterIt(not p.isBanned(it.id))
|
||||
)
|
||||
)
|
||||
|
||||
proc getContentKeys(o: OfferRequest): ContentKeysList =
|
||||
case o.kind
|
||||
@ -915,6 +956,9 @@ proc offer(
|
||||
contentKeys.len().int64, labelValues = [$p.protocolId]
|
||||
)
|
||||
|
||||
if p.isBanned(o.dst.id):
|
||||
return err("destination node is banned")
|
||||
|
||||
let response = ?(await p.offerImpl(o.dst, contentKeys))
|
||||
|
||||
let contentKeysLen =
|
||||
@ -1061,7 +1105,7 @@ proc lookup*(
|
||||
## target. Maximum value for n is `BUCKET_SIZE`.
|
||||
# `closestNodes` holds the k closest nodes to target found, sorted by distance
|
||||
# Unvalidated nodes are used for requests as a form of validation.
|
||||
var closestNodes = p.routingTable.neighbours(target, BUCKET_SIZE, seenOnly = false)
|
||||
var closestNodes = p.neighbours(target, BUCKET_SIZE, seenOnly = false)
|
||||
|
||||
var asked, seen = HashSet[NodeId]()
|
||||
asked.incl(p.localNode.id) # No need to ask our own node
|
||||
@ -1163,7 +1207,7 @@ proc contentLookup*(
|
||||
## target.
|
||||
# `closestNodes` holds the k closest nodes to target found, sorted by distance
|
||||
# Unvalidated nodes are used for requests as a form of validation.
|
||||
var closestNodes = p.routingTable.neighbours(targetId, BUCKET_SIZE, seenOnly = false)
|
||||
var closestNodes = p.neighbours(targetId, BUCKET_SIZE, seenOnly = false)
|
||||
|
||||
# Shuffling the order of the nodes in order to not always hit the same node
|
||||
# first for the same request.
|
||||
@ -1290,7 +1334,7 @@ proc traceContentLookup*(
|
||||
# Need to use a system clock and not the mono clock for this.
|
||||
let startedAtMs = int64(times.epochTime() * 1000)
|
||||
|
||||
var closestNodes = p.routingTable.neighbours(targetId, BUCKET_SIZE, seenOnly = false)
|
||||
var closestNodes = p.neighbours(targetId, BUCKET_SIZE, seenOnly = false)
|
||||
# Shuffling the order of the nodes in order to not always hit the same node
|
||||
# first for the same request.
|
||||
p.baseProtocol.rng[].shuffle(closestNodes)
|
||||
@ -1488,7 +1532,7 @@ proc query*(
|
||||
## This will take k nodes from the routing table closest to target and
|
||||
## query them for nodes closest to target. If there are less than k nodes in
|
||||
## the routing table, nodes returned by the first queries will be used.
|
||||
var queryBuffer = p.routingTable.neighbours(target, k, seenOnly = false)
|
||||
var queryBuffer = p.neighbours(target, k, seenOnly = false)
|
||||
|
||||
var asked, seen = HashSet[NodeId]()
|
||||
asked.incl(p.localNode.id) # No need to ask our own node
|
||||
@ -1579,7 +1623,7 @@ proc neighborhoodGossip*(
|
||||
# It might still cause issues in data getting propagated in a wider id range.
|
||||
|
||||
var closestLocalNodes =
|
||||
p.routingTable.neighbours(NodeId(contentId), k = 16, seenOnly = true)
|
||||
p.routingTable.neighbours(NodeId(contentId), BUCKET_SIZE, seenOnly = true)
|
||||
|
||||
# Shuffling the order of the nodes in order to not always hit the same node
|
||||
# first for the same request.
|
||||
@ -1787,6 +1831,9 @@ proc refreshLoop(p: PortalProtocol) {.async: (raises: []).} =
|
||||
trace "Discovered nodes in random target query", nodes = randomQuery.len
|
||||
debug "Total nodes in routing table", total = p.routingTable.len()
|
||||
|
||||
# Remove the expired bans from routing table to limit memory usage
|
||||
p.routingTable.cleanupExpiredBans()
|
||||
|
||||
await sleepAsync(refreshInterval)
|
||||
except CancelledError:
|
||||
trace "refreshLoop canceled"
|
||||
@ -1839,6 +1886,12 @@ proc resolve*(
|
||||
if id == p.localNode.id:
|
||||
return Opt.some(p.localNode)
|
||||
|
||||
# No point in trying to resolve a banned node because it won't exist in the
|
||||
# routing table and it will be filtered out of any respones in the lookup call
|
||||
if p.isBanned(id):
|
||||
debug "Not resolving banned node", nodeId = id
|
||||
return Opt.none(Node)
|
||||
|
||||
let node = p.getNode(id)
|
||||
if node.isSome():
|
||||
let nodesMessage = await p.findNodes(node.get(), @[0'u16])
|
||||
|
@ -1,5 +1,5 @@
|
||||
# Fluffy
|
||||
# Copyright (c) 2021-2024 Status Research & Development GmbH
|
||||
# Copyright (c) 2021-2025 Status Research & Development GmbH
|
||||
# Licensed and distributed under either of
|
||||
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
||||
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
||||
@ -45,6 +45,7 @@ type
|
||||
contentCacheSize*: int
|
||||
disableContentCache*: bool
|
||||
maxConcurrentOffers*: int
|
||||
disableBanNodes*: bool
|
||||
|
||||
const
|
||||
defaultRadiusConfig* = RadiusConfig(kind: Dynamic)
|
||||
@ -56,6 +57,7 @@ const
|
||||
defaultMaxConcurrentOffers* = 50
|
||||
defaultAlpha* = 3
|
||||
revalidationTimeout* = chronos.seconds(30)
|
||||
defaultDisableBanNodes* = true
|
||||
|
||||
defaultPortalProtocolConfig* = PortalProtocolConfig(
|
||||
tableIpLimits: DefaultTableIpLimits,
|
||||
@ -67,6 +69,7 @@ const
|
||||
contentCacheSize: defaultContentCacheSize,
|
||||
disableContentCache: defaultDisableContentCache,
|
||||
maxConcurrentOffers: defaultMaxConcurrentOffers,
|
||||
disableBanNodes: defaultDisableBanNodes,
|
||||
)
|
||||
|
||||
proc init*(
|
||||
@ -81,6 +84,7 @@ proc init*(
|
||||
contentCacheSize: int,
|
||||
disableContentCache: bool,
|
||||
maxConcurrentOffers: int,
|
||||
disableBanNodes: bool,
|
||||
): T =
|
||||
PortalProtocolConfig(
|
||||
tableIpLimits:
|
||||
@ -93,6 +97,7 @@ proc init*(
|
||||
contentCacheSize: contentCacheSize,
|
||||
disableContentCache: disableContentCache,
|
||||
maxConcurrentOffers: maxConcurrentOffers,
|
||||
disableBanNodes: disableBanNodes,
|
||||
)
|
||||
|
||||
func fromLogRadius*(T: type UInt256, logRadius: uint16): T =
|
||||
|
@ -46,6 +46,7 @@ proc installPortalStateApiHandlers*(rpcServer: RpcServer, p: PortalProtocol) =
|
||||
of Content:
|
||||
let valueBytes = foundContentResult.content
|
||||
validateRetrieval(key, valueBytes).isOkOr:
|
||||
p.banNode(node.id, NodeBanDurationContentLookupFailedValidation)
|
||||
raise invalidValueErr()
|
||||
|
||||
let res = ContentInfo(
|
||||
@ -97,6 +98,10 @@ proc installPortalStateApiHandlers*(rpcServer: RpcServer, p: PortalProtocol) =
|
||||
valueBytes = contentLookupResult.content
|
||||
|
||||
validateRetrieval(key, valueBytes).isOkOr:
|
||||
p.banNode(
|
||||
contentLookupResult.receivedFrom.id,
|
||||
NodeBanDurationContentLookupFailedValidation,
|
||||
)
|
||||
raise invalidValueErr()
|
||||
p.storeContent(keyBytes, contentId, valueBytes, cacheContent = true)
|
||||
|
||||
@ -132,6 +137,10 @@ proc installPortalStateApiHandlers*(rpcServer: RpcServer, p: PortalProtocol) =
|
||||
raise contentNotFoundErrWithTrace(data)
|
||||
|
||||
validateRetrieval(key, valueBytes).isOkOr:
|
||||
if res.trace.receivedFrom.isSome():
|
||||
p.banNode(
|
||||
res.trace.receivedFrom.get(), NodeBanDurationContentLookupFailedValidation
|
||||
)
|
||||
raise invalidValueErr()
|
||||
p.storeContent(keyBytes, contentId, valueBytes, cacheContent = true)
|
||||
|
||||
|
@ -1,5 +1,5 @@
|
||||
# Fluffy
|
||||
# Copyright (c) 2022-2024 Status Research & Development GmbH
|
||||
# Copyright (c) 2022-2025 Status Research & Development GmbH
|
||||
# Licensed and distributed under either of
|
||||
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
||||
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
||||
@ -161,7 +161,7 @@ procSuite "History Content Network":
|
||||
await historyNode1.stop()
|
||||
await historyNode2.stop()
|
||||
|
||||
asyncTest "Offer - Maximum Content Keys in 1 Message":
|
||||
asyncTest "Offer - Maximum plus one Content Keys in 1 Message":
|
||||
# Need to provide enough headers to have the accumulator "finished".
|
||||
const lastBlockNumber = int(mergeBlockNumber - 1)
|
||||
|
||||
@ -204,36 +204,75 @@ procSuite "History Content Network":
|
||||
|
||||
# Offering 1 content item too much which should result in a discv5 packet
|
||||
# that is too large and thus not get any response.
|
||||
block:
|
||||
let offerResult =
|
||||
await historyNode1.portalProtocol.offer(historyNode2.localNode(), contentKVs)
|
||||
let offerResult =
|
||||
await historyNode1.portalProtocol.offer(historyNode2.localNode(), contentKVs)
|
||||
|
||||
# Fail due timeout, as remote side must drop the too large discv5 packet
|
||||
check offerResult.isErr()
|
||||
# Fail due timeout, as remote side must drop the too large discv5 packet
|
||||
check offerResult.isErr()
|
||||
|
||||
for contentKV in contentKVs:
|
||||
let id = toContentId(contentKV.contentKey)
|
||||
check historyNode2.containsId(id) == false
|
||||
for contentKV in contentKVs:
|
||||
let id = toContentId(contentKV.contentKey)
|
||||
check historyNode2.containsId(id) == false
|
||||
|
||||
await historyNode1.stop()
|
||||
await historyNode2.stop()
|
||||
|
||||
asyncTest "Offer - Maximum Content Keys in 1 Message":
|
||||
# Need to provide enough headers to have the accumulator "finished".
|
||||
const lastBlockNumber = int(mergeBlockNumber - 1)
|
||||
|
||||
let headers = createEmptyHeaders(0, lastBlockNumber)
|
||||
let accumulatorRes = buildAccumulatorData(headers)
|
||||
check accumulatorRes.isOk()
|
||||
|
||||
let
|
||||
(masterAccumulator, epochRecords) = accumulatorRes.get()
|
||||
historyNode1 = newHistoryNode(rng, 20302, masterAccumulator)
|
||||
historyNode2 = newHistoryNode(rng, 20303, masterAccumulator)
|
||||
|
||||
check:
|
||||
historyNode1.portalProtocol().addNode(historyNode2.localNode()) == Added
|
||||
historyNode2.portalProtocol().addNode(historyNode1.localNode()) == Added
|
||||
|
||||
(await historyNode1.portalProtocol().ping(historyNode2.localNode())).isOk()
|
||||
(await historyNode2.portalProtocol().ping(historyNode1.localNode())).isOk()
|
||||
|
||||
# Need to run start to get the processContentLoop running
|
||||
historyNode1.start()
|
||||
historyNode2.start()
|
||||
|
||||
let maxOfferedHistoryContent =
|
||||
getMaxOfferedContentKeys(uint32(len(PortalProtocolId)), maxContentKeySize)
|
||||
|
||||
let headersWithProof =
|
||||
buildHeadersWithProof(headers[0 ..< maxOfferedHistoryContent], epochRecords)
|
||||
check headersWithProof.isOk()
|
||||
|
||||
# This is equal to maxOfferedHistoryContent
|
||||
let contentKVs = headersToContentKV(headersWithProof.get())
|
||||
|
||||
# node 1 will offer the content so it needs to have it in its database
|
||||
for contentKV in contentKVs:
|
||||
let id = toContentId(contentKV.contentKey)
|
||||
historyNode1.portalProtocol.storeContent(
|
||||
contentKV.contentKey, id, contentKV.content
|
||||
)
|
||||
|
||||
# One content key less should make offer be succesful and should result
|
||||
# in the content being transferred and stored on the other node.
|
||||
block:
|
||||
let offerResult = await historyNode1.portalProtocol.offer(
|
||||
historyNode2.localNode(), contentKVs[0 ..< maxOfferedHistoryContent]
|
||||
)
|
||||
let offerResult = await historyNode1.portalProtocol.offer(
|
||||
historyNode2.localNode(), contentKVs[0 ..< maxOfferedHistoryContent]
|
||||
)
|
||||
|
||||
check offerResult.isOk()
|
||||
check offerResult.isOk()
|
||||
|
||||
# Make sure the content got processed out of content queue
|
||||
while not historyNode2.historyNetwork.contentQueue.empty():
|
||||
await sleepAsync(1.milliseconds)
|
||||
# Make sure the content got processed out of content queue
|
||||
while not historyNode2.historyNetwork.contentQueue.empty():
|
||||
await sleepAsync(1.milliseconds)
|
||||
|
||||
for i, contentKV in contentKVs:
|
||||
let id = toContentId(contentKV.contentKey)
|
||||
if i < len(contentKVs) - 1:
|
||||
await historyNode2.checkContainsIdWithRetry(id)
|
||||
else:
|
||||
check historyNode2.containsId(id) == false
|
||||
for i, contentKV in contentKVs:
|
||||
let id = toContentId(contentKV.contentKey)
|
||||
await historyNode2.checkContainsIdWithRetry(id)
|
||||
|
||||
await historyNode1.stop()
|
||||
await historyNode2.stop()
|
||||
|
@ -1,5 +1,5 @@
|
||||
# Nimbus - Portal Network
|
||||
# Copyright (c) 2021-2024 Status Research & Development GmbH
|
||||
# Copyright (c) 2021-2025 Status Research & Development GmbH
|
||||
# Licensed and distributed under either of
|
||||
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
||||
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
||||
@ -38,6 +38,7 @@ proc initDiscoveryNode*(
|
||||
previousRecord = previousRecord,
|
||||
config = config,
|
||||
rng = rng,
|
||||
banNodes = true,
|
||||
)
|
||||
|
||||
result.open()
|
||||
|
@ -45,17 +45,21 @@ proc initPortalProtocol(
|
||||
q = newAsyncQueue[(Opt[NodeId], ContentKeysList, seq[seq[byte]])](50)
|
||||
stream = manager.registerNewStream(q, connectionTimeout = 2.seconds)
|
||||
|
||||
proto = PortalProtocol.new(
|
||||
d,
|
||||
protocolId,
|
||||
toContentId,
|
||||
createGetHandler(db),
|
||||
createStoreHandler(db, defaultRadiusConfig),
|
||||
createContainsHandler(db),
|
||||
createRadiusHandler(db),
|
||||
stream,
|
||||
bootstrapRecords = bootstrapRecords,
|
||||
)
|
||||
var config = defaultPortalProtocolConfig
|
||||
config.disableBanNodes = false
|
||||
|
||||
let proto = PortalProtocol.new(
|
||||
d,
|
||||
protocolId,
|
||||
toContentId,
|
||||
createGetHandler(db),
|
||||
createStoreHandler(db, defaultRadiusConfig),
|
||||
createContainsHandler(db),
|
||||
createRadiusHandler(db),
|
||||
stream,
|
||||
bootstrapRecords = bootstrapRecords,
|
||||
config = config,
|
||||
)
|
||||
|
||||
return proto
|
||||
|
||||
@ -502,3 +506,129 @@ procSuite "Portal Wire Protocol Tests":
|
||||
|
||||
await proto1.stopPortalProtocol()
|
||||
await proto2.stopPortalProtocol()
|
||||
|
||||
asyncTest "Banned nodes are removed and cannot be added":
|
||||
let (proto1, proto2) = defaultTestSetup(rng)
|
||||
|
||||
# add the node
|
||||
check:
|
||||
proto1.addNode(proto2.localNode) == Added
|
||||
proto1.getNode(proto2.localNode.id).isSome()
|
||||
|
||||
# banning the node should remove it from the routing table
|
||||
proto1.banNode(proto2.localNode.id, 1.minutes)
|
||||
check proto1.getNode(proto2.localNode.id).isNone()
|
||||
|
||||
# cannot add a banned node
|
||||
check:
|
||||
proto1.addNode(proto2.localNode) == Banned
|
||||
proto1.getNode(proto2.localNode.id).isNone()
|
||||
|
||||
await proto1.stopPortalProtocol()
|
||||
await proto2.stopPortalProtocol()
|
||||
|
||||
asyncTest "Banned nodes are filtered out in FindNodes/Nodes":
|
||||
let
|
||||
proto1 = initPortalProtocol(rng, PrivateKey.random(rng[]), localAddress(20302))
|
||||
proto2 = initPortalProtocol(rng, PrivateKey.random(rng[]), localAddress(20303))
|
||||
proto3 = initPortalProtocol(rng, PrivateKey.random(rng[]), localAddress(20304))
|
||||
distance = logDistance(proto2.localNode.id, proto3.localNode.id)
|
||||
|
||||
check proto2.addNode(proto3.localNode) == Added
|
||||
check (await proto2.ping(proto3.localNode)).isOk()
|
||||
check (await proto3.ping(proto2.localNode)).isOk()
|
||||
|
||||
# before banning the node it is returned in the response
|
||||
block:
|
||||
let res = await proto1.findNodes(proto2.localNode, @[distance])
|
||||
check:
|
||||
res.isOk()
|
||||
res.get().len() == 1
|
||||
|
||||
proto1.banNode(proto3.localNode.id, 1.minutes)
|
||||
|
||||
# after banning the node, it is not returned in the response
|
||||
block:
|
||||
let res = await proto1.findNodes(proto2.localNode, @[distance])
|
||||
check:
|
||||
res.isOk()
|
||||
res.get().len() == 0
|
||||
|
||||
await proto1.stopPortalProtocol()
|
||||
await proto2.stopPortalProtocol()
|
||||
await proto3.stopPortalProtocol()
|
||||
|
||||
asyncTest "Banned nodes are filtered out in FindContent/Content - send enrs":
|
||||
let
|
||||
proto1 = initPortalProtocol(rng, PrivateKey.random(rng[]), localAddress(20302))
|
||||
proto2 = initPortalProtocol(rng, PrivateKey.random(rng[]), localAddress(20303))
|
||||
proto3 = initPortalProtocol(rng, PrivateKey.random(rng[]), localAddress(20304))
|
||||
|
||||
check proto2.addNode(proto3.localNode) == Added
|
||||
check (await proto2.ping(proto3.localNode)).isOk()
|
||||
check (await proto3.ping(proto2.localNode)).isOk()
|
||||
|
||||
let contentKey = ContentKeyByteList.init(@[1'u8])
|
||||
|
||||
block:
|
||||
let res = await proto1.findContent(proto2.localNode, contentKey)
|
||||
check:
|
||||
res.isOk()
|
||||
res.get().nodes.len() == 1
|
||||
|
||||
proto1.banNode(proto3.localNode.id, 1.minutes)
|
||||
|
||||
block:
|
||||
let res = await proto1.findContent(proto2.localNode, contentKey)
|
||||
check:
|
||||
res.isOk()
|
||||
res.get().nodes.len() == 0
|
||||
|
||||
await proto1.stopPortalProtocol()
|
||||
await proto2.stopPortalProtocol()
|
||||
await proto3.stopPortalProtocol()
|
||||
|
||||
asyncTest "Drop messages from banned nodes":
|
||||
let
|
||||
proto1 = initPortalProtocol(rng, PrivateKey.random(rng[]), localAddress(20302))
|
||||
proto2 = initPortalProtocol(rng, PrivateKey.random(rng[]), localAddress(20303))
|
||||
proto3 = initPortalProtocol(rng, PrivateKey.random(rng[]), localAddress(20304))
|
||||
proto4 = initPortalProtocol(rng, PrivateKey.random(rng[]), localAddress(20305))
|
||||
contentKey = ContentKeyByteList.init(@[1'u8])
|
||||
|
||||
proto2.banNode(proto1.localNode.id, 1.minutes)
|
||||
proto3.banNode(proto1.localNode.id, 1.minutes)
|
||||
proto4.banNode(proto1.localNode.id, 1.minutes)
|
||||
|
||||
check:
|
||||
(await proto1.ping(proto2.localNode)).error() ==
|
||||
"No message data, peer might not support this talk protocol"
|
||||
(await proto1.findNodes(proto3.localNode, @[0.uint16])).error() ==
|
||||
"No message data, peer might not support this talk protocol"
|
||||
(await proto1.findContent(proto4.localNode, contentKey)).error() ==
|
||||
"No message data, peer might not support this talk protocol"
|
||||
|
||||
await proto1.stopPortalProtocol()
|
||||
await proto2.stopPortalProtocol()
|
||||
|
||||
asyncTest "Cannot send message to banned nodes":
|
||||
let
|
||||
(proto1, proto2) = defaultTestSetup(rng)
|
||||
contentKey = ContentKeyByteList.init(@[1'u8])
|
||||
|
||||
check:
|
||||
(await proto1.ping(proto2.localNode)).isOk()
|
||||
(await proto1.findNodes(proto2.localNode, @[0.uint16])).isOk()
|
||||
(await proto1.findContent(proto2.localNode, contentKey)).isOk()
|
||||
|
||||
proto1.banNode(proto2.localNode.id, 1.minutes)
|
||||
|
||||
check:
|
||||
(await proto1.ping(proto2.localNode)).error() == "destination node is banned"
|
||||
(await proto1.findNodes(proto2.localNode, @[0.uint16])).error() ==
|
||||
"destination node is banned"
|
||||
(await proto1.findContent(proto2.localNode, contentKey)).error() ==
|
||||
"destination node is banned"
|
||||
|
||||
await proto1.stopPortalProtocol()
|
||||
await proto2.stopPortalProtocol()
|
||||
|
@ -1,5 +1,5 @@
|
||||
# Fluffy
|
||||
# Copyright (c) 2021-2024 Status Research & Development GmbH
|
||||
# Copyright (c) 2021-2025 Status Research & Development GmbH
|
||||
# Licensed and distributed under either of
|
||||
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
||||
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
||||
@ -238,6 +238,7 @@ proc run(config: PortalCliConf) =
|
||||
bindPort = udpPort,
|
||||
enrAutoUpdate = config.enrAutoUpdate,
|
||||
rng = rng,
|
||||
banNodes = true,
|
||||
)
|
||||
|
||||
d.open()
|
||||
|
@ -1,4 +1,4 @@
|
||||
# Copyright (c) 2022-2024 Status Research & Development GmbH
|
||||
# Copyright (c) 2022-2025 Status Research & Development GmbH
|
||||
# Licensed and distributed under either of
|
||||
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
||||
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
||||
@ -151,6 +151,7 @@ when isMainModule:
|
||||
bindPort = conf.udpPort,
|
||||
enrAutoUpdate = true,
|
||||
rng = rng,
|
||||
banNodes = true,
|
||||
)
|
||||
|
||||
d.open()
|
||||
|
Loading…
x
Reference in New Issue
Block a user