NagyZoltanPeter 2786ef6079 chore: update lite-protocol-tester for handling shard argument. (#3371)
* chore: replace pubsub topic with shard configuration across the lite protocol tester
* chore: enhance protocol performance - response time - metrics
* fix filter-client double mounting possibility.
2025-04-16 17:04:52 +02:00

168 lines
5.2 KiB
Nim

{.push raises: [].}
import
std/[options, strutils],
results,
stew/byteutils,
chronicles,
chronos,
metrics,
bearssl/rand
import
../node/peer_manager/peer_manager,
../waku_core,
../waku_core/topics/sharding,
./common,
./rpc,
./rpc_codec,
./protocol_metrics,
../common/rate_limit/request_limiter
logScope:
topics = "waku lightpush"
type WakuLightPush* = ref object of LPProtocol
rng*: ref rand.HmacDrbgContext
peerManager*: PeerManager
pushHandler*: PushMessageHandler
requestRateLimiter*: RequestRateLimiter
sharding: Sharding
proc handleRequest*(
wl: WakuLightPush, peerId: PeerId, buffer: seq[byte]
): Future[LightPushResponse] {.async.} =
let reqDecodeRes = LightpushRequest.decode(buffer)
var isSuccess = false
var pushResponse: LightpushResponse
if reqDecodeRes.isErr():
pushResponse = LightpushResponse(
requestId: "N/A", # due to decode failure we don't know requestId
statusCode: LightpushStatusCode.BAD_REQUEST.uint32,
statusDesc: some(decodeRpcFailure & ": " & $reqDecodeRes.error),
)
else:
let pushRequest = reqDecodeRes.get()
let pubsubTopic = pushRequest.pubSubTopic.valueOr:
let parsedTopic = NsContentTopic.parse(pushRequest.message.contentTopic).valueOr:
let msg = "Invalid content-topic:" & $error
error "lightpush request handling error", error = msg
return LightpushResponse(
requestId: pushRequest.requestId,
statusCode: LightpushStatusCode.INVALID_MESSAGE_ERROR.uint32,
statusDesc: some(msg),
)
wl.sharding.getShard(parsedTopic).valueOr:
let msg = "Autosharding error: " & error
error "lightpush request handling error", error = msg
return LightpushResponse(
requestId: pushRequest.requestId,
statusCode: LightpushStatusCode.INTERNAL_SERVER_ERROR.uint32,
statusDesc: some(msg),
)
# ensure checking topic will not cause error at gossipsub level
if pubsubTopic.isEmptyOrWhitespace():
let msg = "topic must not be empty"
error "lightpush request handling error", error = msg
return LightPushResponse(
requestId: pushRequest.requestId,
statusCode: LightpushStatusCode.BAD_REQUEST.uint32,
statusDesc: some(msg),
)
waku_lightpush_v3_messages.inc(labelValues = ["PushRequest"])
let msg_hash = pubsubTopic.computeMessageHash(pushRequest.message).to0xHex()
notice "handling lightpush request",
my_peer_id = wl.peerManager.switch.peerInfo.peerId,
peer_id = peerId,
requestId = pushRequest.requestId,
pubsubTopic = pushRequest.pubsubTopic,
msg_hash = msg_hash,
receivedTime = getNowInNanosecondTime()
let handleRes = await wl.pushHandler(peerId, pubsubTopic, pushRequest.message)
isSuccess = handleRes.isOk()
pushResponse = LightpushResponse(
requestId: pushRequest.requestId,
statusCode:
if isSuccess:
LightpushStatusCode.SUCCESS.uint32
else:
handleRes.error.code.uint32,
statusDesc:
if isSuccess:
none[string]()
else:
handleRes.error.desc,
relayPeerCount:
if isSuccess:
some(handleRes.get())
else:
none[uint32](),
)
if not isSuccess:
waku_lightpush_v3_errors.inc(
labelValues = [pushResponse.statusDesc.valueOr("unknown")]
)
error "failed to push message", error = pushResponse.statusDesc
return pushResponse
proc initProtocolHandler(wl: WakuLightPush) =
proc handle(conn: Connection, proto: string) {.async.} =
var rpc: LightpushResponse
wl.requestRateLimiter.checkUsageLimit(WakuLightPushCodec, conn):
let buffer = await conn.readLp(DefaultMaxRpcSize)
waku_service_network_bytes.inc(
amount = buffer.len().int64, labelValues = [WakuLightPushCodec, "in"]
)
rpc = await handleRequest(wl, conn.peerId, buffer)
do:
debug "lightpush request rejected due rate limit exceeded",
peerId = conn.peerId, limit = $wl.requestRateLimiter.setting
rpc = static(
LightpushResponse(
## We will not copy and decode RPC buffer from stream only for requestId
## in reject case as it is comparably too expensive and opens possible
## attack surface
requestId: "N/A",
statusCode: LightpushStatusCode.TOO_MANY_REQUESTS.uint32,
statusDesc: some(TooManyRequestsMessage),
)
)
await conn.writeLp(rpc.encode().buffer)
## For lightpush might not worth to measure outgoing trafic as it is only
## small respones about success/failure
wl.handler = handle
wl.codec = WakuLightPushCodec
proc new*(
T: type WakuLightPush,
peerManager: PeerManager,
rng: ref rand.HmacDrbgContext,
pushHandler: PushMessageHandler,
sharding: Sharding,
rateLimitSetting: Option[RateLimitSetting] = none[RateLimitSetting](),
): T =
let wl = WakuLightPush(
rng: rng,
peerManager: peerManager,
pushHandler: pushHandler,
requestRateLimiter: newRequestRateLimiter(rateLimitSetting),
sharding: sharding,
)
wl.initProtocolHandler()
setServiceLimitMetric(WakuLightpushCodec, rateLimitSetting)
return wl