diff --git a/beacon_chain/eth2_network.nim b/beacon_chain/eth2_network.nim index caed145ad..5605743e6 100644 --- a/beacon_chain/eth2_network.nim +++ b/beacon_chain/eth2_network.nim @@ -102,6 +102,8 @@ type maxInactivityAllowed*: Duration netThroughput: AverageThroughput score*: int + requestQuota*: float + lastReqTime*: Moment connections*: int disconnectedFut: Future[void] @@ -200,6 +202,8 @@ type else: discard + InvalidInputsError* = object of CatchableError + NetRes*[T] = Result[T, Eth2NetworkingError] ## This is type returned from all network requests @@ -216,6 +220,10 @@ const ## Score after which peer will be kicked PeerScoreHighLimit* = 1000 ## Max value of peer's score + PeerScoreInvalidRequest* = -500 + ## This peer is sending malformed or nonsensical data + PeerScoreFlooder* = -250 + ## This peer is sending too many expensive requests ConcurrentConnections* = 10 ## Maximum number of active concurrent connection requests. @@ -368,6 +376,28 @@ proc `<`*(a, b: Peer): bool = else: false +const + maxRequestQuota = 1000000.0 + fullReplenishTime = 5.seconds + replenishRate = (maxRequestQuota / fullReplenishTime.nanoseconds.float) + requestFloodingThreshold = -500000.0 + +proc updateRequestQuota*(peer: Peer, reqCost: float) = + let + currentTime = now(chronos.Moment) + nanosSinceLastReq = nanoseconds(currentTime - peer.lastReqTime) + replenishedQuota = peer.requestQuota + nanosSinceLastReq.float * replenishRate + + peer.lastReqTime = currentTime + peer.requestQuota = min(replenishedQuota, maxRequestQuota) - reqCost + + if peer.requestQuota < requestFloodingThreshold: + peer.updateScore(PeerScoreFlooder) + peer.requestQuota = 0.0 + +func allowedOpsPerSecondCost*(n: int): float = + (replenishRate * 1000000000'f / n.float) + proc isSeen*(network: ETh2Node, peerId: PeerID): bool = ## Returns ``true`` if ``peerId`` present in SeenTable and time period is not ## yet expired. @@ -617,15 +647,15 @@ proc handleIncomingStream(network: Eth2Node, # defer: setLogLevel(LogLevel.DEBUG) # trace "incoming " & `msgNameLit` & " conn" + let peer = peerFromStream(network, conn) try: - let peer = peerFromStream(network, conn) - # TODO peer connection setup is broken, update info in some better place # whenever race is fix: # https://github.com/status-im/nimbus-eth2/issues/1157 peer.info = conn.peerInfo template returnInvalidRequest(msg: ErrorMsg) = + peer.updateScore(PeerScoreInvalidRequest) await sendErrorResponse(peer, conn, InvalidRequest, msg) return @@ -691,6 +721,10 @@ proc handleIncomingStream(network: Eth2Node, try: logReceivedMsg(peer, MsgType(msg.get)) await callUserHandler(MsgType, peer, conn, msg.get) + except InvalidInputsError as err: + returnInvalidRequest err.msg + await sendErrorResponse(peer, conn, ServerError, + ErrorMsg err.msg.toBytes) except CatchableError as err: await sendErrorResponse(peer, conn, ServerError, ErrorMsg err.msg.toBytes) @@ -700,6 +734,7 @@ proc handleIncomingStream(network: Eth2Node, finally: await conn.closeWithEOF() + discard network.peerPool.checkPeerScore(peer) proc toPeerAddr*(r: enr.TypedRecord): Result[PeerAddr, cstring] {.raises: [Defect].} = @@ -1024,6 +1059,7 @@ proc init*(T: type Peer, network: Eth2Node, info: PeerInfo): Peer = result.network = network result.connectionState = Connected result.maxInactivityAllowed = 15.minutes # TODO: Read this from the config + result.lastReqTime = now(chronos.Moment) newSeq result.protocolStates, allProtocols.len for i in 0 ..< allProtocols.len: let proto = allProtocols[i] diff --git a/beacon_chain/peer_pool.nim b/beacon_chain/peer_pool.nim index 27ae91de1..0edf469c6 100644 --- a/beacon_chain/peer_pool.nim +++ b/beacon_chain/peer_pool.nim @@ -131,8 +131,8 @@ proc waitNotFullEvent[A, B](pool: PeerPool[A, B], proc newPeerPool*[A, B](maxPeers = -1, maxIncomingPeers = -1, maxOutgoingPeers = -1, - scoreCheckCb: PeerScoreCheckCallback[A] = nil, - peerCounterCb: PeerCounterCallback = nil): PeerPool[A, B] = + scoreCheckCb: PeerScoreCheckCallback[A] = nil, + peerCounterCb: PeerCounterCallback = nil): PeerPool[A, B] = ## Create new PeerPool. ## ## ``maxPeers`` - maximum number of peers allowed. All the peers which @@ -253,7 +253,7 @@ proc shortLogSpace*[A, B](pool: PeerPool[A, B]): string = proc shortLogCurrent*[A, B](pool: PeerPool[A, B]): string = $pool.curIncPeersCount & "/" & $pool.curOutPeersCount -proc checkPeerScore[A, B](pool: PeerPool[A, B], peer: A): bool {.inline.} = +proc checkPeerScore*[A, B](pool: PeerPool[A, B], peer: A): bool {.inline.} = ## Returns ``true`` if peer passing score check. if not(isNil(pool.scoreCheck)): pool.scoreCheck(peer) diff --git a/beacon_chain/sync_protocol.nim b/beacon_chain/sync_protocol.nim index 18771a75d..8cfe69aee 100644 --- a/beacon_chain/sync_protocol.nim +++ b/beacon_chain/sync_protocol.nim @@ -11,6 +11,10 @@ logScope: const MAX_REQUEST_BLOCKS = 1024 + blockByRootLookupCost = allowedOpsPerSecondCost(50) + blockResponseCost = allowedOpsPerSecondCost(100) + blockByRangeLookupCost = allowedOpsPerSecondCost(20) + type StatusMsg* = object forkDigest*: ForkDigest @@ -139,7 +143,7 @@ p2pProtocol BeaconSync(version = 1, {.async, libp2pProtocol("beacon_blocks_by_range", 1).} = trace "got range request", peer, startSlot, count = reqCount, step = reqStep - if reqCount > 0'u64: + if reqCount > 0'u64 and reqStep > 0'u64: var blocks: array[MAX_REQUEST_BLOCKS, BlockRef] let chainDag = peer.networkState.chainDag @@ -151,6 +155,9 @@ p2pProtocol BeaconSync(version = 1, startIndex = chainDag.getBlockRange(startSlot, reqStep, blocks.toOpenArray(0, endIndex)) + peer.updateRequestQuota( + blockByRangeLookupCost + + max(0, endIndex - startIndex + 1).float * blockResponseCost) for i in startIndex..endIndex: doAssert not blocks[i].isNil, "getBlockRange should return non-nil blocks only" @@ -160,6 +167,8 @@ p2pProtocol BeaconSync(version = 1, debug "Block range request done", peer, startSlot, count, reqStep, found = count - startIndex + else: + raise newException(InvalidInputsError, "Potential DoS attack: empty blocksByRange") proc beaconBlocksByRoot( peer: Peer, @@ -168,18 +177,24 @@ p2pProtocol BeaconSync(version = 1, blockRoots: BlockRootsList, response: MultipleChunksResponse[SignedBeaconBlock]) {.async, libp2pProtocol("beacon_blocks_by_root", 1).} = + if blockRoots.len == 0: + raise newException(InvalidInputsError, "Potential DoS attack: empty blocksByRoot") + let chainDag = peer.networkState.chainDag count = blockRoots.len - var found = 0 + peer.updateRequestQuota(count.float * blockByRootLookupCost) + var found = 0 for i in 0..