diff --git a/storage/conf.nim b/storage/conf.nim index ca743daa..ec95dd7d 100644 --- a/storage/conf.nim +++ b/storage/conf.nim @@ -45,6 +45,7 @@ import ./presets import ./utils/natutils from ./blockexchange/engine/downloadmanager import DefaultBlockRetries +from ./dht_proxy/protocol import DefaultMaxInFlightLookups export units, net, storagetypes, logutils, presets, completeCmdArg, parseCmdArg, NatConfig @@ -225,6 +226,14 @@ type name: "mix-pool-json" .}: string + dhtProxyMaxInFlight* {. + desc: + "Max concurrent DHT proxy lookups handled by this node " & + "(omit to use the protocol default: " & $DefaultMaxInFlightLookups & ")", + defaultValue: int.none, + name: "dht-proxy-max-inflight" + .}: Option[int] + maxPeers* {. desc: "The maximum number of peers to connect to", defaultValue: 160, diff --git a/storage/dht_proxy/handler.nim b/storage/dht_proxy/handler.nim index cf6356a1..c8d2543e 100644 --- a/storage/dht_proxy/handler.nim +++ b/storage/dht_proxy/handler.nim @@ -25,6 +25,8 @@ logScope: type DhtProxyProtocol* = ref object of LPProtocol discovery*: Discovery + inFlight: int + maxInFlight: int proc handleFindProviders( self: DhtProxyProtocol, queryBytes: seq[byte] @@ -60,6 +62,18 @@ proc handleLookupRequest( self: DhtProxyProtocol, conn: Connection ) {.async: (raises: [CancelledError]).} = try: + if self.inFlight >= self.maxInFlight: + debug "DHT proxy at capacity, replying TooBusy", + inFlight = self.inFlight, max = self.maxInFlight + await conn.writeLp( + LookupResponse(status: ResponseStatus.Error, errorKind: ErrorKind.TooBusy).encode() + ) + return + + inc self.inFlight + defer: + dec self.inFlight + let reqBytes = await conn.readLp(MaxLookupRequestBytes) req = LookupRequest.decode(reqBytes).valueOr: @@ -84,8 +98,12 @@ proc handleLookupRequest( except CatchableError as exc: warn "Handler error", err = exc.msg -proc new*(T: type DhtProxyProtocol, discovery: Discovery): DhtProxyProtocol = - let self = DhtProxyProtocol(discovery: discovery) +proc new*( + T: type DhtProxyProtocol, + discovery: Discovery, + maxInFlight: int = DefaultMaxInFlightLookups, +): DhtProxyProtocol = + let self = DhtProxyProtocol(discovery: discovery, maxInFlight: maxInFlight) proc handler( conn: Connection, proto: string diff --git a/storage/dht_proxy/protocol.nim b/storage/dht_proxy/protocol.nim index c57bd34e..e98abe6c 100644 --- a/storage/dht_proxy/protocol.nim +++ b/storage/dht_proxy/protocol.nim @@ -17,6 +17,8 @@ import ../logutils const DhtProxyCodec* = "/storage/dht-proxy/1.0.0" +const DefaultMaxInFlightLookups* = 100 + let MaxLookupRequestBytes* = getMaxMessageSizeForCodec(DhtProxyCodec, 1).expect( "DhtProxyCodec framing leaves no room for a Sphinx forward payload" ) @@ -39,6 +41,7 @@ type InvalidCid = 1 Internal = 2 ResponseTooLarge = 3 + TooBusy = 4 LookupRequest* = object queryType*: QueryType diff --git a/storage/storage.nim b/storage/storage.nim index 91f102ba..cb6aa629 100644 --- a/storage/storage.nim +++ b/storage/storage.nim @@ -110,7 +110,11 @@ proc start*(s: StorageServer) {.async.} = await mixProto.start() switch.mount(mixProto) - let dhtProxyProto = DhtProxyProtocol.new(s.storageNode.discovery) + let dhtProxyProto = + if cap =? s.config.dhtProxyMaxInFlight: + DhtProxyProtocol.new(s.storageNode.discovery, maxInFlight = cap) + else: + DhtProxyProtocol.new(s.storageNode.discovery) await dhtProxyProto.start() switch.mount(dhtProxyProto)