mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-07 00:13:06 +00:00
fix: lightpush metrics (#3486)
* fix: lightpush metrics Some light push errors were not reported in metrics due to an early return. * Small improvements * Bound metrics value by using error codes
This commit is contained in:
parent
4b186a4b28
commit
0ed3fc8079
@ -28,94 +28,80 @@ type WakuLightPush* = ref object of LPProtocol
|
|||||||
requestRateLimiter*: RequestRateLimiter
|
requestRateLimiter*: RequestRateLimiter
|
||||||
sharding: Sharding
|
sharding: Sharding
|
||||||
|
|
||||||
|
proc handleRequest(
|
||||||
|
wl: WakuLightPush, peerId: PeerId, pushRequest: LightPushRequest
|
||||||
|
): Future[WakuLightPushResult] {.async.} =
|
||||||
|
let pubsubTopic = pushRequest.pubSubTopic.valueOr:
|
||||||
|
let parsedTopic = NsContentTopic.parse(pushRequest.message.contentTopic).valueOr:
|
||||||
|
let msg = "Invalid content-topic:" & $error
|
||||||
|
error "lightpush request handling error", error = msg
|
||||||
|
return WakuLightPushResult.err(
|
||||||
|
(code: LightPushStatusCode.INVALID_MESSAGE_ERROR, desc: some(msg))
|
||||||
|
)
|
||||||
|
|
||||||
|
wl.sharding.getShard(parsedTopic).valueOr:
|
||||||
|
let msg = "Sharding error: " & error
|
||||||
|
error "lightpush request handling error", error = msg
|
||||||
|
return WakuLightPushResult.err(
|
||||||
|
(code: LightPushStatusCode.INTERNAL_SERVER_ERROR, desc: some(msg))
|
||||||
|
)
|
||||||
|
|
||||||
|
# ensure checking topic will not cause error at gossipsub level
|
||||||
|
if pubsubTopic.isEmptyOrWhitespace():
|
||||||
|
let msg = "topic must not be empty"
|
||||||
|
error "lightpush request handling error", error = msg
|
||||||
|
return
|
||||||
|
WakuLightPushResult.err((code: LightPushStatusCode.BAD_REQUEST, desc: some(msg)))
|
||||||
|
|
||||||
|
waku_lightpush_v3_messages.inc(labelValues = ["PushRequest"])
|
||||||
|
|
||||||
|
let msg_hash = pubsubTopic.computeMessageHash(pushRequest.message).to0xHex()
|
||||||
|
notice "handling lightpush request",
|
||||||
|
my_peer_id = wl.peerManager.switch.peerInfo.peerId,
|
||||||
|
peer_id = peerId,
|
||||||
|
requestId = pushRequest.requestId,
|
||||||
|
pubsubTopic = pushRequest.pubsubTopic,
|
||||||
|
msg_hash = msg_hash,
|
||||||
|
receivedTime = getNowInNanosecondTime()
|
||||||
|
|
||||||
|
let res = (await wl.pushHandler(peerId, pubsubTopic, pushRequest.message)).valueOr:
|
||||||
|
return err((code: error.code, desc: error.desc))
|
||||||
|
return ok(res)
|
||||||
|
|
||||||
proc handleRequest*(
|
proc handleRequest*(
|
||||||
wl: WakuLightPush, peerId: PeerId, buffer: seq[byte]
|
wl: WakuLightPush, peerId: PeerId, buffer: seq[byte]
|
||||||
): Future[LightPushResponse] {.async.} =
|
): Future[LightPushResponse] {.async.} =
|
||||||
let reqDecodeRes = LightpushRequest.decode(buffer)
|
var pushResponse: LightPushResponse
|
||||||
var isSuccess = false
|
|
||||||
var pushResponse: LightpushResponse
|
|
||||||
|
|
||||||
if reqDecodeRes.isErr():
|
let pushRequest = LightPushRequest.decode(buffer).valueOr:
|
||||||
pushResponse = LightpushResponse(
|
let desc = decodeRpcFailure & ": " & $error
|
||||||
|
error "failed to push message", error = desc
|
||||||
|
let errorCode = LightPushStatusCode.BAD_REQUEST.uint32
|
||||||
|
waku_lightpush_v3_errors.inc(labelValues = [$errorCode])
|
||||||
|
return LightPushResponse(
|
||||||
requestId: "N/A", # due to decode failure we don't know requestId
|
requestId: "N/A", # due to decode failure we don't know requestId
|
||||||
statusCode: LightpushStatusCode.BAD_REQUEST.uint32,
|
statusCode: errorCode.uint32,
|
||||||
statusDesc: some(decodeRpcFailure & ": " & $reqDecodeRes.error),
|
statusDesc: some(desc),
|
||||||
)
|
|
||||||
else:
|
|
||||||
let pushRequest = reqDecodeRes.get()
|
|
||||||
|
|
||||||
let pubsubTopic = pushRequest.pubSubTopic.valueOr:
|
|
||||||
let parsedTopic = NsContentTopic.parse(pushRequest.message.contentTopic).valueOr:
|
|
||||||
let msg = "Invalid content-topic:" & $error
|
|
||||||
error "lightpush request handling error", error = msg
|
|
||||||
return LightpushResponse(
|
|
||||||
requestId: pushRequest.requestId,
|
|
||||||
statusCode: LightpushStatusCode.INVALID_MESSAGE_ERROR.uint32,
|
|
||||||
statusDesc: some(msg),
|
|
||||||
)
|
|
||||||
|
|
||||||
wl.sharding.getShard(parsedTopic).valueOr:
|
|
||||||
let msg = "Autosharding error: " & error
|
|
||||||
error "lightpush request handling error", error = msg
|
|
||||||
return LightpushResponse(
|
|
||||||
requestId: pushRequest.requestId,
|
|
||||||
statusCode: LightpushStatusCode.INTERNAL_SERVER_ERROR.uint32,
|
|
||||||
statusDesc: some(msg),
|
|
||||||
)
|
|
||||||
|
|
||||||
# ensure checking topic will not cause error at gossipsub level
|
|
||||||
if pubsubTopic.isEmptyOrWhitespace():
|
|
||||||
let msg = "topic must not be empty"
|
|
||||||
error "lightpush request handling error", error = msg
|
|
||||||
return LightPushResponse(
|
|
||||||
requestId: pushRequest.requestId,
|
|
||||||
statusCode: LightpushStatusCode.BAD_REQUEST.uint32,
|
|
||||||
statusDesc: some(msg),
|
|
||||||
)
|
|
||||||
|
|
||||||
waku_lightpush_v3_messages.inc(labelValues = ["PushRequest"])
|
|
||||||
|
|
||||||
let msg_hash = pubsubTopic.computeMessageHash(pushRequest.message).to0xHex()
|
|
||||||
notice "handling lightpush request",
|
|
||||||
my_peer_id = wl.peerManager.switch.peerInfo.peerId,
|
|
||||||
peer_id = peerId,
|
|
||||||
requestId = pushRequest.requestId,
|
|
||||||
pubsubTopic = pushRequest.pubsubTopic,
|
|
||||||
msg_hash = msg_hash,
|
|
||||||
receivedTime = getNowInNanosecondTime()
|
|
||||||
|
|
||||||
let handleRes = await wl.pushHandler(peerId, pubsubTopic, pushRequest.message)
|
|
||||||
|
|
||||||
isSuccess = handleRes.isOk()
|
|
||||||
pushResponse = LightpushResponse(
|
|
||||||
requestId: pushRequest.requestId,
|
|
||||||
statusCode:
|
|
||||||
if isSuccess:
|
|
||||||
LightpushStatusCode.SUCCESS.uint32
|
|
||||||
else:
|
|
||||||
handleRes.error.code.uint32,
|
|
||||||
statusDesc:
|
|
||||||
if isSuccess:
|
|
||||||
none[string]()
|
|
||||||
else:
|
|
||||||
handleRes.error.desc,
|
|
||||||
relayPeerCount:
|
|
||||||
if isSuccess:
|
|
||||||
some(handleRes.get())
|
|
||||||
else:
|
|
||||||
none[uint32](),
|
|
||||||
)
|
)
|
||||||
|
|
||||||
if not isSuccess:
|
let relayPeerCount = (await handleRequest(wl, peerId, pushRequest)).valueOr:
|
||||||
waku_lightpush_v3_errors.inc(
|
let desc = error.desc
|
||||||
labelValues = [pushResponse.statusDesc.valueOr("unknown")]
|
waku_lightpush_v3_errors.inc(labelValues = [$error.code])
|
||||||
|
error "failed to push message", error = desc
|
||||||
|
return LightPushResponse(
|
||||||
|
requestId: pushRequest.requestId, statusCode: error.code.uint32, statusDesc: desc
|
||||||
)
|
)
|
||||||
error "failed to push message", error = pushResponse.statusDesc
|
|
||||||
return pushResponse
|
return LightPushResponse(
|
||||||
|
requestId: pushRequest.requestId,
|
||||||
|
statusCode: LightPushStatusCode.SUCCESS.uint32,
|
||||||
|
statusDesc: none[string](),
|
||||||
|
relayPeerCount: some(relayPeerCount),
|
||||||
|
)
|
||||||
|
|
||||||
proc initProtocolHandler(wl: WakuLightPush) =
|
proc initProtocolHandler(wl: WakuLightPush) =
|
||||||
proc handler(conn: Connection, proto: string) {.async: (raises: [CancelledError]).} =
|
proc handler(conn: Connection, proto: string) {.async: (raises: [CancelledError]).} =
|
||||||
var rpc: LightpushResponse
|
var rpc: LightPushResponse
|
||||||
wl.requestRateLimiter.checkUsageLimit(WakuLightPushCodec, conn):
|
wl.requestRateLimiter.checkUsageLimit(WakuLightPushCodec, conn):
|
||||||
var buffer: seq[byte]
|
var buffer: seq[byte]
|
||||||
try:
|
try:
|
||||||
@ -137,7 +123,7 @@ proc initProtocolHandler(wl: WakuLightPush) =
|
|||||||
peerId = conn.peerId, limit = $wl.requestRateLimiter.setting
|
peerId = conn.peerId, limit = $wl.requestRateLimiter.setting
|
||||||
|
|
||||||
rpc = static(
|
rpc = static(
|
||||||
LightpushResponse(
|
LightPushResponse(
|
||||||
## We will not copy and decode RPC buffer from stream only for requestId
|
## We will not copy and decode RPC buffer from stream only for requestId
|
||||||
## in reject case as it is comparably too expensive and opens possible
|
## in reject case as it is comparably too expensive and opens possible
|
||||||
## attack surface
|
## attack surface
|
||||||
@ -152,8 +138,8 @@ proc initProtocolHandler(wl: WakuLightPush) =
|
|||||||
except LPStreamError:
|
except LPStreamError:
|
||||||
error "lightpush write stream failed", error = getCurrentExceptionMsg()
|
error "lightpush write stream failed", error = getCurrentExceptionMsg()
|
||||||
|
|
||||||
## For lightpush might not worth to measure outgoing trafic as it is only
|
## For lightpush might not worth to measure outgoing traffic as it is only
|
||||||
## small respones about success/failure
|
## small response about success/failure
|
||||||
|
|
||||||
wl.handler = handler
|
wl.handler = handler
|
||||||
wl.codec = WakuLightPushCodec
|
wl.codec = WakuLightPushCodec
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user