Simple cost model for sync requests; Penalize peers perfoming flooding or invalid requests
This commit is contained in:
parent
6b9419e547
commit
8a6281aad2
|
@ -102,6 +102,8 @@ type
|
|||
maxInactivityAllowed*: Duration
|
||||
netThroughput: AverageThroughput
|
||||
score*: int
|
||||
requestQuota*: float
|
||||
lastReqTime*: Moment
|
||||
connections*: int
|
||||
disconnectedFut: Future[void]
|
||||
|
||||
|
@ -200,6 +202,8 @@ type
|
|||
else:
|
||||
discard
|
||||
|
||||
InvalidInputsError* = object of CatchableError
|
||||
|
||||
NetRes*[T] = Result[T, Eth2NetworkingError]
|
||||
## This is type returned from all network requests
|
||||
|
||||
|
@ -216,6 +220,10 @@ const
|
|||
## Score after which peer will be kicked
|
||||
PeerScoreHighLimit* = 1000
|
||||
## Max value of peer's score
|
||||
PeerScoreInvalidRequest* = -500
|
||||
## This peer is sending malformed or nonsensical data
|
||||
PeerScoreFlooder* = -250
|
||||
## This peer is sending too many expensive requests
|
||||
|
||||
ConcurrentConnections* = 10
|
||||
## Maximum number of active concurrent connection requests.
|
||||
|
@ -368,6 +376,28 @@ proc `<`*(a, b: Peer): bool =
|
|||
else:
|
||||
false
|
||||
|
||||
const
|
||||
maxRequestQuota = 1000000.0
|
||||
fullReplenishTime = 5.seconds
|
||||
replenishRate = (maxRequestQuota / fullReplenishTime.nanoseconds.float)
|
||||
requestFloodingThreshold = -500000.0
|
||||
|
||||
proc updateRequestQuota*(peer: Peer, reqCost: float) =
|
||||
let
|
||||
currentTime = now(chronos.Moment)
|
||||
nanosSinceLastReq = nanoseconds(currentTime - peer.lastReqTime)
|
||||
replenishedQuota = peer.requestQuota + nanosSinceLastReq.float * replenishRate
|
||||
|
||||
peer.lastReqTime = currentTime
|
||||
peer.requestQuota = min(replenishedQuota, maxRequestQuota) - reqCost
|
||||
|
||||
if peer.requestQuota < requestFloodingThreshold:
|
||||
peer.updateScore(PeerScoreFlooder)
|
||||
peer.requestQuota = 0.0
|
||||
|
||||
func allowedOpsPerSecondCost*(n: int): float =
|
||||
(replenishRate * 1000000000'f / n.float)
|
||||
|
||||
proc isSeen*(network: ETh2Node, peerId: PeerID): bool =
|
||||
## Returns ``true`` if ``peerId`` present in SeenTable and time period is not
|
||||
## yet expired.
|
||||
|
@ -617,15 +647,15 @@ proc handleIncomingStream(network: Eth2Node,
|
|||
# defer: setLogLevel(LogLevel.DEBUG)
|
||||
# trace "incoming " & `msgNameLit` & " conn"
|
||||
|
||||
let peer = peerFromStream(network, conn)
|
||||
try:
|
||||
let peer = peerFromStream(network, conn)
|
||||
|
||||
# TODO peer connection setup is broken, update info in some better place
|
||||
# whenever race is fix:
|
||||
# https://github.com/status-im/nimbus-eth2/issues/1157
|
||||
peer.info = conn.peerInfo
|
||||
|
||||
template returnInvalidRequest(msg: ErrorMsg) =
|
||||
peer.updateScore(PeerScoreInvalidRequest)
|
||||
await sendErrorResponse(peer, conn, InvalidRequest, msg)
|
||||
return
|
||||
|
||||
|
@ -691,6 +721,10 @@ proc handleIncomingStream(network: Eth2Node,
|
|||
try:
|
||||
logReceivedMsg(peer, MsgType(msg.get))
|
||||
await callUserHandler(MsgType, peer, conn, msg.get)
|
||||
except InvalidInputsError as err:
|
||||
returnInvalidRequest err.msg
|
||||
await sendErrorResponse(peer, conn, ServerError,
|
||||
ErrorMsg err.msg.toBytes)
|
||||
except CatchableError as err:
|
||||
await sendErrorResponse(peer, conn, ServerError,
|
||||
ErrorMsg err.msg.toBytes)
|
||||
|
@ -700,6 +734,7 @@ proc handleIncomingStream(network: Eth2Node,
|
|||
|
||||
finally:
|
||||
await conn.closeWithEOF()
|
||||
discard network.peerPool.checkPeerScore(peer)
|
||||
|
||||
proc toPeerAddr*(r: enr.TypedRecord):
|
||||
Result[PeerAddr, cstring] {.raises: [Defect].} =
|
||||
|
@ -1024,6 +1059,7 @@ proc init*(T: type Peer, network: Eth2Node, info: PeerInfo): Peer =
|
|||
result.network = network
|
||||
result.connectionState = Connected
|
||||
result.maxInactivityAllowed = 15.minutes # TODO: Read this from the config
|
||||
result.lastReqTime = now(chronos.Moment)
|
||||
newSeq result.protocolStates, allProtocols.len
|
||||
for i in 0 ..< allProtocols.len:
|
||||
let proto = allProtocols[i]
|
||||
|
|
|
@ -131,8 +131,8 @@ proc waitNotFullEvent[A, B](pool: PeerPool[A, B],
|
|||
|
||||
proc newPeerPool*[A, B](maxPeers = -1, maxIncomingPeers = -1,
|
||||
maxOutgoingPeers = -1,
|
||||
scoreCheckCb: PeerScoreCheckCallback[A] = nil,
|
||||
peerCounterCb: PeerCounterCallback = nil): PeerPool[A, B] =
|
||||
scoreCheckCb: PeerScoreCheckCallback[A] = nil,
|
||||
peerCounterCb: PeerCounterCallback = nil): PeerPool[A, B] =
|
||||
## Create new PeerPool.
|
||||
##
|
||||
## ``maxPeers`` - maximum number of peers allowed. All the peers which
|
||||
|
@ -253,7 +253,7 @@ proc shortLogSpace*[A, B](pool: PeerPool[A, B]): string =
|
|||
proc shortLogCurrent*[A, B](pool: PeerPool[A, B]): string =
|
||||
$pool.curIncPeersCount & "/" & $pool.curOutPeersCount
|
||||
|
||||
proc checkPeerScore[A, B](pool: PeerPool[A, B], peer: A): bool {.inline.} =
|
||||
proc checkPeerScore*[A, B](pool: PeerPool[A, B], peer: A): bool {.inline.} =
|
||||
## Returns ``true`` if peer passing score check.
|
||||
if not(isNil(pool.scoreCheck)):
|
||||
pool.scoreCheck(peer)
|
||||
|
|
|
@ -11,6 +11,10 @@ logScope:
|
|||
const
|
||||
MAX_REQUEST_BLOCKS = 1024
|
||||
|
||||
blockByRootLookupCost = allowedOpsPerSecondCost(50)
|
||||
blockResponseCost = allowedOpsPerSecondCost(100)
|
||||
blockByRangeLookupCost = allowedOpsPerSecondCost(20)
|
||||
|
||||
type
|
||||
StatusMsg* = object
|
||||
forkDigest*: ForkDigest
|
||||
|
@ -139,7 +143,7 @@ p2pProtocol BeaconSync(version = 1,
|
|||
{.async, libp2pProtocol("beacon_blocks_by_range", 1).} =
|
||||
trace "got range request", peer, startSlot,
|
||||
count = reqCount, step = reqStep
|
||||
if reqCount > 0'u64:
|
||||
if reqCount > 0'u64 and reqStep > 0'u64:
|
||||
var blocks: array[MAX_REQUEST_BLOCKS, BlockRef]
|
||||
let
|
||||
chainDag = peer.networkState.chainDag
|
||||
|
@ -151,6 +155,9 @@ p2pProtocol BeaconSync(version = 1,
|
|||
startIndex =
|
||||
chainDag.getBlockRange(startSlot, reqStep,
|
||||
blocks.toOpenArray(0, endIndex))
|
||||
peer.updateRequestQuota(
|
||||
blockByRangeLookupCost +
|
||||
max(0, endIndex - startIndex + 1).float * blockResponseCost)
|
||||
|
||||
for i in startIndex..endIndex:
|
||||
doAssert not blocks[i].isNil, "getBlockRange should return non-nil blocks only"
|
||||
|
@ -160,6 +167,8 @@ p2pProtocol BeaconSync(version = 1,
|
|||
|
||||
debug "Block range request done",
|
||||
peer, startSlot, count, reqStep, found = count - startIndex
|
||||
else:
|
||||
raise newException(InvalidInputsError, "Potential DoS attack: empty blocksByRange")
|
||||
|
||||
proc beaconBlocksByRoot(
|
||||
peer: Peer,
|
||||
|
@ -168,18 +177,24 @@ p2pProtocol BeaconSync(version = 1,
|
|||
blockRoots: BlockRootsList,
|
||||
response: MultipleChunksResponse[SignedBeaconBlock])
|
||||
{.async, libp2pProtocol("beacon_blocks_by_root", 1).} =
|
||||
if blockRoots.len == 0:
|
||||
raise newException(InvalidInputsError, "Potential DoS attack: empty blocksByRoot")
|
||||
|
||||
let
|
||||
chainDag = peer.networkState.chainDag
|
||||
count = blockRoots.len
|
||||
|
||||
var found = 0
|
||||
peer.updateRequestQuota(count.float * blockByRootLookupCost)
|
||||
|
||||
var found = 0
|
||||
for i in 0..<count:
|
||||
let blockRef = chainDag.getRef(blockRoots[i])
|
||||
if not isNil(blockRef):
|
||||
await response.write(chainDag.get(blockRef).data)
|
||||
inc found
|
||||
|
||||
peer.updateRequestQuota(found.float * blockResponseCost)
|
||||
|
||||
debug "Block root request done",
|
||||
peer, roots = blockRoots.len, count, found
|
||||
|
||||
|
|
Loading…
Reference in New Issue