NagyZoltanPeter e0b563ffe5
feat: lightpush v3 (#3279)
* Separate new lightpush protocol
New RPC defined
Rename al occurence of old lightpush to legacy lightpush, fix rest tests of lightpush
New lightpush protocol added back
Setup new lightpush protocol, mounting and rest api for it

	modified:   apps/chat2/chat2.nim
	modified:   tests/node/test_wakunode_lightpush.nim
	modified:   tests/node/test_wakunode_sharding.nim
	modified:   tests/test_peer_manager.nim
	modified:   tests/test_wakunode_lightpush.nim
	renamed:    tests/waku_lightpush/lightpush_utils.nim -> tests/waku_lightpush_legacy/lightpush_utils.nim
	renamed:    tests/waku_lightpush/test_all.nim -> tests/waku_lightpush_legacy/test_all.nim
	renamed:    tests/waku_lightpush/test_client.nim -> tests/waku_lightpush_legacy/test_client.nim
	renamed:    tests/waku_lightpush/test_ratelimit.nim -> tests/waku_lightpush_legacy/test_ratelimit.nim
	modified:   tests/wakunode_rest/test_all.nim
	renamed:    tests/wakunode_rest/test_rest_lightpush.nim -> tests/wakunode_rest/test_rest_lightpush_legacy.nim
	modified:   waku/factory/node_factory.nim
	modified:   waku/node/waku_node.nim
	modified:   waku/waku_api/rest/admin/handlers.nim
	modified:   waku/waku_api/rest/builder.nim
	new file:   waku/waku_api/rest/legacy_lightpush/client.nim
	new file:   waku/waku_api/rest/legacy_lightpush/handlers.nim
	new file:   waku/waku_api/rest/legacy_lightpush/types.nim
	modified:   waku/waku_api/rest/lightpush/client.nim
	modified:   waku/waku_api/rest/lightpush/handlers.nim
	modified:   waku/waku_api/rest/lightpush/types.nim
	modified:   waku/waku_core/codecs.nim
	modified:   waku/waku_lightpush.nim
	modified:   waku/waku_lightpush/callbacks.nim
	modified:   waku/waku_lightpush/client.nim
	modified:   waku/waku_lightpush/common.nim
	modified:   waku/waku_lightpush/protocol.nim
	modified:   waku/waku_lightpush/rpc.nim
	modified:   waku/waku_lightpush/rpc_codec.nim
	modified:   waku/waku_lightpush/self_req_handler.nim
	new file:   waku/waku_lightpush_legacy.nim
	renamed:    waku/waku_lightpush/README.md -> waku/waku_lightpush_legacy/README.md
	new file:   waku/waku_lightpush_legacy/callbacks.nim
	new file:   waku/waku_lightpush_legacy/client.nim
	new file:   waku/waku_lightpush_legacy/common.nim
	new file:   waku/waku_lightpush_legacy/protocol.nim
	new file:   waku/waku_lightpush_legacy/protocol_metrics.nim
	new file:   waku/waku_lightpush_legacy/rpc.nim
	new file:   waku/waku_lightpush_legacy/rpc_codec.nim
	new file:   waku/waku_lightpush_legacy/self_req_handler.nim

Adapt to non-invasive libp2p observers

cherry pick latest lightpush (v1) changes into legacy lightpush code after rebase to latest master

Fix vendor dependencies from origin/master after failed rebase of them

Adjust examples, test to new lightpush - keep using of legacy

Fixup error code mappings

Fix REST admin interface with distinct legacy and new lightpush

Fix lightpush v2 tests

* Utilize new publishEx interface of pubsub libp2p

* Adapt to latest libp2p pubslih design changes. publish returns an outcome as Result error.

* Fix review findings

* Fix tests, re-added lost one

* Fix rebase

* Apply suggestions from code review

Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com>

* Addressing review comments

* Fix incentivization tests

* Fix build failed on libwaku

* Change new lightpush endpoint version to 3 instead of 2. Noticed that old and new lightpush metrics can cause trouble in monitoring dashboards so decided to give new name as v3 for the new lightpush metrics and change legacy ones back - temporarly till old lightpush will be decommissioned

* Fixing flaky test with rate limit timing

* Fixing logscope of lightpush and legacy lightpush

---------

Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com>
2025-03-05 12:07:56 +01:00

162 lines
5.0 KiB
Nim

{.push raises: [].}
import
std/[options, strutils],
results,
stew/byteutils,
chronicles,
chronos,
metrics,
bearssl/rand
import
../node/peer_manager/peer_manager,
../waku_core,
../waku_core/topics/sharding,
./common,
./rpc,
./rpc_codec,
./protocol_metrics,
../common/rate_limit/request_limiter
logScope:
topics = "waku lightpush"
type WakuLightPush* = ref object of LPProtocol
rng*: ref rand.HmacDrbgContext
peerManager*: PeerManager
pushHandler*: PushMessageHandler
requestRateLimiter*: RequestRateLimiter
sharding: Sharding
proc handleRequest*(
wl: WakuLightPush, peerId: PeerId, buffer: seq[byte]
): Future[LightPushResponse] {.async.} =
let reqDecodeRes = LightpushRequest.decode(buffer)
var isSuccess = false
var pushResponse: LightpushResponse
if reqDecodeRes.isErr():
pushResponse = LightpushResponse(
requestId: "N/A", # due to decode failure we don't know requestId
statusCode: LightpushStatusCode.BAD_REQUEST.uint32,
statusDesc: some(decodeRpcFailure & ": " & $reqDecodeRes.error),
)
else:
let pushRequest = reqDecodeRes.get()
let pubsubTopic = pushRequest.pubSubTopic.valueOr:
let parsedTopic = NsContentTopic.parse(pushRequest.message.contentTopic).valueOr:
let msg = "Invalid content-topic:" & $error
error "lightpush request handling error", error = msg
return LightpushResponse(
requestId: pushRequest.requestId,
statusCode: LightpushStatusCode.INVALID_MESSAGE_ERROR.uint32,
statusDesc: some(msg),
)
wl.sharding.getShard(parsedTopic).valueOr:
let msg = "Autosharding error: " & error
error "lightpush request handling error", error = msg
return LightpushResponse(
requestId: pushRequest.requestId,
statusCode: LightpushStatusCode.INTERNAL_SERVER_ERROR.uint32,
statusDesc: some(msg),
)
# ensure checking topic will not cause error at gossipsub level
if pubsubTopic.isEmptyOrWhitespace():
let msg = "topic must not be empty"
error "lightpush request handling error", error = msg
return LightPushResponse(
requestId: pushRequest.requestId,
statusCode: LightpushStatusCode.BAD_REQUEST.uint32,
statusDesc: some(msg),
)
waku_lightpush_v3_messages.inc(labelValues = ["PushRequest"])
notice "handling lightpush request",
my_peer_id = wl.peerManager.switch.peerInfo.peerId,
peer_id = peerId,
requestId = pushRequest.requestId,
pubsubTopic = pushRequest.pubsubTopic,
msg_hash = pubsubTopic.computeMessageHash(pushRequest.message).to0xHex(),
receivedTime = getNowInNanosecondTime()
let handleRes = await wl.pushHandler(peerId, pubsubTopic, pushRequest.message)
isSuccess = handleRes.isOk()
pushResponse = LightpushResponse(
requestId: pushRequest.requestId,
statusCode:
if isSuccess:
LightpushStatusCode.SUCCESS.uint32
else:
handleRes.error.code.uint32,
statusDesc:
if isSuccess:
none[string]()
else:
handleRes.error.desc,
)
if not isSuccess:
waku_lightpush_v3_errors.inc(
labelValues = [pushResponse.statusDesc.valueOr("unknown")]
)
error "failed to push message", error = pushResponse.statusDesc
return pushResponse
proc initProtocolHandler(wl: WakuLightPush) =
proc handle(conn: Connection, proto: string) {.async.} =
var rpc: LightpushResponse
wl.requestRateLimiter.checkUsageLimit(WakuLightPushCodec, conn):
let buffer = await conn.readLp(DefaultMaxRpcSize)
waku_service_network_bytes.inc(
amount = buffer.len().int64, labelValues = [WakuLightPushCodec, "in"]
)
rpc = await handleRequest(wl, conn.peerId, buffer)
do:
debug "lightpush request rejected due rate limit exceeded",
peerId = conn.peerId, limit = $wl.requestRateLimiter.setting
rpc = static(
LightpushResponse(
## We will not copy and decode RPC buffer from stream only for requestId
## in reject case as it is comparably too expensive and opens possible
## attack surface
requestId: "N/A",
statusCode: LightpushStatusCode.TOO_MANY_REQUESTS.uint32,
statusDesc: some(TooManyRequestsMessage),
)
)
await conn.writeLp(rpc.encode().buffer)
## For lightpush might not worth to measure outgoing trafic as it is only
## small respones about success/failure
wl.handler = handle
wl.codec = WakuLightPushCodec
proc new*(
T: type WakuLightPush,
peerManager: PeerManager,
rng: ref rand.HmacDrbgContext,
pushHandler: PushMessageHandler,
sharding: Sharding,
rateLimitSetting: Option[RateLimitSetting] = none[RateLimitSetting](),
): T =
let wl = WakuLightPush(
rng: rng,
peerManager: peerManager,
pushHandler: pushHandler,
requestRateLimiter: newRequestRateLimiter(rateLimitSetting),
sharding: sharding,
)
wl.initProtocolHandler()
setServiceLimitMetric(WakuLightpushCodec, rateLimitSetting)
return wl