mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-06-28 05:19:28 +00:00
feat(dht-proxy): add TooBusy error + per-node in-flight cap
Part of https://github.com/logos-storage/logos-storage-pm/issues/13 Signed-off-by: Chrysostomos Nanakos <chris@include.gr>
This commit is contained in:
parent
01019b8700
commit
d00f8bbe47
@ -45,6 +45,7 @@ import ./presets
|
||||
import ./utils/natutils
|
||||
|
||||
from ./blockexchange/engine/downloadmanager import DefaultBlockRetries
|
||||
from ./dht_proxy/protocol import DefaultMaxInFlightLookups
|
||||
|
||||
export
|
||||
units, net, storagetypes, logutils, presets, completeCmdArg, parseCmdArg, NatConfig
|
||||
@ -225,6 +226,14 @@ type
|
||||
name: "mix-pool-json"
|
||||
.}: string
|
||||
|
||||
dhtProxyMaxInFlight* {.
|
||||
desc:
|
||||
"Max concurrent DHT proxy lookups handled by this node " &
|
||||
"(omit to use the protocol default: " & $DefaultMaxInFlightLookups & ")",
|
||||
defaultValue: int.none,
|
||||
name: "dht-proxy-max-inflight"
|
||||
.}: Option[int]
|
||||
|
||||
maxPeers* {.
|
||||
desc: "The maximum number of peers to connect to",
|
||||
defaultValue: 160,
|
||||
|
||||
@ -25,6 +25,8 @@ logScope:
|
||||
|
||||
type DhtProxyProtocol* = ref object of LPProtocol
|
||||
discovery*: Discovery
|
||||
inFlight: int
|
||||
maxInFlight: int
|
||||
|
||||
proc handleFindProviders(
|
||||
self: DhtProxyProtocol, queryBytes: seq[byte]
|
||||
@ -60,6 +62,18 @@ proc handleLookupRequest(
|
||||
self: DhtProxyProtocol, conn: Connection
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
try:
|
||||
if self.inFlight >= self.maxInFlight:
|
||||
debug "DHT proxy at capacity, replying TooBusy",
|
||||
inFlight = self.inFlight, max = self.maxInFlight
|
||||
await conn.writeLp(
|
||||
LookupResponse(status: ResponseStatus.Error, errorKind: ErrorKind.TooBusy).encode()
|
||||
)
|
||||
return
|
||||
|
||||
inc self.inFlight
|
||||
defer:
|
||||
dec self.inFlight
|
||||
|
||||
let
|
||||
reqBytes = await conn.readLp(MaxLookupRequestBytes)
|
||||
req = LookupRequest.decode(reqBytes).valueOr:
|
||||
@ -84,8 +98,12 @@ proc handleLookupRequest(
|
||||
except CatchableError as exc:
|
||||
warn "Handler error", err = exc.msg
|
||||
|
||||
proc new*(T: type DhtProxyProtocol, discovery: Discovery): DhtProxyProtocol =
|
||||
let self = DhtProxyProtocol(discovery: discovery)
|
||||
proc new*(
|
||||
T: type DhtProxyProtocol,
|
||||
discovery: Discovery,
|
||||
maxInFlight: int = DefaultMaxInFlightLookups,
|
||||
): DhtProxyProtocol =
|
||||
let self = DhtProxyProtocol(discovery: discovery, maxInFlight: maxInFlight)
|
||||
|
||||
proc handler(
|
||||
conn: Connection, proto: string
|
||||
|
||||
@ -17,6 +17,8 @@ import ../logutils
|
||||
|
||||
const DhtProxyCodec* = "/storage/dht-proxy/1.0.0"
|
||||
|
||||
const DefaultMaxInFlightLookups* = 100
|
||||
|
||||
let MaxLookupRequestBytes* = getMaxMessageSizeForCodec(DhtProxyCodec, 1).expect(
|
||||
"DhtProxyCodec framing leaves no room for a Sphinx forward payload"
|
||||
)
|
||||
@ -39,6 +41,7 @@ type
|
||||
InvalidCid = 1
|
||||
Internal = 2
|
||||
ResponseTooLarge = 3
|
||||
TooBusy = 4
|
||||
|
||||
LookupRequest* = object
|
||||
queryType*: QueryType
|
||||
|
||||
@ -110,7 +110,11 @@ proc start*(s: StorageServer) {.async.} =
|
||||
await mixProto.start()
|
||||
switch.mount(mixProto)
|
||||
|
||||
let dhtProxyProto = DhtProxyProtocol.new(s.storageNode.discovery)
|
||||
let dhtProxyProto =
|
||||
if cap =? s.config.dhtProxyMaxInFlight:
|
||||
DhtProxyProtocol.new(s.storageNode.discovery, maxInFlight = cap)
|
||||
else:
|
||||
DhtProxyProtocol.new(s.storageNode.discovery)
|
||||
await dhtProxyProto.start()
|
||||
switch.mount(dhtProxyProto)
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user