fryorcraken 16bda047e1
chore!: make sharding configuration explicit (#3468)
* Reserve `networkconfig` name to waku network related settings

* Rename cluster conf to network conf

 A `NetworkConf` is a Waku network configuration.

# Conflicts:
#	tests/factory/test_waku_conf.nim

# Conflicts:
#	tests/factory/test_waku_conf.nim

* Improve sharding configuration

A smarter data types simplifies the logic.

* Fixing tests

* fixup! rename to endpointConf

* wip: autosharding is a specific configuration state and treat it like
it

# Conflicts:
#	waku/factory/external_config.nim

* refactor lightpush handler

some metrics error reporting were missing

# Conflicts:
#	waku/waku_lightpush/protocol.nim

* test_node_factory tests pass

* remove warnings

* fix tests

* Revert eager previous replace-all command

* fix up build tools compilation

* metadata is used to store cluster id

* Mount relay routes in static sharding

* Rename activeRelayShards to subscribeShards

To make it clearer that these are the shards the node will subscribe to.

* Remove unused msg var

* Improve error handling

* Set autosharding as default, with 1 shard in network

Also makes shards to subscribe to all shards in auto sharding, none in
static sharding.
2025-07-04 17:10:53 +10:00

172 lines
5.8 KiB
Nim

{.push raises: [].}
import
std/[options, strutils],
results,
stew/byteutils,
chronicles,
chronos,
metrics,
bearssl/rand
import
../node/peer_manager/peer_manager,
../waku_core,
../waku_core/topics/sharding,
./common,
./rpc,
./rpc_codec,
./protocol_metrics,
../common/rate_limit/request_limiter
logScope:
topics = "waku lightpush"
type WakuLightPush* = ref object of LPProtocol
rng*: ref rand.HmacDrbgContext
peerManager*: PeerManager
pushHandler*: PushMessageHandler
requestRateLimiter*: RequestRateLimiter
autoSharding: Option[Sharding]
proc handleRequest(
wl: WakuLightPush, peerId: PeerId, pushRequest: LightpushRequest
): Future[WakuLightPushResult] {.async.} =
let pubsubTopic = pushRequest.pubSubTopic.valueOr:
if wl.autoSharding.isNone():
let msg = "Pubsub topic must be specified when static sharding is enabled"
error "lightpush request handling error", error = msg
return WakuLightPushResult.err(
(code: LightpushStatusCode.INVALID_MESSAGE_ERROR, desc: some(msg))
)
let parsedTopic = NsContentTopic.parse(pushRequest.message.contentTopic).valueOr:
let msg = "Invalid content-topic:" & $error
error "lightpush request handling error", error = msg
return WakuLightPushResult.err(
(code: LightPushStatusCode.INVALID_MESSAGE_ERROR, desc: some(msg))
)
wl.autoSharding.get().getShard(parsedTopic).valueOr:
let msg = "Auto-sharding error: " & error
error "lightpush request handling error", error = msg
return WakuLightPushResult.err(
(code: LightPushStatusCode.INTERNAL_SERVER_ERROR, desc: some(msg))
)
# ensure checking topic will not cause error at gossipsub level
if pubsubTopic.isEmptyOrWhitespace():
let msg = "topic must not be empty"
error "lightpush request handling error", error = msg
return
WakuLightPushResult.err((code: LightPushStatusCode.BAD_REQUEST, desc: some(msg)))
waku_lightpush_v3_messages.inc(labelValues = ["PushRequest"])
let msg_hash = pubsubTopic.computeMessageHash(pushRequest.message).to0xHex()
notice "handling lightpush request",
my_peer_id = wl.peerManager.switch.peerInfo.peerId,
peer_id = peerId,
requestId = pushRequest.requestId,
pubsubTopic = pushRequest.pubsubTopic,
msg_hash = msg_hash,
receivedTime = getNowInNanosecondTime()
let res = (await wl.pushHandler(peerId, pubsubTopic, pushRequest.message)).valueOr:
return err((code: error.code, desc: error.desc))
return ok(res)
proc handleRequest*(
wl: WakuLightPush, peerId: PeerId, buffer: seq[byte]
): Future[LightPushResponse] {.async.} =
var pushResponse: LightPushResponse
let pushRequest = LightPushRequest.decode(buffer).valueOr:
let desc = decodeRpcFailure & ": " & $error
error "failed to push message", error = desc
let errorCode = LightPushStatusCode.BAD_REQUEST.uint32
waku_lightpush_v3_errors.inc(labelValues = [$errorCode])
return LightPushResponse(
requestId: "N/A", # due to decode failure we don't know requestId
statusCode: errorCode.uint32,
statusDesc: some(desc),
)
let relayPeerCount = (await handleRequest(wl, peerId, pushRequest)).valueOr:
let desc = error.desc
waku_lightpush_v3_errors.inc(labelValues = [$error.code])
error "failed to push message", error = desc
return LightPushResponse(
requestId: pushRequest.requestId, statusCode: error.code.uint32, statusDesc: desc
)
return LightPushResponse(
requestId: pushRequest.requestId,
statusCode: LightPushStatusCode.SUCCESS.uint32,
statusDesc: none[string](),
relayPeerCount: some(relayPeerCount),
)
proc initProtocolHandler(wl: WakuLightPush) =
proc handler(conn: Connection, proto: string) {.async: (raises: [CancelledError]).} =
var rpc: LightPushResponse
wl.requestRateLimiter.checkUsageLimit(WakuLightPushCodec, conn):
var buffer: seq[byte]
try:
buffer = await conn.readLp(DefaultMaxRpcSize)
except LPStreamError:
error "lightpush read stream failed", error = getCurrentExceptionMsg()
return
waku_service_network_bytes.inc(
amount = buffer.len().int64, labelValues = [WakuLightPushCodec, "in"]
)
try:
rpc = await handleRequest(wl, conn.peerId, buffer)
except CatchableError:
error "lightpush failed handleRequest", error = getCurrentExceptionMsg()
do:
debug "lightpush request rejected due rate limit exceeded",
peerId = conn.peerId, limit = $wl.requestRateLimiter.setting
rpc = static(
LightPushResponse(
## We will not copy and decode RPC buffer from stream only for requestId
## in reject case as it is comparably too expensive and opens possible
## attack surface
requestId: "N/A",
statusCode: LightpushStatusCode.TOO_MANY_REQUESTS.uint32,
statusDesc: some(TooManyRequestsMessage),
)
)
try:
await conn.writeLp(rpc.encode().buffer)
except LPStreamError:
error "lightpush write stream failed", error = getCurrentExceptionMsg()
## For lightpush might not worth to measure outgoing traffic as it is only
## small response about success/failure
wl.handler = handler
wl.codec = WakuLightPushCodec
proc new*(
T: type WakuLightPush,
peerManager: PeerManager,
rng: ref rand.HmacDrbgContext,
pushHandler: PushMessageHandler,
autoSharding: Option[Sharding],
rateLimitSetting: Option[RateLimitSetting] = none[RateLimitSetting](),
): T =
let wl = WakuLightPush(
rng: rng,
peerManager: peerManager,
pushHandler: pushHandler,
requestRateLimiter: newRequestRateLimiter(rateLimitSetting),
autoSharding: autoSharding,
)
wl.initProtocolHandler()
setServiceLimitMetric(WakuLightpushCodec, rateLimitSetting)
return wl