mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-06-26 12:29:30 +00:00
refactor(dht-proxy): unify ResponseStatus+ErrorKind into single LookupCode enum (#1473)
Signed-off-by: Chrysostomos Nanakos <chris@include.gr>
This commit is contained in:
parent
c6e6fa30be
commit
fe8a3e6625
@ -29,8 +29,7 @@ logScope:
|
||||
topics = "storage dht-proxy client"
|
||||
|
||||
type LookupResult = object
|
||||
status: ResponseStatus
|
||||
errorKind: ErrorKind
|
||||
code: LookupCode
|
||||
providers: seq[SignedPeerRecord]
|
||||
|
||||
proc requestLookup(
|
||||
@ -57,10 +56,7 @@ proc requestLookup(
|
||||
providers.add(res.get)
|
||||
else:
|
||||
warn "Failed to decode SignedPeerRecord from response", err = $res.error
|
||||
|
||||
return success LookupResult(
|
||||
status: resp.status, errorKind: resp.errorKind, providers: providers
|
||||
)
|
||||
return success LookupResult(code: resp.code, providers: providers)
|
||||
except LPStreamError as exc:
|
||||
return failure("Stream error: " & exc.msg)
|
||||
except CatchableError as exc:
|
||||
@ -103,13 +99,14 @@ proc lookupProviders*(
|
||||
return failure(lookupRes.error)
|
||||
let lookup = lookupRes.get()
|
||||
|
||||
case lookup.status
|
||||
of ResponseStatus.Ok:
|
||||
case lookup.code
|
||||
of LookupCode.Ok:
|
||||
return success lookup.providers
|
||||
of ResponseStatus.NotFound:
|
||||
of LookupCode.NotFound:
|
||||
return success newSeq[SignedPeerRecord]()
|
||||
of ResponseStatus.Error:
|
||||
return failure("Remote returned error: " & $lookup.errorKind)
|
||||
of LookupCode.ErrDecodeFailed, LookupCode.ErrInvalidCid, LookupCode.ErrInternal,
|
||||
LookupCode.ErrResponseTooLarge, LookupCode.ErrTooBusy:
|
||||
return failure("Remote returned error: " & $lookup.code)
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
|
||||
@ -34,14 +34,13 @@ proc handleFindProviders(
|
||||
let
|
||||
cid = Cid.init(queryBytes).valueOr:
|
||||
warn "Invalid CID in lookup request"
|
||||
return
|
||||
LookupResponse(status: ResponseStatus.Error, errorKind: ErrorKind.InvalidCid)
|
||||
return LookupResponse(code: LookupCode.ErrInvalidCid)
|
||||
providers = (await self.discovery.findDirect(cid)).valueOr:
|
||||
warn "Direct lookup failed", cid, err = error.msg
|
||||
return LookupResponse(status: ResponseStatus.Error, errorKind: ErrorKind.Internal)
|
||||
return LookupResponse(code: LookupCode.ErrInternal)
|
||||
|
||||
if providers.len == 0:
|
||||
return LookupResponse(status: ResponseStatus.NotFound)
|
||||
return LookupResponse(code: LookupCode.NotFound)
|
||||
|
||||
var encoded = newSeqOfCap[seq[byte]](providers.len)
|
||||
for spr in providers:
|
||||
@ -51,23 +50,21 @@ proc handleFindProviders(
|
||||
encoded.add(bytes)
|
||||
|
||||
if encoded.len == 0:
|
||||
return LookupResponse(status: ResponseStatus.Error, errorKind: ErrorKind.Internal)
|
||||
return LookupResponse(code: LookupCode.ErrInternal)
|
||||
|
||||
let packed = packProviders(encoded, MaxLookupResponseBytes).valueOr:
|
||||
return LookupResponse(status: ResponseStatus.Error, errorKind: error)
|
||||
return LookupResponse(code: error)
|
||||
|
||||
LookupResponse(status: ResponseStatus.Ok, providers: packed)
|
||||
LookupResponse(code: LookupCode.Ok, providers: packed)
|
||||
|
||||
proc handleLookupRequest(
|
||||
self: DhtProxyProtocol, conn: Connection
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
try:
|
||||
if self.inFlight >= self.maxInFlight:
|
||||
debug "DHT proxy at capacity, replying TooBusy",
|
||||
debug "DHT proxy at capacity, replying ErrTooBusy",
|
||||
inFlight = self.inFlight, max = self.maxInFlight
|
||||
await conn.writeLp(
|
||||
LookupResponse(status: ResponseStatus.Error, errorKind: ErrorKind.TooBusy).encode()
|
||||
)
|
||||
await conn.writeLp(LookupResponse(code: LookupCode.ErrTooBusy).encode())
|
||||
return
|
||||
|
||||
inc self.inFlight
|
||||
@ -78,11 +75,7 @@ proc handleLookupRequest(
|
||||
reqBytes = await conn.readLp(MaxLookupRequestBytes)
|
||||
req = LookupRequest.decode(reqBytes).valueOr:
|
||||
warn "Failed to decode lookup request"
|
||||
await conn.writeLp(
|
||||
LookupResponse(
|
||||
status: ResponseStatus.Error, errorKind: ErrorKind.DecodeFailed
|
||||
).encode()
|
||||
)
|
||||
await conn.writeLp(LookupResponse(code: LookupCode.ErrDecodeFailed).encode())
|
||||
return
|
||||
|
||||
let resp =
|
||||
|
||||
@ -12,6 +12,7 @@
|
||||
import pkg/libp2p/protobuf/minprotobuf
|
||||
import pkg/libp2p_mix
|
||||
import pkg/libp2p/routing_record
|
||||
import pkg/stew/objects
|
||||
|
||||
import ../logutils
|
||||
|
||||
@ -31,25 +32,21 @@ type
|
||||
QueryType* {.pure.} = enum
|
||||
FindProviders = 0
|
||||
|
||||
ResponseStatus* {.pure.} = enum
|
||||
LookupCode* {.pure.} = enum
|
||||
Ok = 0
|
||||
NotFound = 1
|
||||
Error = 2
|
||||
|
||||
ErrorKind* {.pure.} = enum
|
||||
DecodeFailed = 0
|
||||
InvalidCid = 1
|
||||
Internal = 2
|
||||
ResponseTooLarge = 3
|
||||
TooBusy = 4
|
||||
ErrDecodeFailed = 100
|
||||
ErrInvalidCid = 101
|
||||
ErrInternal = 200
|
||||
ErrResponseTooLarge = 201
|
||||
ErrTooBusy = 202
|
||||
|
||||
LookupRequest* = object
|
||||
queryType*: QueryType
|
||||
queryBytes*: seq[byte]
|
||||
|
||||
LookupResponse* = object
|
||||
status*: ResponseStatus
|
||||
errorKind*: ErrorKind
|
||||
code*: LookupCode
|
||||
providers*: seq[seq[byte]]
|
||||
|
||||
proc encode*(req: LookupRequest): seq[byte] =
|
||||
@ -61,11 +58,9 @@ proc encode*(req: LookupRequest): seq[byte] =
|
||||
|
||||
proc encode*(resp: LookupResponse): seq[byte] =
|
||||
var pb = initProtoBuffer()
|
||||
pb.write(1, resp.status.uint32)
|
||||
if resp.status == ResponseStatus.Error:
|
||||
pb.write(2, resp.errorKind.uint32)
|
||||
pb.write(1, resp.code.uint32)
|
||||
for spr in resp.providers:
|
||||
pb.write(3, spr)
|
||||
pb.write(2, spr)
|
||||
pb.finish()
|
||||
pb.buffer
|
||||
|
||||
@ -76,9 +71,8 @@ proc decode*(_: type LookupRequest, data: openArray[byte]): ProtoResult[LookupRe
|
||||
qt: uint32
|
||||
|
||||
if ?pb.getField(1, qt):
|
||||
if qt > QueryType.high.uint32:
|
||||
if not checkedEnumAssign(req.queryType, qt):
|
||||
return err(ProtoError.IncorrectBlob)
|
||||
req.queryType = QueryType(qt)
|
||||
|
||||
discard ?pb.getField(2, req.queryBytes)
|
||||
ok(req)
|
||||
@ -89,34 +83,25 @@ proc decode*(
|
||||
let pb = initProtoBuffer(data)
|
||||
var
|
||||
resp = LookupResponse()
|
||||
status: uint32
|
||||
codeOrd: uint32
|
||||
|
||||
if ?pb.getField(1, status):
|
||||
if status > ResponseStatus.high.uint32:
|
||||
if ?pb.getField(1, codeOrd):
|
||||
if not checkedEnumAssign(resp.code, codeOrd):
|
||||
return err(ProtoError.IncorrectBlob)
|
||||
resp.status = ResponseStatus(status)
|
||||
|
||||
if resp.status == ResponseStatus.Error:
|
||||
var ek: uint32
|
||||
if ?pb.getField(2, ek):
|
||||
if ek > ErrorKind.high.uint32:
|
||||
return err(ProtoError.IncorrectBlob)
|
||||
resp.errorKind = ErrorKind(ek)
|
||||
|
||||
discard ?pb.getRepeatedField(3, resp.providers)
|
||||
|
||||
discard ?pb.getRepeatedField(2, resp.providers)
|
||||
ok(resp)
|
||||
|
||||
proc packProviders*(
|
||||
providers: seq[seq[byte]], budget_bytes: int
|
||||
): Result[seq[seq[byte]], ErrorKind] =
|
||||
): Result[seq[seq[byte]], LookupCode] =
|
||||
if providers.len == 0:
|
||||
error "packProviders called with no providers"
|
||||
return err(ErrorKind.Internal)
|
||||
return err(LookupCode.ErrInternal)
|
||||
|
||||
let single = LookupResponse(status: ResponseStatus.Ok, providers: providers[0 ..< 1])
|
||||
let single = LookupResponse(code: LookupCode.Ok, providers: providers[0 ..< 1])
|
||||
if single.encode().len > budget_bytes:
|
||||
return err(ErrorKind.ResponseTooLarge)
|
||||
return err(LookupCode.ErrResponseTooLarge)
|
||||
|
||||
var
|
||||
lo = 1
|
||||
@ -124,7 +109,7 @@ proc packProviders*(
|
||||
while lo < hi:
|
||||
let
|
||||
mid = (lo + hi + 1) div 2
|
||||
test = LookupResponse(status: ResponseStatus.Ok, providers: providers[0 ..< mid])
|
||||
test = LookupResponse(code: LookupCode.Ok, providers: providers[0 ..< mid])
|
||||
if test.encode().len <= budget_bytes:
|
||||
lo = mid
|
||||
else:
|
||||
|
||||
@ -126,22 +126,21 @@ proc handleFindProviders(
|
||||
): Future[LookupResponse] {.async: (raises: [CancelledError]).} =
|
||||
let c = Cid.init(queryBytes).valueOr:
|
||||
warn "Invalid CID in lookup request"
|
||||
return LookupResponse(status: ResponseStatus.Error, errorKind: ErrorKind.InvalidCid)
|
||||
return LookupResponse(code: LookupCode.ErrInvalidCid)
|
||||
|
||||
let providers =
|
||||
try:
|
||||
(await self.dht.getProviders(c.toNodeId())).valueOr:
|
||||
warn "discv5 getProviders failed", err = $error
|
||||
return
|
||||
LookupResponse(status: ResponseStatus.Error, errorKind: ErrorKind.Internal)
|
||||
return LookupResponse(code: LookupCode.ErrInternal)
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
warn "discv5 getProviders raised", err = exc.msg
|
||||
return LookupResponse(status: ResponseStatus.Error, errorKind: ErrorKind.Internal)
|
||||
return LookupResponse(code: LookupCode.ErrInternal)
|
||||
|
||||
if providers.len == 0:
|
||||
return LookupResponse(status: ResponseStatus.NotFound)
|
||||
return LookupResponse(code: LookupCode.NotFound)
|
||||
|
||||
var encoded = newSeqOfCap[seq[byte]](providers.len)
|
||||
for rec in providers:
|
||||
@ -151,23 +150,21 @@ proc handleFindProviders(
|
||||
encoded.add(bytes)
|
||||
|
||||
if encoded.len == 0:
|
||||
return LookupResponse(status: ResponseStatus.Error, errorKind: ErrorKind.Internal)
|
||||
return LookupResponse(code: LookupCode.ErrInternal)
|
||||
|
||||
let packed = packProviders(encoded, MaxLookupResponseBytes).valueOr:
|
||||
return LookupResponse(status: ResponseStatus.Error, errorKind: error)
|
||||
return LookupResponse(code: error)
|
||||
|
||||
LookupResponse(status: ResponseStatus.Ok, providers: packed)
|
||||
LookupResponse(code: LookupCode.Ok, providers: packed)
|
||||
|
||||
proc handleLookupRequest(
|
||||
self: DhtProxyProtocol, conn: Connection
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
try:
|
||||
if self.inFlight >= self.maxInFlight:
|
||||
debug "DHT proxy at capacity, replying TooBusy",
|
||||
debug "DHT proxy at capacity, replying ErrTooBusy",
|
||||
inFlight = self.inFlight, max = self.maxInFlight
|
||||
await conn.writeLp(
|
||||
LookupResponse(status: ResponseStatus.Error, errorKind: ErrorKind.TooBusy).encode()
|
||||
)
|
||||
await conn.writeLp(LookupResponse(code: LookupCode.ErrTooBusy).encode())
|
||||
return
|
||||
|
||||
inc self.inFlight
|
||||
@ -178,11 +175,7 @@ proc handleLookupRequest(
|
||||
reqBytes = await conn.readLp(MaxLookupRequestBytes)
|
||||
req = LookupRequest.decode(reqBytes).valueOr:
|
||||
warn "Failed to decode lookup request"
|
||||
await conn.writeLp(
|
||||
LookupResponse(
|
||||
status: ResponseStatus.Error, errorKind: ErrorKind.DecodeFailed
|
||||
).encode()
|
||||
)
|
||||
await conn.writeLp(LookupResponse(code: LookupCode.ErrDecodeFailed).encode())
|
||||
return
|
||||
|
||||
let resp =
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user