2024-06-28 16:04:57 +05:30
|
|
|
{.push raises: [].}
|
2022-08-03 01:47:42 +02:00
|
|
|
|
2025-03-05 12:07:56 +01:00
|
|
|
import
|
|
|
|
|
std/[options, strutils],
|
|
|
|
|
results,
|
|
|
|
|
stew/byteutils,
|
|
|
|
|
chronicles,
|
|
|
|
|
chronos,
|
|
|
|
|
metrics,
|
|
|
|
|
bearssl/rand
|
2022-08-03 01:47:42 +02:00
|
|
|
import
|
2024-01-30 07:28:21 -05:00
|
|
|
../node/peer_manager/peer_manager,
|
2023-04-19 13:29:23 +02:00
|
|
|
../waku_core,
|
2025-03-05 12:07:56 +01:00
|
|
|
../waku_core/topics/sharding,
|
2024-01-30 07:28:21 -05:00
|
|
|
./common,
|
2022-08-03 01:47:42 +02:00
|
|
|
./rpc,
|
2022-10-25 14:55:31 +02:00
|
|
|
./rpc_codec,
|
2024-04-15 15:28:35 +02:00
|
|
|
./protocol_metrics,
|
2024-07-16 15:46:21 +02:00
|
|
|
../common/rate_limit/request_limiter
|
2024-03-16 00:08:47 +01:00
|
|
|
|
2022-08-03 01:47:42 +02:00
|
|
|
logScope:
|
2022-11-03 16:36:24 +01:00
|
|
|
topics = "waku lightpush"
|
2022-08-03 01:47:42 +02:00
|
|
|
|
2024-01-30 07:28:21 -05:00
|
|
|
type WakuLightPush* = ref object of LPProtocol
|
2024-03-16 00:08:47 +01:00
|
|
|
rng*: ref rand.HmacDrbgContext
|
|
|
|
|
peerManager*: PeerManager
|
|
|
|
|
pushHandler*: PushMessageHandler
|
2024-07-16 15:46:21 +02:00
|
|
|
requestRateLimiter*: RequestRateLimiter
|
2025-03-05 12:07:56 +01:00
|
|
|
sharding: Sharding
|
2022-08-03 01:47:42 +02:00
|
|
|
|
2024-03-16 00:08:47 +01:00
|
|
|
proc handleRequest*(
|
|
|
|
|
wl: WakuLightPush, peerId: PeerId, buffer: seq[byte]
|
2025-03-05 12:07:56 +01:00
|
|
|
): Future[LightPushResponse] {.async.} =
|
|
|
|
|
let reqDecodeRes = LightpushRequest.decode(buffer)
|
|
|
|
|
var isSuccess = false
|
|
|
|
|
var pushResponse: LightpushResponse
|
2023-10-09 16:38:23 +02:00
|
|
|
|
|
|
|
|
if reqDecodeRes.isErr():
|
2025-03-05 12:07:56 +01:00
|
|
|
pushResponse = LightpushResponse(
|
|
|
|
|
requestId: "N/A", # due to decode failure we don't know requestId
|
|
|
|
|
statusCode: LightpushStatusCode.BAD_REQUEST.uint32,
|
|
|
|
|
statusDesc: some(decodeRpcFailure & ": " & $reqDecodeRes.error),
|
|
|
|
|
)
|
2023-10-09 16:38:23 +02:00
|
|
|
else:
|
2025-03-05 12:07:56 +01:00
|
|
|
let pushRequest = reqDecodeRes.get()
|
|
|
|
|
|
|
|
|
|
let pubsubTopic = pushRequest.pubSubTopic.valueOr:
|
|
|
|
|
let parsedTopic = NsContentTopic.parse(pushRequest.message.contentTopic).valueOr:
|
|
|
|
|
let msg = "Invalid content-topic:" & $error
|
|
|
|
|
error "lightpush request handling error", error = msg
|
|
|
|
|
return LightpushResponse(
|
|
|
|
|
requestId: pushRequest.requestId,
|
|
|
|
|
statusCode: LightpushStatusCode.INVALID_MESSAGE_ERROR.uint32,
|
|
|
|
|
statusDesc: some(msg),
|
|
|
|
|
)
|
2022-10-25 14:55:31 +02:00
|
|
|
|
2025-03-05 12:07:56 +01:00
|
|
|
wl.sharding.getShard(parsedTopic).valueOr:
|
|
|
|
|
let msg = "Autosharding error: " & error
|
|
|
|
|
error "lightpush request handling error", error = msg
|
|
|
|
|
return LightpushResponse(
|
|
|
|
|
requestId: pushRequest.requestId,
|
|
|
|
|
statusCode: LightpushStatusCode.INTERNAL_SERVER_ERROR.uint32,
|
|
|
|
|
statusDesc: some(msg),
|
|
|
|
|
)
|
2022-08-03 01:47:42 +02:00
|
|
|
|
2025-03-05 12:07:56 +01:00
|
|
|
# ensure checking topic will not cause error at gossipsub level
|
|
|
|
|
if pubsubTopic.isEmptyOrWhitespace():
|
|
|
|
|
let msg = "topic must not be empty"
|
|
|
|
|
error "lightpush request handling error", error = msg
|
|
|
|
|
return LightPushResponse(
|
|
|
|
|
requestId: pushRequest.requestId,
|
|
|
|
|
statusCode: LightpushStatusCode.BAD_REQUEST.uint32,
|
|
|
|
|
statusDesc: some(msg),
|
|
|
|
|
)
|
2024-08-02 09:45:05 +02:00
|
|
|
|
2025-03-05 12:07:56 +01:00
|
|
|
waku_lightpush_v3_messages.inc(labelValues = ["PushRequest"])
|
2024-08-02 09:45:05 +02:00
|
|
|
|
2025-04-16 17:04:52 +02:00
|
|
|
let msg_hash = pubsubTopic.computeMessageHash(pushRequest.message).to0xHex()
|
2024-08-02 09:45:05 +02:00
|
|
|
notice "handling lightpush request",
|
2025-02-13 00:48:36 +01:00
|
|
|
my_peer_id = wl.peerManager.switch.peerInfo.peerId,
|
2024-06-10 15:56:55 +02:00
|
|
|
peer_id = peerId,
|
2025-03-05 12:07:56 +01:00
|
|
|
requestId = pushRequest.requestId,
|
|
|
|
|
pubsubTopic = pushRequest.pubsubTopic,
|
2025-04-16 17:04:52 +02:00
|
|
|
msg_hash = msg_hash,
|
2025-02-13 00:48:36 +01:00
|
|
|
receivedTime = getNowInNanosecondTime()
|
2024-03-16 00:08:47 +01:00
|
|
|
|
2025-03-05 12:07:56 +01:00
|
|
|
let handleRes = await wl.pushHandler(peerId, pubsubTopic, pushRequest.message)
|
|
|
|
|
|
2023-10-09 16:38:23 +02:00
|
|
|
isSuccess = handleRes.isOk()
|
2025-03-05 12:07:56 +01:00
|
|
|
pushResponse = LightpushResponse(
|
|
|
|
|
requestId: pushRequest.requestId,
|
|
|
|
|
statusCode:
|
|
|
|
|
if isSuccess:
|
|
|
|
|
LightpushStatusCode.SUCCESS.uint32
|
|
|
|
|
else:
|
|
|
|
|
handleRes.error.code.uint32,
|
|
|
|
|
statusDesc:
|
|
|
|
|
if isSuccess:
|
|
|
|
|
none[string]()
|
|
|
|
|
else:
|
|
|
|
|
handleRes.error.desc,
|
2025-03-18 09:36:58 +01:00
|
|
|
relayPeerCount:
|
|
|
|
|
if isSuccess:
|
|
|
|
|
some(handleRes.get())
|
|
|
|
|
else:
|
|
|
|
|
none[uint32](),
|
2025-03-05 12:07:56 +01:00
|
|
|
)
|
2023-10-09 16:38:23 +02:00
|
|
|
|
2024-05-09 20:07:49 +02:00
|
|
|
if not isSuccess:
|
2025-03-05 12:07:56 +01:00
|
|
|
waku_lightpush_v3_errors.inc(
|
|
|
|
|
labelValues = [pushResponse.statusDesc.valueOr("unknown")]
|
|
|
|
|
)
|
|
|
|
|
error "failed to push message", error = pushResponse.statusDesc
|
|
|
|
|
return pushResponse
|
2023-10-09 16:38:23 +02:00
|
|
|
|
2024-03-04 15:31:37 +01:00
|
|
|
proc initProtocolHandler(wl: WakuLightPush) =
|
2023-10-09 16:38:23 +02:00
|
|
|
proc handle(conn: Connection, proto: string) {.async.} =
|
2025-03-05 12:07:56 +01:00
|
|
|
var rpc: LightpushResponse
|
2024-05-09 20:07:49 +02:00
|
|
|
wl.requestRateLimiter.checkUsageLimit(WakuLightPushCodec, conn):
|
|
|
|
|
let buffer = await conn.readLp(DefaultMaxRpcSize)
|
|
|
|
|
|
2024-06-28 02:48:29 +02:00
|
|
|
waku_service_network_bytes.inc(
|
|
|
|
|
amount = buffer.len().int64, labelValues = [WakuLightPushCodec, "in"]
|
2024-05-09 20:07:49 +02:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
rpc = await handleRequest(wl, conn.peerId, buffer)
|
|
|
|
|
do:
|
|
|
|
|
debug "lightpush request rejected due rate limit exceeded",
|
2024-07-16 15:46:21 +02:00
|
|
|
peerId = conn.peerId, limit = $wl.requestRateLimiter.setting
|
2024-05-09 20:07:49 +02:00
|
|
|
|
|
|
|
|
rpc = static(
|
2025-03-05 12:07:56 +01:00
|
|
|
LightpushResponse(
|
2024-05-09 20:07:49 +02:00
|
|
|
## We will not copy and decode RPC buffer from stream only for requestId
|
|
|
|
|
## in reject case as it is comparably too expensive and opens possible
|
|
|
|
|
## attack surface
|
|
|
|
|
requestId: "N/A",
|
2025-03-05 12:07:56 +01:00
|
|
|
statusCode: LightpushStatusCode.TOO_MANY_REQUESTS.uint32,
|
|
|
|
|
statusDesc: some(TooManyRequestsMessage),
|
2024-05-09 20:07:49 +02:00
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
|
2023-10-09 16:38:23 +02:00
|
|
|
await conn.writeLp(rpc.encode().buffer)
|
2024-03-16 00:08:47 +01:00
|
|
|
|
2024-05-09 20:07:49 +02:00
|
|
|
## For lightpush might not worth to measure outgoing trafic as it is only
|
|
|
|
|
## small respones about success/failure
|
|
|
|
|
|
2022-08-03 01:47:42 +02:00
|
|
|
wl.handler = handle
|
|
|
|
|
wl.codec = WakuLightPushCodec
|
|
|
|
|
|
2024-03-16 00:08:47 +01:00
|
|
|
proc new*(
|
|
|
|
|
T: type WakuLightPush,
|
|
|
|
|
peerManager: PeerManager,
|
|
|
|
|
rng: ref rand.HmacDrbgContext,
|
|
|
|
|
pushHandler: PushMessageHandler,
|
2025-03-05 12:07:56 +01:00
|
|
|
sharding: Sharding,
|
2024-04-15 15:28:35 +02:00
|
|
|
rateLimitSetting: Option[RateLimitSetting] = none[RateLimitSetting](),
|
2024-03-16 00:08:47 +01:00
|
|
|
): T =
|
2024-04-15 15:28:35 +02:00
|
|
|
let wl = WakuLightPush(
|
|
|
|
|
rng: rng,
|
|
|
|
|
peerManager: peerManager,
|
|
|
|
|
pushHandler: pushHandler,
|
2024-07-16 15:46:21 +02:00
|
|
|
requestRateLimiter: newRequestRateLimiter(rateLimitSetting),
|
2025-03-05 12:07:56 +01:00
|
|
|
sharding: sharding,
|
2024-04-15 15:28:35 +02:00
|
|
|
)
|
2022-10-25 14:55:31 +02:00
|
|
|
wl.initProtocolHandler()
|
2024-09-18 15:58:07 +02:00
|
|
|
setServiceLimitMetric(WakuLightpushCodec, rateLimitSetting)
|
2024-06-28 02:48:29 +02:00
|
|
|
return wl
|