From 1e73213a3604b0113a13b1ca2157db3276c78a4d Mon Sep 17 00:00:00 2001 From: Sergei Tikhomirov Date: Fri, 28 Nov 2025 10:41:20 +0100 Subject: [PATCH] chore: Lightpush minor refactor (#3538) * chore: refactor Lightpush (more DRY) * chore: apply review suggestions Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> --------- Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> --- .../lightpush_mix/lightpush_publisher_mix.nim | 6 +- tests/waku_lightpush/test_ratelimit.nim | 8 +- waku/node/kernel_api/lightpush.nim | 4 +- waku/waku_lightpush/client.nim | 145 +++++++----------- waku/waku_lightpush/common.nim | 15 +- waku/waku_lightpush/protocol.nim | 12 +- 6 files changed, 76 insertions(+), 114 deletions(-) diff --git a/examples/lightpush_mix/lightpush_publisher_mix.nim b/examples/lightpush_mix/lightpush_publisher_mix.nim index 4219cd665..104de8552 100644 --- a/examples/lightpush_mix/lightpush_publisher_mix.nim +++ b/examples/lightpush_mix/lightpush_publisher_mix.nim @@ -163,9 +163,9 @@ proc setupAndPublish(rng: ref HmacDrbgContext, conf: LightPushMixConf) {.async.} ephemeral: true, # tell store nodes to not store it timestamp: getNowInNanosecondTime(), ) # current timestamp - let res = await node.wakuLightpushClient.publishWithConn( - LightpushPubsubTopic, message, conn, dPeerId - ) + + let res = + await node.wakuLightpushClient.publish(some(LightpushPubsubTopic), message, conn) let startTime = getNowInNanosecondTime() diff --git a/tests/waku_lightpush/test_ratelimit.nim b/tests/waku_lightpush/test_ratelimit.nim index b2dcdc7b5..7420a4e56 100644 --- a/tests/waku_lightpush/test_ratelimit.nim +++ b/tests/waku_lightpush/test_ratelimit.nim @@ -37,7 +37,7 @@ suite "Rate limited push service": handlerFuture = newFuture[(string, WakuMessage)]() let requestRes = - await client.publish(some(DefaultPubsubTopic), message, peer = serverPeerId) + await client.publish(some(DefaultPubsubTopic), message, serverPeerId) check await handlerFuture.withTimeout(50.millis) @@ -66,7 +66,7 @@ suite "Rate limited push service": var endTime = Moment.now() var elapsed: Duration = (endTime - startTime) await sleepAsync(tokenPeriod - elapsed + firstWaitExtend) - firstWaitEXtend = 100.millis + firstWaitExtend = 100.millis ## Cleanup await allFutures(clientSwitch.stop(), serverSwitch.stop()) @@ -99,7 +99,7 @@ suite "Rate limited push service": let message = fakeWakuMessage() handlerFuture = newFuture[(string, WakuMessage)]() let requestRes = - await client.publish(some(DefaultPubsubTopic), message, peer = serverPeerId) + await client.publish(some(DefaultPubsubTopic), message, serverPeerId) discard await handlerFuture.withTimeout(10.millis) check: @@ -114,7 +114,7 @@ suite "Rate limited push service": let message = fakeWakuMessage() handlerFuture = newFuture[(string, WakuMessage)]() let requestRes = - await client.publish(some(DefaultPubsubTopic), message, peer = serverPeerId) + await client.publish(some(DefaultPubsubTopic), message, serverPeerId) discard await handlerFuture.withTimeout(10.millis) check: diff --git a/waku/node/kernel_api/lightpush.nim b/waku/node/kernel_api/lightpush.nim index 8df6291b1..9451767ac 100644 --- a/waku/node/kernel_api/lightpush.nim +++ b/waku/node/kernel_api/lightpush.nim @@ -210,9 +210,7 @@ proc lightpushPublishHandler( "Waku lightpush with mix not available", ) - return await node.wakuLightpushClient.publishWithConn( - pubsubTopic, message, conn, peer.peerId - ) + return await node.wakuLightpushClient.publish(some(pubsubTopic), message, conn) else: return await node.wakuLightpushClient.publish(some(pubsubTopic), message, peer) diff --git a/waku/waku_lightpush/client.nim b/waku/waku_lightpush/client.nim index d68552304..b528b4c76 100644 --- a/waku/waku_lightpush/client.nim +++ b/waku/waku_lightpush/client.nim @@ -17,8 +17,8 @@ logScope: topics = "waku lightpush client" type WakuLightPushClient* = ref object - peerManager*: PeerManager rng*: ref rand.HmacDrbgContext + peerManager*: PeerManager publishObservers: seq[PublishObserver] proc new*( @@ -29,33 +29,31 @@ proc new*( proc addPublishObserver*(wl: WakuLightPushClient, obs: PublishObserver) = wl.publishObservers.add(obs) -proc sendPushRequest( - wl: WakuLightPushClient, - req: LightPushRequest, - peer: PeerId | RemotePeerInfo, - conn: Option[Connection] = none(Connection), -): Future[WakuLightPushResult] {.async.} = - let connection = conn.valueOr: - (await wl.peerManager.dialPeer(peer, WakuLightPushCodec)).valueOr: - waku_lightpush_v3_errors.inc(labelValues = [dialFailure]) - return lighpushErrorResult( - LightPushErrorCode.NO_PEERS_TO_RELAY, - dialFailure & ": " & $peer & " is not accessible", - ) +proc ensureTimestampSet(message: var WakuMessage) = + if message.timestamp == 0: + message.timestamp = getNowInNanosecondTime() - defer: - await connection.closeWithEOF() +## Short log string for peer identifiers (overloads for convenience) +func shortPeerId(peer: PeerId): string = + shortLog(peer) + +func shortPeerId(peer: RemotePeerInfo): string = + shortLog(peer.peerId) + +proc sendPushRequestToConn( + wl: WakuLightPushClient, request: LightPushRequest, conn: Connection +): Future[WakuLightPushResult] {.async.} = try: - await connection.writeLP(req.encode().buffer) - except CatchableError: - error "failed to send push request", error = getCurrentExceptionMsg() + await conn.writeLp(request.encode().buffer) + except LPStreamRemoteClosedError: + error "Failed to write request to peer", error = getCurrentExceptionMsg() return lightpushResultInternalError( - "failed to send push request: " & getCurrentExceptionMsg() + "Failed to write request to peer: " & getCurrentExceptionMsg() ) var buffer: seq[byte] try: - buffer = await connection.readLp(DefaultMaxRpcSize.int) + buffer = await conn.readLp(DefaultMaxRpcSize.int) except LPStreamRemoteClosedError: error "Failed to read response from peer", error = getCurrentExceptionMsg() return lightpushResultInternalError( @@ -66,10 +64,12 @@ proc sendPushRequest( waku_lightpush_v3_errors.inc(labelValues = [decodeRpcFailure]) return lightpushResultInternalError(decodeRpcFailure) - if response.requestId != req.requestId and - response.statusCode != LightPushErrorCode.TOO_MANY_REQUESTS: + let requestIdMismatch = response.requestId != request.requestId + let tooManyRequests = response.statusCode == LightPushErrorCode.TOO_MANY_REQUESTS + if requestIdMismatch and (not tooManyRequests): + # response with TOO_MANY_REQUESTS error code has no requestId by design error "response failure, requestId mismatch", - requestId = req.requestId, responseRequestId = response.requestId + requestId = request.requestId, responseRequestId = response.requestId return lightpushResultInternalError("response failure, requestId mismatch") return toPushResult(response) @@ -78,88 +78,49 @@ proc publish*( wl: WakuLightPushClient, pubSubTopic: Option[PubsubTopic] = none(PubsubTopic), wakuMessage: WakuMessage, - peer: PeerId | RemotePeerInfo, + dest: Connection | PeerId | RemotePeerInfo, ): Future[WakuLightPushResult] {.async, gcsafe.} = + let conn = + when dest is Connection: + dest + else: + (await wl.peerManager.dialPeer(dest, WakuLightPushCodec)).valueOr: + waku_lightpush_v3_errors.inc(labelValues = [dialFailure]) + return lighpushErrorResult( + LightPushErrorCode.NO_PEERS_TO_RELAY, + "Peer is not accessible: " & dialFailure & " - " & $dest, + ) + + defer: + await conn.closeWithEOF() + var message = wakuMessage - if message.timestamp == 0: - message.timestamp = getNowInNanosecondTime() + ensureTimestampSet(message) - when peer is PeerId: - info "publish", - peerId = shortLog(peer), - msg_hash = computeMessageHash(pubsubTopic.get(""), message).to0xHex - else: - info "publish", - peerId = shortLog(peer.peerId), - msg_hash = computeMessageHash(pubsubTopic.get(""), message).to0xHex + let msgHash = computeMessageHash(pubSubTopic.get(""), message).to0xHex() + info "publish", + myPeerId = wl.peerManager.switch.peerInfo.peerId, + peerId = shortPeerId(conn.peerId), + msgHash = msgHash, + sentTime = getNowInNanosecondTime() - let pushRequest = LightpushRequest( - requestId: generateRequestId(wl.rng), pubSubTopic: pubSubTopic, message: message + let request = LightpushRequest( + requestId: generateRequestId(wl.rng), pubsubTopic: pubSubTopic, message: message ) - let publishedCount = ?await wl.sendPushRequest(pushRequest, peer) + let relayPeerCount = ?await wl.sendPushRequestToConn(request, conn) for obs in wl.publishObservers: obs.onMessagePublished(pubSubTopic.get(""), message) - return lightpushSuccessResult(publishedCount) + return lightpushSuccessResult(relayPeerCount) proc publishToAny*( - wl: WakuLightPushClient, pubSubTopic: PubsubTopic, wakuMessage: WakuMessage + wl: WakuLightPushClient, pubsubTopic: PubsubTopic, wakuMessage: WakuMessage ): Future[WakuLightPushResult] {.async, gcsafe.} = - ## This proc is similar to the publish one but in this case - ## we don't specify a particular peer and instead we get it from peer manager - - var message = wakuMessage - if message.timestamp == 0: - message.timestamp = getNowInNanosecondTime() - + # Like publish, but selects a peer automatically from the peer manager let peer = wl.peerManager.selectPeer(WakuLightPushCodec).valueOr: # TODO: check if it is matches the situation - shall we distinguish client side missing peers from server side? return lighpushErrorResult( LightPushErrorCode.NO_PEERS_TO_RELAY, "no suitable remote peers" ) - - info "publishToAny", - my_peer_id = wl.peerManager.switch.peerInfo.peerId, - peer_id = peer.peerId, - msg_hash = computeMessageHash(pubsubTopic, message).to0xHex, - sentTime = getNowInNanosecondTime() - - let pushRequest = LightpushRequest( - requestId: generateRequestId(wl.rng), - pubSubTopic: some(pubSubTopic), - message: message, - ) - let publishedCount = ?await wl.sendPushRequest(pushRequest, peer) - - for obs in wl.publishObservers: - obs.onMessagePublished(pubSubTopic, message) - - return lightpushSuccessResult(publishedCount) - -proc publishWithConn*( - wl: WakuLightPushClient, - pubSubTopic: PubsubTopic, - message: WakuMessage, - conn: Connection, - destPeer: PeerId, -): Future[WakuLightPushResult] {.async, gcsafe.} = - info "publishWithConn", - my_peer_id = wl.peerManager.switch.peerInfo.peerId, - peer_id = destPeer, - msg_hash = computeMessageHash(pubsubTopic, message).to0xHex, - sentTime = getNowInNanosecondTime() - - let pushRequest = LightpushRequest( - requestId: generateRequestId(wl.rng), - pubSubTopic: some(pubSubTopic), - message: message, - ) - #TODO: figure out how to not pass destPeer as this is just a hack - let publishedCount = - ?await wl.sendPushRequest(pushRequest, destPeer, conn = some(conn)) - - for obs in wl.publishObservers: - obs.onMessagePublished(pubSubTopic, message) - - return lightpushSuccessResult(publishedCount) + return await wl.publish(some(pubsubTopic), wakuMessage, peer) diff --git a/waku/waku_lightpush/common.nim b/waku/waku_lightpush/common.nim index f2687834e..9c2ea7ced 100644 --- a/waku/waku_lightpush/common.nim +++ b/waku/waku_lightpush/common.nim @@ -35,7 +35,15 @@ func isSuccess*(response: LightPushResponse): bool = func toPushResult*(response: LightPushResponse): WakuLightPushResult = if isSuccess(response): - return ok(response.relayPeerCount.get(0)) + let relayPeerCount = response.relayPeerCount.get(0) + return ( + if (relayPeerCount == 0): + # Consider publishing to zero peers an error even if the service node + # sent us a "successful" response with zero peers + err((LightPushErrorCode.NO_PEERS_TO_RELAY, response.statusDesc)) + else: + ok(relayPeerCount) + ) else: return err((response.statusCode, response.statusDesc)) @@ -51,11 +59,6 @@ func lightpushResultBadRequest*(msg: string): WakuLightPushResult = func lightpushResultServiceUnavailable*(msg: string): WakuLightPushResult = return err((LightPushErrorCode.SERVICE_NOT_AVAILABLE, some(msg))) -func lighpushErrorResult*( - statusCode: LightpushStatusCode, desc: Option[string] -): WakuLightPushResult = - return err((statusCode, desc)) - func lighpushErrorResult*( statusCode: LightpushStatusCode, desc: string ): WakuLightPushResult = diff --git a/waku/waku_lightpush/protocol.nim b/waku/waku_lightpush/protocol.nim index 2e8c9c2f1..95bfc003e 100644 --- a/waku/waku_lightpush/protocol.nim +++ b/waku/waku_lightpush/protocol.nim @@ -78,9 +78,9 @@ proc handleRequest( proc handleRequest*( wl: WakuLightPush, peerId: PeerId, buffer: seq[byte] ): Future[LightPushResponse] {.async.} = - let pushRequest = LightPushRequest.decode(buffer).valueOr: + let request = LightPushRequest.decode(buffer).valueOr: let desc = decodeRpcFailure & ": " & $error - error "failed to push message", error = desc + error "failed to decode Lightpush request", error = desc let errorCode = LightPushErrorCode.BAD_REQUEST waku_lightpush_v3_errors.inc(labelValues = [$errorCode]) return LightPushResponse( @@ -89,16 +89,16 @@ proc handleRequest*( statusDesc: some(desc), ) - let relayPeerCount = (await handleRequest(wl, peerId, pushRequest)).valueOr: + let relayPeerCount = (await wl.handleRequest(peerId, request)).valueOr: let desc = error.desc waku_lightpush_v3_errors.inc(labelValues = [$error.code]) error "failed to push message", error = desc return LightPushResponse( - requestId: pushRequest.requestId, statusCode: error.code, statusDesc: desc + requestId: request.requestId, statusCode: error.code, statusDesc: desc ) return LightPushResponse( - requestId: pushRequest.requestId, + requestId: request.requestId, statusCode: LightPushSuccessCode.SUCCESS, statusDesc: none[string](), relayPeerCount: some(relayPeerCount), @@ -123,7 +123,7 @@ proc initProtocolHandler(wl: WakuLightPush) = ) try: - rpc = await handleRequest(wl, conn.peerId, buffer) + rpc = await wl.handleRequest(conn.peerId, buffer) except CatchableError: error "lightpush failed handleRequest", error = getCurrentExceptionMsg() do: